版权声明:write by DGHxj https://blog.csdn.net/DGHxj_/article/details/84067826
首先是一个SpringBoot项目,在项目中添加如下依赖:
1、简单模式
package com.jt.test.rabbitmq;
import org.junit.Before;
import org.junit.Test;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
/**
* 本类生成2个test方法,其中一个是生产者
* 另一个是消费者
* @author DGHxj
*
*/
public class SimpleTest {
private Connection conn;
@Before//Test运行之前都要执行这个方法
public void getResource() throws Exception{
/*
* 创建连接工程
* 获取场长链接
*/
ConnectionFactory factory=new ConnectionFactory();
//com.rabbitmq.client
//创建连接,获取登录信息,ip,端口
factory.setHost("10.42.39.48");
factory.setPort(5672);
factory.setUsername("easymall");
factory.setPassword("123456");
factory.setVirtualHost("/easymall");
//从工厂中获取连接对象
conn=factory.newConnection();
}
@Test
public void productor() throws Exception{
/*
*长链接创建短连接
*声明绑定队列queue
*发送消息
*/
Channel channel=conn.createChannel();//到此为止可以连接了
//rabbitmq
//准备一个队列名称
String queue="simple";
//声明队列,消费端和生产端调用声明队列方法,无则创建有则直接连接使用
/*
* queue String 队列名称
* durable boolean 是否持久化
* exclusive boolean
是否专属,一个连接创建的所有channel声明的queue
是否只有当前链接可以使用 false
*autoDelete boolean 是否自动删除 false 没有channel连接queue时
queue自动消失
*arguments map类型,其他参数,例如队列多长(数据量)
*/
channel.queueDeclare(queue, false, false, false, null);
//发送消息
String msg="hello simple mode rabbitmq";
/*
* exchange string 交换机名称 "" 默认创建的AMQP default(direct)
* routingKey string 绑定交换机的路由key 简单模式使用queue名称
* props BasicProperties 属性类,消息也有各种属性
* 例如 deliveryMode 0持久化 1不持久化
* body byte[] 消息的二进制
*/
channel.basicPublish("", queue, null, msg.getBytes());
}
//消费端代码
@Test
public void consumer() throws Exception{
//获取连接
Channel channel=conn.createChannel();
//生产端已经声明了队列,就无需再次声明
String queue="simple";
channel.queueDeclare(queue,false,false,false,null);
//消费者对象,创建
QueueingConsumer consumer=new QueueingConsumer(channel);
//利用绑定channel的消费者对象,绑定消费队列
/*
* queue string 绑定的队列名
* autoAck boolean 是否自动确认
* 确认逻辑在队列脏哦能举足轻重 true表示自动确认,false表示手动确认
* callback绑定的消费对象
*/
channel.basicConsume(queue, true,consumer);
//编写监听逻辑,NIO非阻塞线程的代码
while(true){
//一旦有消息生成,创建接受对象Delivery
Delivery delivery=consumer.nextDelivery();
//从对向获取消息
String msg=new String(delivery.getBody());
System.out.println("消费者获取消息:"+msg);
}
}
}
二、工作模式(资源争抢)
package com.jt.test.rabbitmq;
import org.junit.Before;
import org.junit.Test;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
/**
* 测试利用消费逻辑,完成争抢的效果
* 一个模拟忙,抢的少
* 一个模拟空闲,抢得多
* @author DGHxj
*
*/
public class WorkTest {
private Connection conn;
@Before
public void getResource() throws Exception{
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("10.42.39.48");
factory.setPort(5672);
factory.setUsername("easymall");
factory.setPassword("123456");
factory.setVirtualHost("/easymall");
conn=factory.newConnection();
}
//生产者
@Test
public void productor() throws Exception{
Channel channel=conn.createChannel();
String queue="work";
channel.queueDeclare(queue, false, false, false, null);
for(int i=0;i<100;i++){
String msg="hello work "+i;
channel.basicPublish("",queue, null,msg.getBytes());
System.out.println("生产者发送成功第"+i+"条");
}
}
//消费者1
@Test
public void consumer01() throws Exception{
Channel channel=conn.createChannel();
String queue="work";
channel.queueDeclare(queue, false, false, false, null);
QueueingConsumer consumer=new QueueingConsumer(channel);
channel.basicQos(1);//消费者不执行回执确认,queue只最多发送一条消息
channel.basicConsume(queue, false,consumer);
while(true){
Delivery delivery=consumer.nextDelivery();
System.out.println("消费者1接收到消息:"+new String(delivery.getBody()));
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
//消费者2
@Test
public void consumer02() throws Exception{
Channel channel=conn.createChannel();
String queue="work";
channel.queueDeclare(queue, false, false, false, null);
QueueingConsumer consumer=new QueueingConsumer(channel);
channel.basicQos(1);//消费者不执行回执确认,queue只最多发送一条消息
channel.basicConsume(queue, false,consumer);
while(true){
Delivery delivery=consumer.nextDelivery();
System.out.println("消费者2接收到消息:"+new String(delivery.getBody()));
Thread.sleep(100);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
三、发布订阅(publish/fanout)
package com.jt.test.rabbitmq;
import org.junit.Before;
import org.junit.Test;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
/**
* 一个生产者,发送到发布订阅类型的交换机消息
* 同步复制到所有与该交换机连接绑定的队列+
* @author DGHxj
*
*/
public class PublishTest {
private Connection conn;
@Before//Test运行之前,@Before的方法自动换行一次
public void getResource() throws Exception{
/*
* 创建连接工程
* 获取长链接
*/
ConnectionFactory factory=new ConnectionFactory();
//com.rabbitmq.client
//创建连接,获取登录信息
factory.setHost("10.42.39.48");
factory.setPort(5672);
factory.setUsername("easymall");
factory.setPassword("123456");
factory.setVirtualHost("/easymall");
//从工厂中获取连接对象
conn=factory.newConnection();
}
private static final String exchange="fanout1807";
private static final String queue01="fanoutQ01";
private static final String queue02="fanoutQ02";
//生产端代码
@Test
public void productor() throws Exception{
//自定义完成一个发布订阅类型的交换机
Channel channel=conn.createChannel();
//声明队列可以在生产端也可以在消费端,交换机也是如此
channel.exchangeDeclare(exchange, "fanout");//名称,type
//topic direct fanout headers
//发送消息
for(int i=0;i<100;i++){
String msg="hello fanout:"+i;
channel.basicPublish(exchange, "", null, msg.getBytes());
}
}
//消费端01
@Test
public void consumer01() throws Exception{
Channel channel=conn.createChannel();
//声明队列
channel.queueDeclare(queue01, false, false, false, null);
//声明交换机
channel.exchangeDeclare(exchange, "fanout");
//绑定队列到交换机
channel.queueBind(queue01, exchange, "");
//消费者对象
QueueingConsumer consumer=new QueueingConsumer(channel);
//绑定消费者队列
channel.basicConsume(queue01, true, consumer);
while(true){
Delivery delivery=consumer.nextDelivery();
System.out.println("消费者01接收到:"+new String(delivery.getBody()));
}
}
//消费端02
@Test
public void consumer02() throws Exception{
Channel channel=conn.createChannel();
//声明队列
channel.queueDeclare(queue02, false, false, false, null);
//声明交换机
channel.exchangeDeclare(exchange, "fanout");
//绑定队列到交换机
channel.queueBind(queue02, exchange, "");
//消费者对象
QueueingConsumer consumer=new QueueingConsumer(channel);
//绑定消费者队列
channel.basicConsume(queue02, true, consumer);
while(true){
Delivery delivery=consumer.nextDelivery();
System.out.println("消费者02接收到:"+new String(delivery.getBody()));
}
}
}
四、路由模式(routing/direct)
package com.jt.test.rabbitmq;
import org.junit.Before;
import org.junit.Test;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import ch.qos.logback.core.net.SyslogOutputStream;
/**
* 测试路由模式
* 消息携带具体的路由key
* 绑定到交换机的不同队列使用不同的路由key
* 根据匹配的消息会发送到目的queue
* @author DGHxj
*
*/
public class RoutingTest {
private Connection conn;
@Before
public void getResource() throws Exception{
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("10.42.39.48");
factory.setPort(5672);
factory.setUsername("easymall");
factory.setPassword("123456");
factory.setVirtualHost("/easymall");
conn=factory.newConnection();
}
private static final String exchange="direct1807";
private static final String queue01="directQ01";
private static final String queue02="directQ02";
//生产者代码
@Test
public void productor() throws Exception{
Channel channel=conn.createChannel();
channel.exchangeDeclare(exchange, "direct");
for(int i=0;i<50;i++){
String msg="hello direct update:"+i;
channel.basicPublish(exchange, "item.update", null, msg.getBytes());
String msg2="hello direct add:"+i;
channel.basicPublish(exchange, "item.add", null, msg2.getBytes());
}
}
//消费者代码
@Test
public void consumer01() throws Exception{
//通过长链接创建短连接
Channel channel=conn.createChannel();
//声明队列
channel.queueDeclare(queue01, false, false, false, null);
//声明交换机
channel.exchangeDeclare(exchange, "direct");
//绑定队列和交换机
channel.queueBind(queue01, exchange, "item.update");
//消费者对象
QueueingConsumer consumer=new QueueingConsumer(channel);
//绑定消费者与队列
channel.basicConsume(queue01, true, consumer);
//实时监控生产者
while(true){
//创建接受对象delivery
Delivery delivery=consumer.nextDelivery();
System.out.println("消费者01接收到:"+new String(delivery.getBody()));
}
}
//消费者代码
@Test
public void consumer02() throws Exception{
//通过长链接创建短连接
Channel channel=conn.createChannel();
//声明队列
channel.queueDeclare(queue02, false, false, false, null);
//声明交换机
channel.exchangeDeclare(exchange, "direct");
//绑定交换机和队列
channel.queueBind(queue02, exchange, "item.add");
//消费者对象
QueueingConsumer consumer=new QueueingConsumer(channel);
//绑定队列和消费者对象
channel.basicConsume(queue02, true, consumer);
//实时监听
while(true){
//创建接收对象
Delivery delivery=consumer.nextDelivery();
System.out.println("消费者02接收到:"+new String(delivery.getBody()));
}
}
}
五、topic主题模式
package com.jt.test.rabbitmq;
import org.junit.Before;
import org.junit.Test;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
/**
* 测试topic主题模式 范围的转发
*
* @author DGHxj
*
*/
public class TopicTest {
private Connection conn;
@Before
public void getResource() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.42.39.48");
factory.setPort(5672);
factory.setUsername("easymall");
factory.setPassword("123456");
factory.setVirtualHost("/easymall");
conn = factory.newConnection();
}
private static final String exchange = "topics1807";
private static final String queue01 = "topicQ01";
private static final String queue02 = "topicQ02";
//#表示任意,*表示一个字符串
//生产者代码
@Test
public void productor() throws Exception{
Channel channel=conn.createChannel();
channel.exchangeDeclare(exchange, "topic");
for(int i=0;i<50;i++){
String msg="hello topic item.#:"+i;
channel.basicPublish(exchange, "12.item.add", null, msg.getBytes());
String msg2="hello topic *.item:"+i;
channel.basicPublish(exchange, "tedu.item", null, msg2.getBytes());
}
}
// 消费者代码
@Test
public void consumer01() throws Exception {
Channel channel = conn.createChannel();
channel.queueDeclare(queue01, false, false, false, null);
//channel.exchangeDeclare(exchange, "topic");
channel.queueBind(queue01, exchange, "#.item.#");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queue01, true, consumer);
while (true) {
Delivery delivery = consumer.nextDelivery();
System.out.println("消费者01获得:" + new String(delivery.getBody()));
}
}
// 消费者代码
@Test
public void consumer02() throws Exception {
Channel channel = conn.createChannel();
channel.queueDeclare(queue02, false, false, false, null);
channel.exchangeDeclare(exchange, "topic");
channel.queueBind(queue02, exchange, "*.item");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queue02, true, consumer);
while (true) {
Delivery delivery = consumer.nextDelivery();
System.out.println("消费者02获得:" + new String(delivery.getBody()));
}
}
}