Spring-Rabbit
导入依赖
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> </dependency>
rabbmitmq配置文件
rabbitmq.host=127.0.0.1 rabbitmq.port=5672 rabbitmq.username=mytest rabbitmq.passwd=mytest rabbitmq.vhost=/mytest
生产者 spring-rabbitmq配制文件
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd"> <!-- 连接工厂 --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.passwd}" virtual-host="${rabbitmq.vhost}" /> <!-- MQ的管理,包括队列、交换器等 --> <rabbit:admin connection-factory="connectionFactory" /> <!-- durable:持久化 。队列或交换机是否要存储到磁盘也可以存储到内容--> <!-- 定义交换机 --> <rabbit:topic-exchange name="KAOLA-EXCHANGE-ITEM" auto-declare="true" durable="true" > </rabbit:topic-exchange> <!-- 定义模板 --> <rabbit:template id="template" connection-factory="connectionFactory" exchange="KAOLA-EXCHANGE-ITEM" > </rabbit:template> </beans>
发送消息内容到交换机
import com.fasterxml.jackson.databind.ObjectMapper; @Autowired private RabbitTemplate rabbitTemplate; private static final ObjectMapper MAPPER = new ObjectMapper(); private void sendMsg(Long itemId,String type){ try{ Map<String, Object> map = new HashMap<String, Object>(); map.put("itemId", itemId); map.put("type", type); this.rabbitTemplate.convertAndSend("item."+type,MAPPER.writeValueAsString(map)); }catch(Exception e){ e.printStackTrace(); } } 方法调用: //发送消息到交换机,通知其它系统该商品已经更新 sendMsg(item.getId(),"update"); //发送消息到交换机,通知其它系统该商品已经新增 sendMsg(item.getId(),"insert"); //发送消息到交换机,通知其它系统该商品已经删除 sendMsg(item.getId(),"delete");
消费者接收消息:
导入依赖
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> </dependency>
rabbmitmq配置文件
rabbitmq.host=127.0.0.1 rabbitmq.port=5672 rabbitmq.username=mytest rabbitmq.passwd=mytest rabbitmq.vhost=/mytest
消费者 spring-rabbitmq配制文件
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd"> <!-- 连接工厂 --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.passwd}" virtual-host="${rabbitmq.vhost}" /> <!-- MQ的管理,包括队列、交换器等 --> <rabbit:admin connection-factory="connectionFactory" /> <!-- 定义队列 --> <rabbit:queue name="KAOLA-WEB-ITEM" auto-declare="true" durable="true" /> <bean id="itemHandler" class="com.kaola.portal.mq.handler.ItemHandler" /> <!-- 定义监听 --> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="itemHandler" method="execute" queue-names="KAOLA-WEB-ITEM"/> </rabbit:listener-container> </beans>
具体处理逻辑:
import org.apache.commons.lang3.StringUtils; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; public class ItemHandler { private static final ObjectMapper MAPPER = new ObjectMapper(); public void execute(String msg){ try { JsonNode jsonNode = MAPPER.readTree(msg); Long itemId = jsonNode.get("itemId").asLong(); String type = jsonNode.get("type").asText(); if(StringUtils.equals(type, "update")||StringUtils.equals("insert", type)){ //TODO 根据itemId操作redis中的数据 }else if(StringUtils.equals(type, "delete")){ //TODO 根据itemId操作redis中的数据 } } catch (Exception e) { } } }
队列和交换机的绑定关系
实现:
1、 在配置文件中将队列和交换机完成绑定
<!-- 定义队列,自动声明 --> <rabbit:queue name="KAOLA-WEB-ITEM" auto-declare="true"></rabbit:queue> <!-- 定义队列、交换机、以及完成队列和交换机的绑定 --> <rabbit:topic-exchange name="KAOLA-EXCHANGE-ITEM" auto-declare="true" durable="true"> <rabbit:bindings> <!--<rabbit:binding queue="KAOLA-WEB-ITEM" pattern="item.*" />--> <rabbit:binding queue="KAOLA-WEB-ITEM" pattern="item.update" /> <rabbit:binding queue="KAOLA-WEB-ITEM" pattern="item.delete" /> </rabbit:bindings> </rabbit:topic-exchange>
2、 可以在管理界面中完成绑定
2.1)绑定关系如果发生变化,需要修改配置文件,并且服务需要重启
2.2)管理更加灵活
2.3)更容易对绑定关系的权限管理,流程管理
在界面管理工具中完成绑定关系
从 从交换机KAOLA-EXCHANGE-ITEM中完成绑定,如下图