最近在使用activeMQ做一些东西,虽然不是我直接承担该模块,但是既然参与到了这个工程中,学习一点只是还是不错的。自己搭建了一个环境,整体上不是比较难,但是中间的一些坑,拿出来说说。(不是太深入,欢迎大神指正)
首先到官网上现在相关的包,http://activemq.apache.org/download.html 剩下的就是需要你的电脑有JAVA环境,在bin目录下(一下提到目录,默认在activeMQ的包目录下),按照自己电脑的环境启动32位或者是64位。activeMQ主要的特性就是支持消息的延时投递和消息的持久化,从而保证消息的不丢失,项目中主要用到这连个特性,但是延时需要在activeMQ中配置,默认不开启,有点微坑。打开conf下的activemq.xml,找到broker标签,在该行尖括号内加上schedulerSupport="true" ,如果正在运行,需要重新启动才生效,此时就支持延时功能,其他的暂时不用配置。
建立基本的spring项目,以maven工程为例,在pom中添加:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.12.0</version>
</dependency>
非pom工程添加相关的jar包即可,applicationContext.xml文件配置:
<bean id="targetConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
<property name="useAsyncSend" value="true" />
</bean>
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="targetConnectionFactory"/>
</bean>
<!--这个是队列目的地,点对点的-->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>myqueue</value>
</constructor-arg>
</bean>
<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<property name="connectionFactory" ref="connectionFactory"/>
<property name="defaultDestination" ref="queueDestination" />
<property name="pubSubDomain" value="false" />
<property name="explicitQosEnabled" value="true" /> <!-- deliveryMode, priority, timeToLive 的开关,要生效,必须配置为true,默认false-->
<property name="deliveryMode" value="2" /> <!-- 发送模式 DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久-->
<property name="sessionAcknowledgeMode" value="1" />
<!-- 消息应答方式
Session.AUTO_ACKNOWLEDGE 1 消息自动签收
Session.CLIENT_ACKNOWLEDGE 2 客户端调用acknowledge方法手动签收
Session.DUPS_OK_ACKNOWLEDGE 3不必必须签收,消息可能会重复发送
-->
</bean>
<!-- 消息监听器 -->
<bean id="consumerMessageListener" class="com.wei.activemq.MsgListener" />
<!-- 消息监听容器 -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer" >
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="queueDestination"/>
<property name="messageListener" ref="consumerMessageListener"/>
</bean>
然后添加相应的支持
定义我们要发送的消息任务:
public class MsgTask implements Serializable {
private long id;
private String name;
private long delayTime;
//getter setter...
}
定义消息生产者
@Service
public class MsgProducer {
@Autowired
public JmsTemplate jmsTemplate;
/**
* send message
*/
public void sendMessage(final MsgTask msgTask) {
jmsTemplate.send(jmsTemplate.getDefaultDestination(), new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
ObjectMessage objectMessage = session.createObjectMessage(msgTask);
// objectMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, msgTask.getDelayTime());打开注解,即可发送延时的消息
objectMessage.setJMSExpiration(msgTask.getDelayTime());
objectMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
objectMessage.setJMSType("延迟发送的消息");
Constants.map.put(msgTask.hashCode(),msgTask);
return objectMessage;
}
});
}
}
定义消息接收者:
public class MsgListener implements MessageListener {
public void onMessage(Message m) {
System.out.println("[receive message]");
ObjectMessage om = (ObjectMessage) m;
try {
MsgTask msgTask = (MsgTask) om.getObject();
Constants.receivemap.put(msgTask.hashCode(), msgTask);
System.out.println("延迟发送时间:" + msgTask.getDelayTime() / 1000);
System.out.println("model:" + om.getJMSDeliveryMode());
System.out.println("destination:" + om.getJMSDestination());
System.out.println("type:" + om.getJMSType());
System.out.println("messageId:" + om.getJMSMessageID());
System.out.println("time:" + om.getJMSTimestamp());
System.out.println("expiredTime:" + om.getJMSExpiration());
System.out.println("priority:" + om.getJMSPriority());
System.out.println("到达率:" + Constants.receivemap.size() * 1.0 / Constants.map.size());
// System.out.println("long Propertity:" + om.getLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY));
Random random = new Random();
if (msgTask.getId() % 2 == 0) {
om.acknowledge();
System.out.println("耗时为:" + (System.currentTimeMillis() - Constants.beginTime) + " s");
msgTask.setId(msgTask.getId() + 1);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
添加一些测试参数
public class Constants {
public static ConcurrentHashMap<Integer, MsgTask> map = new ConcurrentHashMap<>();
public static ConcurrentHashMap<Integer, MsgTask> receivemap = new ConcurrentHashMap<>();
public static int size = 1000;
public static long beginTime = 1000;
}
好了,可以写测试一下
public class MainTest {
public static void main(String[] args) {
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("applicationContext.xml");
MsgProducer msgProducer = (MsgProducer) applicationContext.getBean("msgProducer");
Constants.beginTime = System.currentTimeMillis();
for (int i = 0; i < 2; i++) {
MsgTask msgTask = new MsgTask();
msgTask.setName("test" + i);
msgTask.setId(i);
msgTask.setDelayTime((long) Math.random() * 1000);
msgProducer.sendMessage(msgTask);
}
}
}
基本的功能已经可用,可以在大部分场合中使用。