Topic生产者
新建Maven工程
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>sprinboot_activemq</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.5.RELEASE</version>
<relativePath/>
</parent>
<properties>
<projet.build.sourceEncoding>UTF-8</projet.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<version>2.1.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
springboot配置:
application.yml
server:
port: 6666
spring:
activemq:
broker-url: tcp://localhost:61616 #自己的MQ服务器地址
user: admin
password: admin
jms:
pub-sub-domain: true # false: Queue true:Topic
#自己定义主题名称
mytopic: boot-activemq-topic
配置Bean:
package pers.zhang.config;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import javax.jms.Topic;
@Component
public class TopicConfigBean {
@Value("${mytopic}")
private String topicName;
@Bean
public Topic topic(){
return new ActiveMQTopic(topicName);
}
}
生产者:
@Component
public class Topic_Produce {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Topic topic;
@Scheduled(fixedDelay = 2000)
public void produceTopic(){
jmsMessagingTemplate.convertAndSend(topic, "主题消息:" + UUID.randomUUID().toString().substring(0, 6));
System.out.println("2秒一次...");
}
}
主启动类:
@SpringBootApplication
@EnableScheduling//开启
public class MainApp_Produce {
public static void main(String[] args) {
SpringApplication.run(MainApp_Produce.class, args);
}
}
消费者
创建Maven工程,pom.xml与上面生产者一致
springboot配置,application.yml:
server:
port: 5566
spring:
activemq:
broker-url: tcp://localhost:61616 #自己的MQ服务器地址
user: admin
password: admin
jms:
pub-sub-domain: true # false: Queue true:Topic
#自己定义队列名称
mytopic: boot-activemq-topic
消费者:
package pers.zhang.activemq;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.TextMessage;
@Component
public class Queue_Consumer {
@JmsListener(destination = "${mytopic}")
public void receive(TextMessage textMessage)throws JMSException {
System.out.println("消费者收到主题的消息:" + textMessage.getText());
}
}
主启动类:
@SpringBootApplication
public class MainApp_Consumer {
public static void main(String[] args) {
SpringApplication.run(MainApp_Consumer.class, args);
}
}
测试
先启动消费者:
再启动生产者,每2秒发送一条消息:
消费者控制台打印:
消费者收到主题的消息:主题消息:dfccdd
消费者收到主题的消息:主题消息:70b401
消费者收到主题的消息:主题消息:ce22bd
消费者收到主题的消息:主题消息:997932