面向消息的中间件
随着系统变大变复杂,一个大的系统,开始向着领域模型和微服务架构演进。而各个子系统之间的通信开始变得复杂、重要。不过总的来说还是分两类:同步通信和异步通信。对于同步通信,现在通俗的做法有REST、RPC、SOAP等。对于异步,现在用的最多就是面向消息的中间件(Message Oriented Middleware,MOM)。
我们知道异步通信一般有两个问题,一是发送方进程与消息服务端进程要都正常运行,二是只能点对点通信。MOM就很好的解决了这两个问题。
首先,发送方将消息发送给消息服务器,消息服务器将消息存放在若干队列中,当消息接收方接收消息时,再将消息转发给接收方。这种情况下,消息的发送和接收都是异步的,发送方无需等待即可发送,接收方接收消息的时候发送方可以不运行,即两方的运行状况不受彼此影响。另外,可以一对多通信,一个消息可以有多个接收者。
JMS概述
JMS全称Java消息服务(Java Message Service)。 官方文档给的解释是:Java消息服务(JMS) API是一个消息传递标准,允许基于Java平台Enterprise Edition (Java EE)的应用程序组件创建、发送、接收和读取消息。它支持松散耦合、可靠和异步的分布式通信。说白了,JMS就是定义一套规范,构建一个生产者-消费者的消息队列模型,只是没有实现。所以有Java消息服务是一个与具体平台无关的API的说法。目前绝大多数MOM提供商都对JMS提供了支持,实现JMS 接口的消息中间件称为JMS Provider,比如ActiveMQ、Tibco EMS。我用的是后者。
JMS功能
JMS规范包含点对点(Point to Point,PTP)和发布/订阅(Publish/Subscribe,pub/sub)两种消息模型,提供可靠消息传输、事务和消息过滤等机制。
JMS术语
JMS Provider:实现JMS 接口的消息中间件;PTP:Point to Point,即点对点的消息模型;
Pub/Sub:Publish/Subscribe,即发布/订阅的消息模型;
Queue:队列目标;
Topic:主题目标;
ConnectionFactory:连接工厂,JMS 用它创建连接;
Connection:JMS 客户端到JMS Provider 的连接;
Destination:消息的目的地;
Session:会话,一个发送或接收消息的线程;
MessageProducer:由Session 对象创建的用来发送消息的对象;
MessageConsumer:由Session 对象创建的用来接收消息的对象;
Acknowledge:签收,可靠传输的保证。
Transaction:事务。
JMS的收发
看了这么多JMS概念,先简单看下JMS的收发message的步骤。下面两张图,一张是Producer 向EMS发送message,另一个张是Consumer从EMS接收message的步骤。
发送消息:
接收消息:
这里要说一下,Session 是一个继承 Runnable 的接口。也就是说,Connection 通过创建新的线程来跟中间件进行交互。源码如下:
public interface Session extends Runnable { int AUTO_ACKNOWLEDGE = 1; int CLIENT_ACKNOWLEDGE = 2; int DUPS_OK_ACKNOWLEDGE = 3; int SESSION_TRANSACTED = 0; // 其他代码 void run(); // 其他代码 }
JMS消息模型
说完JMS 编程步骤,说下JMS的消息模型。JMS 支持两种截然不同的消息传送模型:PTP(即点对点模型)和Pub/Sub(即发布/订阅模型),分别称作:PTP Domain 和Pub/Sub Domain。
PTP(使用Queue即队列目标)
1.消息从一个生产者传送至一个消费者。
2.生产者将消息发送至目标队列,然后根据队列传送策略,从该队列将消息传送至与该队列连接的一个消费者,且一次只传送一条消息。
3.可以多个生产者向目标队列发送消息,也可以多个消费者连接并消费目标队列。但每条消息只能发送至、并由一个消费者成功消费。
4.如果没有消费者连接目标队列进行消费,队列将保留它收到的消息。
这里有个特点,即 一条消息只能被一个消费者消费。
Pub/Sub(使用Topic即主题目标)
1.消息从一个生产者传送至任意数量的消费者。
2.在此传送模型中,目标是一个主题。消息首先被传送至主题目标,然后传送至所有已订阅此主题的消费者。
3.向主题目标发送消息的生产者的数量没有限制,并且每个消息可以发送至任意数量的订阅消费者。
4.主题目标支持持久订阅的概念。意思就是说消费者已向主题目标进行订阅注册,但当消息被发送到主题时,此消费者可以是不工作。当此消费者开始恢复工作时,它依旧能接收此消息。因此,当没有已经向主题目标注册的消费者时,主题才不保留其接收到的消息。
它的特点是, 一对多进行消息广播,而且是可靠的。
JMS消息
上面说到,生产者-消费者之间传递的是消息,但这个消息是什么呢?JMS定义了消息的结构,它由以下几部分组成:消息头、属性和消息体。消息头(Header)
消息头主要包含一些识别消息和路由信息,消息头包含一些标准的属性如:消息发送哪JMSDestination,消息唯一的Id JMSMessageID等。具体可以看 Message 接口。
public interface Message { // ... }
属性
消息头的补充。其中一部分是JMS定义的标准属性(JMS开头)。当然我们也可以为自定义JMS属性。
这里注意:Message 只定义了消息头和属性,没定义消息体。
消息体(Body)
消息体即我们要在生产者和消费者之间要传递的信息。JMS 帮我们定义了五种消息体格式,基本兼容我们要传递的消息格式。这五种消息体以interface的方式定义出。
1. 用的最多文本消息,传递java.lang.String对象,常见的是把xml文件内容当成文本传输。
public interface TextMessage extends Message { void setText(String var1) throws JMSException; String getText() throws JMSException; }
2. 传递键值对,传递 key是String的键值对的集合,值类型可以是Java任何基本类型
public interface MapMessage extends Message { // ... }
3. 直接传递字节流,这个适合网络编程。
public interface BytesMessage extends Message { // ... }4. Java中的输入输出流
public interface StreamMessage extends Message { // ... }5. 传递Java可序列化对象(Serializable)
public interface ObjectMessage extends Message { void setObject(Serializable var1) throws JMSException; Serializable getObject() throws JMSException; }
下面来分别说明下两种消息模型的使用。
PTP模型
PTP(Point-to-Point)模型是基于队列实现的。首先,生产者发消息到队列,然后消费者从队列接收消息。这样队列的存在就让消息变成异步传输,另外队列相当于缓存了消息,这在一定程度上缓解了生产者和消费者之间的吞吐量的差距。再者,JMS Provider 也提供了队列的管理API,如创建、删除、重新发送等。如下:
这里,如果有多个Consumer,则一条消息被哪个Consumer消费是随机的。
Pub/Sub模型
Pub/Sub模型定义了如何向一个内容节点发布和订阅消息,这些节点被称作主题(topic)。这点跟设计模式中的发布/订阅模式类似。
消息发布者(publisher)发布消息到主题,主题是消息的传输中介,消息订阅者(subscribe) 从主题订阅消息。主题的存在是对消息订阅者和消息发布者的一种解耦,这点与发布/订阅模式不同。
消息订阅
这里先说下说下订阅(subscription)。消息订阅分两种:非持久订阅(non-durable subscription)和持久订阅(durable subscrip-tion)。一般可靠传输中都选择后者。非持久订阅只有当订阅者处于活跃状态并与主题保持连接时才能收到主题的消息。相反如果订阅者宕机处于离线状态时,那么改订阅者永远不会收到离线状态期间的消息。
持久订阅不同,订阅者会向JMS Provider 注册一个识别自己的ID,在订阅者处于离线期间,JMS Provider 会为这个ID 保存所有离线期间主题收到的消息。然后当订阅者下次连接到JMS Provider时,就可以根据自己注册的ID 得到离线期间未收到的消息。s实际使用时,用哪种订阅方式取决于对可靠性(Reliability)的要求。如果所有的消息必须被接收,用持久订阅模式。如果能容忍丢失消息,则用非持久订阅模式。毕竟JMS Provider保持订阅者离线未收的消息也是需要一定资源消耗的。订阅者越多,离线时间越长,消耗的越多。
Tibco EMS
Tibco EMS全称TIBCO Enterprise Message Service,是采用C语言编写的一个消息服务器产品。完全支持JMS的通讯协议,而且在运行速度和消息吞吐量上非常出色。现在我用的是8.3,有需要的可以在官方下载安装。安装完如下图:
Start EMS Administration Tool:EMS管理工具,可以连接远程TCP连接。
Start EMS Multicast Daemon:一个用于多播的进程。熟悉网络的知道,多播是网络节点通讯中的概念。官方说法如下:
87. One multicast daemon (tibemsmcd) can receive messages from multiple servers, and can deliver messages to multiple clients.
解释:一个多播守护进程(tibemsmcd)可以接收来自多个服务器的消息,并且可以将消息传递给多个客户端。
也就是说在本地配置好后,该进程负责这些节点的组播。
Start EMS Server:这个是启动一个本地自带的EMS Server,默认地址是:tcp://localhost:7222。
启动EMS Administration Tool后,即可使用 admin 进行登录,密码默认为空。自己可以根据需要进行更改,命令如下:
>set server authorization=enabled >set password admin admin然后就可以根据需要创建我们需要的Topic和Queue了。
// 创建用户分组 >create group selfgroup // 创建用户 >create user selfuser password=selfuser // 为用户分组添加成员 >add member selfgroup selfuser // 创建一个queue >create queue XX.QQ // 将这个queue的各种权限赋给某个用户 >grant queue XX.QQ selfuser receive,send,browse
启动本地EMS Server后,就可以用客户端去连接上边的queue了。也可以自己查看一些API文档。
如果你是本地的EMS Server,创建完Queue后,就会发现新建的Queue已经在本地的配置文件里了。(Topic 是topics.conf)
C:\ProgramData\TIBCO_HOME\tibco\cfgmgmt\ems\data\queues.conf
EMS使用
随着系统变得复杂,子系统间的消息传递、转发、重新构建等也变得更复杂了。现在使用时都是将Topic、Queue和消息服务中间件一起搭配使用,比如Tibco RV(TIBCO Rendezvous)。一个简单的消息服务就类似下图,可能实际使用中会更加复杂。
Route
这里注意两个概念,Route即路由。Tibco EMS允许我们将消息从一个Server路由到其他服务器。
路由的时候有两点需要注意:
1. 一个路由是发生在两个Server之间的,且两个Server上必须具有相同的Topic、Queue名字。当然也要是全局的。
2. 路由是双向的。
如下图,两个Server上,两个Queue的名字必须一样才能进行路由。Topic也是如此。
如下命令即可创建一个Route
create route B url=tcp://B:7222 zone_name=Z1 zone_type=1hop
B:路由名字
url:在当前server与指定的server之间创建路由。
zone_name:所属路由区域。
zone_type:取值1hop或mhop。即单跳或者多跳。
配置:C:\ProgramData\TIBCO_HOME\tibco\cfgmgmt\ems\data\tibemsd.conf 中,要将routing enabled且server名字要跟Route名字一致。
server = kaka password = $man$BwhuGWZs7m2vm0tkJDYEW47sA8V routing = enabled
Selector
Route是用于两个Server之间,与Route不同,官方说法:Selector应用于同一个Server上的Bridge上。不过我把Selector加在Route上也是起作用的。比如一个Queue作为一个Topic的Consumer即时一个Bridge。Selector就是在这个Bridge上,帮助Queue过滤掉一些不需要的消息。
selector可以作用在消息的头或者属性上。
加在Bridge
首先创建一个Topic:
create topic XX.TPC
加上上边创建的Queue XX.QQ,现在我们在它们俩之间创建一个bridge,同时进行selector。
create bridge source=topic:XX.TPC target=queue:XX.QQ selector="prop='1'"
个人觉得selector的语法有点类似 SQL的 where 字句。这里prop是消息的一个属性,因此,这条bridge只会传递prop='1' 的消息给Queue。
加在Consumer
MessageConsumer consumer = session.createConsumer(queue, "prop = '1'");这里创建一个带有selector的consumer,它只消费 prop='1' 的消息。
加在Tibco RV
其实Tibco也提供了selector功能:
加在Route上
setprop route LAB07 outgoing_topic="XX.TPC" selector="CARRIER_ID='1'"此时,会过滤掉该路由上不满足条件的消息。
Tibco JMS
Tibco实现了JMS Provider即Tibco EMS,然后也为JMS客户端的编程提供实现,即tibjms.jar。目前我用到的是7.0 版本,是基于JMS 1.0实现的。因此,想使用JMS 2.0 的就需要更高的版本了。
Tibco JMS的编程跟上边提到的编程步骤一样。如下给个例子:
public static void sendToQueue(List<String> messageList) throws Exception { QueueConnection connection = null; QueueConnectionFactory factory = new TibjmsQueueConnectionFactory(MainProcess.EMS_URL); connection = factory.createQueueConnection("xxxx", "xxxx"); QueueSession session = connection.createQueueSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); Queue receiverQueue = session.createQueue(MainProcess.QUEUE_NAME); QueueSender sender = session.createSender(receiverQueue); for(String messageBody : messageList) { Message message = session.createTextMessage(messageBody); message.setStringProperty("MY_SELF_HEADER", "MY_SELF_HEADER"); sender.send(message); } session.close(); connection.close(); // System.out.println("Send successfully"); }
打开上边一些类的源码就会发现,它们都是继承或者实现 javax.jms的。
多Session
我们知道App连接queue,之后就能获取消息了。不过有时候app资源足够,我们可以更快的消耗这些消息。怎么做呢?还记得上边提到说Session 是一个继承 Runnable的的 interface。也就是说JMS规范本身就是支持多线程的。因此我们在一个Connection上的基础上创建多个session即可提高queue里面消息的处理速度。
public static void main(String[] args) throws Exception { QueueConnection connection = null; QueueConnectionFactory factory = new TibjmsQueueConnectionFactory("tcp://localhost:7222"); connection = factory.createQueueConnection("admin", ""); QueueSession session1 = connection.createQueueSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); Queue receiverQueue1 = session1.createQueue("XX.QQ"); QueueReceiver receiver1 = session1.createReceiver(receiverQueue1); QueueSession session2 = connection.createQueueSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); Queue receiverQueue2 = session2.createQueue("XX.QQ"); QueueReceiver receiver2 = session2.createReceiver(receiverQueue2); TextMessage message1 = (TextMessage) receiver1.receive(); TextMessage message2 = (TextMessage) receiver2.receive(); }