JMS即Java消息服务(Java MessageService)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。绝大多数MOM提供商都对JMS提供支持。
ActiveMQ是Apache出品的开源项目,它是JMS规范的一个实现。
消息中间件是利用高效可靠的消息传递机制在不同应用之间进行通信或者从一个系统传输数据到另外一个系统。并基于数据通信来进行分布式系统的集成。
Java消息服务应用程序结构支持两种模型:
一种是点对点的,即一个生产者和一个消费者一一对应;默认缓存到mq中
另一种是发布/ 订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。默认不缓存到mq中
JMS公共接口
JMS的基本构件:
生产者:MessageProducer 由Session 对象创建的用来发送消息的对象
消费者:MessageConsumer 由Session 对象创建的用来接收消息的对象
消息:Message jms消息包括消息头和消息体以及其它的扩展属性。
JMS定义的消息类型有TextMessage、MapMessage、BytesMessage、StreamMessage和ObjectMessage。
目的地:Destination,消息的目的地,是用来指定生产的消息的目标和它消费的消息的来源的对象。
消息队列:Queue 点对点的消息队列
消息主题:Topic 发布订阅的消息队列
JMS开发流程
1、生产者(producer)开发流程(ProducerTool.java):
1.1 创建Connection:
根据url,user和password创建一个jms Connection。
例如:
//1.创建连接工厂 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616"); //2.获取连接 Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); |
1.2 创建Session:
在connection的基础上创建一个session,同时设置是否支持事务和ACKNOWLEDGE标识。
//4.获取session (参数1:是否启动事务,参数2:消息确认模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
1.3 创建Destination对象:
需指定其对应的主题(subject)名称,producer和consumer将根据subject来发送/接收对应的消息。
//5.创建队列对象 Queue queue = session.createQueue("test-queue"); //6.创建消息生产者 MessageProducer producer = session.createProducer(queue); |
1.4 创建MessageProducer:
根据Destination创建MessageProducer对象,同时设置其持久模式。
1.5 发送消息到队列(Queue):
封装TextMessage消息,使用MessageProducer的send方法将消息发送出去。
//7.创建消息 TextMessage textMessage = session.createTextMessage("Good ActiveMQ"); //8.发送消息 producer.send(textMessage); //9.关闭资源 producer.close(); session.close(); connection.close(); |
2、消费者(consumer)开发流程(ConsumerTool.java):
2.1 实现MessageListener接口:
消费者类必须实现MessageListener接口,然后在onMessage()方法中监听消息的到达并处理。
2.2 创建Connection:
根据url,user和password创建一个jms Connection,如果是durable模式,还需要给connection设置一个clientId。
//1.创建连接工厂 ConnectionFactory connectionFactory=newActiveMQConnectionFactory("tcp://192.168.25.135:61616"); //2.获取连接 Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); |
//4.获取session (参数1:是否启动事务,参数2:消息确认模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建队列对象 Queue queue = session.createQueue("test-queue"); |
2.4创建replyProducer【可选】:
可以用来将消息处理结果发送给producer。
2.5 创建MessageConsumer:
根据Destination创建MessageConsumer对象。
2.6 消费message:
在onMessage()方法中接收producer发送过来的消息进行处理,并可以通过replyProducer反馈信息给producer
//6.创建消息消费 MessageConsumer consumer =session.createConsumer(queue);
//7.监听消息 consumer.setMessageListener(new MessageListener() { public voidonMessage(Message message) { TextMessagetextMessage=(TextMessage)message; try { System.out.println("接收到消息:"+textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //8.等待键盘输入 System.in.read(); //9.关闭资源 consumer.close(); session.close(); connection.close(); |
ActiveMQ与spring的整合
生产者配置文件
<context:component-scan base-package="cn.rongyue.demo"></context:component-scan> <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.25.135:61616"/> </bean> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory"/> </bean> <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <property name="connectionFactory" ref="connectionFactory"/> </bean> <!--这个是队列目的地,点对点的 文本信息--> <bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="queue_text"/> </bean> |
@Component publicclass QueueProducer { @Autowired private JmsTemplate jmsTemplate; @Autowired private Destination queueTextDestination; /** * 发送文本消息 * @param text */ publicvoid sendTextMessage(final String text){ jmsTemplate.send(queueTextDestination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { returnsession.createTextMessage(text); } }); } } |
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.25.135:61616"/> </bean> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory"/> </bean> <!--这个是队列目的地,点对点的 文本信息--> <bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="queue_text"/> </bean> <!-- 我的监听类 --> <bean id="myMessageListener" class="cn.itcast.demo.MyMessageListener"></bean> <!-- 消息监听容器 --> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueTextDestination" /> <property name="messageListener" ref="myMessageListener" /> </bean> |
publicclass MyMessageListener implements MessageListener { publicvoid onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收到消息:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } |
需要注意的问题
安装ActiveMQ前,要先安装JDK