主要有以下四个步骤:
①建立发布者,通过频道(mychannel)发布消息
package com.cqh.PubSub; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; /** * Created by yl1794 on 2018/3/28. */ //建立发布者,通过频道(mychannel)发布消息 public class Publisher extends Thread{ private final JedisPool jedisPool; public Publisher(JedisPool jedisPool) { this.jedisPool = jedisPool; } @Override public void run(){ BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); Jedis jedis = jedisPool.getResource(); //连接池中取出一个连接 while (true) { String line; try { line = reader.readLine(); if (!"quit".equals(line)) { jedis.publish("mychannel", line); //从通过mychannel 频道发布消息 System.out.println(String.format("发布消息成功!channel: %s, message: %s", "mychannel", line)); } else { break; } } catch (IOException e) { e.printStackTrace(); } } } }
②建立消息监听类,并重写了JedisPubSub的一些相关方法
package com.cqh.PubSub; import redis.clients.jedis.JedisPubSub; /** * Created by yl1794 on 2018/3/28. */ //建立消息监听类,并重写了JedisPubSub的一些相关方法 public class MsgListener extends JedisPubSub{ public MsgListener(){} @Override public void onMessage(String channel, String message) { //收到消息会调用 System.out.println(String.format("收到消息成功! channel: %s, message: %s", channel, message)); this.unsubscribe(); } @Override public void onSubscribe(String channel, int subscribedChannels) { //订阅频道会调用 System.out.println(String.format("订阅频道成功! channel: %s, subscribedChannels %d", channel, subscribedChannels)); } @Override public void onUnsubscribe(String channel, int subscribedChannels) { //取消订阅会调用 System.out.println(String.format("取消订阅频道! channel: %s, subscribedChannels: %d", channel, subscribedChannels)); } }
③建立订阅者,订阅者去订阅频道(mychannel)
package com.cqh.PubSub; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; /** * Created by yl1794 on 2018/3/28. */ //建立订阅者,订阅者去订阅频道(mychannel) public class Subscriber extends Thread { private final JedisPool jedisPool; private final MsgListener msgListener = new MsgListener(); private final String channel = "mychannel"; public Subscriber(JedisPool jedisPool) { super("Subscriber"); this.jedisPool = jedisPool; } @Override public void run() { Jedis jedis = null; try { jedis = jedisPool.getResource(); //取出一个连接 jedis.subscribe(msgListener, channel); //通过subscribe的api去订阅,参数是订阅者和频道名 //注意:subscribe是一个阻塞的方法,在取消订阅该频道前,会一直阻塞在这,无法执行后续的代码 //这里在msgListener的onMessage方法里面收到消息后,调用了this.unsubscribe();来取消订阅,才会继续执行 System.out.println("继续执行后续代码。。。"); } catch (Exception e) { System.out.println(String.format("subsrcibe channel error, %s", e)); } finally { if (jedis != null) { jedis.close(); } } } }
④测试类
package com.cqh.PubSub; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; /** * Created by yl1794 on 2018/3/28. */ //测试类,键盘输入消息 public class TestPubSub { public static void main( String[] args ) { // 连接redis服务端 JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), "127.0.0.1", 6379); Publisher publisher = new Publisher(jedisPool); //发布者 publisher.start(); Subscriber subscriber = new Subscriber(jedisPool); //订阅者 subscriber.start(); } }
⑤结果