消息的可靠性
可以从持久化、事务、签收、集群,四个方面保证消息中间件的高可用
消息的持久化
有这样一种情况,生产者发送了很多消息到MQ服务器,突然某一时刻MQ服务器宕机了,此时有一些消费被消费者消费了,有一些还没有。那么没消费的消息还在吗?还能收到吗?答案是yes。生产者在发送消息的时候,默认就是以持久化的模式发送,MQ服务器宕机恢复后消息仍在,还可以继续传递消息(即消费者还能收到消息)。
//创建消息生产者,参数:destination 目的地(队列或主题)
MessageProducer messageProducer = session.createProducer(Destination destination);
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); //持久,默认值
//messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //非持久
//创建消息
TextMessage textMessage = session.createTextMessage("this is a msg");
//也可以针对某一条消息设置持久或非持久
textMessage setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
现在我们以队列演示一下消息的持久化
重启MQ服务
现在我们来启动消费者
主题我就不演示了,需要注意的是:仅对持久的主题订阅作用才有意义。非持久的主题订阅只要离线就收不到消息,生产者设不设置持久都一样。
消息的事务
事务用在生产者上,消息需要手动提交,要么全部发送成功,要么全部发送失败。
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.1.3:61616");
Connection connection = factory.createConnection();
connection.start();
//创建回话,两个参数:transacted 事务,acknowledgeMode 签收
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); //开启事务
Queue queue = session.createQueue("queue01"); //创建队列
MessageProducer messageProducer = session.createProducer(queue);
try {
for (int i = 1; i <= 3; i++) {
TextMessage textMessage = session.createTextMessage("这是一个事务消息" + i);
messageProducer.send(textMessage);
}
session.commit(); //提交事务,如果不手动提交,消息是发不出去的
} catch (Exception e) {
session.rollback(); //回滚事务
}finally{
messageProducer.close();
session.close();
connection.close();
}
System.out.println("消息发送完毕");
}
事务用在消费者身上,如果不手动提交,消息会被重复接收
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.1.3:61616");
Connection connection = factory.createConnection();
connection.start();
//创建回话,两个参数:transacted 事务,acknowledgeMode 签收
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); //true,开启事务
Queue queue = session.createQueue("queue01"); //创建队列
MessageConsumer messageConsumer = session.createConsumer(queue);
TextMessage textMessage = (TextMessage) messageConsumer.receive(100000); //接收消息
if(textMessage != null)
System.out.println(textMessage.getText());
session.commit(); 如果不手动提交,消息不会出队;等下次再调用receive方法时会把队列中的消息再次取出来
System.in.read();
messageConsumer.close();
session.close();
connection.close();
}
综上,事务一般用在生产者上
消息的签收
签收机制:AUTO_ACKNOWLEDGE(自动签收)、CLIENT_ACKNOWLEDGE(手动签收)、DUPS_OK_ACKNOWLEDGE(该模式签收时机未知,所有有可能收到重复消息,但是可以减少session开销)、SESSION_TRANSACTED(以事务为提交即签收),实际开发中用前两种。
签收主要针对消费者的,生产者选择任何一种签收方式都可以(因为没有实际效果,而参数是必须的)。下面我们来说说消费者的签收
非事务手动签收
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); //非事务手动签收
TextMessage textMessage = (TextMessage) messageConsumer.receive(100000); //接收消息
if(textMessage != null)
System.out.println(textMessage.getText());
textMessage.acknowledge(); //手动签收消息,如果不写这句话,消息永远不会出队
事务手动签收
Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); //事务手动签收
//第一种情况,签收不提交事务
textMessage.acknowledge();
//结论:消息会不出队,可继续收到消息
第二中情况:提交事务,不签收
session.commit();
//结论:消息被接收
//综上,只要开启并提交了事务,即使不手动签收,也会自动签收
自动签收,顾名思义,不需要任何操作。如果有事务,签收还是以事务为准。