顺序消息
在某些业务中,consumer在消费消息时,是需要按照生产者发送消息的顺序进行消费的,比如在电商系统中,订单的消息,会有创建订单、订单支付、订单完成,如果消息的顺序发生改变,那么这样的消息就没有意义了。
OrderConsumer.java
package com.woodie.rocketmq.order;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.UnsupportedEncodingException;
import java.util.List;
public class OrderConsumer {
public static void main(String[] args) throws Exception{
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("HAOKE_ORDER_CONSUMER");
consumer.setNamesrvAddr("192.168.142.128:9876");
consumer.subscribe("haoke_order_topic", "*");
// 通过MessageListenerOrderly设置有序的监听
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
try {
for (MessageExt msg: msgs) {
System.out.println(Thread.currentThread().getName() + " Receive " + msg.getQueueId() + " " + new String(msg.getBody(), "UTF-8"));
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
// System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
}
}
OrderProducer.java
package com.woodie.rocketmq.order;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class OrderProducer {
public static void main(String[] args) throws Exception{
// 设置一个分组
DefaultMQProducer producer = new DefaultMQProducer("HAOKE_ORDER_PRODUCER");
producer.setNamesrvAddr("192.168.142.128:9876");
producer.start();
// 发送100条消息
for (int i = 0; i < 100; i++) {
int orderId = i % 10; // 模拟生成订单id, 只有10个, 为0~9
String msgStr = "order --> " + i + ", id = " + orderId;
Message message = new Message("haoke_order_topic","ORDER_MSG", msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(message, (mqs, msg, arg) -> {
Integer id = (Integer) arg;
// mqs.size()指的是队列的个数,默认为4
// id%size 产生的index为0~3
int index = id % mqs.size();
// 通过mqs.get(index)就获取到了消息队列, 通过这里每次都会将相同的消息落在同一个消息队列中
return mqs.get(index);
}, orderId);
System.out.println(sendResult);
}
producer.shutdown();
}
}
测试
从下面可以看到同一个id消息都是由, id 相同的数据都是存在同一个队列中,receive X 表示 在具体的队列