目录
JMS 基本概念
提示:可以先参考《ActiveMQ 简介 与 Maven 项目基本使用 》的编码,然后更容易理解本文的理论。
1、JMS 全称 Java Message Service ,Java 消息服务,是 Java EE 中的一个技术。
2、JMS 规范定义了 Java 中访问消息中间件的接口,并没有给予实现,实现 JMS 接口的消息中间件成为 JMS Provider(供应者),例如 ActiveMQ 。
3、JMS Provider:实现 JMS 接口和规范的消息中间件。
4、JMS Message:JMS 的消息,由以下三个部分组成:
1)消息头:每个消息头字段都有对应的 getter 和 setter 方法
2)消息属性:如果需要除消息头字段以外的值,那么可以使用消息属性
3)消息体:封装具体的消息数据
5、JMS Producer:消息生产者,创建和发送 JMS 消息的客户端应用
6、JMS Consumer:消息消费者,接收和处理 JMS 消息的客户端应用。消息的消费可以采用以下两种方式之一:
1)同步消费:通过调用消费者的 receive 方法从目的地中显示的读取消息数据,receive 方法会一直阻塞到消息到达。
2)异步消费:客户端可以为消费者注册一个消息监听器,以定义消息到达时所采取的措施/动作。
7、JMS domains:消息传递域,JMS 规范中定义了两种消息转递域:点对点(point-to-point,简写成 PTP),发布/订阅(publish/subscribe,简写为 pub/sub)
8、点对点(point-to-point)消息传递域特点如下:
1)每个生产可以对应多个消费者,但是每个消息只能有一个消费者消费
2)消息的生产者和消费者之间没有时间上的相关性,即无论消费者在生产者发送消息的时候是否已经运行,它启动后都可以提取消息
9、发布/订阅(publish/subscribe)消息传递域特点如下:
1)每个消息可以有多个消费者消费
2)生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费自它订阅之后生产者发布的消息。即使消费者订阅了主题,如果生产者发布消息的时候,消费者宕机了,那么它再次启动的时候,默认也是不能再接收的。但 JMS 规范允许客户端创建持久订阅,允许消费者消费它在未激活状态时发送的消息。
10、在点对点消息传递域中,目的地被称为队列(queue),在发布/订阅消息传递域中,目的地被称为主题(topic)
11、Connection Factory:连接工厂,用来创建连接对象,以连接到 JMS 的 Provider(供应商,实现者)
12、JMS Connection:封装了客户与 JMS 提供者之间的一个虚拟的连接
13、JMS Session:是生产者和消费者的一个单线程上下文。会话用于创建消息生产者(Producer)、消息消费者(Consumer),和消息(Message)等。会话提供了一个事务性的上下文,一组发送和接收被组合到了一个原子操作中。
Destination | 消息发送到的目的地 |
Acknowledge | 签收 |
Transaction | 事务 |
JMS Cline | 用来收发消息的客户端应用 |
JMS 消息结构
1、JMS 消息(Message)由消息头、消息属性、消息体组成。
2、消息头包含消息的识别信息和路由信息,包含一些标准的属性如下:
标准属性 | 描述 |
JMSDestination | JMS 目的地,由 send 方法设置。主要指 queue与topic。自动分配。 |
JMSDeliveryMode | JMS 传递模式,由 send 方法设置。分为持久模式和非持久模式。前者的消息应该被传送"一次且仅仅一次",即使 JMS 提供者出现故障,该消息也不会丢失,会在服务器恢复后再次传递;后者的消息最多会传送一次,如果JMS Provider 服务器故障,该消息将永久丢失。自动分配。 |
JMSExpiration | JMS 消息过期/到期时间,由 send 方法设置。等于 Destination 的 send 方法中的 timeToLive 值加上发送时刻的 GMT 时间值。如果 timeToLive 等于0,则 JMSExpiration 被设为0,表示该消息永不过期。如果发送后,在消息过期时间之后还没有被发送到目的地,则该消息被清除。自动分配。 |
JMSPriority | JMS 消息优先级,由 send 方法设置。有 0-9 十个级别,0-4是普通消息,5-9是加急消息。JMS 不要求 JMS Provider 严格按着十个优先级发送消息,但必须保证加急消息要先于普通消息到达。默认是第4级。自动分配。 |
JMSMessageID | JMS 消息标识 id,由 send 方法设置。唯一识别每个消息的标识,由 JMS Provider 产生。自动分配。 |
JMSTimestamp | JMS 时间戳,由客户端设置。一个 JMS Provider 在调用 send 方法时自动设置的。它是消息被发送和消费者实际接收的时间差。自动分配。 |
JMSCorrelationID | JMS 相关性 id,由客户端设置。用来连接到另外一个消息,典型的应用是在回复消息中连接到原消息。可以是任意值,不仅仅是 JMSMessageId。 |
JMSType | JMS 消息类型的识别符。由开发者设置。 |
JMSReplyTo | JMS 回复,由客户端设置。提供本信息回复消息的目的地址。 |
JMSRedelivered | JMS 重发,由JMS Provider(供应商)设置 |
3、消息体:JMS API 定义了 5 种消息体格式,也叫消息类型,可以使用不同的形式发送接收数据,并可以兼容现有的消息格式。包括:TextMessage、MapMessage、BytesMessage、StreamMessage 和 ObjectMessage。它们都是 Message 接口的子类。
4、消息属性,包含以下三种类型的属性:
1)应用程序设置和添加的属性,比如:Message.setStringProperty("username","zhangSan");
2)JMS 定义的属性:使用 "JMSX" 作为属性名的前缀,connection.getMetaData().getJMSXPropertyNames(); 返回连接支持的所有 JMSX 属性的名字;
3)JMS 供应商特点的属性
JMSXUserID | 发送消息的用户标识,发送时提供商设置 |
JMSXAppID | 发送消息的应用标识,发送时提供商设置 |
JMSXDeliveryCount | 转发消息重试次数,第一次是1、第二次是2、...,发送时提供商设置 |
JMSXGroupID | 消息所在消息组的标识,由客户端设置 |
JMSXGroupSeq | 组内消息的序号,第一个消息是1,第二个是2,...,由客户端设置 |
JMSXProducerTXID | 产生消息的事务的事务标识,发送时提供商设置 |
JMSXConsumerTXID | 消费消息的事务的事务标识,接收时提供商设置 |
JMSXRcvTimestamp | JMS 转发消息到消费者的时间,接收时提供商设置 |
String brokerURL = "tcp://127.0.0.1:61616";//ActiveMQ 中间件连接地址
/**
* 创建 javax.jms.ConnectionFactory 连接工厂
* org.apache.activemq.ActiveMQConnectionFactory 中默认设置了大量的参数,还有几个重载的构造器可以选择
*/
ConnectionFactory mqConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
//如果 ActiveMQ 连不上,则抛异常:java.net.ConnectException: Connection refused: connect
Connection connection = mqConnectionFactory.createConnection();//通过连接工厂获取连接 javax.jms.Connection
connection.start();//启动连接,同理还有 stop、close
//连接元数据
ConnectionMetaData metaData = connection.getMetaData();
int jmsMajorVersion = metaData.getJMSMajorVersion();
int jmsMinorVersion = metaData.getJMSMinorVersion();
String jmsProviderName = metaData.getJMSProviderName();
String jmsVersion = metaData.getJMSVersion();
int providerMajorVersion = metaData.getProviderMajorVersion();
int providerMinorVersion = metaData.getProviderMinorVersion();
String providerVersion = metaData.getProviderVersion();
Enumeration jmsxPropertyNames = metaData.getJMSXPropertyNames();
System.out.println("jmsMajorVersion = " + jmsMajorVersion);//jmsMajorVersion = 1
System.out.println("jmsMinorVersion = " + jmsMinorVersion);//jmsMinorVersion = 1
System.out.println("jmsProviderName = " + jmsProviderName);//jmsProviderName = ActiveMQ
System.out.println("jmsVersion = " + jmsVersion);//jmsVersion = 1.1
System.out.println("providerMajorVersion = " + providerMajorVersion);//providerMajorVersion = 5
System.out.println("providerMinorVersion = " + providerMinorVersion);//providerMinorVersion = 15
System.out.println("providerVersion = " + providerVersion);//providerVersion = 5.15.9
while (jmsxPropertyNames.hasMoreElements()){
Object o = jmsxPropertyNames.nextElement();
System.out.print(o+"\t");//JMSXUserID JMSXGroupID JMSXGroupSeq JMSXDeliveryCount JMSXProducerTXID
}
JMS 消息确认模式
1、JMS 消息只有在被确认之后,才认为已经成功的消费了。消息的成功消费通常包含三个阶段:客户接收消息、客户处理消息、消息被确认。消息被确认后,消费者无法再次读取,相反如果消息未被确认,则消费者可以一直读取。
2、createSession(boolean transacted, int acknowledgeMode)
transacted 为 true 时:表示事务性会话,此时当一个事务被提交时,确认自动发生:session.commit();//确认消息,告诉中间件,消息已经确认接收;与 acknowledgeMode 参数设置无关。
transacted 为 false 时:表示非事务性会话,此时消息何时被确认取决于创建会话时的应答模式(acknowledgeMode)。与 acknowledgeMode 参数有关。
3、应答/确认模式(acknowledgeMode) 在 transacted 为 false 时有效,可选值如下:
Session.AUTO_ACKNOWLEDGE | 自动确认。当客户成功的从 receive 方法,或者从 MessageListener.onMessage 方法返回的时候,会话自动确认客户收到的消息。 |
Session.CLIENT_ACKNOWLEDGE | 客户确认。客户通过调用消息(Session)的 acknowledge() 方法确认消息。client 确认模式是在会话层上进行,只要确认一个被消费的消息,将自动确认当前会话中所有已经被消费的消息。比如一个会话中消费者消费了10个消息,然后确认了第8个消息,那么这10个消息都会被确认。 |
Session.DUPS_ACKNOWLEDGE | 迟钝确认。该模式只是会话迟钝的确认消息的提交。如果 JMS Provider 失败,那么可能会导致一些重复的消息。如果是重复的消息,那么 JMS provider 必须把消息头的 JMSRedelivered 字段设置为 true。不常用。 |
4、下面使用环境:Maven 3.6.1 + Java JDK 1.8 + ActiveMQ 5.15.9 + IDEA 2018 稍作演示,Mava 管理的 Java SE 应用。pom.xml 文件如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.wmx</groupId>
<artifactId>activeMQ1</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
</dependencies>
</project>
生产者内容如下:
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.util.UUID;
/**
* 消息发送者
*/
@SuppressWarnings("all")
public class JmsSender {
public static void main(String[] args) {
Connection connection = null;
Session session = null;
try {
String brokerURL = "tcp://127.0.0.1:61616";//ActiveMQ 中间件连接地址
/**
* 创建 javax.jms.ConnectionFactory 连接工厂
* org.apache.activemq.ActiveMQConnectionFactory 中默认设置了大量的参数,还有几个重载的构造器可以选择
*/
ConnectionFactory mqConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
//如果 ActiveMQ 连不上,则抛异常:java.net.ConnectException: Connection refused: connect
connection = mqConnectionFactory.createConnection();//通过连接工厂获取连接 javax.jms.Connection
connection.start();//启动连接,同理还有 stop、close
/**
* Session createSession(boolean transacted, int acknowledgeMode) 创建会话
* transacted :表示是否开启事务
* acknowledgeMode:表示会话确认模式
*/
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
/**
* createQueue(String queueName):创建消息队列,指定队列名称,消费者可以根据队列名称获取消息
* Destination 目的地,重点,interface Queue extends Destination
*/
Destination destination = session.createQueue("queue-app");
//createProducer(Destination destination):根据目的地创建消息生产者
MessageProducer producer = session.createProducer(destination);
int massageTotal = 5;
for (int i = 0; i < massageTotal; i++) {
MapMessage mapMessage = session.createMapMessage();//创建一个 Map 消息
mapMessage.setStringProperty("token", UUID.randomUUID().toString());//设置属性。口令
mapMessage.setIntProperty("expires", 60000);////设置属性。口令失效时间
mapMessage.setString("key", "嫦娥 " + (i + 1) + " 号");//设置消息内容
producer.send(mapMessage);//生产者发送消息
session.commit();//会话提交,发送消息
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (session != null) {
try {
session.close();//关闭会话
} catch (JMSException e) {
e.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();//关闭连接
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
消费者内容如下:
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 消息消费者
*/
@SuppressWarnings("all")
public class JmsReceiver {
public static void main(String[] args) {
Connection connection = null;
Session session = null;
try {
String brokerURL = "tcp://127.0.0.1:61616";//ActiveMQ 中间件连接地址
/**
* 创建 javax.jms.ConnectionFactory 连接工厂
* org.apache.activemq.ActiveMQConnectionFactory 中默认设置了大量的参数,还有几个重载的构造器可以选择
*/
ConnectionFactory mqConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
//如果 ActiveMQ 连不上,则抛异常:java.net.ConnectException: Connection refused: connect
connection = mqConnectionFactory.createConnection();//通过连接工厂获取连接 javax.jms.Connection
connection.start();//启动连接,同理还有 stop、close
/**
* Session createSession(boolean transacted, int acknowledgeMode) 创建会话
* transacted :表示是否开启事务
* acknowledgeMode:表示会话确认模式
* 关闭事务,自动确认:表示 receive 方法、MessageListener.onMessage 方法返回后就已经确认了
*/
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
/**
* createQueue(String queueName):创建消息队列,指定队列名称,消费者可以根据队列名称获取消息
* Destination 目的地,重点,interface Queue extends Destination
*/
Destination destination = session.createQueue("queue-app");
//createProducer(Destination destination):根据目的地创建消息消费者
MessageConsumer consumer = session.createConsumer(destination);
int massageTotal = 5;
for (int i = 0; i < massageTotal; i++) {
Message message = consumer.receive();//receive方法会导致当前线程阻塞,指导接收到消息
if (message instanceof MapMessage) {
MapMessage mapMessage = (MapMessage) message;//消费者接收消息。因为对方发送的 Map 消息,所以可以强转
System.out.print((i + 1) + ":" + mapMessage.getStringProperty("token"));//获取属性
System.out.print(":" + mapMessage.getIntProperty("expires"));//获取属性
System.out.println(":" + mapMessage.getString("key"));//获取消息内容
mapMessage.acknowledge();//应答/确认消息
}
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (session != null) {
try {
session.close();//关闭会话
} catch (JMSException e) {
e.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();//关闭连接
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}