Day177.高级特性 -ActiveMQ

ActiveMQ

高级特性

1 异步投递

(1) 异步投递是什么

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PGrxiS2N-1611823661224)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128120034951.png)]

自我理解:异步投递,是指生产者和broker之间发送消息的异步。不是指生产者和消费者之间异步。

特征

① 不能有效保证消息的发送成功

② 如果出现slowConsumer,可能会给Broker带来消息积压的可能

官网介绍:http://activemq.apache.org/async-sends


说明:对于一个Slow Consumer,使用同步发送消息可能出成Producer堵塞等情况,慢消费者适合使用异步发送。

(这句话我认为有误)

总结

① 异步发送可以让生产者发的更快。

扫描二维码关注公众号,回复: 12647488 查看本文章

② 如果异步投递不需要保证消息是否发送成功,发送者的效率会有所提高。如果异步投递还需要保证消息是否成功发送,并采用了回调的方式,发送者的效率提高不多,这种就有些鸡肋。


(2) 代码实现

官网上3中代码实现:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-uW0NhZ0q-1611823661229)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128123647416.png)]

public class Jms_TX_Producer {
    
    
    // 方式1。3种方式任选一种
    private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626?jms.useAsyncSend=true";//设置异步投递
    private static final String ACTIVEMQ_QUEUE_NAME = "Async";

    public static void main(String[] args) throws JMSException {
    
    
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 方式2
        activeMQConnectionFactory.setUseAsyncSend(true);//设置异步投递
        Connection connection = activeMQConnectionFactory.createConnection();
        // 方式3
        ((ActiveMQConnection)connection).setUseAsyncSend(true);//设置异步投递
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
        MessageProducer producer = session.createProducer(queue);
        try {
    
    
            for (int i = 0; i < 3; i++) {
    
    
                TextMessage textMessage = session.createTextMessage("tx msg--" + i);
                producer.send(textMessage);
            }
            System.out.println("消息发送完成");
        } catch (Exception e) {
    
    
            e.printStackTrace();
        } finally {
    
    
            producer.close();
            session.close();
            connection.close();
        }
    }
}

(3) 异步发送如何确认发送成功

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-laWe2cvb-1611823661232)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128130608981.png)]

public class JmsProduce_asynsc {
    
    

    public static final String ACTIVEMQ_URL = "tcp://192.168.109.101:61616";
    public static final String QUEUE_NAME = "asynsc01";

    public static void main(String[] args) throws JMSException {
    
    
        ActiveMQConnectionFactory mqConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        mqConnectionFactory.setUseAsyncSend(true);//设置异步投递
        Connection conn = mqConnectionFactory.createConnection();
        conn.start();
        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(QUEUE_NAME);
        ActiveMQMessageProducer mqMessageProducer = (ActiveMQMessageProducer) session.createProducer(queue);
        mqMessageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
        TextMessage textMessage = null;
        for (int i = 1; i <= 3; i++) {
    
    
            textMessage = session.createTextMessage("msg--asynsc01-----" + i);
            textMessage.setJMSMessageID(UUID.randomUUID().toString().substring(0,3)+"---orderAchang");
            String msgID = textMessage.getJMSMessageID();
            //new AsyncCallback()设置异步投送的回调函数
            mqMessageProducer.send(textMessage, new AsyncCallback() {
    
    
                @Override
                public void onSuccess() {
    
    
                    //发送成功的情况
                    System.out.println(msgID+"has benn ok send");
                }

                @Override
                public void onException(JMSException exception) {
    
    
                    //发送失败的情况
                    System.out.println(msgID+"fail to send to mq");
                }
            });

        }
        mqMessageProducer.close();
        session.close();
        conn.close();

        System.out.println("========消息发布到MQ完成===========");
    }

}

控制台观察发送消息的信息:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-erwdgyTo-1611823661237)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128130728377.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bLg7keUm-1611823661240)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128130748684.png)]


2 延迟投递和定时投递

(1) 介绍

官网文档:http://activemq.apache.org/delay-and-schedule-message-delivery.html

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2DhhtSLd-1611823661242)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128130810198.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gZrk5nBo-1611823661244)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128130818179.png)]

(2) 修改配置文件并重启

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zObSp0kl-1611823661245)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128130838486.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Xw9r9B10-1611823661246)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128131710357.png)]

 </bean>
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}"  schedulerSupport="true" >
        <destinationPolicy>

之后重启activemq


(3) 代码实现

生产者代码:

public class JmsProduce_delayAndschedule {
    
    

    public static final String ACTIVEMQ_URL = "tcp://192.168.109.101:61616";
    public static final String QUEUE_NAME = "asynsc01";

    public static void main(String[] args) throws JMSException {
    
    
        ActiveMQConnectionFactory mqConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        mqConnectionFactory.setUseAsyncSend(true);//设置异步投递
        Connection conn = mqConnectionFactory.createConnection();
        conn.start();
        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(QUEUE_NAME);
        ActiveMQMessageProducer mqMessageProducer = (ActiveMQMessageProducer) session.createProducer(queue);
==================================================================================      
        long delay = 3 * 1000; //延迟3秒
        long period = 4 * 1000; //四秒钟投递一次
        int repeat = 5; //投递次数

        TextMessage textMessage = null;
        for (int i = 1; i <= 3; i++) {
    
    
            textMessage = session.createTextMessage("msg--delay--正文内容-----" + i);

            //消息属性设置
            textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,delay);
            textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD,period);
            textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT,repeat);
            // 此处的意思:该条消息,等待3秒,之后每4秒发送一次,重复发送5次。
=====================================================================================
            mqMessageProducer.send(textMessage);

        }
        mqMessageProducer.close();
        session.close();
        conn.close();

        System.out.println("========消息发布到MQ完成===========");
    }

}

消费者代码:与之前一样


3 消息消费的重试机制

(1) 是什么

官网文档:http://activemq.apache.org/redelivery-policy

是什么: 消费者收到消息,之后出现异常了,没有告诉broker确认收到该消息,broker会尝试再将该消息发送给消费者。尝试n次,如果消费者还是没有确认收到该消息,那么该消息将被放到死信队列重,之后broker不会再将该消息发送给消费者。

(2) 具体哪些情况会引发消息重发

① Client用了transactions且再session中调用了rollback

② Client用了transactions且再调用commit之前关闭或者没有commit

③ Client再CLIENT_ACKNOWLEDGE的传递模式下,session中调用了recover

(3) 请说说消息重发时间间隔和重发次数

间隔:1

次数:6

每秒发6次

(4) 有毒消息Poison ACK

一个消息被redelivedred超过默认的最大重发次数(默认6次)时,消费的回个MQ发一个“poison ack”表示这个消息有毒,告诉broker不要再发了。这个时候broker会把这个消息放到DLQ(死信队列)。

(5) 属性说明

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zK66uNep-1611823661247)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128134219876.png)]

(6) 代码验证

生产者。发送3条数据。代码省略…

消费者。开启事务,却没有commit。重启消费者,前6次都能收到消息,到第7次,不会再收到消息。代码:

public class Jms_TX_Consumer {
    
    
    private static final String ACTIVEMQ_URL = "tcp://192.168.109.101:61616";
    private static final String ACTIVEMQ_QUEUE_NAME = "dead01";

    public static void main(String[] args) throws JMSException, IOException {
    
    
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
        MessageConsumer messageConsumer = session.createConsumer(queue);
        messageConsumer.setMessageListener(new MessageListener() {
    
    
            public void onMessage(Message message) {
    
    
                if (message instanceof TextMessage) {
    
    
                    TextMessage textMessage = (TextMessage) message;
                    try {
    
    
                        System.out.println("***消费者接收到的消息:   " + textMessage.getText());
                        //session.commit();
                    }catch (Exception e){
    
    
                        e.printStackTrace();
                    }
                }
            }
        });
        //关闭资源
        System.in.read();
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

activemq管理后台。多了一个名为ActiveMQ.DLQ队列,里面多了3条消息。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Lq7ABpSw-1611823661249)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128141557656.png)]


(7) 代码修改默认参数

修改重试次数为3。更多的设置请参考官网文档。

消费者代码

public class Jms_TX_Consumer {
    
    
    private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
    private static final String ACTIVEMQ_QUEUE_NAME = "dead01";

    public static void main(String[] args) throws JMSException, IOException {
    
    
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
=====================================================================
        // 修改默认参数,设置消息消费重试3次
        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setMaximumRedeliveries(3);
        activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
=====================================================================
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
        MessageConsumer messageConsumer = session.createConsumer(queue);
        messageConsumer.setMessageListener(new MessageListener() {
    
    
            public void onMessage(Message message) {
    
    
                if (message instanceof TextMessage) {
    
    
                    TextMessage textMessage = (TextMessage) message;
                    try {
    
    
                        System.out.println("***消费者接收到的消息:   " + textMessage.getText());
                        //session.commit();
                    }catch (Exception e){
    
    
                        e.printStackTrace();
                    }
                }
            }
        });
        System.in.read();
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

(8)整合spring

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-oq9I26E9-1611823661250)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128141729105.png)]


4 死信队列

承接上个标题的内容。

(1) 是什么

官网文档: http://activemq.apache.org/redelivery-policy

死信队列:异常消息规避处理的集合,主要处理失败的消息。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-V0OEQ7G8-1611823661252)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128141753364.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-aQrLSo6a-1611823661253)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128141812924.png)]


(1) 死信队列的配置(一般采用默认)

1. sharedDeadLetterStrategy

不管是queue还是topic,失败的消息都放到这个队列中。下面修改activemq.xml的配置,可以达到修改队列的名字。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IJAP3gFZ-1611823661254)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128141825368.png)]

2. individualDeadLetterStrategy

可以为queue和topic单独指定两个死信队列。还可以为某个话题,单独指定一个死信队列。

在这里插入图片描述

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6WJemec9-1611823661257)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128141848717.png)]

3. 自动删除过期消息

过期消息是值生产者指定的过期时间,超过这个时间的消息。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mUzIMXbj-1611823661258)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128141900244.png)]

4. 存放非持久消息到死信队列中

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vrZ7pkCQ-1611823661259)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128141912124.png)]


5 消息不被重复消费,幂等性

如何保证消息不被重复消费呢?幕等性问题你谈谈

也就是变向问:如何解决消息的重复消费问题?

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8dHYFAu7-1611823661260)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128144728668.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vj6Ha33S-1611823661261)(C:\Users\PePe\AppData\Roaming\Typora\typora-user-images\image-20210128144749293.png)]

幂等性如何解决,根据messageid去查这个消息是否被消费了

感谢尚硅谷

猜你喜欢

转载自blog.csdn.net/qq_43284469/article/details/113349647