RabbitMQ相对来说,用的不是很广泛,存在以下缺陷: 基于AMQP协议导致它的实现 比较重量级, 与kafka等对比 在吞吐量上 处于下风。
RabbitMQ 一个基于Erlang语言开发的框架。
看图上的基本概念:
publisher: 消息发布者
virtual host: 虚拟主机,相当于一台 rabbitMq的发布服务器
broker: 消息队列 服务器实体
exchange:交换器 ; queue:消息队列, 这两个通过 binding来绑定。 具体是通过一个叫 路由键(Route-Key)的东西来绑定(
绑定策略分为 direct , fanOut 广播,Topic)
Consumer: 消费者
channel: 是一个个的信道,基于AMQP的 rabbitMq 由于 创建和销毁TCP连接 对系统的开销很大,所以通过channel信道复用 这些TCP连接。
最后 读一下代码:
用Producer发布消息时,先建立登录连接, 再创建信道和声明交换器,
然后设置binding模式 以及 路由值,
最后
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
将消息 通过指定的交换器和路由值 发送到 队列中。
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
//设置 RabbitMQ 地址
factory.setHost("localhost");
factory.setVirtualHost("/");
//建立到代理服务器到连接
Connection conn = factory.newConnection();
//创建信道
Channel channel = conn.createChannel();
//声明交换器
String exchangeName = "hello-exchange";
channel.exchangeDeclare(exchangeName, "direct", true);
String routingKey = "testRoutingKey";
//发布消息
byte[] messageBodyBytes = "quit".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
channel.close();
conn.close();
用Consumer消费消息时, 前面的代码类似,
建立到代理服务器到连接,创建信道,声明交换器,声明队列,
然后绑定队列,通过键 testRoutingKey 将队列和交换器绑定起来;
最后 消费端需要不断轮询 来等待消息队列的消息随时出来。
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
// factory.setUsername("tuyf");
// factory.setPassword("1234");
factory.setHost("localhost");
factory.setVirtualHost("/");
//建立到代理服务器到连接
Connection conn = factory.newConnection();
//创建信道
final Channel channel = conn.createChannel();
//声明交换器
String exchangeName = "hello-exchange";
channel.exchangeDeclare(exchangeName, "direct", true);
//声明队列
String queueName = channel.queueDeclare().getQueue();
String routingKey = "testRoutingKey";
//绑定队列,通过键 testRoutingKey 将队列和交换器绑定起来
channel.queueBind(queueName, exchangeName, routingKey);
while (true) {
//消费消息
boolean autoAck = false;
String consumerTag = "";
channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
System.out.println("消费的路由键:" + routingKey);
System.out.println("消费的内容类型:" + contentType);
long deliveryTag = envelope.getDeliveryTag();
//确认消息
channel.basicAck(deliveryTag, false);
System.out.println("消费的消息体内容:");
String bodyStr = new String(body, "UTF-8");
System.out.println(bodyStr);
}
});
}
最后 实际使用时,多看API, 一个 com.rabbitmq.client.Channel 接口 就有上百个方法。
https://blog.csdn.net/w1083750893/article/details/78824843
代码来源:[email protected]:tuyf/rabbitMQ.git
目录结构问题,麻烦自行处理