文章目录
一.应用场景和术语
1.应用
应用解耦(库存模块和订单模块解耦)
异步处理(注册成功后发送短信和发送邮件)
流量削峰(秒杀系统)
2.术语
queue 队列,拥有自己的 erlang 进程;
exchange 交换机,内部实现为保存 binding 关系的查找表;
channel 信道,实际进行路由工作的实体,按照 routing_key 将 message 投递给 queue 。由 AMQP 协议描述可知,channel 是真实 TCP 连接之上的虚拟连接,所有 AMQP 命令都是通过 channel 发送的,且每一个 channel 有唯一的 ID。一个 channel 只能被单独一个操作系统线程使用,故投递到特定 channel 上的 message 是有顺序的。但一个操作系统线程上允许使用多个 channel 。由于TCP连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。RabbitMQ使用信道的方式来传输数据。信道是建立在真实的TCP连接内的虚拟连接,且每条TCP连接上的信道数量没有限制。
vhost 可理解为虚拟 broker ,即 mini-RabbitMQ server。其内部均含有独立的 queue、exchange 和 binding 等,其拥有独立的权限系统,可以做到 vhost 范围的用户控制。当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段。
二.docker安装以及前期准备
docker search rabbitmq:management
docker pull rabbitmq:management
docker images
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management
docker ps -a
如果docker pull rabbitmq 后面不带management,启动rabbitmq后是无法打开管理界面的,所以我们要下载带management插件的rabbitmq
用户名和密码 guest
添加用户
virtual hosts管理(相当于mysql中的db,一般以 / 开头)
对用户进行授权
java开发,引入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.10</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
</dependency>
连接工具类
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConnectionUtils {
public static Connection getConnection(){
/*定义一个连接工厂*/
ConnectionFactory connectionFactory = new ConnectionFactory();
/*设置连接信息*/
connectionFactory.setHost("IP地址");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/vhost_mmr");
connectionFactory.setUsername("user_mmr");
connectionFactory.setPassword("123456");
Connection connection=null;
try {
connection = connectionFactory.newConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return connection;
}
}
消息路由
涵盖三部分:交换器、路由、绑定。生产者把消息发布到交换器上;绑定决定了消息如何从路由器路由到特定的队列;消息最终到达队列,并被消费者接收。
a.消息发布到交换器时,消息将拥有一个路由键(routing key),在消息创建时设定。
通过队列路由键,可以把队列绑定到交换器上。
b.消息到达交换器后,RabbitMQ会将消息的路由键与队列的路由键进行匹配(针对不同的交换器有不同的路由规则)。如果能够匹配到队列,则消息会投递到相应队列中;如果不能匹配到任何队列,消息将进入 “黑洞”。
三.simple简单队列
1.模型
p:消息的生产者
红色:消息队列
c:消费者
只包含一个生产者以及一个消费者,生产者Producer将消息发送到队列中,消费者Consumer从该队列接收消息。
2.代码实现
生产者
import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
private static final String QUEUE_NAME="test_simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
/*从连接中获取通道*/
Channel channel = connection.createChannel();
/*创建队列声明*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String msg="hello simple queue";
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
channel.close();
connection.close();
}
}
消费者
import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Recv {
private static final String QUEUE_NAME="test_simple_queue";
public static void main(String[] args) throws IOException, InterruptedException {
/*获取连接*/
Connection connection = ConnectionUtils.getConnection();
/*创建频道*/
Channel channel = connection.createChannel();
/*创建队列声明*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println(msg);
}
};
/*监听队列*/
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
3.不足
耦合性高,生产者和消费者一一对应;
四.work quenes 工作队列
1.模型
多个消费者绑定到同一个队列上,一条消息只能被一个消费者进行消费。
在实际开发中,生产者发送消息毫不费力,而消费者一般要和业务结合。消费者接受到消息后需要处理,队列会积压很多消息。
2.代码实现
生产者
import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
private static final String QUEUE_NAME="test_work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
/*从连接中获取通道*/
Channel channel = connection.createChannel();
/*创建队列声明*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
for(int i=0;i<50;i++){
String msg="hello work queue"+i;
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
try {
Thread.sleep(i*20);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
channel.close();
connection.close();
}
}
消费者1
import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Recv1 {
private static final String QUEUE_NAME="test_work_queue";
public static void main(String[] args) throws IOException, InterruptedException {
/*获取连接*/
Connection connection = ConnectionUtils.getConnection();
/*创建频道*/
Channel channel = connection.createChannel();
/*创建队列声明*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
/*消息到达触发*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("recv1 "+msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
/*监听队列*/
boolean autoAck=true;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
消费者2
import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Recv2 {
private static final String QUEUE_NAME="test_work_queue";
public static void main(String[] args) throws IOException, InterruptedException {
/*获取连接*/
Connection connection = ConnectionUtils.getConnection();
/*创建频道*/
Channel channel = connection.createChannel();
/*创建队列声明*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
/*消息到达触发*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("recv2 "+msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
/*监听队列*/
boolean autoAck=true;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
现象:消费者1和消费者2处理的消息数量一致
这种现象称为轮询分发(round-robin)
公平分发 fair dispatch:
生产者
import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
private static final String QUEUE_NAME="test_work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
/*从连接中获取通道*/
Channel channel = connection.createChannel();
/*创建队列声明*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/*每个消费者发送确认消息之前,消息队列不发送下一个消息,一次仅处理一个*/
int fetch=1;
channel.basicQos(fetch);
for(int i=0;i<50;i++){
String msg="hello work queue"+i;
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
try {
Thread.sleep(i*20);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
channel.close();
connection.close();
}
}
消费者1
import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Recv1 {
private static final String QUEUE_NAME="test_work_queue";
public static void main(String[] args) throws IOException, InterruptedException {
/*获取连接*/
Connection connection = ConnectionUtils.getConnection();
/*创建频道*/
Channel channel = connection.createChannel();
/*创建队列声明*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/*保证一次仅分发一个*/
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
/*消息到达触发*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("recv1 "+msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
/*监听队列*/
boolean autoAck=false; /*手动应答改为false*/
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
消费者2
import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Recv2 {
private static final String QUEUE_NAME="test_work_queue";
public static void main(String[] args) throws IOException, InterruptedException {
/*获取连接*/
Connection connection = ConnectionUtils.getConnection();
/*创建频道*/
Channel channel = connection.createChannel();
/*创建队列声明*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
/*消息到达触发*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("recv2 "+msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
/*监听队列*/
boolean autoAck=false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
现象:能者多劳
3.消息应答和持久化
/*监听队列*/
boolean autoAck=false; /*手动应答改为false*/
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
自动确认模式:一旦rabbitmq将消息分发给消费者,就会从内存中删除;在这种情况下,如果杀死正在执行的消费者,就会丢失正在处理的消息;
手动确认模式:如果有一个消费者挂掉,则交付给其他消费者。rabbitmq支持消息应答,消费者发送一个消息应答,告知rabbitmq可将数据从内存中删除;
/*创建队列声明*/
boolean durable=false;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
若修改持久化为true时,运行报错,因为已经定义了test_work_queue,未被持久化,rabbitmq不允许重新定义一个(不同参数)已经存在的队列
五.publish_subscribe订阅模式
1.模型
x:交换机
一个生产者,多个消费者;每个消费者都有自己的队列;生产者没有直接把消息发送到队列,而是发送到交换机(转发器 exchange);每个队列都要绑定到交换机上;生产者发送的消息,经过交换机,到达队列就能实现,一个消息被多个消费者消费;
2.实现
生产者
import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
private static final String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
/*声明交换机*/
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
/*发送消息*/
String msg = "hello ps";
channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
channel.close();
connection.close();
}
}
消费者1
import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Recv1 {
private static final String QUEUE_NAME="test_exchange_fanout_email";
private static final String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args) throws IOException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
/*队列声明*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/*绑定队列到交换机转发器*/
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
/*保证一次仅分发一个*/
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
/*消息到达触发*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("recv1 "+msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
/*监听队列*/
boolean autoAck=false; /*自动应答改为false*/
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
消费者2
import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Recv2 {
private static final String QUEUE_NAME="test_exchange_fanout_sms";
private static final String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args) throws IOException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
/*队列声明*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/*绑定队列到交换机转发器*/
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
/*保证一次仅分发一个*/
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
/*消息到达触发*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("recv2 "+msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
/*监听队列*/
boolean autoAck=false; /*自动应答改为false*/
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
3.Exchange(交换机)
一方面接受生产者的消息,另一方面向队列推送消息;
匿名转发 “”
Fanout 不处理路由键
Direct 处理路由键
六.路由模式
1.模型
2.代码实现
生产者
import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
private static final String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
/*声明交换机*/
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
/*发送消息*/
String msg = "hello direct";
String routingKey="info";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
channel.close();
connection.close();
}
}
消费者1
import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Recv1 {
private static final String QUEUE_NAME="test_queue_direct_1";
private static final String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] args) throws IOException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
/*队列声明*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/*绑定队列到交换机转发器*/
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
/*保证一次仅分发一个*/
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
/*消息到达触发*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("recv1 "+msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
/*监听队列*/
boolean autoAck=false; /*自动应答改为false*/
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
消费者2
import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Recv2 {
private static final String QUEUE_NAME="test_queue_direct_2";
private static final String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] args) throws IOException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
/*队列声明*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/*绑定队列到交换机转发器*/
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warning");
/*保证一次仅分发一个*/
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
/*消息到达触发*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("recv2 "+msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
/*监听队列*/
boolean autoAck=false; /*自动应答改为false*/
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
七.主题模式
1.模型
将路由键和某模式匹配 # 匹配一个或多个 * 匹配多个
Goods.#
商品 的发布 删除 修改 查询
2.代码实现
生产者
import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
private static final String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
/*声明交换机*/
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
/*发送消息*/
String msg = "商品";
channel.basicPublish(EXCHANGE_NAME, "goods.del", null, msg.getBytes());
channel.close();
connection.close();
}
}
消费者1
import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Recv1 {
private static final String QUEUE_NAME="test_exchange_topic_1";
private static final String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] args) throws IOException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
/*队列声明*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/*绑定队列到交换机转发器*/
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.add");
/*保证一次仅分发一个*/
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
/*消息到达触发*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("recv1 "+msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
/*监听队列*/
boolean autoAck=false; /*自动应答改为false*/
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
消费者2
import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Recv2 {
private static final String QUEUE_NAME="test_exchange_topic_2";
private static final String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] args) throws IOException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
/*队列声明*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/*绑定队列到交换机转发器*/
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.*");
/*保证一次仅分发一个*/
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
/*消息到达触发*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("recv2 "+msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
/*监听队列*/
boolean autoAck=false; /*自动应答改为false*/
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
八.rabbitmq消息确认机制(生产者)之事务机制
1.消息确认
在rabbitmq中,可通过持久化数据解决rabbitmq服务器异常的数据丢失问题;
而在生产者将消息发送出去以后,消息是否到达rabbitmq服务器
两种方式: AMQP实现事务机制 Confirm模式
2.事务机制
txSelect:用户将当前channel设置为transaction模式
txCommit:用于提交事务
txRollback:用于回滚
3.代码实现
生产者
import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TxSend {
private static final String QUEUE_NAME="test_queue_tx";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
/*从连接中获取通道*/
Channel channel = connection.createChannel();
/*创建队列声明*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String msg="hello tx";
try {
channel.txSelect();
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
channel.txCommit();
System.out.println("事务提交");
} catch (IOException e) {
channel.txRollback();
System.out.println("事务回滚");
}
channel.close();
connection.close();
}
}
消费者
import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class TxRecv {
private static final String QUEUE_NAME="test_queue_tx";
public static void main(String[] args) throws IOException, InterruptedException {
/*获取连接*/
Connection connection = ConnectionUtils.getConnection();
/*创建频道*/
Channel channel = connection.createChannel();
/*创建队列声明*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println(msg);
}
};
/*监听队列*/
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
4.优劣势
事务机制降低吞吐量
九.rabbitmq消息确认机制之Confirm
1.原理
生产者将信道设置成Confirm模式,一旦信道进入Confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(以confirm.select为基础从1开始计数),一旦消息被投递到所有匹配的队列之后,Broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,Broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外Broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。
Confirm模式优势在于其异步;
2.原理
channel.confirmSelect() 开启信道的Confirm模式
编程模式:
串行
a.普通 waitForConfirms()
b.批量 waitForConfirms()
异步
c.异步 提供一个回调方法
3.代码实现
注意: channel设置为事务模式,不能再设置为Confirm模式
消费者
import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Recv {
private static final String QUEUE_NAME="test_simple_confirm1";
public static void main(String[] args) throws IOException, InterruptedException {
/*获取连接*/
Connection connection = ConnectionUtils.getConnection();
/*创建频道*/
Channel channel = connection.createChannel();
/*创建队列声明*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println(msg);
}
};
/*监听队列*/
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
普通模式生产者
import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
private static final String QUEUE_NAME="test_simple_confirm1";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/*生产者设置Confirm模式*/
channel.confirmSelect();
String msg="hello confirm message";
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
if(!channel.waitForConfirms()){
System.out.println("消息发送失败");
}else {
System.out.println("消息发送成功");
}
channel.close();
connection.close();
}
}
批量模式生产者
import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
private static final String QUEUE_NAME="test_simple_confirm1";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/*生产者设置Confirm模式*/
channel.confirmSelect();
String msg="hello confirm message";
/*批量发送后确认*/
for(int i=0;i<10;i++){
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
}
if(!channel.waitForConfirms()){
System.out.println("消息发送失败");
}else {
System.out.println("消息发送成功");
}
channel.close();
connection.close();
}
}
异步模式
Channel 对象提供的ConfirmListener()回调方法只包含deliveryTag(当前Chanel发出的消息序号),我们需要自己为每一个Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次handleAck方法,unconfirm集合删除相应的一条(multiple=false)或多条(multiple=true)记录,从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构
import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
public class SendSync {
private static final String QUEUE_NAME = "test_simple_confirm1";
public static void main(String[] args) throws IOException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/*生产者设置Confirm模式*/
channel.confirmSelect();
/*采用有序集合存储结构*/
SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
/*Channel对象提供ConfirmListener()回调方法*/
channel.addConfirmListener(new ConfirmListener() {
/*b代表multiple
* l代表deliveryTag*/
/*没有问题*/
@Override
public void handleAck(long l, boolean b) throws IOException {
if (b) {
System.out.println("handleAck multiple");
confirmSet.headSet(l + 1).clear();
} else {
System.out.println("handleAck multiple false");
confirmSet.remove(l);
}
}
/*处理有问题,这里简单地移除*/
@Override
public void handleNack(long l, boolean b) throws IOException {
if (b) {
System.out.println("handleAck multiple");
confirmSet.headSet(l + 1).clear();
} else {
System.out.println("handleAck multiple false");
confirmSet.remove(l);
}
}
});
String msg="sssss";
/*序列号存储到有序集合中*/
while (true){
long seqNo = channel.getNextPublishSeqNo();
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
confirmSet.add(seqNo);
}
}
}
十.Spring集成
1.添加依赖
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.7.5.RELEASE</version>
</dependency>
2.xml配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd">
<!-- 自动扫描 -->
<context:component-scan base-package="com.cz" />
<!-- rabbitMQ连接工厂 -->
<rabbit:connection-factory id="connectionFactory" host="47.98.32.134" port="5672" username="user_mmr" password="123456" virtual-host="/vhost_mmr"></rabbit:connection-factory>
<!-- 创建rabbitTemplate 消息模板类 -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange"></rabbit:template>
<!--MQ管理,包含队列 交换机声明-->
<rabbit:admin connection-factory="connectionFactory"></rabbit:admin>
<!--队列声明-->
<rabbit:queue name="myQueue" auto-declare="true" durable="true"/>
<!--交换机声明-->
<rabbit:fanout-exchange name="fanoutExchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding queue="myQueue"></rabbit:binding>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!--队列的监听-->
<rabbit:listener-container connection-factory="connectionFactory" >
<rabbit:listener ref="foo" method="listen" queue-names="myQueue"></rabbit:listener>
</rabbit:listener-container>
<!--消费者-->
<bean id="foo" class="com.cz.Spring.MyConsumer"></bean>
</beans>
3.生产者 消费者
public class MyConsumer {
public void listen(String foo){
System.out.println(foo);
}
}
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class SpringMain {
public static void main(String[] args) throws InterruptedException {
ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:context.xml");
RabbitTemplate rabbitTemplate= applicationContext.getBean(RabbitTemplate.class);
/*发送消息*/
rabbitTemplate.convertAndSend("hello spring rabbit");
Thread.sleep(1000);
applicationContext.destroy();
}
}
十一.死信队列
1.简介
当队列中的消息成为死信以后,如果队列设置了DLX那么消息会被发送到DLX。通过x-dead-letter-exchange设置DLX,通过这个x-dead-letter-routing-key设置消息发送到DLX所用的routing-key,如果不设置默认使用消息本身的routing-key.
2.三种情况下会变成死信
(1)消息被拒绝(basic.reject 或者 basic.nack),并且requeue=false;
(2)消息的过期时间到期了;
(3)队列长度限制超过了。
十二.避免消息重复投递或重复消费
在消息生产时,MQ内部针对每条生产者发送的消息生成一个inner-msg-id,作为去重和幂等的依据(消息投递失败并重传),避免重复的消息进入队列;在消息消费时,要求消息体中必须要有一个bizId(对于同一业务全局唯一)作为去重和幂等的依据,避免同一条消息被重复消费。
业务场景下:
1.若消息做数据库的insert操作。给这个消息一个唯一主键,若出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
2.若消息做redis的set的操作,不用解决,因为set操作本来就是幂等操作。
3.准备一个第三方介质,来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。