前言
MQ这种中间件是今年的一个学习计划,其实如果单纯从使用角度来说MQ确实比较容易,但是也缺少一个完整的总结。这次的总结不打算参考其他资料了,主要会以RabbitMQ的官网为准。其实这一个系列前几篇可能更多的是官方文档的一个自我翻译。
安装教程
简介
RabbitMQ是一个接受并发送消息的中间件,你可以把它类比为一个邮局,当你想寄信件的时候,你会将邮件放入到邮箱。邮递员确会帮你将邮件送到目的地,RabbitMQ其实就干了邮箱,邮局和邮递员的活儿。唯一不同的是,RabbitMQ不处理纸质消息,而是处理二进制数字化的文本消息。(这一段来自RabbitMQ的官网,自我翻译的)
安装教程可以参考这篇大牛的博客,需要提醒的是在安装RabbitMQ之前,需要先安装Erlang。RabbitMQ安装完整教程
hello world
在实现Hello World程序的时候,我们会实现两个Java程序,一个是消息生产者,一个是消息消费者。生产者发送一条消息,消费者消费收到这条消息并将其打印出来,仅此而已。如下图所示:
其中的P表示生产者,C表示消费者,中间的就是队列(queue) 。
生产者
生产者连接到RabbitMQ,然后发送一条消息之后就退出。
private static final String QUEUE_NAME = "hello_queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
先只给出连接的代码,newConnection这个操作,封装了底层的Socket连接,之后,我们创建了一个Channel,这个是我们操作的最多的api对象。
为了发送消息,我们需要声明一个队列,之后我们将消息发送到这个队列中。
//声明一个队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String message = "producer send message";
//将消息发送到指定的queue中
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
完整的生产者代码如下:
package com.learn.rabbitmq.helloworld.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* author:liman
* createtime:2019/10/11
*/
public class Sender {
private static final String QUEUE_NAME = "hello_queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String message = "producer send message";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("message send success!");
}
}
消费者
消费者是一个监听的角色,监听是否有从RabbitMQ的消息,这一点不像生产者,因此在打印消息之前需要保持消费者是运行的。
消费者也需要获取连接和创建channel,同时也要声明绑定的队列,因为消费者可能先启动,所以我们需要消费者在消费消息之前指定的队列必须存在。
除此之外,消费者告知RabbitMQ服务器通过queue给消费者发送消息,由于RabbitMQ将会给消费者异步发送消息,因此我们需要提供一个回调函数,用户缓存消息,这个就是DeliverCallBack的任务了。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
完整代码如下:
package com.learn.rabbitmq.helloworld.consumer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* author:liman
* createtime:2019/10/11
*/
public class Receive {
private static final String QUEUE_NAME="hello_queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
Connection connection=connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
System.out.println("wait for message");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
运行结果这里就不贴出来了,比较简单。
总结
之前在学习RabbitMQ的时候,一直看的比较零散,没有找到思路进行总结,直到看到了官网。这篇博客就是对官网的一个简单翻译,主要的内容在参考资料中,后续会在spring boot中集成mq进行一个简单的项目。
参考资料: