整合生产者
创建Maven工程
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>sprinboot_activemq</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.5.RELEASE</version>
<relativePath/>
</parent>
<properties>
<projet.build.sourceEncoding>UTF-8</projet.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<version>2.1.5.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
配置SpringBoot
application.yml
server:
port: 7777
spring:
activemq:
broker-url: tcp://localhost:61616 #自己的MQ服务器地址
user: admin
password: admin
jms:
pub-sub-domain: false # false: Queue true:Topic
#自己定义队列名称
myqueue: boot-activemq-queue
配置Bean
package pers.zhang.config;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import javax.jms.Queue;
@Component
@EnableJms//开启JMS适配注解
public class ConfigBean {
@Value("${myqueue}")
private String myqueue;
@Bean
public Queue queue(){
return new ActiveMQQueue(myqueue);
}
}
发送者
package pers.zhang.test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;
import javax.jms.Queue;
import java.util.UUID;
@Component
public class Queue_Produce {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
public void produceMsg(){
jmsMessagingTemplate.convertAndSend(queue, "*****" + UUID.randomUUID().toString().substring(0, 6));
}
}
主启动类
@SpringBootApplication
public class MainApp_Produce {
public static void main(String[] args) {
SpringApplication.run(MainApp_Produce.class, args);
}
}
编写测试类
package pers.zhang;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
import pers.zhang.test.Queue_Produce;
import javax.annotation.Resource;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = MainApp_Produce.class)
@WebAppConfiguration
public class TestActive {
@Resource
private Queue_Produce queue_produce;
@Test
public void testSend() throws Exception{
queue_produce.produceMsg();
}
}
运行测试方法:
每隔3秒向MQ推送消息:
增加Queue_Produce定时投递方法:
//间隔3定时投递
@Scheduled(fixedDelay = 3000)
public void produceMdgScheduled(){
jmsMessagingTemplate.convertAndSend(queue, "*****" + UUID.randomUUID().toString().substring(0, 6));
System.out.println("3秒一次...");
}
修改主启动类MainApp_Produce:
@SpringBootApplication
@EnableScheduling//开启
public class MainApp_Produce {
public static void main(String[] args) {
SpringApplication.run(MainApp_Produce.class, args);
}
}
运行主启动类,控制台打印:
3秒一次...
3秒一次...
3秒一次...
整合消费者
创建Maven工程,pom.xml与生产者一致。
配置application.yml
server:
port: 8888
spring:
activemq:
broker-url: tcp://localhost:61616 #自己的MQ服务器地址
user: admin
password: admin
jms:
pub-sub-domain: false # false: Queue true:Topic
#自己定义队列名称
myqueue: boot-activemq-queue
消费者
监听消息:
package pers.zhang.activemq;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.TextMessage;
@Component
public class Queue_Consumer {
@JmsListener(destination = "${myqueue}")
public void receive(TextMessage textMessage)throws JMSException {
System.out.println("消费者收到消息:" + textMessage.getText());
}
}
主启动类
@SpringBootApplication
public class MainApp_Consumer {
public static void main(String[] args) {
SpringApplication.run(MainApp_Consumer.class, args);
}
}
运行,控制台打印:
消费者收到消息:*****88d60d
消费者收到消息:*****b9c57e
消费者收到消息:*****5116ae
消费者收到消息:*****b59d0b
消费者收到消息:*****0806c6
消费者收到消息:*****af7056
消费者收到消息:*****9895bd