使用Redis实现MQ

        常见的消息中间间有很多,比如ActiveMQ,RabbitMQ,Kafka等,这篇博客主要写一下用Redis实现MQ的功能。

        MQ实现的功能简单说就是将生产出来的消息来消费,具体的消费模式有点对点消费,发布\订阅模式的消费,Redis中也实现了点对点,发布\订阅这种方式,具体代码展示。

        点对点消费者:

public class RedisQueueConsumer {

    @Test
    public  void consumer() {

        Jedis jedis = new Jedis("xxx.xxx.xxx.xxx",6379);
        while (true)
        {
            //使用redis的rpop来获取对应的key的value
            String value = jedis.rpop("key");
            while(value != null)
            {
                System.out.println(value);
                break;
            }

        }

    }
}

        点对点生产者:

public class RedisQueueTest {

    @org.junit.Test
    public  void  producer()
    {
        Jedis jedis = new Jedis("xxx.xxx.xxx.xxx",6379);
        // 生产消息
        for (int i = 1;i <= 10;i++)
        {
            //主要使用redis中的lpush
            jedis.lpush("key", "这是第" + i + "个消息");
        }

        jedis.close();
    }

}

        先启动消费者,然后生产者生产消息,结果图:

        发布\订阅模式:

        先编写监听代码,继承JedisPubSub,然后重写里面的onMessage方法:


public class Subscriber  extends JedisPubSub {
    public Subscriber(){}
    @Override
    public void onMessage(String channel, String message) {
        System.out.println(message);
    }
    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        super.onSubscribe(channel, subscribedChannels);
    }
    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
        super.onUnsubscribe(channel, subscribedChannels);
    }
}

        消费者:

public class RedisTopicConsumer {
    @Test
    public  void consumer() {

        Subscriber subscriber = new Subscriber();

        Jedis jedis = new Jedis("xxx.xxx.xxx.xx",6379);
        while (true)
        {
            //监听消息
            jedis.subscribe(subscriber,"key");
        }
    }
}

        生产者:

public class RedisTopicTest {
    @org.junit.Test
    public  void  producer()
    {
        Jedis jedis = new Jedis("xxx.xxx.xxx.xxx",6379);
        // 生产消息
        for (int i = 1;i <= 10;i++)
        {
            //取消息
            jedis.publish("key", "这是第" + i + "个消息");
        }

        jedis.close();
    }
}

       启动消费者,打开生产者生产消息,消息被订阅消费:

        整个过程,实际上就是利用Redis的功能,生产消息使用lpush入队,取消息就是rpop出队;生产消息publish发布消息到指定的频道,subscribe来订阅具体的消息。

猜你喜欢

转载自blog.csdn.net/qq_41061437/article/details/108517630