RabbitMQ持久化,负载均衡 学习

消息确认(RabbitMQ支持消息确认–ACK)

如果处理一条消息需要几秒钟的时间,你可能会想,如果在处理消息的过程中,消费者服务器、网络、网卡出现故障挂了,那可能这条正在处理的消息或者任务就没有完成,就会失去这个消息和任务。
为了确保消息或者任务不会丢失,RabbitMQ支持消息确认–ACK。ACK机制是消费者端从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除。如果一个消费者在处理消息时挂掉(网络不稳定、服务器异常、网站故障等原因导致频道、连接关闭或者TCP连接丢失等),那么他就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将此消息重新放入队列中。如果有其他消费者同时在线,RabbitMQ会立即将这个消息推送给这个在线的消费者。这种机制保证了在消费者服务器故障的时候,能不丢失任何消息和任务。

消息确认的两种方法:

1.

boolean autoAck = true ;                  

channel.basicConsume(channelName, autoAck, consumer) ; 

2.

boolean autoAck = false ;                  

channel.basicConsume(channelName, autoAck, consumer) ; 
while(true){  
              
            //获取消息,如果没有消息,这一步将会一直阻塞  
            Delivery delivery = consumer.nextDelivery() ;  
              
            String msg = new String(delivery.getBody()) ;    
              
            //确认消息,已经收到  
            channel.basicAck(delivery.getEnvelope().getDeliveryTag()  
                    , false);  
              
            System.out.println("received message[" + msg + "] from " + channelName);  
        }  

队列持久化

如何确保消费者挂掉的情况下,任务不会消失。但是如果RabbitMQ服务器挂了呢?
如果不告诉RabbitMQ,当RabbitMQ服务器挂了,可能就丢失所有队列中的消息和任务。如果想让RabbitMQ记住当前的状态和内容,就需要通过2件事来确保消息和任务不会丢失。

1.队列的持久化是通过durable=true来实现的。 
一般程序中这么使用:

Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("queue.persistent.name", true, false, false, null);

 
 

消息的持久化

如过将queue的持久化标识durable设置为true,则代表是一个持久的队列,那么在服务重启之后,也会存在,因为服务会把持久化的queue存放在硬盘上,当服务重启的时候,会重新什么之前被持久化的queue。队列是可以被持久化,但是里面的消息是否为持久化那还要看消息的持久化设置。也就是说,重启之前那个queue里面还没有发出去的消息的话,重启之后那队列里面是不是还存在原来的消息,这个就要取决于发生着在发送消息时对消息的设置了。 

如果要在重启后保持消息的持久化必须设置消息是持久化的标识。

设置消息的持久化:

channel.basicPublish("exchange.persistent", "persistent", MessageProperties.PERSISTENT_TEXT_PLAIN, "persistent_test_message".getBytes());

负载均衡

默认情况下,RabbitMQ将队列消息随机分配给每个消费者,这时可能出现消息调度不均衡的问题。例如有两台消费者服务器,一个服务器可能非常繁忙,消息不断,另外一个却很悠闲,没有什么负载。RabbitMQ不会主动介入这些情况,还是会随机调度消息到每台服务器。
这是因为RabbitMQ此时只负责调度消息,不会根据ACK的反馈机制来分析那台服务器返回反馈慢,是不是处理不过来啊。
为了解决这个问题,我们可以使用【prefetchcount = 1】这个设置。这个设置告诉RabbitMQ,不要一次将多个消息发送给一个消费者。这样做的好处是只有当消费者处理完成当前消息并反馈后,才会收到另外一条消息或任务。这样就避免了负载不均衡的事情了。

int prefetchCount = 1;
channel.basicQos(prefetchCount);




猜你喜欢

转载自blog.csdn.net/wanderlustLee/article/details/80332551