RocketMQ:
Rocket MQ是一种消息中间件。通过调用消息生产者defaultMQProducer.send(msg)来推送消息。 Message msg = new Message(topicName,null,key,(json).getBytes());
消费者消费到MQ后进行报文转换获取任务数据。
Rocket MQ的依赖
<!-- https://mvnrepository.com/artifact/com.alibaba.rocketmq/rocketmq-client -->
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>3.2.6</version>
</dependency>
MQ的生产者类:
public class RocketMQProducer {
private DefaultMQProducer sender;
protected String nameServer;
protected String groupName;
protected String topics;
public void init() {
sender = new DefaultMQProducer(groupName);
sender.setNamesrvAddr(nameServer);
sender.setInstanceName(UUID.randomUUID().toString());
try {
sender.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
public RocketMQProducer(String nameServer, String groupName, String topics) {
this.nameServer = nameServer;
this.groupName = groupName;
this.topics = topics;
}
public void send(Message message) {
message.setTopic(topics);
try {
SendResult result = sender.send(message);
SendStatus status = result.getSendStatus();
System.out.println("messageId=" + result.getMsgId() + ", status=" + status);
} catch (Exception e) {
e.printStackTrace();
}
}
}
MQ的消费类:
public class RocketMQConsumer {
private DefaultMQPushConsumer consumer;
private MessageListener listener;
protected String nameServer;
protected String groupName;
protected String topics;
public RocketMQConsumer(MessageListener listener, String nameServer, String groupName, String topics) {
this.listener = listener;
this.nameServer = nameServer;
this.groupName = groupName;
this.topics = topics;
}
public void init() {
consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(nameServer);
try {
consumer.subscribe(topics, "*");
} catch (MQClientException e) {
e.printStackTrace();
}
consumer.setInstanceName(UUID.randomUUID().toString());
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.registerMessageListener((MessageListenerConcurrently) this.listener);
try {
consumer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
System.out.println("RocketMQConsumer Started! group=" + consumer.getConsumerGroup() + " instance=" + consumer.getInstanceName()
);
}
}
MQ的配置信息:
mq:
namesrvAddr: 10.10.6.71:9876;10.10.6.72:9876
consumerGroup: TimeslicePG
topicName: GenTimesliceTopic
tags: tag1
instanceName: instance1
consumeThreadMax: 1
consumeThreadMin: 1