JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件;由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行。
下载Apache ActiveMQ服务
下载后将zip 压缩包解压,将这个解压缩后的文件夹拷贝到指定位置,我本人的服务都在c盘下的service文件夹 找到C:\service\apache-activemq\bin\win64下的activemq.bat文件双击即可启动activemq服务器,如下图
说明已经启动完毕,可以登录主页面查看ActiveMQ的相关内容,如果对话框一闪而过则需要配置环境变量 打开网页,在地址栏输入http://127.0.0.1:8161 用户名,密码都是admin 并将apache-activemq目录下的activemq-all.5.15.10.jar拷贝到工程中lib文件夹下,即可使用ActiveMQ的所有功能。 |
ActiveMQ点对点消息实现:使用方式首先启动消息生产者发送消息,然后启动消费者接收消息
创建JMSProducer.java(消息生产者)
|
创建JMSConsumer.java(消息消费者)
package com.kingsoft.activemq.unit01; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class JMSConsumer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args) { ConnectionFactory connectionFactory; //连接工厂 Connection connection = null; // 连接 Session session; // 会话 接受或者发送消息的线程 Destination destination; // 消息的目的地 MessageConsumer consumer; // 消息的消费者 // 实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory( JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL); try { // 通过连接工厂获取连接 connection = connectionFactory.createConnection(); // 启动连接 connection.start(); // 创建Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建连接的消息队列 destination = session.createQueue("first-quque1"); // 创建消息消费者 consumer = session.createConsumer(destination); // 注册消息监听 consumer.setMessageListener(new Listener()); } catch (JMSException e) { e.printStackTrace(); } } } |
创建消息监听Listener.java
package com.kingsoft.activemq.unit01; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * * @ClassName: Listener * @Description: TODO (消息监听) */ public class Listener implements MessageListener{ @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage)message; try { System.out.println("收到的消息:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } |
ActiveMQ发布/订阅模式消息实现:使用方式首先启动消费者订阅,然后执行消息生产者(类似订报纸)
消费者监听Listener1.java
package com.kingsoft.activemq.unit02; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class Listener1 implements MessageListener{ @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage)message; try { System.out.println("订阅者一收到的消息:"+textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } |
消费者监听Listener2.java
package com.kingsoft.activemq.unit02; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class Listener2 implements MessageListener{ @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage)message; try { System.out.println("订阅者二收到的消息:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } |
消息发布者JMSProducer.java
package com.kingsoft.activemq.unit02; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class JMSProducer { private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的用户名 private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认连接密码 private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认接地址 private static final int SENDNUM=10; // 发送的消息数量
/** * * <b>Description</b><br> * (发送消息) * <br> * -------------------------------------------------<br> * <b>A我去 2019年10月12日 上午12:03:42</b> */ public static void sendMessage(Session session,MessageProducer messageProducer)throws Exception{ for(int i=0;i<JMSProducer.SENDNUM;i++){ TextMessage message=session.createTextMessage("ActiveMQ 发送的消息"+i); System.out.println("发送消息:"+"ActiveMQ 发布的消息"+i); messageProducer.send(message); } }
public static void main(String[] args) { ConnectionFactory connectionFactory; // 连接工厂 Connection connection = null; // 连接 Session session; // 会话 接受或者发送消息的线程 Destination destination; // 消息的目的地 MessageProducer producer; // 消息生产者
//1.实例化连接工厂 connectionFactory=new ActiveMQConnectionFactory( JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL); try { //2.通过连接工厂获取连接 connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.创建Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); destination = session.createTopic("First-Topic"); producer = session.createProducer(destination); sendMessage(session, producer); session.commit(); } catch (JMSException e) { e.printStackTrace(); }catch(Exception e) { e.printStackTrace(); }finally { if(connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } } |
消息订阅者一JMSConsumer1.java
package com.kingsoft.activemq.unit02; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class JMSConsumer1 { private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; //用户名 private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; //密码 private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; //连接地址
public static void main(String[] args) { ConnectionFactory connectionFactory; // 连接工厂 Connection connection = null; // 连接 Session session; // 会话 接受或者发送消息的线程 Destination destination; // 消息的目的地 MessageConsumer consumer; // 消息的消费者
// 实例化连接工厂 connectionFactory=new ActiveMQConnectionFactory( JMSConsumer1.USERNAME, JMSConsumer1.PASSWORD, JMSConsumer1.BROKEURL); try { connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession( Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); destination = session.createTopic("First-Topic"); consumer = session.createConsumer(destination); consumer.setMessageListener(new Listener1()); } catch (JMSException e) { e.printStackTrace(); } } } |
消息订阅者二JMSConsumer2.java
package com.kingsoft.activemq.unit02; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class JMSConsumer2 { private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; //用户名 private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; //连接密码 private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; //连接地址
public static void main(String[] args) { ConnectionFactory connectionFactory; // 连接工厂 Connection connection = null; // 连接 Session session; // 会话 接受或者发送消息的线程 Destination destination; // 消息的目的地 MessageConsumer consumer; // 消息的消费者
// 实例化连接工厂 connectionFactory=new ActiveMQConnectionFactory( JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD, JMSConsumer2.BROKEURL); try { connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession( Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); destination = session.createTopic("First-Topic"); consumer = session.createConsumer(destination); consumer.setMessageListener(new Listener2()); } catch (JMSException e) { e.printStackTrace(); } } } |
一定要首先启动两个订阅者,然后启动发布者,如果没有订购报纸,所以没有收到报纸是一个道理。