实现功能
主要实现一个客户端系统发送消息到rocketmq服务器,另外一个客户端系统从rocketmq服务器里读取发送的消息。
发送端
添加依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
application.yml
server:
port: 8021
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: my-group
程序代码
package com.mqdemo;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class DemoController {
@Autowired
protected RocketMQTemplate rocketMQTemplate;
@GetMapping("/send")
public void send(){
rocketMQTemplate.convertAndSend("test_topic_3","hello world");
}
}
接收端
添加依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot</artifactId>
<version>2.0.1</version>
</dependency>
application.yml
server:
port: 8022
rocketmq:
name-server: 127.0.0.1:9876
# 定义发布者组名
consumer:
group: my-customer-group1
# 定义要发送的topic
topic: test_topic_3
程序代码
package com.receivemqdemo;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "test_topic_3",consumerGroup = "my-consumer_test-topic-1")
public class ReceiveListener implements RocketMQListener<String> {
@Override
public void onMessage(String string) {
System.out.println("接受到的消息:"+string);
}
}
服务端
下载rocketmq
启动
1.进行到bin目录执行“start mqnamesrv.cmd”
2.bin目录下执行"start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true"