上一篇演示了通过jedis的api进行发布订阅消息的案例,但是只是单订阅方和消费方,本篇演示多个消息发布方,单个消息订阅方的场景。
import org.junit.Test;
import redis.clients.jedis.JedisPubSub;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @Author fanchunshuai
* @Date 2020/1/3 19
* @Description:
* 参考 : https://blog.csdn.net/yuan52007298/article/details/99320123
* 多个消息发布方,单个消息订阅方
*/
public class MultSubPubTest extends ClusterTest
{
private static final String WEIXIN = "WEIXIN";
private static final String QQ = "QQ";
/**
* 再启动发布方
*/
@Test
public void testPublish(){
//多个消息发布方
Long count = cluster.publish(WEIXIN,"您有一条微信技术文章.");
System.out.println("count = "+count);
Long count2 = cluster.publish(QQ,"在吗?张三,有个问题请教.");
System.out.println("count2 = "+count2);
}
/**
* 先启动订阅方
* 如果发布方晚于订阅方启动,订阅方无法知道消息
*/
@Test
public void testSubscribe(){
//这种方式存在阻塞,redis重启会导致订阅中断,使用下面的线程池方式
//cluster.subscribe(new WeixinSubscribe(),TOPIC);
//下面使用第二种方式
try {
//单个消息订阅方
Executors.newSingleThreadScheduledExecutor().
scheduleWithFixedDelay(new Thread(()->{
System.out.println("恢复订阅........");
cluster.subscribe(new MutilSubscribeOne(),QQ,WEIXIN);
},"jedis subscribe thread"),0L,1000L, TimeUnit.MILLISECONDS).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
/**
* 自定义订阅类
*/
class MutilSubscribeOne extends JedisPubSub{
@Override
public void onMessage(String channel, String message) {
System.out.println("channel = "+channel+",message = "+message);
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println("channel = "+channel+",subscribedChannels = "+subscribedChannels);
}
}