JMS
什么是JMS
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持 。
JMS的接口在import javax.jms
中。而我们常用的ActiveMQ、RabbitMQ都是JMS接口的实现。
JMS数据交互的两种方式
方式一(点对点)
点对点是被动
- 每个消息只有一个接受者(自己测试了一下,可以有多个接受者,但是当有多个接收者时,每个接收者只能获取随机的几条信息)
- 消息发送者和消息接受者并没有时间依赖性。
- 当消息发送者发送消息的时候,无论接收者程序在不在运行,都能获取到消息;
- 当接收者收到消息的时候,会发送确认收到通知(acknowledgement)
- 点对点消息模型图:
方式二(发布/订阅)
- 一个消息可以传递给多个订阅者
- 发布者和订阅者有时间依赖性,只有当客户端创建订阅后才能接受消息,且订阅者需一直保持活动状态以接收消息。
- 为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。
- 发布/订阅消息模型图:
ActiveMQ
什么是ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
下载及启动
下载主页:http://activemq.apache.org/
ActiveMQ 服务启动地址:
http://127.0.0.1:8161/admin/
用户名/密码 admin/admin
ActiveMQ的两种方式
ActiveMQ的点对点(一对一)
以下是创建会话的一些参数说明
Session.AUTO_ACKNOWLEDGE。当客户成功的从receive 方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。
Session.CLIENT_ACKNOWLEDGE。 客户通过消息的 acknowledge 方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消费的消息。例如,如果一个消息消费者消费了 10 个消息,然后确认第 5个消息,那么所有 10 个消息都被确认。
Session.DUPS_ACKNOWLEDGE。 该选择只是会话迟钝的确认消息的提交。如果 JMS provider 失败,那么可能会导致一些重复的消息。如果是重复的消息,那么 JMS provider 必须把消息头的 JMSRedelivered 字段设置为 true。
生产者模型
public class JSMProduct {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) {
ConnectionFactory connectionFactory = null;
Connection connection = null;
Session session = null;
Destination destination = null;
MessageProducer messageProducer = null;
connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,URL);
try{
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("QQ消息");
messageProducer = session.createProducer(destination);
String text = "1870000000";
for(int i = 0;i<10;i++){
TextMessage msg = session.createTextMessage(text+i);
System.out.println(text+i);
messageProducer.send(destination,msg);
}
session.commit();
}catch (Exception e){
e.printStackTrace();
}finally {
try {
if(connection != null){
session.close();
messageProducer.close();
connection.close();
}
}catch (Exception e){
e.printStackTrace();
}
}
}
}
消费者模型
public class JSMConsume {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) {
ConnectionFactory connectionFactory = null;
Connection connection = null;
Session session = null;
Destination destination = null;
MessageConsumer messageConsumer= null;
connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, URL);
try{
connection = connectionFactory.createConnection();
connection.start();
session =connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("QQ消息");
messageConsumer = session.createConsumer(destination);
for (int i = 0; i < 5; i++) {
TextMessage message = (TextMessage) messageConsumer.receive();
if (message != null){
System.out.println(message.getText());
}
}
session.commit();
}catch (Exception e){
e.printStackTrace();
}
}
}
以上生产者消费者采用一对一的方式,即消费者开启时才进行消费,而这样就没有达到该模型的最终效果,因此消费者采用另外一种方式进行,我们创建一个监听类实现消息的监听,以下是实现代码:
public class MyMessageListen implements MessageListener {
@Override
public void onMessage(Message message) {
for (int i = 0; i < 5; i++) {
if (message != null){
try {
TextMessage msg = (TextMessage) message;
System.out.println(msg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
注意我们监听消息的时候不能关闭,也就是说不能在finally中关闭连接会话等,因为监听也是一个线程。
public class JSMConsumeListen {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) {
ConnectionFactory connectionFactory = null;
Connection connection = null;
Session session = null;
Destination destination = null;
MessageConsumer messageConsumer= null;
connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, URL);
try{
connection = connectionFactory.createConnection();
connection.start();
session =connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("QQ消息");
messageConsumer = session.createConsumer(destination);
//设置监听
messageConsumer.setMessageListener(new MyMessageListen());
session.commit();
}catch (Exception e){
e.printStackTrace();
}
}
}
ActiveMQ发布和订阅
订阅和发布的方式上面已经解释了,大家应该关注过一些事物,例如大家熟悉的微博,当关注某个事物后,你才会收到该事物的一些消息,也就是说,我们的消费者需要先监听消息,当有消息产生时,消费者才能消费,下面就直接上代码
public class JSMConsume1 {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) {
ConnectionFactory connectionFactory = null;
Connection connection = null;
Session session = null;
Destination destination = null;
MessageConsumer messageConsumer= null;
connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, URL);
try{
connection = connectionFactory.createConnection();
connection.start();
session =connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
//注意:这里修改为createTopic
destination = session.createTopic("QQ消息");
messageConsumer = session.createConsumer(destination);
messageConsumer.setMessageListener(new MyMessageListen1());
session.commit();
}catch (Exception e){
e.printStackTrace();
}
}
}
class MyMessageListen1 implements MessageListener {
@Override
public void onMessage(Message message) {
for (int i = 0; i < 5; i++) {
if (message != null){
try {
TextMessage msg = (TextMessage) message;
System.out.println(msg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
以上代码为消费者的实现代码,生产者的代码和点对点基本一样,我们只需要将对应的方式修改为destination = session.createTopic("QQ消息");
即可。
以上是我初学记录下来的学习过程,如有错误请及时提出