- stream-provider8801 : 作为生产进行发消息的模块
- stream-consumer8802 : 作为消息接收的模块
- stream-consumer8803 : 作为消息接收的模块
消息驱动-生产者
1、模块名 stream-provider8801
2、pml.xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
3、yml 配置文件
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此处配置要绑定的 rabbitmq 的服务信息。
defaultRabbit: # 表示定义的名称,用于 binding 的整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq 相关的环境信息
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
bindings: # 服务整合处理
output: # 这个名字就是一个通道名称
destination: studyExchange # 表示要使用 Exchange 名称定义
cnntext-type: application/json # 设置消息类型,本次为 JSON,文本则设置 "test/plan"
binder: defaultRabbit # 设置要绑定的消息服务的具体位置
eureka:
client:
fetch-registry: true
register-with-eureka: true
service-url:
defaultZone: http://eureka7001.com:7001/eureka/
instance:
lease-renewal-interval-in-seconds: 2 # 心跳间隔时间 默认30秒
lease-expiration-duration-in-seconds: 5 # 超时时间 默认是90秒
instance-id: send-8801.com # 在服务列表显示主机名称
prefer-ip-address: true # 访问路径变成ip地址
4、主启动
@SpringBootApplication
public class StreamProviderMain8801 {
public static void main(String[] args){
SpringApplication.run(StreamProviderMain8801.class,args) ;
}
}
public interface IMessageProvider {
public String send();
}
@EnableBinding(Source.class) // 定义消息的推送管道
public class MessageProviderImpl implements IMessageProvider {
@Resource
private MessageChannel output;
@Override
public String send() {
String serial = UUID.randomUUID().toString();
boolean send = output.send(MessageBuilder.withPayload(serial).build());
if (send) {
System.out.println("ok");
return "发送成功";
}
System.out.println("fail");
return "发送失败";
}
}
@RestController
public class SendMessageController {
@Autowired
private IMessageProvider messageProvider ;
@RequestMapping("/send")
public String send(){
return messageProvider.send() ; // 触发方法
}
}
5、测试
消息驱动-消费者
1、模块名 cloud-stream-consumer8802
2、pml.xml(同上)
3、yml 配置文件
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此绑定rabbitMQ的服务信息
defaultRabbit: # 表示定义的名称 , 用于binding整合
type: rabbit #消息组件类型
environment: # rabbit相关环境配置
spring:
rabbitmq:
host: localhost
prot: 5672
username: guest
password: guest
bindings: # 服务的整合处理
input: # 这个名字是一个通道的名称
destination: studyExchange # 表示用Exchange名称定义
content-type: application/json
binder: defaultRabbit # 绑定服务具体设置
4、主启动
@SpringBootApplication
public class StreamCon8802 {
public static void main(String[] args){
SpringApplication.run(StreamCon8802.class,args) ;
}
}
@Component
@EnableBinding(Sink.class)
public class ReceiverMessageController {
@Value("${server.port}")
public String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message<String> message){
System.out.println(serverPort+"====:===消费==="+ message.getPayload() );
}
}
5、测试
分组消费与持久化
准备
创建多个消费者服务(复制 一个8802 为 8803)
启动: RabbtMQ、Eureka 服务注册 、 消息生产者、消息消费者-1、消息消费者-2
运行后的2个问题 : 有重复消费
问题 、消息持久化
问题
目前消费者-1、消费者-2都同时收
到了,存在
重复消费的问题
如何解决
- 分组和持久化属性 group : * * *
重点
* * *
生产中的实际问题
比如在如下场景汇总,订单系统我们需要做集群部署,都会从 RabbitMQ 中获取顶大信息,如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。
这个时候我们就可以使用 Stream 中的消息分组来解决
注意,在Stream 中,处于一个 group 中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。
不同组
是可以完全消费的(重复消费)
同一个组内
会发生竞争
关系,只有其中一个可以消费。
分组
现状描述
- 故障现象是重复消费
- 导致现象是
默认分组
是不同的,组流水号
不同,可以消费 - 自定义配置分组, 自定义配置分为同一个组,解决重复消费问题
原理
微服务
应用放置于同一个group
中, 就能保证消息只会被其中一个应用消费一次,不同的小组是可以消费的
,同一个组内
会发生竞争关系
,只有其中一个可以消费
自定义分组
消费侧
spring:
cloud:
stream:
bindings: # 服务的整合处理
input: # 这个名字是一个通道的名称
group: aqiang9-1 # 组名
结论
消费者-1,消费者-2 都变成了不同组,group 两个不同 : 都可以收到
group 相同:消费者-1,消费者-2 实现了轮询分组
,每次 发布者 模块发出
的消息只能被 消费者-1,消费者-2 其中一个接收
到,这样避免了重复消费
的问题
持久化
当消息没有被消费时 , 新启动的会 自动
去对应 group 去查看是否有消息 , 如果有 ,自动消费