RabbitMQ(五)用Spring玩一下RabbitMQ
- 五种消息模型,在企业中应用最广泛的就是最后一种:定向匹配topic
- Spring AMQP 是基于 Spring 框架的AMQP消息解决方案,提供模板化的发送和接收消息的抽象层,提供基于消息驱动的 POJO的消息监听等,简化了我们对于RabbitMQ相关程序的开发。
生产端工程
<dependencies>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>
</dependencies>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/tx
https://www.springframework.org/schema/tx/spring-tx.xsd
http://www.springframework.org/schema/rabbit
https://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--配置RabbitMQ连接-->
<rabbit:connection-factory id="connectionFactory" host="IP" port="5672" username="admin" password="admin" virtual-host="/szx"></rabbit:connection-factory>
<!--配置队列-->
<rabbit:queue name="test_spring_queue_1"></rabbit:queue>
<!--配置RabbitMQAdmin,主要用于在Java代码中对队列的管理,用来创建,绑定,删除队列与交换机,发送消息等操作-->
<rabbit:admin connection-factory="connectionFactory"></rabbit:admin>
<!--配置路由,topic类型-->
<rabbit:topic-exchange name="spring_topic_exchange">
<!--绑定队列-->
<rabbit:bindings>
<rabbit:binding pattern="msg.#" queue="test_spring_queue_1"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--配置json转换工具,将消息转换为json-->
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"></bean>
<!--配置RabbitMQ的模板-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="spring_topic_exchange" message-converter="jsonMessageConverter"></rabbit:template>
</beans>
发消息
public class Sender {
public static void main(String[] args) {
//获取spring的配置文件
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml");
//与容器中获取RabbitMQ模板类
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
//发送消息
Map<String,String> map = new HashMap<String, String>();
map.put("name","小星");
map.put("email","[email protected]");
/**
* String routingKey: 指定路由键
* Object object: 发送的数据
*/
rabbitTemplate.convertAndSend("msg.user",map);
context.close();
}
}
消费端工程
依赖与生产者一致
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:rab="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/tx
https://www.springframework.org/schema/tx/spring-tx.xsd
http://www.springframework.org/schema/rabbit
https://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--配置RabbitMQ连接-->
<rabbit:connection-factory id="connectionFactory" host="106.75.245.83" port="5672" username="admin" password="admin" virtual-host="/szx"></rabbit:connection-factory>
<!--配置队列-->
<rabbit:queue name="test_spring_queue_1"></rabbit:queue>
<!--配置RabbitMQAdmin,主要用于在Java代码中对队列的管理,用来创建,绑定,删除队列与交换机,发送消息等操作-->
<rabbit:admin connection-factory="connectionFactory"></rabbit:admin>
<!--配置扫描包-->
<context:component-scan base-package="com.szx.listener"></context:component-scan>
<!--配置监听-->
<rabbit:listener-container connection-factory="connectionFactory">
<!--应用注入到容器中的监听类-->
<rabbit:listener ref="consumerListener" queues="test_spring_queue_1"></rabbit:listener>
</rabbit:listener-container>
</beans>
消费者
- MessageListener接口用于spring容器接收到消息后处理消息
- 如果需要使用自己定义的类型 来实现 处理消息时,必须实现该接口,并重写onMessage()方法
- 当spring容器接收消息后,会自动交由onMessage进行处理
@Component
public class ConsumerListener implements MessageListener {
//jackson提供序列化和反序列化中使用最多的类,用来转换json
private static final ObjectMapper MAPPER = new ObjectMapper();
@Override
public void onMessage(Message message) {
try {
//将message转换为json
JsonNode jsonNode = MAPPER.readTree(message.getBody());
String name = jsonNode.get("name").asText();
String email = jsonNode.get("email").asText();
System.out.println("从对类中获取:[" + name + "]的邮箱是: " + email);
} catch (IOException e) {
e.printStackTrace();
}
}
}
启动项目
public class TestRunner {
public static void main(String[] args) {
//获取spring容器
ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-consumer.xml");
try {
//程序一直运行不停止
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
}