JMS:即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。绝大多数MOM提供商都对JMS提供支持。它类似于JDBC(Java Database Connectivity)。JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。许多厂商都支持 JMS,JMS 使您能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另一个 JMS客户机发送消息,可以将消息分为几种类型,它们分别携带:简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。
ConnectionFactory | 连接工厂 |
Connection | l连接 |
Session | 会话 |
Destination | 目标 |
MessageProducer | 生产者 |
MessageConsumer | 消费者 |
Message | 消息 |
Broker | 消息中间件的实例 |
经典API示意图(版本1.1):
JMS Message Mode:消息由三部分构成,Header(消息头)、Body(消息体) 必需,Properties (消息属性)非必需
头字段名 | 说明 | Set By | Setter method |
JMSDestination | 消息的目的地,Topic或者是Queue | JMS provider send method | setJMSDestination (not for client use) |
JMSDeliveryMode | 消息的发送模式 | JMS provider send method | setJMSDeliveryMode(not for client use) |
JMSTimestamp | 消息传递给Broker的时间戳,它不是实 际发送的时间 |
JMS provider send method | setJMSTimestamp (not for client use) |
JMSExpiration | 消息的有效期,在有效期内,消息消费者才可以消费这个消息 | JMS provider send method | setJMSExpiration (not for client use) |
JMSPriority | 消息的优先级。0-4为正常的优先级,5-9为高优先级 | JMS provider send method |
setJMSPriority (not for client use) |
JMSMessageID | 一个字符串用来唯一标示一个消息 | JMS provider send method |
setJMSMessageID (not for client use) |
JMSReplyTo | 有时消息生产者希望消费者回复一个消息,JMSReplyTo为一个Destination,表示需要回复的目的地 | Client application |
setJMSReplyTo |
JMSCorrelationID | 通常用来关联多个Message | Client application | setJMSCorrelationID, setJMSCorrelationIDAsByte |
JMSType | 表示消息体的结构,和JMS提供者有关 | Client application | setJMSType |
JMSRedelivered | 如果这个值为true,表示消息是被重新发 送了 |
JMS provider prior to delivery |
setJMSRedelivered (not for client use) |
消息体实现说明 | 说明 |
BytesMessage | 用来传递字节消息 |
MapMessage | 用来传递键值对消息 |
ObjectMessage | 用来传递序列化对象 |
StreamMessage | 用来传递文件等 |
TextMessage | 用来传递字符串 |
1、JMS支持两种消息传递方式:
- 点对点(Point-to-Point),也可以称为queue 2. 发布订阅(Publish and subscribe),也可以称为topic
2、ActiveMq异步消息支持:https://activemq.apache.org/async-sends
3、发送有过期时间消息(自动出队):
//通过生产者producer 发送消息 消息级别设置过期时间,默认是0毫秒永不过期
producer.send(message, DeliveryMode.PERSISTENT, 4, 30000L);
// producer级别,设置producer的默认过期时间
producer.setTimeToLive(30000L);
producer.send(message);
消息过期时间和消息重发有什么区别?
消息过期是生产者发送消息到MQ服务器,多长时间没人消费,最终会被出队。
消息重发是消息发送到了MQ,但是分发给消费者的时候,没给MQ服务器回复,会再次进行分发。
并且过期时间是生产者发送消息的那一刻就开始算,而不是发送到MQ服务器上在开始算。
普通web项目生产者设置过期时间(我这里默认设置30S)
package com.study.activemq.le2_example.activemq.expiration;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 简单生产者
*/
public class Producer {
public static void main(String[] args) {
new ProducerThread("tcp://mq.study.com:61616", "ExpirationTestQueue").start();
}
static class ProducerThread extends Thread {
String brokerUrl;
String destinationUrl;
public ProducerThread(String brokerUrl, String destinationUrl) {
this.brokerUrl = brokerUrl;
this.destinationUrl = destinationUrl;
}
@Override
public void run() {
ActiveMQConnectionFactory connectionFactory;
Connection conn;
Session session;
try {
// 1、创建连接工厂
connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
connectionFactory.setUseAsyncSend(true);
// 2、创建连接
conn = connectionFactory.createConnection();
conn.start(); // 一定要start
// 3、创建会话(可以创建一个或者多个session)
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4、创建消息发送目标 (Topic or Queue)
Destination destination = session.createQueue(destinationUrl);
// 5、用目的地创建消息生产者
MessageProducer producer = session.createProducer(destination);
// 设置递送模式(持久化 / 不持久化)
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 6、创建一条文本消息
String text = "Hello world! From: " + Thread.currentThread().getName() + " : "
+ System.currentTimeMillis();
TextMessage message = session.createTextMessage(text);
// 7、通过producer 发送消息
// 消息级别设置过期时间
// producer.send(message, DeliveryMode.PERSISTENT, 4, 30000L);
// producer级别,设置producer的默认过期时间
producer.setTimeToLive(30000L);
producer.send(message);
System.out.println("Sent message: " + text);
// 8、 清理、关闭连接
session.close();
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
30S后刷新页面,消息出列
结合spring生产者设置过期时间(我这里默认设置30S)
package com.example.customers.activemq.day2.expire;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.core.JmsTemplate;
import javax.annotation.PostConstruct;
import javax.jms.DeliveryMode;
/**
* @author Heian
* @time 19/08/23 20:59
* @description: spring 设置过期消息
*/
@SpringBootApplication
public class ProducerExpire {
@Autowired
private JmsTemplate jmsTemplate;
@PostConstruct
public void sendMessage() {
System.out.println("Sending an email message with 30S");
// 单独设置某条消息的过期时间
jmsTemplate.execute("ExpirationTestQueue", (session, producer) -> {
producer.send(session.createTextMessage("Expiration set Type 1"), DeliveryMode.PERSISTENT, 4, 30000L);
return null;
});
//二选一
// jmsTemplate级别消息设置过期时间
jmsTemplate.setExplicitQosEnabled(true);
jmsTemplate.setTimeToLive(30000L);
jmsTemplate.convertAndSend("ExpirationTestQueue", "message with Time to Live");
}
public static void main(String[] args) {
SpringApplication.run (ProducerExpire.class);
}
}
4. 发送延迟消息、定时任务(我这个项目就经常用到定时任务去异步执行我们需要的操作)
a:首先activemq支持删除两种方式,但是需要在配置文件中开启 目录在conf/activemq.xml
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true" >
b:普通web项目中实现延迟消息/定时任务 生产者
package activemq.delay;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ScheduledMessage;
import javax.jms.*;
/**
* 定时任务 生产者
*/
public class Producer {
public static void main(String[] args) {
new ProducerThread("tcp://192.168.32.130:61616", "delayQueue").start();
}
static class ProducerThread extends Thread {
String brokerUrl;
String destinationUrl;
public ProducerThread(String brokerUrl, String destinationUrl) {
this.brokerUrl = brokerUrl;
this.destinationUrl = destinationUrl;
}
@Override
public void run() {
ActiveMQConnectionFactory connectionFactory;
Connection conn;
Session session;
try {
// 1、创建连接工厂
connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
connectionFactory.setUseAsyncSend(true);
// 2、创建连接
conn = connectionFactory.createConnection();
conn.start(); // 一定要start
// 3、创建会话(可以创建一个或者多个session)
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4、创建消息发送目标 (Topic or Queue)
Destination destination = session.createQueue(destinationUrl);
// 5、用目的地创建消息生产者
MessageProducer producer = session.createProducer(destination);
// 设置递送模式(持久化 / 不持久化)
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 6、创建文本消息
// 延时、调度消息
// 【不可用,这是JMS2.0中的方法】设置producer发送的消息的延迟递送时间
// producer.setDeliveryDelay(60000L);
// ActiveMQ 中的方案
// http://activemq.apache.org/delay-and-schedule-message-delivery.html
// 延时 5秒
TextMessage message = session.createTextMessage("Delay message - 1!");
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 5 * 1000L);
// 延时 5秒,投递3次,间隔10秒 (投递次数=重复次数+默认的一次)所以这里会投递三次
TextMessage message2 = session.createTextMessage("Delay message - 2!");
message2.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 5 * 1000L); // 延时
message2.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 10 * 1000L); // 投递间隔
message2.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 2); // 重复次数
/**
* CRON 表达式的方式
* 第一个参数:分钟(0-59)
* 第二个参数:小时(0-23)
* 第三个参数:天day of month(1-31)
* 第四个参数:月份(1-12)
* 第五个参数:日期day of week(0-7 0代表周天)
*/
TextMessage message3 = session.createTextMessage("Delay message - 3!");
message3.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "30 * * * *");
// CRON 表达式的方式 以及 和上面参数的组合,CRON表达式指定开始时间 (10次)
TextMessage message4 = session.createTextMessage("Delay message - 4!");
message4.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "30 * * * *");
message4.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 1000);//延迟1S发送
message4.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 1000);//间隔时间1S发送
message4.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 9);//重复9次
// 7、发送消息
producer.send(message);
producer.send(message2);
producer.send(message3);
producer.send(message4);
System.out.println("Sent delay message: ok");
// 8、 清理、关闭连接
session.close();
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
需要一个消费者来消费,便于在控制台输出信息
package activemq.delay;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 简单消费者
*/
// http://activemq.apache.org/consumer-features.html
public class Consumer {
public static void main(String[] args) {
new ConsumerThread("tcp://192.168.32.130:61616", "delayQueue").start();
}
}
class ConsumerThread extends Thread {
String brokerUrl;
String destinationUrl;
public ConsumerThread(String brokerUrl, String destinationUrl) {
this.brokerUrl = brokerUrl;
this.destinationUrl = destinationUrl;
}
@Override
public void run() {
ActiveMQConnectionFactory connectionFactory;
Connection conn;
Session session;
MessageConsumer consumer;
try {
// brokerURL
// http://activemq.apache.org/connection-configuration-uri.html
// 1、创建连接工厂
connectionFactory = new ActiveMQConnectionFactory(this.brokerUrl);
// 2、创建连接对象
conn = connectionFactory.createConnection();
conn.start(); // 一定要启动
// 3、创建会话(可以创建一个或者多个session)
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4、创建消息消费目标(Topic or Queue)
Destination destination = session.createQueue(destinationUrl);
// 5、创建消息消费者 http://activemq.apache.org/destination-options.html
consumer = session.createConsumer(destination);
// 6、异步接收消息
consumer.setMessageListener(message -> {
if (message instanceof TextMessage) {
try {
System.out.println("Time: " + System.currentTimeMillis() + " 收到文本消息:"
+ ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
} else {
System.out.println(message);
}
});
// consumer.close();
// session.close();
// conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
springboot项目结合spring注解实现
package com.example.customers.activemq.day2.quartz;
import com.example.customers.activemq.day1.Emails;
import org.apache.activemq.ScheduledMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import javax.annotation.PostConstruct;
@SpringBootApplication
public class DelayMesssageProducer {
@Autowired
private JmsTemplate jmsTemplate;
@PostConstruct
public void sendMessage() {
// jms 2.0 的实现才能实现该方式
// jmsTemplate.setDeliveryDelay(20000L);
// jmsTemplate.convertAndSend("delayQueue", "message with delay -1 ");
// 发送延时消息 加了个回调函数,让我们有机会去改消息
jmsTemplate.convertAndSend("quzrtz", "结合spring发送定时任务消息", message -> {
// 延时 5秒,投递3次,间隔10秒 (投递次数=重复次数+默认的一次)
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 30 * 1000L); // 延时
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 5 * 1000L); // 投递间隔
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 5); // 重复次数 会有6次
//message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "30 * * * *");
return message;
});
System.out.println("发送了一个定时任务");
}
@JmsListener(destination = "quzrtz")
public void receive(Object message) {
System.out.println(Thread.currentThread().getName() + "接受到的消息" + message.toString ());
}
public static void main(String[] args) {
SpringApplication.run(DelayMesssageProducer.class, args);//会扫表此路径下的文件
}
}
备注:消费者生产者写在了同一个类中,我这里设置了30s的延时,即出队入队一开始都为0,只是消息存在broker中,后面入队消息逐渐+1一直到加至6。(定时任务在业务中很常用:比如轮询数据库发现异常数据,会起生产者去做XX处理,然后起一个监听去监听队列)
4. 设置消息优先级
//数字越大,优先级越高,越容易出列
producer.setPriority (8);//普通web 生产者
message.setJMSPriority (5);//spring 生产者 回调中设置
5. 设置消息传递模式
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
//一般都是设置持久化机制,也是为了高可用,挂了重启消息会被重新加载。另一方面因为消息入队都是存于内存中,不可能一直让你无限添加队列,是有数量限制的,可在配置中更改,所以需要持久化到硬盘
6.消息顺序性
- 设置优先级
- 延时消息,也会改变消息顺序
- 发送了非持久化消息,jsm挂了,该条消息也就发不出去了,也会影响消息顺序性
- 多个生产者往同一个队列里添加消息,并且生产者有的是持久化消息机制有的是非持久化消息机制,非持久化的消息排在持久化消息的前面
- 消息开启事务支持,一个事务存在多条消息,可能不再是遵从发送消息的顺序而是遵从jms事务的顺序
7. 持久化订阅和非持久化订阅
对于topic的消息,有两种订阅类型:Durable Subscribers 和 NonDurable Subscribers。
- 当Broker发送消息给订阅者时,如果订阅者处于 inactive 状态:持久订阅者可以收到消息,而非持久订阅者则收不到消息。因为topic只有在持久订阅下才会持久化,才会将信息存储在硬盘内存中。
- 当持久订阅者处于 inactive 状态时,Broker需要为持久订阅者保存消息;
举例:例如,设想一个持久订户C在时间t开始订阅主题T,某些发布者向主题发送消息M1,M2,M3并且S将接收这些消息中的每一个。
然后C停止,发布者继续发送M4,M5。当C在D2重新启动时,发布者此时还在发送M6和M7,则现在C会收到M4,M5,然后是M6和M7以及所有未来的消息。即S将接收来自M1..M7的所有消息。而非持久化订阅只会接受自己active期间的消息,也就是会丢失M4,M5。
(这段话出处来自:https://blog.csdn.net/liuxiao723846/article/details/78934382)
备注:
- 当发布者已经生产了一批数据,消费者随后才进行的持久化订阅,但并不会接收发布者之前发布的消息,但之后的消息都会接受到,即使订阅者宕机,重启后依然能接收,所以一般是先订阅者订阅消息在启动发布者发布消息,才能接受到全面的消息。
持久化订阅的代码实现
1、持久订阅时,客户端向JMS 注册一个识别自己身份的ID(clientId必须有,而且必须唯一,重复会提示id已被占用,且无法启动。)
2、当这个客户端处于离线时,JMS Provider 会为这个ID 保存所有发送到主题的消息,会造成的影响是:如果持久订阅者订阅的消息太多则会溢出,当 消息投递成功之后,Broker就可以把消息删除了。
3、当客户再次连接到JMS Provider时,会根据自己的ID 得到所有当自己处于离线时发送到主题的消息
package com.example.customers.activemq.day3.durable;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @author Heian
* @time 19/08/28 20:15
* @description: 生产者不停的向topic写入消息,供订阅者消费
*/
public class Producer {
public static void main(String[] args) {
new ProducerThread("tcp://192.168.32.130:61616", "durableTopic").start();
}
static class ProducerThread extends Thread {
String brokerUrl;
String destinationUrl;
public ProducerThread(String brokerUrl, String destinationUrl) {
this.brokerUrl = brokerUrl;
this.destinationUrl = destinationUrl;
}
@Override
public void run() {
ActiveMQConnectionFactory connectionFactory;
Connection conn;
Session session;
try {
// 1、创建连接工厂
connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
// 2、创建连接
conn = connectionFactory.createConnection();
conn.start(); // 一定要start
// 3、创建会话(可以创建一个或者多个session)
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4、创建消息发送目标 (Topic or Queue)
Destination destination = session.createTopic(destinationUrl);
// 5、用目的地创建消息生产者
MessageProducer producer = session.createProducer(destination);
// 设置递送模式(持久化 / 不持久化)
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 6、创建文本消息
for (int i = 0; i < 100; i++) {
String text = "发送消息: " + Thread.currentThread().getName() + " : "
+ System.currentTimeMillis();
TextMessage message = session.createTextMessage(text);
// 7、通过producer 发送消息
System.out.println("Sent message: " + text);
producer.send(message);
Thread.sleep(2000L);
}
// 8、 清理、关闭连接
session.close();
conn.close();
} catch (JMSException | InterruptedException e) {
e.printStackTrace();
}
}
}
}
package com.example.customers.activemq.day3.durable;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @author Heian
* @time 19/08/28 19:51
* @description: 创建持久化订阅
* 1、持久订阅时,客户端向JMS 注册一个识别自己身份的ID(clientId必须有,而且必须唯一,重复会提示id已被占用)
* 2、当这个客户端处于离线时,JMS Provider 会为这个ID 保存所有发送到主题的消息
* 3、当客户再次连接到JMS Provider时,会根据自己的ID 得到所有当自己处于离线时发送到主题的消息。
*/
public class DurableSubscriber {
public static void main(String[] args) {
// 通过brokerurl上指定clientid 或者在地址上后缀带入tcp://mq.study.com:61616?jms.clientID=x
new ConsumerThread("tcp://192.168.32.130:61616", "durableTopic").start();
}
}
class ConsumerThread extends Thread {
String brokerUrl;//地址
String destinationUrl;//topic名称或者queue名称
public ConsumerThread(String brokerUrl, String destinationUrl) {
this.brokerUrl = brokerUrl;
this.destinationUrl = destinationUrl;
}
@Override
public void run() {
ActiveMQConnectionFactory connectionFactory;
Connection conn;
Session session;
try {
// 1、创建连接工厂
connectionFactory = new ActiveMQConnectionFactory(this.brokerUrl);
// 2、创建连接对象
conn = connectionFactory.createConnection();
// 通过conn对象,持久订阅需指定ClientId,唯一ID
conn.setClientID("client-1");
conn.start(); // 一定要启动
// 3、创建会话(可以创建一个或者多个session)
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4、往(Topic or Queue)写入数据 Destination destination1 = session.createTopic (destinationUrl);
Topic destination = session.createTopic(destinationUrl);
// 5、创建消息消费者 订阅名 就是说同一个Topic下还细分了 比如都属于体育这个Topic,体育下又有篮球这个订阅名,然后生产者往里面写消息
TopicSubscriber consumer = session.createDurableSubscriber(destination, "myhabit-basketball");
// 6、异步接收消息
consumer.setMessageListener(message -> {
if (message instanceof TextMessage) {
try {
System.out.println("Time: " + System.currentTimeMillis() + " 收到文本消息:"
+ ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
} else {
System.out.println(message);
}
});
// consumer.close();
// session.close();
// conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
7. 共享订阅
假若生若干产者向某个Topic发送了大量的消息,然而订阅者消费能力有限,会导致Broke囤积大量的未发送给对应消费者的消息,然后这些消息又不是特别重要的,比如实时股票价格,需要快速地消费掉,对消息进行持久化就没有太大的意义,而且会因为存储消息而造成一定的开销。所以虚拟主题就应运而生,Virtual Topics就可以类比share subScription,那为什么需要虚拟主题呢?
- 同一个应用内的consumer端负载均衡的问题,也就是同一个应用上的一个持久订阅者不能使用多个consumer(也可以认为是消费组)来共同承担消息处理功能,因为每个consumer都会获取所有消息,在某些业务场景下明显是不合适的,比如某个消息下发到所有的订阅者,但只需要一个订阅者处理了此消息,就已经达到了这个消息的价值,就没必要让每个人都去处理了。
-
同一应用内的consumer端的failover问题:假设只能使用单个的持久订阅者,如果这个订阅者出错,则应用就无法处理消息了,系统的健壮性不高。而通过虚拟主题,建立多个消息群组,达到消息的高可用,让消息能够得到处理
JMS中共享是2.0规范定义的。但ActiveMQ只实现了1.1规范,它早就提出它的共享解决办法,如下
其目的就是为了达到负载均衡、容错和高可用,由之前的一条消息向N个订阅者推送,变成了一条消息向(自定义)多条queue推送,当然向现在这些队列的消息都是一样的,在由每个队列负载均衡的向消费者退si推送,此时就是点对点了,只会推送一条。
<!-- 需要在/conf中activemq.xml中新增配置,在broker节点中添加如下配置 -->
<!-- name:主题名,可以是通配符 prefix:队列的前缀 selectorAware:表示从Topic中将消息转发给Queue时,是否关注Consumer的 selector情况。如果为false,那么Topic中的消息全部转发给Queue,否则只会转发匹配Queue Consumer的selector的消息-->
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<virtualTopic name="VirtualTopic.>" prefix="VirtualTopicConsumers.*." selectorAware="false"/>
<virtualTopic name="aa>" prefix="VirtualTopicConsumers.*." selectorAware="false"/>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
配置时请记得手打,莫要粘贴复制,否则会出现莫名其妙的报错
备注:
. 用于分隔path名称
* 用于匹配任何path名称 > 用于递归匹配以xx名称开头的任何目标
示例:
com.study.*.mq 匹配com.study.a.mq,不匹配com.study.a.b.mq com.> com.study.a.mq和com.study.a.b.mq都匹配
通配符可用于生产和消费者:
在生产者中使用通配符时,消息将发送到所有匹配的目标上。
在消费者中使用通配符时,将接收所有匹配的目标的消息。
name 代表的就是生产者往哪个topic写入消息,它可以是多个,因为它有通配符.>
prefix代表消费者消费哪个topic的前缀。
比如name配的是name="topicName.>" frefix="consumer.*."
那么生产者需要 new ProducerThread("tcp://192.168.32.130:61616", "topicName.AnyName").start()
消费者需要 new ConsumerThread ("tcp://192.168.32.130:61616", "consumer.A.VirtualTopic.AnyName").start();
package com.example.customers.activemq.day3.虚拟主题;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @author Heian
* @time 19/08/28 20:15
* @description: 生产消息
*/
public class Producer {
public static void main(String[] args) {
//配置name:主题名,此时我配的是VirtualTopic.>
new ProducerThread("tcp://192.168.32.130:61616", "VirtualTopic.AnyName").start();
}
static class ProducerThread extends Thread {
String brokerUrl;
String destinationUrl;
public ProducerThread(String brokerUrl, String destinationUrl) {
this.brokerUrl = brokerUrl;
this.destinationUrl = destinationUrl;
}
@Override
public void run() {
ActiveMQConnectionFactory connectionFactory;
Connection conn;
Session session;
try {
// 1、创建连接工厂
connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
// 2、创建连接
conn = connectionFactory.createConnection();
conn.start(); // 一定要start
// 3、创建会话(可以创建一个或者多个session)
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4、创建消息发送目标 (Topic or Queue)
Destination destination = session.createTopic(destinationUrl);
// 5、用目的地创建消息生产者
MessageProducer producer = session.createProducer(destination);
// 设置递送模式(持久化 / 不持久化)
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 6、创建文本消息
for (int i = 0; i < 10; i++) {
String text = "发送消息: " + Thread.currentThread().getName() + " : "
+ System.currentTimeMillis();
TextMessage message = session.createTextMessage(text);
// 7、通过producer 发送消息
System.out.println("Sent message: " + text);
producer.send(message);
Thread.sleep(20000L);
}
// 8、 清理、关闭连接
session.close();
conn.close();
} catch (JMSException | InterruptedException e) {
e.printStackTrace();
}
}
}
}
package com.example.customers.activemq.day3.虚拟主题;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Consumer {
public static void main(String[] args) {
//prefix 队列前缀 此时我配置的是VirtualTopicConsumers.*
new ConsumerThread ("tcp://192.168.32.130:61616", "VirtualTopicConsumers.A.VirtualTopic.AnyName").start();
new ConsumerThread ("tcp://192.168.32.130:61616", "VirtualTopicConsumers.A.VirtualTopic.AnyName").start();
new ConsumerThread ("tcp://192.168.32.130:61616", "VirtualTopicConsumers.B.VirtualTopic.AnyName").start();
new ConsumerThread ("tcp://192.168.32.130:61616", "VirtualTopicConsumers.B.VirtualTopic.AnyName").start();
}
}
class ConsumerThread extends Thread {
String brokerUrl;
String destinationUrl;
public ConsumerThread(String brokerUrl, String destinationUrl) {
this.brokerUrl = brokerUrl;
this.destinationUrl = destinationUrl;
}
@Override
public void run() {
ActiveMQConnectionFactory connectionFactory;
Connection conn;
Session session;
try {
// 1、创建连接工厂
connectionFactory = new ActiveMQConnectionFactory(this.brokerUrl);
// 2、创建连接对象
conn = connectionFactory.createConnection();
conn.start(); // 一定要启动
// 3、创建会话(可以创建一个或者多个session)
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4、创建消息消费目标(Topic or Queue)
Destination destination = session.createQueue(destinationUrl);
// 5、创建消息消费者 http://activemq.apache.org/destination-options.html
MessageConsumer consumer = session.createConsumer(destination);
// 6、异步接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
System.out.println(Thread.currentThread().getName() + " Time: " + System.currentTimeMillis()
+ " 收到文本消息:" + ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
} else {
System.out.println(message);
}
}
});
// consumer.close();
// session.close();
// conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
此时消费者启动四个线程,但是A线程有两个,B线程组有两个,在启动生产者往对应topic写入数据,此时数据会被AB两个线程组中的各一个线程消费。并且这也是属于持久化订阅
ActiveMQ Session Task-5 Time: 1567088532112 收到文本消息:发送消息: Thread-0 : 1567088532107
ActiveMQ Session Task-5 Time: 1567088532125 收到文本消息:发送消息: Thread-0 : 1567088532107
注意:此时控制台显示的线程名称是监听线程的名称,并非是AB线程组中的某个线程名称。