环境
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1.简单模式
测试代码启动执行生产端一个,启动消费端监听队列获取消息消费
如何使用java创建链接操作rabbitmq的连接对象;
package com.jt.test.rabbitmq;
/**
* 创建链接,操作rabbitmq
* 实现一发,一接的简单模式逻辑
**/
public class SimpleMode {
//测试方法编写生产端逻辑.准备一些静态属性值
private static final String queue="simple01";
//每一次运行测试程序,都需要链接对象的支持
private Channel channel;
//给私有属性信道对象赋值
@Before
public void init() throws IOException, TimeoutException{
//对channel赋值
//获取连接工厂,在连接工厂中,配置rabbitmq
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("10.9.104.184");
factory.setPort(5672);
factory.setUsername("mall");
factory.setPassword("123456");
factory.setVirtualHost("emvh");
//从连接工厂创建长连接对象
Connection conn = factory.newConnection();
//从场链接获取短连接
channel=conn.createChannel();
//无论生产端或者消费端都是使用channel操作rabbitmq
}
@Test
public void sender() throws IOException{
//客户端,需要声明使用的队列名称
* queue 队列名称
* durable 是否持久化
* exclusive 是否专属连接
* autoDelete 是否自动删除
* arguments 当前队列的属性配置
*/
//当前信道连接rabbitmq,声明队列,如果已经存在
//可以直接使用;
channel.queueDeclare(queue, false, false, false, null);
//生成消息
String msg="hello rabbitmq 1812";
//生产端代码,发送消息,发送给交换机,此处使用默认交换机,名字是
//"" 它的类型是路由模式,所有的队列都会用自己的
//队列名称作为路由key绑定它
/*
* exchange 交换机名称
* routingKey 消息携带的路由key
* props:消息的一些属性,例如是否持久化消息
* body: 消息的byte数组
*/
channel.basicPublish("", queue, null, msg.getBytes());
while(true);
}
}
消息发送完毕后观察控制台
ready:表示准备好被消费的消息数量
unacked:表示消费者未反馈的消息个数
total:前两者的总和
API详解
queueDeclare(queue,durable,exclusive,autoDelete,arguments)
queue:String类型,表示声明的队列名称,有则直接使用,无则创建队列
durable:Boolean,true表示持久化,在rabbitmq关闭/宕机时,queue(仅只队列),
运行时是内存组件数据,持久化到磁盘,重启恢复后,
队列不消失,false不持久化,宕机关闭队列消失.
exclusive:Boolean,是否专属化,true表示专属化,只专属于当前连接connection,
其他进程创建的connection无法操作这个队列.false不专属,其他连接可以操作.
autoDelete:Boolean类型,是否自动删除,当连接队列的最后一个信道消失时
true表示自动删除,false不删
arguments:Map<Object,Object>
给当前的队列指定一些生成创建时的属性,比如消息的最大长度,存货时间等等;
basicPublish(exchange, routingKey, props, body)
props:BasicProperties,发送的消息携带的属性,比如如果队列可持久化
,一般消息也应该随之有持久化能力,deliveryMode,
0表示可持久化,1表示不持久化,一旦要求消息持久化,队列必须持久化
消息的消费逻辑
1 消费者监听的队列发送消息,等待返回确认
2 消费者拿到消息,执行消费逻辑
3 消费逻辑执行完毕,返回确认
4 队列删除消息
消费者空闲/非空闲:
空闲表示消费者随时准别可以接收队列发送的下一条消息;
非空闲表示在消费者接收消息后还没有返回ack确认这段时间非空闲;
basicQos(1):在消费代码的ack确认返回队列之前,队列最多只给消费者发送1条;
自动确认和手动返回确认区别:
自动确认:就是不会根据消费逻辑完成最终确认的顺序,
直接在刚接到消息时无论消费逻辑成功还是失败都返回确认
(不关心消息正确消费,关心消费者吞吐量)
手动确认:在执行错误的代码中,执行手动确认,强调消息消费逻辑成功率,
如果消费成功,返回手动回执,消息在队列删除了,如果没有消费成功,
手动回执,消息一值消息队列保存,当前消费端一直可以从队列获取未消费成功的消息.
工作模式
package com.jt.test.rabbitmq;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import org.junit.Before;
import org.junit.Test;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.QueueingConsumer.Delivery;
/**
* 完成多个消费者绑定同一个队列
* 实现消息争抢,模拟不同处理速度的
* 消息消费个数的不同(手动回执)
* @author admin
*
*/
public class WorkMode {
//测试方法编写生产端逻辑.准备一些静态属性值
private static final String queue="work01";
//每一次运行测试程序,都需要链接对象的支持
private Channel channel;
//给私有属性信道对象赋值
@Before
public void init() throws IOException, TimeoutException{
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("10.9.104.184");
factory.setPort(5672);
factory.setUsername("mall");
factory.setPassword("123456");
factory.setVirtualHost("emvh");
Connection conn = factory.newConnection();
channel=conn.createChannel();}
@Test
public void sender() throws IOException{
channel.queueDeclare(queue, false, false, false, null);
//for循环模拟一次发送多条消息,观察争抢者
//每个抢到多少条
for(int i=0;i<100;i++){
String msg="hello work_"+i;
channel.basicPublish("", queue, null, msg.getBytes());
//打桩,提示发送内容
System.out.println("生产者发送消息:"+msg);
}}
//消费者逻辑
@Test
public void comsu01() throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException{
//声明队列
channel.queueDeclare(queue, false, false, false, null);
//创建一个消费者对象
QueueingConsumer consumer=
new QueueingConsumer(channel);
//定义每个消费者,在空闲时,最多接收到的消息数量
channel.basicQos(1);
//消费者和队列绑定,手动回执,利用消费时间的长短
//演示消费的逻辑不同,消费时间不同
channel.basicConsume(queue, false, consumer);
//编写监听获取消息打印的代码
while(true){//死循环监听队列
//从队列中获取的消息会封装到消费者的Delivery
Delivery delivery = consumer.nextDelivery();
System.out.println("消费者01获取消息:"
+new String(delivery.getBody()));
Thread.sleep(10);
//TODO手动确认
//每消费一条消息,调用代码ack确认
//deliveryTag:回执的消息戳
//multiple:是否批量false
channel.basicAck(delivery
.getEnvelope().getDeliveryTag(), false);
}}
@Test
public void comsu02() throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException{
//声明队列
channel.queueDeclare(queue, false, false, false, null);
//创建一个消费者对象
QueueingConsumer consumer=
new QueueingConsumer(channel);
//定义每个消费者,在空闲时,最多接收到的消息数量
channel.basicQos(1);
//消费者和队列绑定
channel.basicConsume(queue, false, consumer);
//编写监听获取消息打印的代码
while(true){//死循环监听队列
//从队列中获取的消息会封装到消费者的Delivery
Delivery delivery = consumer.nextDelivery();
System.out.println("消费者02获取消息:"
+new String(delivery.getBody()));
//TODO
Thread.sleep(50);
channel.basicAck(delivery
.getEnvelope().getDeliveryTag(), false);}}}
发布订阅
发布订阅的结构是自定义一个fanout类型的交换机,执行发布订阅,多个队列绑定这个交换机,队列绑定一个消费者,实现消息消费,生产者发送到交换机的所有消息,都会被队列的消费者接收消费;
package com.jt.test.rabbitmq;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import org.junit.Before;
import org.junit.Test;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
/**
* 模拟发布订阅群发逻辑
* @author admin
*
*/
public class FanoutMode {
//每一次运行测试程序,都需要链接对象的支持
private Channel channel;
//给私有属性信道对象赋值
@Before
public void init() throws IOException, TimeoutException{
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("10.9.104.184");
factory.setPort(5672);
factory.setUsername("mall");
factory.setPassword("123456");
factory.setVirtualHost("emvh");
Connection conn = factory.newConnection();
channel=conn.createChannel();
}
private static final String EXT="fanout";
private static final String EX=EXT+"01";
private static final String queue01=EXT+"Q1";
private static final String queue02=EXT+"Q2";
@Test
public void sender() throws IOException{
//生产者只关心,消息发送到的交换机是谁
//声明交换机,用自定义的交换机工作,有则直接用
//无则创建
channel.exchangeDeclare(EX, EXT);
//发送消息到交换机
String msg="hello "+EXT;
channel.basicPublish(EX, "", null, msg.getBytes());
}
//消费者1
@Test
public void cons01() throws Exception{
//声明使用的交换机,声明队列
channel.queueDeclare(queue01, false, false, false, null);
//创建消费者
QueueingConsumer consumer=new QueueingConsumer(channel);
//完成绑定关系,队列与交换机的绑定fanout,
//消费者与队列的绑定关系
channel.queueBind(queue01, EX, "");
channel.basicConsume(queue01, true,consumer);
//监听获取消息
while(true){
Delivery delivery = consumer.nextDelivery();
System.out.println("消费者01获取:"
+new String(delivery.getBody()));
}}
//消费者2
@Test
public void cons02() throws Exception{
//声明使用的交换机,声明队列
channel.queueDeclare(queue02, false, false, false, null);
//创建消费者
QueueingConsumer consumer=new QueueingConsumer(channel);
//完成绑定关系,队列与交换机的绑定fanout,
//消费者与队列的绑定关系
channel.queueBind(queue02, EX, "");
channel.basicConsume(queue02, true,consumer);
//监听获取消息
while(true){
Delivery delivery = consumer.nextDelivery();
System.out.println("消费者02获取:"
+new String(delivery.getBody()));
}}}
路由模式
除了交换机类型,绑定routingKey不同,其他结构和发布订阅一致
package com.jt.test.rabbitmq;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import org.junit.Before;
import org.junit.Test;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
/**
* 模拟路由指定消息目的地逻辑
* @author admin
*
*/
public class RoutingMode {
//每一次运行测试程序,都需要链接对象的支持
private Channel channel;
//给私有属性信道对象赋值
@Before
public void init() throws IOException, TimeoutException{
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("10.9.104.184");
factory.setPort(5672);
factory.setUsername("mall");
factory.setPassword("123456");
factory.setVirtualHost("emvh");
Connection conn = factory.newConnection();
channel=conn.createChannel();
}
private static final String EXT="direct";
private static final String EX=EXT+"01";
private static final String queue01=EXT+"Q1";
private static final String queue02=EXT+"Q2";
@Test
public void sender() throws IOException{
channel.exchangeDeclare(EX, EXT);
//发送消息到交换机
String msg="hello "+EXT;
channel.basicPublish(EX, "product.haha", null, msg.getBytes());
}
//消费者1
@Test
public void cons01() throws Exception{
channel.exchangeDeclare(EX, EXT);
//声明使用的交换机,声明队列
channel.queueDeclare(queue01, false, false, false, null);
//创建消费者
QueueingConsumer consumer=new QueueingConsumer(channel);
//完成绑定关系,队列与交换机的绑定fanout,
//消费者与队列的绑定关系
channel.queueBind(queue01, EX, "product.haha");
channel.basicConsume(queue01, true,consumer);
//监听获取消息
while(true){
Delivery delivery = consumer.nextDelivery();
System.out.println("消费者01获取:"
+new String(delivery.getBody()));
}}
//消费者2
@Test
public void cons02() throws Exception{
channel.exchangeDeclare(EX, EXT);
//声明使用的交换机,声明队列
channel.queueDeclare(queue02, false, false, false, null);
//创建消费者
QueueingConsumer consumer=new QueueingConsumer(channel);
//完成绑定关系,队列与交换机的绑定fanout,
//消费者与队列的绑定关系
channel.queueBind(queue02, EX, "product.update");
channel.basicConsume(queue02, true,consumer);
//监听获取消息
while(true){
Delivery delivery = consumer.nextDelivery();
System.out.println("消费者02获取:"
+new String(delivery.getBody()));}}}
主题模式
package com.jt.test.rabbitmq;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import org.junit.Before;
import org.junit.Test;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
/**
* 主题模式,范围匹配发送
* @author admin
*
*/
public class TopicMode {
//每一次运行测试程序,都需要链接对象的支持
private Channel channel;
//给私有属性信道对象赋值
@Before
public void init() throws IOException, TimeoutException{
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("10.9.104.184");
factory.setPort(5672);
factory.setUsername("mall");
factory.setPassword("123456");
factory.setVirtualHost("emvh");
Connection conn = factory.newConnection();
channel=conn.createChannel();
}
private static final String EXT="topic";
private static final String EX=EXT+"01";
private static final String queue01=EXT+"Q1";
private static final String queue02=EXT+"Q2";
@Test
public void sender() throws IOException{
channel.exchangeDeclare(EX, EXT);
//发送消息到交换机
String msg="hello "+EXT;
channel.basicPublish(EX, "mall.product.update", null, msg.getBytes());
}
//消费者1
@Test
public void cons01() throws Exception{
channel.exchangeDeclare(EX, EXT);
//声明使用的交换机,声明队列
channel.queueDeclare(queue01, false, false, false, null);
//创建消费者
QueueingConsumer consumer=new QueueingConsumer(channel);
//完成绑定关系,队列与交换机的绑定fanout,
//消费者与队列的绑定关系
channel.queueBind(queue01, EX, "mall.#");
channel.basicConsume(queue01, true,consumer);
//监听获取消息
while(true){
Delivery delivery = consumer.nextDelivery();
System.out.println("消费者01获取:"
+new String(delivery.getBody()));
}
}
//消费者2
@Test
public void cons02() throws Exception{
channel.exchangeDeclare(EX, EXT);
//声明使用的交换机,声明队列
channel.queueDeclare(queue02, false, false, false, null);
//创建消费者
QueueingConsumer consumer=new QueueingConsumer(channel);
//完成绑定关系,队列与交换机的绑定fanout,
//消费者与队列的绑定关系
channel.queueBind(queue02, EX, "*.product.*");
channel.basicConsume(queue02, true,consumer);
//监听获取消息
while(true){
Delivery delivery = consumer.nextDelivery();
System.out.println("消费者02获取:"
+new String(delivery.getBody()));
}}}