用锹的人比手厉害,会做锹的人比用锹的厉害
记录几种rocketmq的几种形式,以及基于springboot的常见的配置方式
这是几个mq的相对对比
先贴一下配置
@Slf4j
@SpringBootConfiguration
public class MQProducerConfiguration {
/**
* 发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
*/
@Value("${rocketmq.producer.groupName}")
private String normalGroupName;
@Value("${rocketmq.producer.transation.groupName}")
private String transationGroupName;
@Value("${rocketmq.producer.namesrvAddr}")
private String namesrvAddr;
/**
* 消息最大大小,默认4M
*/
@Value("${rocketmq.producer.maxMessageSize}")
private Integer maxMessageSize;
/**
* 消息发送超时时间,默认3秒
*/
@Value("${rocketmq.producer.sendMsgTimeout}")
private Integer sendMsgTimeout;
/**
* 消息发送失败重试次数,默认2次
*/
@Value("${rocketmq.producer.retryTimesWhenSendFailed}")
private Integer retryTimesWhenSendFailed;
/**
* 默认消息配置
*/
@Bean
public DefaultMQProducer getRocketMQProducer() throws Exception {
if (StringUtils.isEmpty(this.normalGroupName)) {
throw new Exception("no normalGroupName");
}
if (StringUtils.isEmpty(this.namesrvAddr)) {
throw new Exception("nameServerAddr is blank");
}
DefaultMQProducer producer;
producer = new DefaultMQProducer(this.normalGroupName);
producer.setNamesrvAddr(this.namesrvAddr);
//如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName
//producer.setInstanceName(instanceName);
if (this.maxMessageSize != null) {
producer.setMaxMessageSize(this.maxMessageSize);
}
if (this.sendMsgTimeout != null) {
producer.setSendMsgTimeout(this.sendMsgTimeout);
}
//如果发送消息失败,设置重试次数,默认为2次
if (this.retryTimesWhenSendFailed != null) {
producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
}
producer.start();
log.info(String.format("producer is start ! normalGroupName:[%s],namesrvAddr:[%s]", this.normalGroupName, this.namesrvAddr));
return producer;
}
/**
* 事务消息配置
*/
@Bean
public TransactionMQProducer getTransationRocketMQProducer() throws Exception {
if (StringUtils.isEmpty(this.transationGroupName)) {
throw new Exception("no transationGroupName");
}
if (StringUtils.isEmpty(this.namesrvAddr)) {
throw new Exception("nameServerAddr is blank");
}
TransactionMQProducer producer = new TransactionMQProducer(this.transationGroupName);
producer.setNamesrvAddr(this.namesrvAddr);
MQTransactionListenerProcessor transactionListenerProcessor = new MQTransactionListenerProcessor();
producer.setTransactionListener(transactionListenerProcessor);
//如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName
//producer.setInstanceName(instanceName);
if (this.maxMessageSize != null) {
producer.setMaxMessageSize(this.maxMessageSize);
}
if (this.sendMsgTimeout != null) {
producer.setSendMsgTimeout(this.sendMsgTimeout);
}
//如果发送消息失败,设置重试次数,默认为2次
if (this.retryTimesWhenSendFailed != null) {
producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
}
producer.start();
log.info(String.format("Transaction producer is start ! transationGroupName:[%s],namesrvAddr:[%s]", this.transationGroupName, this.namesrvAddr));
return producer;
}
}
消费者配置
@Slf4j
@SpringBootConfiguration
public class MQConsumerConfiguration {
@Value("${rocketmq.consumer.namesrvAddr}")
private String namesrvAddr;
@Value("${rocketmq.consumer.groupName}")
private String groupName;
@Value("${rocketmq.consumer.consumeThreadMin}")
private int consumeThreadMin;
@Value("${rocketmq.consumer.consumeThreadMax}")
private int consumeThreadMax;
@Value("${rocketmq.consumer.topics}")
private String topics;
@Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
private int consumeMessageBatchMaxSize;
@Autowired
private MQConsumeMsgListenerProcessor mqMessageListenerProcessor;
@Bean
public DefaultMQPushConsumer getRocketMQConsumer() throws Exception {
if (StringUtils.isEmpty(groupName)) {
throw new Exception("groupName is null");
}
if (StringUtils.isEmpty(namesrvAddr)) {
throw new Exception("namesrvAddr is null");
}
if (StringUtils.isEmpty(topics)) {
throw new Exception("topics is null");
}
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeThreadMin(consumeThreadMin);
consumer.setConsumeThreadMax(consumeThreadMax);
consumer.registerMessageListener(mqMessageListenerProcessor);
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
/**
* 设置消费模型,集群还是广播,默认为集群
*/
//consumer.setMessageModel(MessageModel.CLUSTERING);
/**
* 设置一次消费消息的条数,默认为1条
*/
consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
try {
/**
* 设置该消费者订阅的主题和tag,如果是订阅该主题下的所有tag,则tag使用*;如果需要指定订阅该主题下的某些tag,则使用||分割,例如tag1||tag2||tag3
*/
String[] topicTagsArr = topics.split(";");
for (String topicTags : topicTagsArr) {
String[] topicTag = topicTags.split("~");
consumer.subscribe(topicTag[0], topicTag[1]);
}
consumer.start();
log.info("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}", groupName, topics, namesrvAddr);
} catch (MQClientException e) {
log.error("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}", groupName, topics, namesrvAddr, e);
throw new Exception(e);
}
return consumer;
}
}
注入
//注意名字不要变,正好是注入到刚刚的配置中的名字的
@Autowired
private DefaultMQProducer getRocketMQProducer;
@Autowired
private TransactionMQProducer getTransationRocketMQProducer;
可靠同步消息
/**
* 可靠同步消息
*/
public void syn(String tag, String topic, String content) {
try {
Message msg = new Message(topic, tag, content.getBytes(RemotingHelper.DEFAULT_CHARSET)
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = defaultMQProducer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (MQClientException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
}
}
可靠异步消息
此处有一点问题,后续研究一下再改一下,哪位朋友知道问题的留个言
/**
* 可靠异步消息
*/
public void async(String tag, String topic, String content) {
defaultMQProducer.setRetryTimesWhenSendAsyncFailed(0);
try {
Message msg = new Message(topic, tag, content.getBytes(RemotingHelper.DEFAULT_CHARSET));
//重点在这里 异步发送回调
defaultMQProducer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", e);
e.printStackTrace();
}
});
} catch (MQClientException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
耗时短消息
适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
只发送消息,不等待服务器响应,只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别
/**
* @Author: lǎo xiāng
* @Date: 2021/2/7 11:17
* @Describe: 适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
* 只发送消息,不等待服务器响应,只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别
*/
public void shortSend(String content) {
try {
//固定topic和tag
Message msg = new Message("topic", "TagA", content.getBytes(RemotingHelper.DEFAULT_CHARSET));
defaultMQProducer.sendOneway(msg);
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
延时消息
注意:RocketMQ不支持任意时间的延时,只支持以下几个固定的延时等级 private String messageDelayLevel
= “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;messageDelayLevel可以自己设置级别,在conf/broker.conf里面加入 messageDelayLevel=10s 15s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 3 0m 1h 2h
扫描二维码关注公众号,回复: 12628290 查看本文章重启broker: nohup sh mqbroker -n localhost:9876 -c …/conf/broker.conf
&后测试当级别设为1为10s生效
/**
* 延时消息
* delayTimeLevel为不同级别的时间延时,具体参考配置,可添加自定义
*/
public void sendDelay(String topic, int delayTimeLevel, String content) {
Message message = new Message(topic, content.getBytes());
message.setDelayTimeLevel(delayTimeLevel);
try {
SendResult send = defaultMQProducer.send(message);
log.info("delay message send result:{}", send.toString());
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
顺序消息
其中rule为取模根据,比如订单号
/**
* 顺序消息
* rule:规则,比如订单号
*/
public void sendSX(String rule, String topic, String content) {
Message msg = new Message(topic, content.getBytes());
SendResult sendResult = null;//订单id
try {
sendResult = defaultMQProducer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
//防止订单号规则问题,取hashCode取正
int hashCode = arg.hashCode();
int index = Math.abs(hashCode) % mqs.size();
log.info("orderno:{}, index:{}", arg, index);
return mqs.get(index);
}
}, rule);
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
content));
}
事务消息
这个不多说了,一般用于分布式事务解决方案
/**
* 事务消息
* 这里最好不要单独创建事务管理然后shutdown,否则不会有回查操作,producer在开着的时候才会触发回查操作
*/
public void transationMsg(String topic, String tag, String content) {
try {
//此处的topic可以和其他区分开
Message msg = new Message(topic, tag, content.getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = getTransationRocketMQProducer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
public class MQTransactionListenerProcessor implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
System.out.println("执行本地事务:" + message.getTags());
if (StringUtils.equals("x", message.getTags())) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.equals("y", message.getTags())) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else{
return LocalTransactionState.UNKNOW;
}
}
/**
* TODO 注意回查版本,原来好像是3.2.0没有开源,后来才有的
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("MQ检查消息Tag【"+messageExt.getTags()+"】的本地事务执行结果:" + DateUtil.now());
return LocalTransactionState.UNKNOW;
}
}
都是测试过的,
有什么问题可以私我