一 项目架构
二 编写生产者
1 添加依赖
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
2 配置文件
server:
port: 8080
spring:
application:
name: spring-msg-producer
#默认配置,可不配置
#rabbitmq:
# host: localhost
# port: 5672
# username: guest
# password: guest
eureka:
client:
serviceUrl:
defaultZone: http://localhost:8761/eureka/
3 新建接口
package org.crazyit.cloud;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.SubscribableChannel;
public interface SendService {
//绑定通道名
@Output("myInput")
SubscribableChannel sendOrder();
}
4 新建启动类
package org.crazyit.cloud;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
@SpringBootApplication
@EnableEurekaClient
@EnableBinding(SendService.class)
public class ProducerApp {
public static void main(String[] args) {
new SpringApplicationBuilder(ProducerApp.class).run(args);
}
}
5 编写控制器,发送消息
package org.crazyit.cloud;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TestController {
@Autowired
private SendService sendService;
@RequestMapping(value = "/send", method = RequestMethod.GET)
public String send() {
Message msg = MessageBuilder.withPayload("Hello World".getBytes()).build();
sendService.sendOrder().send(msg);
return "success";
}
}
6 启动项目
7 测试
三 编写消费者
1 添加依赖
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
2 编写接口
package org.crazyit.cloud;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface ReceiveService {
//绑定通道名
@Input("myInput")
SubscribableChannel myInput();
}
3 新建启动类
package org.crazyit.cloud;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
@SpringBootApplication
@EnableEurekaClient
@EnableBinding(ReceiveService.class)
public class ConsumerApp {
public static void main(String[] args) {
new SpringApplicationBuilder(ConsumerApp.class).web(true).run(args);
}
//监听消息
@StreamListener("myInput")
public void onReceive(byte[] msg) {
System.out.println("消息者1,接收到的消息:" + new String(msg));
}
}
4 启动消费者
四 测试
浏览器输入:http://localhost:8080/send
消费者控制台输出:
消息者1,接收到的消息:Hello World