docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
或者不设置密码
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 rabbitmq:management
使用 docker inspect rabbit查看容器的ip地址,
这个留着待会儿用spring boot连接的时候进行配置。
打开浏览器 localhost:15672 来到rabbitmq的web管理界面,刚刚我们的rabbit容器是以默认的方式打开的,所以账号密码都是 guest。
登录进去之后就设计队列和交换机(类比计算机网络)
我设计了两个队列,分别是C.Queue 用来接收spider服务传过来给web端的,spider_queue spider组件来接收web传过来的项目。
编号 | Exchange | RoutingKey | Queue | 描述 |
---|---|---|---|---|
1 | C.Exchange | C.* | C.Queue | web服务端监听,消息来自于spider服务 |
2 | topicExchange | spider.* | spider_queue | spider服务监听,消息来自于web服务端 |
前缀最好设置成一样的,这样一旦队列多了,方便维护,以免眼花
建好之后:
然后用spring boot来调用
在pom.xml加入:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.0.0.RELEASE</version>
</dependency>
并在配置文件中加入配置:
这是web服务的配置
spring.rabbitmq.host=172.17.0.3
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
com.shengqian.demo.rabbitmq.ToSpiderQueueName=C.Queue
com.shengqian.demo.rabbitmq.exchangename=C.Exchange
com.shengqian.demo.rabbitmq.routekey=C.*
com.shengqian.demo.rabbitmq.recv=recvMessage
下面来看看spider服务的配置
spring.rabbitmq.host=172.17.0.3
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
com.shengqian.demo.rabbitmq.ToSpiderQueueName=spider_queue
com.shengqian.demo.rabbitmq.exchangename=topicExchange
com.shengqian.demo.rabbitmq.routekey=spider.*
com.shengqian.demo.rabbitmq.recv=recvMessage
实际上没什么区别,唯一的区别就是监听的队列参数不一样,详细的都在那个表里面了。
这里的host就是刚刚我们查看的docker 容器的ip。之所以能这么做是因为我在我的路由表中加了一条路由,使得本机发出的172.17.0.0/16的ip能丢到docker的默认network中,有关详情请参见我的 用springboot连接redis集群一文,里面有较为详细的操作方法。
然后加入rabbitmq配置:
@Configuration
public class RabbitMQConfig {
@Autowired
private QueueService queueService;
@Value("${com.shengqian.demo.rabbitmq.ToSpiderQueueName}")
private String queueName;
@Value("${com.shengqian.demo.rabbitmq.exchangename}")
private String exchangeName;
@Value("${com.shengqian.demo.rabbitmq.routekey}")
private String routeKey;
@Value("${com.shengqian.demo.rabbitmq.recv}")
private String messageHandler;
@Bean
Queue getQueue() {
return new Queue(queueName);
}
@Bean
TopicExchange getTopicExchange() {
return new TopicExchange(exchangeName);
}
@Bean
Binding getBinding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(routeKey);
}
@Bean
MessageListenerAdapter listenerAdapter(AmqpMessageService receiver) {
return new MessageListenerAdapter(receiver, messageHandler);
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(1);
container.setPrefetchCount(1);
container.setExposeListenerChannel(true);
return container;
}
}
现在就可以使用了,实现一个简单的两个服务之间用消息队列进行对话的demo:
在web端
public interface AmqpMessageService {
/**
* 接收消息队列消息
*
* @param message 消息内容
*/
void recvMessage(String message);
/**
* 发送消息到消息队列
*
* @param message 消息内容
*/
void sendMessage(String routingKey, String message);
}
public interface QueueService extends AmqpMessageService {
void receive(String message);
}
@Service
public class QueueServiceImpl implements QueueService, InitializingBean {
@Autowired
public RabbitTemplate rabbitTemplate;
private final static Logger LOGGER = LoggerFactory.getLogger(QueueServiceImpl.class);
public void receive(String message) {
LOGGER.info("接收到来自C.Queue队列的消息:" + message);
}
@Override
public void afterPropertiesSet() throws Exception {
//执行一些初始化操作,如程序重新启动,需要加载扫描参数
}
/**
* 接收消息队列消息
*
* @param message 消息内容
*/
@Override
public void recvMessage(String message) {
receive(message);
}
/**
* 发送消息到消息队列
*
* @param message 消息内容
*/
@Override
public void sendMessage(String routingKey,String message) {
rabbitTemplate.convertAndSend("topicExchange", routingKey, JSON.toString(message));
}
}
//最后定义一个controller
@Controller
public class TestController {
@Autowired
private QueueService queueService;
@ApiOperation(value = "测试rabbit", notes = "测试rabbitmq")
@RequestMapping(value = Path.TEST_RABBITMQ, method = RequestMethod.GET)
@ResponseBody
public Integer send(){
Map<String, Object> ans = new HashMap<>();
ans.put("hah", "hello spider");
queueService.sendMessage("spider.*", JSON.toString(ans));
return 1;
}
}
然后在spider服务端:
public interface AmqpMessageService {
/**
* 接收消息队列消息
*
* @param message 消息内容
*/
void recvMessage(String message);
/**
* 发送消息到消息队列
*
* @param message 消息内容
*/
void sendMessage(String routingKey, String message);
}
public interface QueueService extends AmqpMessageService {
void receive(String message);
}
@Service
public class QueueServiceImpl implements QueueService, InitializingBean {
@Autowired
public RabbitTemplate rabbitTemplate;
private final static Logger LOGGER = LoggerFactory.getLogger(QueueServiceImpl.class);
public void receive(String message) {
LOGGER.info("接收到来自spider_queue队列的消息:" + message);
//处理一段时间然后,返回
Map<String, Object> ans = new HashMap<String, Object>();
ans.put("hello",message);
sendMessage("C.*",JSON.toString(ans));
}
@Override
public void afterPropertiesSet() throws Exception {
//执行一些初始化操作,如程序重新启动,需要加载扫描参数
}
/**
* 接收消息队列消息
*
* @param message 消息内容
*/
@Override
public void recvMessage(String message) {
receive(message);
}
/**
* 发送消息到消息队列
*
* @param message 消息内容
*/
@Override
public void sendMessage(String routingKey, String message) {
rabbitTemplate.convertAndSend("C.Exchange", routingKey, JSON.toString(message));
}
}
最后分别打开两个spring boot服务即可,我的spider项目也是基于spring boot构建的服务,当订阅了消息队列,有消息来的时候会驱动整个服务继续运行,所以说该服务是消息驱动的服务。
打开两个服务后,我在web服务中构建了swagger2,
直接点开第三个接口运行就可以了,有关如何在springboot环境下集成swagger请看我前面的博客。运行结果如下:
可以正常使用。接下来计划完成spider项目与docker环境下的cockroachDB集群的交互问题,cockroachDB是新一代newSql。