本文总结下ActiveMQ高级特性。
一.异步投递
ActiveMQ支持消息的异步和同步发送,默认使用异步发送,以此来提高生产者的消息发送性能。
1.同步发送
ActiveMQ在以下两种情况下使用同步发送模式:
1.明确指定使用同步发送;
2.在非事务模式下,使用持久化机制发送消息;这样producer每次发送消息,都会阻塞等待broker返回确认,表示消息已经被安全的持久化到了磁盘中。
2.异步发送
很多高性能的应用,允许在失败的情况下,有少量的数据丢失。如果应用满足这个特点,则可以使用异步投递来提高生产的性能,即使发送的是持久化消息。
明确指定使用异步发送,有以下3种形式:
- ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(“tcp://locahost:61616?jms.useAsyncSend=true”);
- (ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
- ((ActiveMQConnection)connection).setUseAsyncSend(true);
3.异步发送产生的问题:
- 不确定消息生产后,是否成功发送到broker(MQ);
- 在消费很慢的情况下,会产生消息积压;
4.使用回调函数
如果生产者使用异步投递模式,生产的消息存在内存中进行异步发送,但是如果此时MQ宕机,那么生产者内存中的消息就会发送失败,产生消息丢失。那么如何解决中情况呢?解决办法就是使用回调。
//创建消息的生产者
ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(queue);
//创建消息
TextMessage textMessage = session.createTextMessage("msg--" + i);
textMessage.setJMSMessageID(UUID.randomUUID() + "--业务标识");
String jmsMessageID = textMessage.getJMSMessageID();
//使用生产者发送消息
producer.send(textMessage, new AsyncCallback() {
@Override
public void onSuccess() {
System.out.println(jmsMessageID + "has been OK");
}
@Override
public void onException(JMSException exception) {
System.out.println(jmsMessageID + "fail to send to mq");
}
});
生产者使用ActiveMQMessageProducer类型;
然后发送消息时,添加回调方法,实现成功和失败两个接口方法,分别记录下成功和失败的消息id。这样就可以知道哪些消息发送失败,从而使用其他手段进行补偿。
二.延时投递和定时投递
消息生产后,可以延迟一定时间后,在发送到MQ中,并且还可以设置重复发送,即每间隔一定时间发送一次消息。
1.开启
需要在activemq.xml的配置文件中添加 schedulerSupport="true"
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">
2.代码中参数设置
生产者发送消息时,可以对消息对象设置一些控制参数,如下表格:
以下是生产者部分核心代码片段,注意注释内容:
.....
//延时3秒后发送消息
long delay = 3 * 1000;
//重复周期为4秒
long period = 4 * 1000;
//重复次数为5次
int repeat = 5;
try {
//生产3条消息到队列
for (int i = 1; i <= 3; i++) {
//创建消息
TextMessage textMessage = session.createTextMessage("msg--" + i);
//设置延时时间
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
//设置重复周期
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
//设置重复次数
textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
//使用生产者发送消息
producer.send(textMessage);
}
.....
发送消息之前,设置消息的延迟时间,重复周期,重复次数。
消费者端会循环消费到重复的消息,这里就不演示了。
三.消费重试机制
1.导致消息重发的的几个场景:
- 消费者Cilent使用了事务,但是在session中调用了rollback;
- 消费者CLient使用了事务,但是在commit之前关闭了或者没有提交;
- 消费者Client在CLIENT_ACKNOWLEDGE模式下,在session调用了recover()。
以上情况会导致ActiveMQ消息重发,默认间隔1秒,重发6次。
2.demo
针对以上第2种情况,可以做简单的测试,消费者测试代码如下:
//创建连接工厂,按照给定的url地址,使用默认用户名和密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//通过连接工厂获取连接,并启动
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//创建会话session,开启事务!!!!!
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
。。。。
省略若干代码
。。。。
//注意这里不提交事务
//session.commit();
过程:
生产者把消息发送到MQ后,消费者使用事务进行消费,但是没有提交事务。这时,MQ中的数据会依然存在,MQ会尝试重新发送消息,当消费者重复消费超过6次后,消费者会返回给MQ一个消息,告诉MQ此消息有毒,MQ会将此消息保存到死信队列中。消费者再次消费时,将不会消费到此有毒的消息。
3.重试参数设置
其中,重试此次数可以通过参数设定:
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
...
RedeliveryPolicy queuePolicy = new RedeliveryPolicy();
queuePolicy.setInitialRedeliveryDelay(0);
queuePolicy.setRedeliveryDelay(1000);
queuePolicy.setUseExponentialBackOff(false);
queuePolicy.setMaximumRedeliveries(2);
...
activeMQConnectionFactory.setRedeliveryPolicy(queuePolicy);
其中,参数具体含义如下,红色的两个就是默认次数6和默认时间间隔1s:
官方文档:http://activemq.apache.org/redelivery-policy
四.死信队列
死信队列(Dead Letter Queue),上一节中说到,当一条消息被重复消费超过默认的6次,就会被ActiveMQ放入到死信队列。开发人员可以查看此队列中的错误消息,进行人工干预。
理论上,一般生产环境中使用MQ,都会设计两个队列:
- 核心业务队列,比如订单系统发送的订单消息;
- 私信队列,处理异常消息;
比如下图中:
上图中,假如第三方物流系统故障,无法被仓储系统请求,那么仓储系统每消费到一条订单消息,发送给第三方物流系统时,都会出错。此时仓储系统可以把这条消息拒绝访问或者标记此消息为失败。
一旦这条消息被标识为了失败消息后,MQ就会把这条消息存储到一个提前设置好的死信队列中。
然后我们就会看到,第三方物流系统故障期间,所有的订单消息,都被标记为处理失败,转存到死信队列中。然后仓储系统后台有个线程,不停监视第三方物流系统的状态,一旦发现它恢复正常,仓储系统就会从私信队列中消费消息,重新执行发货和配送的业务逻辑。
死信队列有两种模式:
sharedDeadLetterStrategy :
共享队列,所有问题消息,都保存在共有的一个队列中;这是ActiveMQ的默认策略;
individualDeadLetterStrategy :
将问题消息,保存到各自的队列中,队列名字不同,对于Queue模式,队列的名字前缀是ActiveMQ.DLQ.Queue;对于Topic,队列名字前缀为ActiveMQ.DLQ.Topic。
比如,
配置死信队列名字:
<broker>
<destinationPolicy>
<policyMap>
<policyEntries>
<!-- Set the following policy on all queues using the '>' wildcard
'>'表示应用于所有队列,可以改为具体的队列名字-->
<policyEntry queue=">">
<deadLetterStrategy>
<!--
Use the prefix 'DLQ.' for the destination name, and make
the DLQ a queue rather than a topic
设置队列前缀
-->
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/>
</deadLetterStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
</broker>
配置是否清理过期的消息,true为清理,false为不清理:
<deadLetterStrategy>
<sharedDeadLetterStrategy processExpired="false" />
</deadLetterStrategy>
配置消息过期时间:
<broker>
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue="QueueWhereItIsOkToExpireDLQEntries">
<deadLetterStrategy>
<.... expiration="300000"/>
</deadLetterStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
</broker>
配置是否存储非持久化消息:
因为ActiveMQ的私信队列默认不存储非持久化类型的消息,但是可以通过以下配置开启:
<broker>
<destinationPolicy>
<policyMap>
<policyEntries>
<!-- Set the following policy on all queues using the '>' wildcard -->
<policyEntry queue=">">
<!--
Tell the dead letter strategy to also place non-persisted messages
onto the dead-letter queue if they can't be delivered.
-->
<deadLetterStrategy>
<sharedDeadLetterStrategy processNonPersistent="true" />
</deadLetterStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
</broker>
五.防止重复调用
由于网络延迟等原因,会造成MQ重试,在重试过程中可能会产生消息重复消费。
解决方案:
1.如果消费到的消息是做数据库插入操作,可以给消息做一个唯一主键,比如使用消息ID。这样即使出现消息重复消费,由于唯一索引的存在,也不会重复插入数据。
2.准备一个第三方服务来做消费记录。比如使用redis,需要给消息分配全局唯一id。消费者开始消费之前,先去redis查询有没有消费过,发现没有消费过才进行消费。只要消费过的消息,都以<id,message>键值对形式存储到redis中。这里要注意消费消息和保存消息的事务原子性。