Day174.JMS规范、JMS的可靠性【重要】、ActiveMQ的broker -ActiveMQ.md

ActiveMQ

一、JMS规范

复习一下:什么是JavaEE?面试题

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9hA4hIkQ-1611580956486)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210125160827111.png)]

各个消息队列的比较

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eZEXSmh6-1611580956489)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210125161818798.png)]

1.1 JMS是什么

什么是Java消息服务

Java消息服务指的是两个应用程序之间进行异步通信的API,它为标准协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持Java应用程序开发。在JavaEE中,当两个应用程序使用JMS进行通信时,它们之间不是直接相连的,而是通过一个共同的消息收发服务组件关联起来以达到解耦/异步削峰的效果。

在这里插入图片描述

1.2 消息头

JMS的消息头有哪些属性:

JMSDestination:消息目的地
JMSDeliveryMode:消息持久化模式
JMSExpiration:消息过期时间
JMSPriority:消息的优先级
JMSMessageID:消息的唯一标识符。后面我们会介绍如何解决幂等性。

说明: 消息的生产者可以set这些属性,消息的消费者可以get这些属性。

这些属性在send方法里面也可以设置。

public class JmsProduce_topic {
    
    

    public static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
    public static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) throws  Exception{
    
    
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);
        MessageProducer messageProducer = session.createProducer(topic);

        for (int i = 1; i < 4 ; i++) {
    
    
            TextMessage textMessage = session.createTextMessage("topic_name--" + i);
            // 这里可以指定每个消息的目的地
            textMessage.setJMSDestination(topic);
            /*
            持久模式和非持久模式。
            一条持久性的消息:应该被传送“一次仅仅一次”,这就意味着如果JMS提供者出现故障,该消息并不会丢失,它会在服务器恢复之后再次传递。
            一条非持久的消息:最多会传递一次,这意味着服务器出现故障,该消息将会永远丢失。
             */
            textMessage.setJMSDeliveryMode(0);
            /*
            可以设置消息在一定时间后过期,默认是永不过期。
            消息过期时间,等于Destination的send方法中的timeToLive值加上发送时刻的GMT时间值。
            如果timeToLive值等于0,则JMSExpiration被设为0,表示该消息永不过期。
            如果发送后,在消息过期时间之后还没有被发送到目的地,则该消息被清除。
             */
            textMessage.setJMSExpiration(1000);
            /*  消息优先级,从0-9十个级别,0-4是普通消息5-9是加急消息。
            JMS不要求MQ严格按照这十个优先级发送消息但必须保证加急消息要先于普通消息到达。默认是4级。
             */
            textMessage.setJMSPriority(10);
            // 唯一标识每个消息的标识。MQ会给我们默认生成一个,我们也可以自己指定。
            textMessage.setJMSMessageID("ABCD");
            // 上面有些属性在send方法里也能设置
            messageProducer.send(textMessage);
        }
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("  **** TOPIC_NAME消息发送到MQ完成 ****");
    }
}


1.3 消息体

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XCjdC53X-1611580956498)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210125165724458.png)]

扫描二维码关注公众号,回复: 12647498 查看本文章

消息生产者

public class JmsProduce_topic {
    
    

    public static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
    public static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) throws  Exception{
    
    
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
         javax.jms.Connection connection = activeMQConnectionFactory.createConnection();
         connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);
        MessageProducer messageProducer = session.createProducer(topic);
=====================================================================
        for (int i = 1; i < 4 ; i++) {
    
    
// 发送TextMessage消息体
            TextMessage textMessage = session.createTextMessage("topic_name--" + i);
            messageProducer.send(textMessage);
            // 发送MapMessage  消息体。set方法: 添加,get方式:获取
            MapMessage  mapMessage = session.createMapMessage();
            mapMessage.setString("name", "张三"+i);
            mapMessage.setInt("age", 18+i);
            messageProducer.send(mapMessage);
        }
======================================================================        
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("  **** TOPIC_NAME消息发送到MQ完成 ****");
    }
}

消息消费者

生产者发送的什么类型的消息,消费者就需要判断然后用相应的类型接收

public class JmsConsummer_topic {
    
    
    public static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
    public static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) throws Exception{
    
    
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        javax.jms.Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);
        MessageConsumer messageConsumer = session.createConsumer(topic);
=============================================================
        messageConsumer.setMessageListener( (message) -> {
    
    
 			// 判断消息是哪种类型之后,再强转。
            if (null != message  && message instanceof TextMessage){
    
    
                   TextMessage textMessage = (TextMessage)message;
                    try {
    
    
                      System.out.println("****消费者text的消息:"+textMessage.getText());
                    }catch (JMSException e) {
    
    
                    }
                }
            if (null != message  && message instanceof MapMessage){
    
    
                MapMessage mapMessage = (MapMessage)message;
                try {
    
    
                    System.out.println("****消费者的map消息:"+mapMessage.getString("name"));
                    System.out.println("****消费者的map消息:"+mapMessage.getInt("age"));
                }catch (JMSException e) {
    
    
                }
            }

        });
        System.in.read();
==================================================================
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

1.4 消息属性

如果需要除消息头字段之外的值,那么可以使用消息属性。他是识别/去重/重点标注等操作,非常有用的方法。

他们是以属性名和属性值对的形式制定的。可以将属性是为消息头得扩展,属性指定一些消息头没有包括的附加信息,比如可以在属性里指定消息选择器。消息的属性就像可以分配给一条消息的附加消息头一样。它们允许开发者添加有关消息的不透明附加信息。它们还用于暴露消息选择器在消息过滤时使用的数据。

下图是设置消息属性的API:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-uZUZrdXc-1611580956500)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210125171205887.png)]

消息生产者

public class JmsProduce_topic {
    
    

    public static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
    public static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) throws  Exception{
    
    
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);
        MessageProducer messageProducer = session.createProducer(topic);
============================================================
        for (int i = 1; i < 4 ; i++) {
    
    
            TextMessage textMessage = session.createTextMessage("topic_name--" + i);
            // 调用Message的set*Property()方法,就能设置消息属性。根据value的数据类型的不同,有相应的API。
            textMessage.setStringProperty("From","[email protected]");
            textMessage.setByteProperty("Spec", (byte) 1);
            textMessage.setBooleanProperty("Invalide",true);
            messageProducer.send(textMessage);
        }
==============================================================        
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("  **** TOPIC_NAME消息发送到MQ完成 ****");
    }
}

消息消费者

public class JmsConsummer_topic {
    
    
    public static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
    public static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) throws Exception{
    
    
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        javax.jms.Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);
        MessageConsumer messageConsumer = session.createConsumer(topic);      
=====================================================================
        messageConsumer.setMessageListener( (message) -> {
    
    
            if (null != message  && message instanceof TextMessage){
    
    
                    TextMessage textMessage = (TextMessage)message;
                    try {
    
    
                      System.out.println("消息体:"+textMessage.getText());
                      System.out.println("消息属性:"+textMessage.getStringProperty("From"));
                      System.out.println("消息属性:"+textMessage.getByteProperty("Spec"));
                      System.out.println("消息属性:"+textMessage.getBooleanProperty("Invalide"));
                    }catch (JMSException e) {
    
    
                    }
                }
        });
        System.in.read();
====================================================================        
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

二、JMS的可靠性【重要】

1、消息的持久化

什么是持久化消息?

保证消息只被传送一次和成功使用一次。在持久性消息传送至目标时,消息服务将其放入持久性数据存储。如果消息服务由于某种原因导致失败,它可以恢复此消息并将此消息传送至相应的消费者。虽然这样增加了消息传送的开销,但却增加了可靠性。

我的理解:在消息生产者将消息成功发送给MQ消息中间件之后。无论是出现任何问题,如:MQ服务器宕机、消费者掉线等。都保证(topic要之前注册过,queue不用)消息消费者,能够成功消费消息。如果消息生产者发送消息就失败了,那么消费者也不会消费到该消息。

1)queue消息队列【非持久和持久】

queue非持久,当服务器宕机,消息不存在(消息丢失了)。即便是非持久,消费者在不在线的话,消息也不会丢失,等待消费者在线,还是能够收到消息的。

queue持久化,当服务器宕机,消息依然存在。queue消息默认是持久化的

持久化消息,保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。

可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wxDmYzqv-1611580956501)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210125173413569.png)]

非持久化的消费者,和之前的代码一样。下面演示非持久化的生产者。下面除灰色背景代码外,其他都和之前一样。

->流程:

1、建立ActiveMQConnectionFactory工厂对象

2、通过工厂对象创建连接对象

3、连接MQ

4、通过连接对象创建session会话对象

5、通过session会话对象创建Queue队列对象

6、通过session会话对象创建生产者对象,传入Queue队列对象

7、设置生产者对象为非持久化【Queue默认持久化】

8、通过session对象创建TextMessage类型对象

9、通过生产者调用send方法,传入TextMessage类型对象

10、关闭资源

运行结果证明:当生产者成功发布消息之后,MQ服务端宕机重启,消息生产者就收不到该消息了

public class JmsProduce {
    
    
    public static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
    public static final String QUEUE_NAME = "jdbc01";

    public static void main(String[] args) throws  Exception{
    
    
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(QUEUE_NAME);
        MessageProducer messageProducer = session.createProducer(queue);
====================================================================
        // 非持久化
        messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
=====================================================================
        for (int i = 1; i < 4 ; i++) {
    
    
            TextMessage textMessage = session.createTextMessage("---MessageListener---" + i);
            messageProducer.send(textMessage);
        }
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("  **** 消息发送到MQ完成 ****");
    }
}

持久化的消费者,和之前的代码一样。

运行结果证明:当生产者成功发布消息之后,MQ服务端宕机重启,消息生产者仍然能够收到该消息

public class JmsProduce {
    
    
    public static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
    public static final String QUEUE_NAME = "jdbc01";

    public static void main(String[] args) throws  Exception{
    
    
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(QUEUE_NAME);
        MessageProducer messageProducer = session.createProducer(queue);
        messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
        for (int i = 1; i < 4 ; i++) {
    
    
            TextMessage textMessage = session.createTextMessage("---MessageListener---" + i);
            messageProducer.send(textMessage);
        }
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("  **** 消息发送到MQ完成 ****");
    }
}

整体结论:

queue消息默认是持久化的。

—当是持久化时,MQ宕机后重新连上,之前的消息还会存在;

—当不是持久化时,MQ宕机后重新连上,之前的消息将消失


2)topic消息主题【持久化】

topic默认就是非持久化的,因为生产者生产消息时,消费者也要在线,这样消费者才能消费到消息。

topic消息持久化,只要消费者向MQ服务器注册过,所有生产者发布成功的消息,该消费者都能收到,不管是MQ服务器宕机还是消费者不在线。

注意

  1. 一定要先运行一次消费者,等于向MQ注册,类似我订阅了这个主题。

  2. 然后再运行生产者发送消息。

  3. 之后无论消费者是否在线,都会收到消息。如果不在线的话,下次连接的时候,会把没有收过的消息都接收过来。

消息生产者

->流程:

1、建立ActiveMQConnectionFactory工厂

2、通过工厂对象建立连接对象

3、通过连接对象创建session会话对象

4、通过session会话对象创建一个Topic主题对象,并设置名字

5、通过session对象创建一个生产者对象,并传入Topic主题对象

6、通过生产者对象设置持久化模式

7、通过连接对象与MQ连接

9、通过session会话对象创建TextMessage类型的消息

10、调用send方法,传入TextMessage对象,发送

11、关闭资源

// 持久化topic 的消息生产者
public class JmsProduce_persistence {
    
    

    public static final String ACTIVEMQ_URL = "tcp://192.168.17.3:61616";
    public static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) throws  Exception{
    
    
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        javax.jms.Connection connection = activeMQConnectionFactory.createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);
        MessageProducer messageProducer = session.createProducer(topic);

        // 设置持久化topic 
        messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
        // 设置持久化topic之后再,启动连接
        connection.start();
        for (int i = 1; i < 4 ; i++) {
    
    
            TextMessage textMessage = session.createTextMessage("topic_name--" + i);
            messageProducer.send(textMessage);
        }
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("  **** TOPIC_NAME消息发送到MQ完成 ****");
    }
}

消息消费者

->流程:

1、创建ActiveMQConnectionFactory工厂对象

2、通过工厂对象建立连接对象

3、通过连接对象,设置客户端ID,向MQ服务器注册自己的名称

4、通过连接对象创建session会话对象

5、通过session会话对象创建Topic主题对象

6、通过session会话对象设置订阅哪个Topic主题和设置名字,来创建topic订阅者对象

7、然后再连接MQ

8、通过topic订阅者对象调用接收方法,生成消息接收对象

9、强转为传来的消息类型对象

10、获取消息

11、在给消息对象赋值,让他不等于null,不断监听循环

12、关闭资源

// 持久化topic 的消息消费者
public class JmsConsummer_persistence {
    
    
    public static final String ACTIVEMQ_URL = "tcp://192.168.17.3:61616";
    public static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) throws Exception{
    
    
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
		// 设置客户端ID。向MQ服务器注册自己的名称
        connection.setClientID("marrry");
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);
		// 创建一个topic订阅者对象。一参是topic,二参是订阅者名称
        TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark...");
        // 之后再开启连接
        connection.start();
        Message message = topicSubscriber.receive();
         while (null != message){
    
    
             TextMessage textMessage = (TextMessage)message;
             System.out.println(" 收到的持久化 topic :"+textMessage.getText());
             message = topicSubscriber.receive();
         }
        session.close();
        connection.close();
    }
}

控制台介绍

topic页面还是和之前的一样。另外在subscribers页面也会显示。如下:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7QT6LuCi-1611580956504)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210125182445953.png)]

当持久化消费者从离线状态到上线状态后,会自动消费之前在订阅后上线前的数据


2、消息的事务性

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1BKYdoVb-1611580956505)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210125193233825.png)]

(1) 生产者开启事务后,执行commit方法,这批消息才真正的被提交。不执行commit方法,这批消息不会提交。

执行rollback方法,之前的消息会回滚掉。生产者的事务机制,要高于签收机制,当生产者开启事务,签收机制不再重要。

(2) 消费者开启事务后,执行commit方法,这批消息才算真正的被消费。不执行commit方法,这些消息不会标记已消费,下次还会被消费。执行rollback方法,是不能回滚之前执行过的业务逻辑,但是能够回滚之前的消息,回滚后的消息,下次还会被消费。消费者利用commit和rollback方法,甚至能够违反一个消费者只能消费一次消息的原理。

(3) 问:消费者和生产者需要同时操作事务才行吗?

答:消费者和生产者的事务,完全没有关联,各自是各自的事务。

生产者

public class Jms_TX_Producer {
    
    
    private static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
    private static final String ACTIVEMQ_QUEUE_NAME = "Queue-TX";

    public static void main(String[] args) throws JMSException {
    
    
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //1.创建会话session,两个参数transacted=事务,acknowledgeMode=确认模式(签收)
        //设置为开启事务
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
        MessageProducer producer = session.createProducer(queue);
        try {
    
    
            for (int i = 0; i < 3; i++) {
    
    
                TextMessage textMessage = session.createTextMessage("tx msg--" + i);
              producer.send(textMessage);
if(i == 2){
    
    
                    throw new RuntimeException("GG.....");
                }
            }
            // 2. 开启事务后,使用commit提交事务,这样这批消息才能真正的被提交。
            session.commit();
            System.out.println("消息发送完成");
        } catch (Exception e) {
    
    
            System.out.println("出现异常,消息回滚");
            // 3. 工作中一般,当代码出错,我们在catch代码块中回滚。这样这批发送的消息就能回滚。
            session.rollback();
        } finally {
    
    
            //4. 关闭资源
            producer.close();
            session.close();
            connection.close();
        }
    }
}

消费者

public class Jms_TX_Consumer {
    
    
    private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
    private static final String ACTIVEMQ_QUEUE_NAME = "Queue-TX";

    public static void main(String[] args) throws JMSException, IOException {
    
    
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        // 创建会话session,两个参数transacted=事务,acknowledgeMode=确认模式(签收)
        // 消费者开启了事务就必须手动提交,不然会重复消费消息
        final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
        MessageConsumer messageConsumer = session.createConsumer(queue);
        messageConsumer.setMessageListener(new MessageListener() {
    
    
            int a = 0;
            @Override
            public void onMessage(Message message) {
    
    
                if (message instanceof TextMessage) {
    
    
                    try {
    
    
                        TextMessage textMessage = (TextMessage) message;
                        System.out.println("***消费者接收到的消息:   " + textMessage.getText());
                        if(a == 0){
    
    
                            System.out.println("commit");
                            session.commit();
                        }
                        if (a == 2) {
    
    
                            System.out.println("rollback");
                            session.rollback();
                        }
                        a++;
                    } catch (Exception e) {
    
    
                        System.out.println("出现异常,消费失败,放弃消费");
                        try {
    
    
                            session.rollback();
                        } catch (JMSException ex) {
    
    
                            ex.printStackTrace();
                        }
                    }
                }
            }
        });
        //关闭资源
        System.in.read();
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

消费者的控制台输出信息。可以看出commit和rollback方法的作用。

消费者开启事务后,如果不commit(),会出现重复消费的现象,要注意!!!

***消费者接收到的消息:   tx msg--0
commit
***消费者接收到的消息:   tx msg--1
***消费者接收到的消息:   tx msg--2
rollback
***消费者接收到的消息:   tx msg--1
***消费者接收到的消息:   tx msg--2

3、 消息的签收机制

一、签收的几种方式

① 自动签收(Session.AUTO_ACKNOWLEDGE):该方式是默认的。该种方式,无需我们程序做任何操作,框架会帮我们自动签收收到的消息。

② 手动签收(Session.CLIENT_ACKNOWLEDGE):手动签收。该种方式,需要我们手动调用Message.acknowledge(),来签收消息。如果不签收消息,该消息会被我们反复消费,只到被签收。

③ 允许重复消息(Session.DUPS_OK_ACKNOWLEDGE):多线程或多个消费者同时消费到一个消息,因为线程不安全,可能会重复消费。该种方式很少使用到。

④ 事务下的签收(Session.SESSION_TRANSACTED):开始事务的情况下,可以使用该方式。该种方式很少使用到。


二、事务和签收的关系

① 在事务性会话中,当一个事务被成功提交则消息被自动签收。如果事务回滚,则消息会被再次传送。事务优先于签收,开始事务后,签收机制不再起任何作用。

② 非事务性会话中,消息何时被确认取决于创建会话时的应答模式。

③ 生产者事务开启,只有commit后才能将全部消息变为已消费。

④ 事务偏向生产者签收偏向消费者。也就是说,生产者使用事务更好点,消费者使用签收机制更好点。


非事务下的消费者如何使用手动签收的方式

生产者不变:->非事务,自动签收

public class Jms_TX_Producer {
    
    

    private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
    private static final String ACTIVEMQ_QUEUE_NAME = "Queue-ACK";

    public static void main(String[] args) throws JMSException {
    
    
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
        MessageProducer producer = session.createProducer(queue);
        try {
    
    
            for (int i = 0; i < 3; i++) {
    
    
                TextMessage textMessage = session.createTextMessage("tx msg--" + i);
                producer.send(textMessage);
            }
            System.out.println("消息发送完成");
        } catch (Exception e) {
    
    
            e.printStackTrace();
        } finally {
    
    
            producer.close();
            session.close();
            connection.close();
        }
    }
}

消费者:->手动签收,非事务

public class Jms_TX_Consumer {
    
    
    private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
    private static final String ACTIVEMQ_QUEUE_NAME = "Queue-ACK";

    public static void main(String[] args) throws JMSException, IOException {
    
    
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //设置为Session.CLIENT_ACKNOWLEDGE 【手动签收】
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
        MessageConsumer messageConsumer = session.createConsumer(queue);
        messageConsumer.setMessageListener(new MessageListener() {
    
    
            @Override
            public void onMessage(Message message) {
    
    
                if (message instanceof TextMessage) {
    
    
                    try {
    
    
                        TextMessage textMessage = (TextMessage) message;
                        System.out.println("***消费者接收到的消息:   " + textMessage.getText());
                        /* 设置为Session.CLIENT_ACKNOWLEDGE后,要调用该方法,标志着该消息已被签收(消费)。
                            如果不调用该方法,该消息的标志还是未消费,下次启动消费者或其他消费者还会收到改消息。
                         */
                        textMessage.acknowledge();//需要很显式的自己手动签收
                    } catch (Exception e) {
    
    
                        System.out.println("出现异常,消费失败,放弃消费");
                    }
                }
            }
        });
        System.in.read();
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

如果生产者不变,消费者设置为手动接受,非事务状态下;如果不显式的自己调用acknowledge()手动签收,就会出现重复消费的问题!!!


4、JMS的点对点总结

点对点模型是基于队列的,生产者发消息到队列,消费者从队列接收消息,队列的存在使得消息的异步传输成为可能。和我们平时给朋友发送短信类似。

1、如果在Session关闭时有部分消息己被收到但还没有被签收(acknowledged),那当消费者下次连接到相同的队列时,这些消息还会被再次接收

2、队列可以长久地保存消息直到消费者收到消息。消费者不需要因为担心消息会丢失而时刻和队列保持激活的连接状态,充分体现了异步传输模式的优势


5、JMS的发布订阅总结

(1) JMS的发布订阅总结

JMS Pub/Sub 模型定义了如何向一个内容节点发布和订阅消息,这些节点被称作topic。

主题可以被认为是消息的传输中介,发布者(publisher)发布消息到主题,订阅者(subscribe)从主题订阅消息。

主题使得消息订阅者和消息发布者保持互相独立不需要解除即可保证消息的传送

(2) 非持久订阅

非持久订阅只有当客户端处于激活状态,也就是和MQ保持连接状态才能收发到某个主题的消息。

如果消费者处于离线状态,生产者发送的主题消息将会丢失作废,消费者永远不会收到。

一句话:先订阅注册才能接受到发布,只给订阅者发布消息。

(3) 持久订阅

客户端首先向MQ注册一个自己的身份ID识别号,当这个客户端处于离线时,生产者会为这个ID保存所有发送到主题的消息,当客户再次连接到MQ的时候,会根据消费者的ID得到所有当自己处于离线时发送到主题的消息

当持久订阅状态下,不能恢复或重新派送一个未签收的消息。

持久订阅才能恢复或重新派送一个未签收的消息。

(4) 非持久和持久化订阅如何选择

当所有的消息必须被接收,则用持久化订阅。当消息丢失能够被容忍,则用非持久订阅。


三、 ActiveMQ的broker

1、broker是什么?

相当于一个ActiveMQ服务器实例

说白了,Broker其实就是实现了用代码的形式启动ActiveMQ将MQ嵌入到Java代码中,以便随时用随时启动,在用的时候再去启动这样能节省了资源,也保证了可用性。这种方式,我们实际开发中很少采用,因为他缺少太多了东西,如:日志,数据存储等等。

2、启动broker时指定配置文件

启动broker时指定配置文件,可以帮助我们在一台服务器上启动多个broker。实际工作中一般一台服务器只启动一个broker。

xbean:file:/填写要运行的xml配置文件地址

service activemq start xbean:file:/myactiveMQ/apache-activemq-5.15.9/conf/activemq02.xml

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7TBlQhBz-1611580956507)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210125210434242.png)]

3、嵌入式的broker启动

用ActiveMQ Broker作为独立的消息服务器来构建Java应用。

ActiveMQ也支持在vm中通信基于嵌入的broker,能够无缝的集成其他java应用。

1)、pom.xml添加一个依赖

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.10.1</version>
</dependency>

2)、嵌入式broke的启动类

public class EmbedBroker {
    
    
    public static void main(String[] args) throws Exception {
    
    
        //ActiveMQ也支持在vm中通信基于嵌入的broker
        BrokerService brokerService = new BrokerService();
        brokerService.setUseJmx(true);
        brokerService.addConnector("tcp://localhost:61616");
        brokerService.start();
    }
    
}

因为是通过java内嵌启动的mq

修改消费者与生产者连接的URL

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-uWMCbkoG-1611580956508)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210125211941990.png)]

运行结果:

在这里插入图片描述

可以照样使用,可拿来做应急,但还是推荐使用linux上的

感谢尚硅谷!!!

猜你喜欢

转载自blog.csdn.net/qq_43284469/article/details/113142200