安装ActiveMq 下载地址:http://activemq.apache.org/activemq-5140-release.html
http://127.0.0.1:8161/admin/ 为访问地址:账户默认为:admin 密码:admin
yml配置
spring:
activemq:
broker-url: tcp://localhost:61616
in-memory: true
pool:
enabled: false
password: admin
user: admin
maven依赖
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.14.5</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.5</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
生产者:
package com.ocean.service;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import javax.jms.*;
/**
* Created with IDEA
* author:QinWei
* Date:2019/1/7
* Time:11:16
*/
@Service("producer")
public class Producer {
@Autowired // 也可以注入JmsTemplate,JmsMessagingTemplate对JmsTemplate进行了封装
private JmsMessagingTemplate jmsTemplate;
// 发送消息,destination是发送到的队列,message是待发送的消息
public void sendMessage( final String message){
Destination destination = new ActiveMQQueue("mytest.queue");
jmsTemplate.convertAndSend(destination, message);
}
/**
* @desc 延时发送
*/
public void delaySend(String text, String queueName, Long time) {
//获取连接工厂
ConnectionFactory connectionFactory = this.jmsTemplate.getConnectionFactory();
try {
//获取连接
Connection connection = connectionFactory.createConnection();
connection.start();
//获取session
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 创建一个消息队列
Destination destination = session.createQueue(queueName);
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
TextMessage message = session.createTextMessage(text);
//设置延迟时间
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
//发送
producer.send(message);
session.commit();
producer.close();
session.close();
connection.close();
} catch (Exception e) {
e.getMessage();
}
}
}
消费者:
package com.ocean.config;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* Created with IDEA
* author:QinWei
* Date:2019/1/7
* Time:11:17
*/
@Component
public class Consumer {
// 使用JmsListener配置消费者监听的队列,其中text是接收到的消息
@JmsListener(destination = "mytest.queue")
public void receiveQueue(String text) throws InterruptedException {
Thread.sleep(5000);
System.out.println("consumer接收到"+text+"的请求并处理完毕,时间是"+new Date());
}
}
测试:
@Controller
public class UserController {
@Autowired
private Producer producer;
/**
* 消息队列实现方式
*/
@RequestMapping("/queue")
@ResponseBody
public String queue(){
for (int i = 0; i < 10 ; i++){
producer.sendMessage( "queue"+i);
}
return "queue 发送成功";
}
}
延迟5秒是为了模仿业务 处理以下为消费后的信息