ActiveMQ官网对延迟投递和定时投递的描述:http://activemq.apache.org/delay-and-schedule-message-delivery.html
broker的schedulerSupport有4个属性。
Property name | type | description |
AMQ_SCHEDULED_DELAY | long | 延迟投递时间 |
AMQ_SCHEDULED_PERIOD | long | 重复投递时间间隔 |
AMQ_SCHEDULED_REPEAT | int | 重复投递次数 |
AMQ_SCHEDULED_CRON | String | cron表达式 |
延迟投递或定时投递默认是不开启的,所以需要在activemq.xml配置文件的broker结点加上schedulerSupport="true"属性来开启。修改完配置文件后,记得重启ActiveMQ。
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">
修改Producer的代码,Consumer的代码只需要修改QUEUE_NAME。
package com.wsy.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsConsumer_DelayAndSchedule {
public static final String ACTIVEMQ_URL = "tcp://192.168.0.123:61616";
public static final String QUEUE_NAME = "DelayAndSchedule";
public static void main(String[] args) throws JMSException {
// 创建连接工厂,按照给定的url地址采用默认的用户名和密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 通过连接工厂,获取Connection并启动
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 创建Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地(目的地有两个子接口,分别是Queue和Topic)
Queue queue = session.createQueue(QUEUE_NAME);
// 创建消费者,指明从queue取消息
MessageConsumer messageConsumer = session.createConsumer(queue);
while (true) {
// 因为向队列中存放的是TextMessage的实例,所以取出来的时候,也要用TextMessage的实例来接收
// 这里的receive()方法表示一直等待,如果给它传一个long类型的毫秒数,表示consumer等待超时时间
TextMessage textMessage = (TextMessage) messageConsumer.receive();
if (textMessage != null) {
System.out.println("消费者消费:"+textMessage.getText());
} else {
break;
}
}
// 按照资源打开的相反顺序关闭资源
messageConsumer.close();
session.close();
connection.close();
}
}