一、前言
这几天对Springcloud 2.x的配置文章做了大致的收尾工作,在看springcloud官方文档意外发现了一项遗漏的技能点,自己测试试验了下,感觉还是不错的技术,特此做一项学习笔记总结。
二、现状
企业中使用到消息中间件有很多种类,目前流行的消息中间件有 Rabbitmq、ActiveMq、RocketMq、Kafka(分布式)等。
但是作为一只猿,不可能说把每种流行出来的MQ都学精通,这个不实际,一般而言都会精通一两种。这样会出现一个问题。
学的是Rabbitmq,但是工作了却要使用Activemq,然道要重学?
能否出现一种类似Hiberate的技术,实现一套代码自动适应各种不同数据库?
能否出现一种新的技术,让开发者不必过于注重每种MQ的细节,只需要使用一种配置绑定方式,达到在各种MQ中自由切换。
三、了解 Spring Cloud Stream
参考文档:
《spring cloud Stream 中文文档》
《spring cloud Stream 英文文档》
总结下SpringCloud Stream的大致功能:
1、Spring Cloud Stream 是一项构建消息驱动微服务的框架。
2、开发者只需要通过 inputs 和 outputs 来与Spring Cloud Stream 中的Binder 对象进行操作即可。
3、目前SpringCloud Stream 暂只支持 Rabbitmq 和 kafka。
4、选择什么mq,进行什么操作,只需要在配置中书写即可。
本次文章以Rabbitmq Binder 为例做配置、使用说明。
官方文档地址:《springcloud Stream rabbitmq binder 文档》
Springcloud Stream 的消息模型:
组件名 | 说明 |
---|---|
Middleware | 中间件,目前暂只支持rabbitmq、kafka。 |
Binder | 应用与消息中间件之间的封装,通过Binder可以很方便的连接中间件,可以通过配置文件动态的变更消息类型(rabbitmq的exchange,kafka的topic) |
@Input | 注解标识输入通道 |
@Output | 注解标识输出通道 |
@StreamListener | 监听队列,用于消费者对消息的接收 |
@EnableBinding | 信道和exchange绑定 |
四、Demo
4.1、demo前的准备
- rabbitmq的安装
没有安装rabbitmq的可以参考我的专栏:rabbitmq 从入门到放弃 - springcloud环境的搭建(这里暂时不做高可用)
4.2、Stream Product的搭建
Stream 对Springboot和SpringCloud的兼容性版本要求很高,需要匹配兼容性规则。
依赖引入:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.3.RELEASE</version>
</parent>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<dependencies>
<!-- 引入web依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- SpringBoot整合eureka客户端 -->
<!-- <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency> -->
<!-- 引入springcloud stream rabbitmq binder 相关依赖 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!-- 监控相关 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>
<!-- 管理依赖 -->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Finchley.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<!-- 注意: 这里必须要添加, 否者各种依赖有问题 -->
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
相关配置 application.yml
server:
port: 8000
spring:
application:
name: spring-cloud-stream-product
cloud:
stream:
binders: # 配置绑定的rabbitmq的服务信息(是一个Map,可以有多个)
defaultRabbit: # 标识定义名称,只是一个别名(名字可以另外起名,只需要和下列的output绑定好关系即可)
type: rabbit # 组件类型
environment: # 配置rabbitmq的环境参数
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 整合处理(是一个Map,可以有多个)
output: # 通道的名称
destination: studyExchange # exchange的名称
content-type: application/json #消息类型参数
binder: defaultRabbit # 和binders中定义的名称保持一致
编写一个业务接口:
public interface IMessageService {
public String send(String msg);
}
业务接口实现类:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import cn.linkpower.service.IMessageService;
@EnableBinding(Source.class) //定义消息的推送管道
public class MessageServiceImpl implements IMessageService {
private static Logger log = LoggerFactory.getLogger(MessageServiceImpl.class);
//消息发送管道
@Autowired
private MessageChannel output;
@Override
public String send(String msg) {
output.send(MessageBuilder.withPayload(msg).build());
log.info("**** send msg:{}",msg);
return null;
}
}
写一个测试请求接口:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import cn.linkpower.service.IMessageService;
@RestController
public class TestController {
private static Logger log = LoggerFactory.getLogger(TestController.class);
@Autowired
private IMessageService messageServiceImpl;
@RequestMapping("/test/{msg}")
public String test(@PathVariable("msg") String msg){
log.info("接收到的url数据:{}",String.valueOf(msg));
messageServiceImpl.send(msg);
return msg;
}
}
SpringCloud-Stream-Product 项目结构:
4.3、Stream Consumer的搭建
依赖引入和Product一致。
配置文件中需要注意一个小细节:
spring.cloud.stream.bindings.input
完整配置文件:
server:
port: 8001
spring:
application:
name: spring-cloud-stream-consumer
cloud:
stream:
binders: # 配置绑定的rabbitmq的服务信息(是一个Map,可以有多个)
defaultRabbit: # 标识定义名称,只是一个别名
type: rabbit # 组件类型
environment: # 配置rabbitmq的环境参数
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 整合处理(是一个Map,可以有多个)
input: # 通道的名称(此时为消费端,所以需要变更为 input)
destination: studyExchange # exchange的名称
content-type: application/json #消息类型参数
binder: defaultRabbit # 和binders中定义的名称保持一致
创建一个消费消息的类,用于消费rabbitmq队列中的消息:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
/**
* 监听消息队列中的消息,并进行消息的打印操作
* @author 765199214
*
*/
@Component
@EnableBinding(Sink.class)
public class ReceviverMessageListener {
private static Logger log = LoggerFactory.getLogger(ReceviverMessageListener.class);
@Value("${server.port}")
private String port;
@StreamListener(Sink.INPUT)
public void input(Message<String> message){
log.info("接受消息:{},该服务端口信息为:{}",message.getPayload(),port);
}
}
五、测试
分别启动 Product 和 Consumer 项目,查看 rabbitmq的web界面:
请求接口:
http://localhost:8000/test/7777777
Product端:
Consumer端:
六、消息重复消费
修改Consumer的端口号,更改为 8002,启动Consumer-8002 项目,发送消息,查看打印日志:
Porodut-8000:
Consumer:
发现:
生产一条消息,两个消费者都拿到了消息;
正常的分布式架构,是需要保证消息不重复消费的!
[问:]为什么会出现这种原因?
问题根源在于,默认分组是不同的,每个微服务的组名称不一样,被认为不同组可以消费,导致相同的消息分别全部发送给不同组中进行消费-----重复消费!
[问:]如何解决这种问题?
再配置文件中,新增分组选项,保证相同操作的微服务为一个组就行,让这个组中的任何一个微服务处理数据都是竞争关系。
spring.cloud.stream.bindings.input.group
再次启动 8001和8002端口的服务,测试已知问题。
访问地址:http://localhost:8000/test/66666
Product:
Consumer-8001:
Consumer-8002:
完美解决!
六、消息持久化
当只启动Product项目时,访问接口:
http://localhost:8000/test/7777777
查看rabbitmq管理界面,可以得知消息发送至rabbitmq中,但并未做持久化处理!
在只有Product服务生产消息,
消息发送至rabbitmq中后,就会消失;
即使重新启动消费者服务,依旧不会拿到rabbitmq中的消息,导致消息的丢失!
[问:]如何解决这种问题?
依旧是再配置文件中新增配置项:
spring.cloud.stream.bindings.input.group
再次只启动 Product 微服务,生产消息至消息队列中,访问:
http://localhost:8000/test/66666
查看rabbitmq管理界面:
然后再启动消费者服务,此时的消费者服务具有 Group 分组:
添加了 spring.cloud.stream.bindings.input.group 属性,能够防止消息的重复消费和保证消息的不丢失!