初识ActiveMQ
要了解ActiveMQ,先得了解JMS的概念,JMS(JavaMessageService)是Java提供的消息中间件的一组规范接口,ActiveMQ则是Apache遵循JMS规范实现的消息中间件。
一、JMS的优点
早在没有JMS的时候,应用系统通信存在如下缺陷:
- 应用系统通信具有同步性,发送方需要等待接收方处理才能继续进行;
- 发送方接收方的生命周期相互依赖,如果接收方发生故障,发送方也会异常报错;
- 只能点对点通信,无法实现一对多的通信;
JMS很好地解决了以上的问题:
- JMS提供了一个消息服务器,发送方将消息发送给消息服务器,消息服务器将其存放于队列/主题中,等到接收方合适的时候,消息服务器将消息转发给接收方,于是,发送与接收就是异步的了。
- 消息发送方和消息接受方都面向于消息服务器,根本不需要理会对方的存在,也就不存在生命周期的耦合。
- 同时,JMS提供发布订阅模式,能够实现一对多的消息发送。
二、ActiveMQ的实现
ActiveMQ是JMS的实现,所以自然需要遵循JMS的规范,我们来简单了解下JMS定义的内容:
Provider/MessageProvider | 生产者/消息生产者 |
Consumer/MessageConsumer | 消费者/消息消费者 |
PTP/PointToPoint | 点对点通信模型 |
Pub/Sub | 发布订阅通信模型 |
Queue | 队列,目标类型之一,与PTP相对应 |
Topic | 主题,另一目标类型,与Pub/Sub相对应 |
Connection | JMS客户端与JMS消息中间件的连接 |
ConnectionFactory | 连接工厂 |
Session | Connection创建会话,实际上就是发送接收消息的一个线程 |
Destination | 消息目的地,指向某一个队列或者某一个主题,由Session创建 |
三、ActiveMQ的HelloWorld实例
接下来我们按部就班地写一个HelloWorld的实例。
ActiveMQ的官网地址是http://activemq.apache.org/,此处我们先使用其中的windows版本,目录结构如下:
bin中包含了启动文件,activemq.bat,只要配置了JAVA_HOME就可以直接启动。
config存放了jetty.xml和activemq.xml,可以配置有关ActiveMQ的配置信息,如ActiveMQ控制台登录的用户名和密码,和ActiveMQ的持久化方式。
再如图启动bat文件:
如此,我们就可以开始敲Java代码了(为了保证代码的完整性,笔者将在注释中解释代码):
package lfq.ima; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class Sender { public static void main(String args[]) throws Exception { //第一步:创建连接工厂,其中的构造方法传入用户名密码,以及消息中间件的URL和通信端口号 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "admin","admin","tcp://localhost:61616" ); //第二步:创建并打开连接 Connection connection = connectionFactory.createConnection(); connection.start(); //第三步:创建会话,其中第一个参数为是否开启事务 Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE); //第四步:指定一个“目的地”,此处目的地为队列,传入一个String类型的参数作为其名字 Destination destination = session.createQueue("quque1"); //第五步:会话根据目的地创建消息提供者 MessageProducer messageProducer = session.createProducer(destination); messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //第六步:定义消息类型 TextMessage textMessage = session.createTextMessage(); for (int i = 0;i<1000;i++) { textMessage.setText("helloworld!"+i); messageProducer.send(textMessage); } //最后:关闭连接 if(connection!=null) { connection.close(); } }}
上文在创建Session时,第一个参数为是否开启事务,第二个参数为签收模式,此处写的是自动签收,但是一共定义有三种签收模式。
何为签收,就是消费者在接到消息后,告诉消息中间件已经收到消息,消息则失效。
自动签收意味着消费者在收到消息后,即调用receive(),便自动告诉中间件,已签收。
第二个为客户端显示的调用acknowledge()方法确定签收。
第三个为签不签收无所谓,只要客户端能忍耐接收到重复消息,或者已经消费的消息被其他消费者消费就可以。
通常我们会选择第二个模式,因为有些时候收到消息不代表着结束,只有当一段业务逻辑真的结束以后,再签收,才是最可靠的。
生产者的send方法有多个重载,用于设置是否持久化消息,以及消息的失效时间,还有消息的优先级(消息的优先级只是理论的概念,并不能保证优先级高的消息优先被消费者消费)