在实际开发过程中,服务与服务之间通信经常会使用到消息中间件,而以往使用了哪个中间件比如RabbitMQ,那么该中间件和系统的耦合性就会非常高,如果我们要替换为Kafka那么变动会比较大,这时我们可以使用SpringCloudStream来整合我们的消息中间件,来降低系统和中间件的耦合性。
一、什么是SpringCloudStream
官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。
应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中binder 交互,通过我们配置来 binding ,而 Spring Cloud Stream 的 binder 负责与消息中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。
通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。目前仅支持RabbitMQ、Kafka。
二、Stream 解决了什么问题?
Stream解决了开发人员无感知的使用消息中间件的问题,因为Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程
官网结构图
组成 | 说明 |
---|---|
Middleware | 中间件,目前只支持RabbitMQ和Kafka |
Binder | Binder是应用与消息中间件之间的封装,目前实行了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现 |
@Input | 注解标识输入通道,通过该输入通道接收到的消息进入应用程序 |
@Output | 注解标识输出通道,发布的消息将通过该通道离开应用程序 |
@StreamListener | 监听队列,用于消费者的队列的消息接收 |
@EnableBinding | 指信道channel和exchange绑定在一起 |
三、消息驱动入门案例
我们通过一个入门案例来演示下通过stream来整合RabbitMQ来实现消息的异步通信的效果,所以首先要开启RabbitMQ服务,RabbitMQ不清楚的请参考此文:https://dpb-bobokaoya-sm.blog.csdn.net/article/details/90409404
1.创建消息发送者服务
1.1 创建项目
创建一个SpringCloud项目
1.2 pom文件
pom文件中重点是要添加spring-cloud-starter-stream-rabbit这个依赖
<span style="color:#000000"><code><span style="color:#0000ff"><<span style="color:#0000ff">project</span> <span style="color:#ff0000">xmlns</span>=<span style="color:#a31515">"http://maven.apache.org/POM/4.0.0"</span> <span style="color:#ff0000">xmlns:xsi</span>=<span style="color:#a31515">"http://www.w3.org/2001/XMLSchema-instance"</span>
<span style="color:#ff0000">xsi:schemaLocation</span>=<span style="color:#a31515">"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">modelVersion</span>></span>4.0.0<span style="color:#0000ff"></<span style="color:#0000ff">modelVersion</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">parent</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">groupId</span>></span>org.springframework.boot<span style="color:#0000ff"></<span style="color:#0000ff">groupId</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">artifactId</span>></span>spring-boot-starter-parent<span style="color:#0000ff"></<span style="color:#0000ff">artifactId</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">version</span>></span>1.5.13.RELEASE<span style="color:#0000ff"></<span style="color:#0000ff">version</span>></span>
<span style="color:#0000ff"></<span style="color:#0000ff">parent</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">groupId</span>></span>com.bobo<span style="color:#0000ff"></<span style="color:#0000ff">groupId</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">artifactId</span>></span>stream-sender<span style="color:#0000ff"></<span style="color:#0000ff">artifactId</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">version</span>></span>0.0.1-SNAPSHOT<span style="color:#0000ff"></<span style="color:#0000ff">version</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">dependencyManagement</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">dependencies</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">dependency</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">groupId</span>></span>org.springframework.cloud<span style="color:#0000ff"></<span style="color:#0000ff">groupId</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">artifactId</span>></span>spring-cloud-dependencies<span style="color:#0000ff"></<span style="color:#0000ff">artifactId</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">version</span>></span>Dalston.SR5<span style="color:#0000ff"></<span style="color:#0000ff">version</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">type</span>></span>pom<span style="color:#0000ff"></<span style="color:#0000ff">type</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">scope</span>></span>import<span style="color:#0000ff"></<span style="color:#0000ff">scope</span>></span>
<span style="color:#0000ff"></<span style="color:#0000ff">dependency</span>></span>
<span style="color:#0000ff"></<span style="color:#0000ff">dependencies</span>></span>
<span style="color:#0000ff"></<span style="color:#0000ff">dependencyManagement</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">dependencies</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">dependency</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">groupId</span>></span>org.springframework.boot<span style="color:#0000ff"></<span style="color:#0000ff">groupId</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">artifactId</span>></span>spring-boot-starter-web<span style="color:#0000ff"></<span style="color:#0000ff">artifactId</span>></span>
<span style="color:#0000ff"></<span style="color:#0000ff">dependency</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">dependency</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">groupId</span>></span>org.springframework.cloud<span style="color:#0000ff"></<span style="color:#0000ff">groupId</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">artifactId</span>></span>spring-cloud-starter-eureka<span style="color:#0000ff"></<span style="color:#0000ff">artifactId</span>></span>
<span style="color:#0000ff"></<span style="color:#0000ff">dependency</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">dependency</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">groupId</span>></span>org.springframework.cloud<span style="color:#0000ff"></<span style="color:#0000ff">groupId</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">artifactId</span>></span>spring-cloud-starter-stream-rabbit<span style="color:#0000ff"></<span style="color:#0000ff">artifactId</span>></span>
<span style="color:#0000ff"></<span style="color:#0000ff">dependency</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">dependency</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">groupId</span>></span>org.springframework.boot<span style="color:#0000ff"></<span style="color:#0000ff">groupId</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">artifactId</span>></span>spring-boot-starter-test<span style="color:#0000ff"></<span style="color:#0000ff">artifactId</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">scope</span>></span>test<span style="color:#0000ff"></<span style="color:#0000ff">scope</span>></span>
<span style="color:#0000ff"></<span style="color:#0000ff">dependency</span>></span>
<span style="color:#0000ff"></<span style="color:#0000ff">dependencies</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">build</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">plugins</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">plugin</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">groupId</span>></span>org.springframework.boot<span style="color:#0000ff"></<span style="color:#0000ff">groupId</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">artifactId</span>></span>spring-boot-maven-plugin<span style="color:#0000ff"></<span style="color:#0000ff">artifactId</span>></span>
<span style="color:#0000ff"></<span style="color:#0000ff">plugin</span>></span>
<span style="color:#0000ff"></<span style="color:#0000ff">plugins</span>></span>
<span style="color:#0000ff"></<span style="color:#0000ff">build</span>></span>
<span style="color:#0000ff"></<span style="color:#0000ff">project</span>></span></code></span>
1.3 配置文件
配置文件中除了必要的服务名称,端口和Eureka的信息外我们还要添加RabbitMQ的注册信息
<span style="color:#000000"><code>spring.application.name=stream-sender
server.port=9060
#设置服务注册中心地址,指向另一个注册中心
eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/
#rebbitmq 链接信息
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/</code></span>
1.4 创建消费发送者接口
创建一个发送消息的接口。具体如下:方法名称自定义,返回类型必须是SubscribableChannel,在Output注解中指定交换器名称。
<span style="color:#000000"><code><span style="color:#008000">/**
* 发送消息的接口
* <span style="color:#808080">@author</span> dengp
*
*/</span>
<span style="color:#0000ff">public</span> <span style="color:#0000ff">interface</span> <span style="color:#a31515">ISendeService</span> {
<span style="color:#008000">/**
* 指定输出的交换器名称
* <span style="color:#808080">@return</span>
*/</span>
<span style="color:#2b91af">@Output</span>(<span style="color:#a31515">"dpb-exchange"</span>)
SubscribableChannel <span style="color:#a31515">send</span>();
}</code></span>
1.5 启动类
在启动类中通过@EnableBinding注解绑定我们创建的接口类。
<span style="color:#000000"><code><span style="color:#2b91af">@SpringBootApplication</span>
<span style="color:#2b91af">@EnableEurekaClient</span>
<span style="color:#008000">// 绑定我们刚刚创建的发送消息的接口类型</span>
<span style="color:#2b91af">@EnableBinding</span>(value={ISendeService.class})
<span style="color:#0000ff">public</span> <span style="color:#0000ff">class</span> <span style="color:#a31515">StreamSenderStart</span> {
<span style="color:#0000ff">public</span> <span style="color:#0000ff">static</span> <span style="color:#0000ff">void</span> <span style="color:#a31515">main</span>(String[] args) {
SpringApplication.run(StreamSenderStart.class, args);
}
}</code></span>
2.创建消息消费者服务
2.1 创建项目
2.2 pom文件
添加的依赖和发送消息的服务是一致的
<span style="color:#000000"><code><span style="color:#0000ff"><<span style="color:#0000ff">project</span> <span style="color:#ff0000">xmlns</span>=<span style="color:#a31515">"http://maven.apache.org/POM/4.0.0"</span> <span style="color:#ff0000">xmlns:xsi</span>=<span style="color:#a31515">"http://www.w3.org/2001/XMLSchema-instance"</span>
<span style="color:#ff0000">xsi:schemaLocation</span>=<span style="color:#a31515">"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">modelVersion</span>></span>4.0.0<span style="color:#0000ff"></<span style="color:#0000ff">modelVersion</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">parent</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">groupId</span>></span>org.springframework.boot<span style="color:#0000ff"></<span style="color:#0000ff">groupId</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">artifactId</span>></span>spring-boot-starter-parent<span style="color:#0000ff"></<span style="color:#0000ff">artifactId</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">version</span>></span>1.5.13.RELEASE<span style="color:#0000ff"></<span style="color:#0000ff">version</span>></span>
<span style="color:#0000ff"></<span style="color:#0000ff">parent</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">groupId</span>></span>com.bobo<span style="color:#0000ff"></<span style="color:#0000ff">groupId</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">artifactId</span>></span>stream-receiver<span style="color:#0000ff"></<span style="color:#0000ff">artifactId</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">version</span>></span>0.0.1-SNAPSHOT<span style="color:#0000ff"></<span style="color:#0000ff">version</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">dependencyManagement</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">dependencies</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">dependency</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">groupId</span>></span>org.springframework.cloud<span style="color:#0000ff"></<span style="color:#0000ff">groupId</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">artifactId</span>></span>spring-cloud-dependencies<span style="color:#0000ff"></<span style="color:#0000ff">artifactId</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">version</span>></span>Dalston.SR5<span style="color:#0000ff"></<span style="color:#0000ff">version</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">type</span>></span>pom<span style="color:#0000ff"></<span style="color:#0000ff">type</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">scope</span>></span>import<span style="color:#0000ff"></<span style="color:#0000ff">scope</span>></span>
<span style="color:#0000ff"></<span style="color:#0000ff">dependency</span>></span>
<span style="color:#0000ff"></<span style="color:#0000ff">dependencies</span>></span>
<span style="color:#0000ff"></<span style="color:#0000ff">dependencyManagement</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">dependencies</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">dependency</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">groupId</span>></span>org.springframework.boot<span style="color:#0000ff"></<span style="color:#0000ff">groupId</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">artifactId</span>></span>spring-boot-starter-web<span style="color:#0000ff"></<span style="color:#0000ff">artifactId</span>></span>
<span style="color:#0000ff"></<span style="color:#0000ff">dependency</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">dependency</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">groupId</span>></span>org.springframework.cloud<span style="color:#0000ff"></<span style="color:#0000ff">groupId</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">artifactId</span>></span>spring-cloud-starter-eureka<span style="color:#0000ff"></<span style="color:#0000ff">artifactId</span>></span>
<span style="color:#0000ff"></<span style="color:#0000ff">dependency</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">dependency</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">groupId</span>></span>org.springframework.cloud<span style="color:#0000ff"></<span style="color:#0000ff">groupId</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">artifactId</span>></span>spring-cloud-starter-stream-rabbit<span style="color:#0000ff"></<span style="color:#0000ff">artifactId</span>></span>
<span style="color:#0000ff"></<span style="color:#0000ff">dependency</span>></span>
<span style="color:#0000ff"></<span style="color:#0000ff">dependencies</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">build</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">plugins</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">plugin</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">groupId</span>></span>org.springframework.boot<span style="color:#0000ff"></<span style="color:#0000ff">groupId</span>></span>
<span style="color:#0000ff"><<span style="color:#0000ff">artifactId</span>></span>spring-boot-maven-plugin<span style="color:#0000ff"></<span style="color:#0000ff">artifactId</span>></span>
<span style="color:#0000ff"></<span style="color:#0000ff">plugin</span>></span>
<span style="color:#0000ff"></<span style="color:#0000ff">plugins</span>></span>
<span style="color:#0000ff"></<span style="color:#0000ff">build</span>></span>
<span style="color:#0000ff"></<span style="color:#0000ff">project</span>></span></code></span>
2.3 配置文件
注意修改服务名称和端口
<span style="color:#000000"><code>spring.application.name=stream-receiver
server.port=9061
#设置服务注册中心地址,指向另一个注册中心
eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/
#rebbitmq 链接信息
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/</code></span>
2.4 创建接收消息的接口
此接口和发送消息的接口相似,注意使用的是@Input注解。
<span style="color:#000000"><code><span style="color:#008000">/**
* 接收消息的接口
* <span style="color:#808080">@author</span> dengp
*
*/</span>
<span style="color:#0000ff">public</span> <span style="color:#0000ff">interface</span> <span style="color:#a31515">IReceiverService</span> {
<span style="color:#008000">/**
* 指定接收的交换器名称
* <span style="color:#808080">@return</span>
*/</span>
<span style="color:#2b91af">@Input</span>(<span style="color:#a31515">"dpb-exchange"</span>)
SubscribableChannel <span style="color:#a31515">receiver</span>();
}
</code></span>
2.5 创建处理消息的处理类
注意此类并不是实现上面创建的接口。而是通过@EnableBinding来绑定我们创建的接口,同时通过@StreamListener注解来监听dpb-exchange对应的消息服务
<span style="color:#000000"><code><span style="color:#008000">/**
* 具体接收消息的处理类
* <span style="color:#808080">@author</span> dengp
*
*/</span>
<span style="color:#2b91af">@Service</span>
<span style="color:#2b91af">@EnableBinding</span>(IReceiverService.class)
<span style="color:#0000ff">public</span> <span style="color:#0000ff">class</span> <span style="color:#a31515">ReceiverService</span> {
<span style="color:#2b91af">@StreamListener</span>(<span style="color:#a31515">"dpb-exchange"</span>)
<span style="color:#0000ff">public</span> <span style="color:#0000ff">void</span> <span style="color:#a31515">onReceiver</span>(<span style="color:#0000ff">byte</span>[] msg){
System.out.println(<span style="color:#a31515">"消费者:"</span>+<span style="color:#0000ff">new</span> String(msg));
}
}</code></span>
2.6 启动类
同样要添加@EnableBinding注解
<span style="color:#000000"><code><span style="color:#2b91af">@SpringBootApplication</span>
<span style="color:#2b91af">@EnableEurekaClient</span>
<span style="color:#2b91af">@EnableBinding</span>(value={IReceiverService.class})
<span style="color:#0000ff">public</span> <span style="color:#0000ff">class</span> <span style="color:#a31515">StreamReceiverStart</span> {
<span style="color:#0000ff">public</span> <span style="color:#0000ff">static</span> <span style="color:#0000ff">void</span> <span style="color:#a31515">main</span>(String[] args) {
SpringApplication.run(StreamReceiverStart.class, args);
}
}</code></span>
3.编写测试代码
通过单元测试来测试服务。
<span style="color:#000000"><code><span style="color:#0000ff">import</span> org.junit.Test;
<span style="color:#0000ff">import</span> org.junit.runner.RunWith;
<span style="color:#0000ff">import</span> org.springframework.beans.factory.annotation.Autowired;
<span style="color:#0000ff">import</span> org.springframework.boot.test.context.SpringBootTest;
<span style="color:#0000ff">import</span> org.springframework.messaging.Message;
<span style="color:#0000ff">import</span> org.springframework.messaging.support.MessageBuilder;
<span style="color:#0000ff">import</span> org.springframework.test.context.junit4.SpringRunner;
<span style="color:#0000ff">import</span> com.bobo.stream.StreamSenderStart;
<span style="color:#0000ff">import</span> com.bobo.stream.sender.ISendeService;
<span style="color:#2b91af">@RunWith</span>(SpringRunner.class)
<span style="color:#2b91af">@SpringBootTest</span>(classes=StreamSenderStart.class)
<span style="color:#0000ff">public</span> <span style="color:#0000ff">class</span> <span style="color:#a31515">StreamTest</span> {
<span style="color:#2b91af">@Autowired</span>
<span style="color:#0000ff">private</span> ISendeService sendService;
<span style="color:#2b91af">@Test</span>
<span style="color:#0000ff">public</span> <span style="color:#0000ff">void</span> <span style="color:#a31515">testStream</span>(){
String msg = <span style="color:#a31515">"hello stream ..."</span>;
<span style="color:#008000">// 将需要发送的消息封装为Message对象</span>
Message message = MessageBuilder
.withPayload(msg.getBytes())
.build();
sendService.send().send(message );
}
}</code></span>
启动消息消费者后,执行测试代码。结果如下:
消息接收者获取到了发送者发送的消息,同时我们在RabbitMQ的web界面也可以看到相关的信息
总结
我们同stream实现了消息中间件的使用,我们发现只有在两处地址和RabbitMQ有耦合,第一处是pom文件中的依赖,第二处是application.properties中的RabbitMQ的配置信息,而在具体的业务处理中并没有出现任何RabbitMQ相关的代码,这时如果我们要替换为Kafka的话我们只需要将这两处换掉即可,即实现了中间件和服务的高度解耦。