第二篇我们简单介绍下RabbitMQ与与SpringMVC整合:
由于部分项目还未切换SpringBoot,所以这里介绍下RabbitMQ与与SpringMVC的整合。
1、消息生产者编写
(1)pom文件引入RabbitMQ相关依赖
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>${rabbitmq.version}</version>
</dependency>
(2)编写RabbitMQ配置类
//连接rabbitMQ的基本配置
@Configuration
@EnableRabbit
public class RabbitConfig {
@Bean
public ConnectionFactory connectionFactory() {
// 读取rabbitmq配置,此处请自行配置,一般配置于配置文件
String hostname = PropertiesUtils.GetPropertiesByClassPath("global.properties").getProperty("rabbit.hostname");
String username = PropertiesUtils.GetPropertiesByClassPath("global.properties").getProperty("rabbit.username");
String password = PropertiesUtils.GetPropertiesByClassPath("global.properties").getProperty("rabbit.password");
int port = Integer.valueOf(PropertiesUtils.GetPropertiesByClassPath("global.properties").getProperty("rabbit.port"));;
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(hostname);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setPort(port);
return connectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
//配置消费者监听的容器
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
return factory;
}
@Bean
public Queue Queue() {
// System.out.println("RabbitConfig");
return new Queue("orderInfoQueue");
}
}
(3)编写生产者消费者模式配置类
//生产者消费者模式的配置,包括一个队列和两个对应的消费者
@Configuration
public class ProducerConsumerConfig {
@Bean
public Queue myQueue() {
Queue queue=new Queue("myqueue");
return queue;
}
}
(4)编写消息发送类
@Service
public class ProducerImpl implements Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void sendMessage(String queue, Message message) {
if (queue.equals("error")) {
throw new RuntimeException("error");
}
rabbitTemplate.setQueue(queue);
rabbitTemplate.convertAndSend(queue,message);
}
2、消息消费者编写
(1)同样编写生产者(1)引入依赖(2)编写配置(3)模式配置
(2)编写消息接收类
public class Receiver {
private final static String QUEUE_NAME = "MyQueue";
public static void main(String[] args) {
receive();
}
public static void receive()
{
ConnectionFactory factory = null;
Connection connection = null;
Channel channel = null;
try {
factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Consumer consumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
System.out.println("11111111111");
String message = new String(body, "UTF-8");
System.out.println("收到消息....."+message);
}};
channel.basicConsume(QUEUE_NAME, true,consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally{
//关闭资源
if (channel!=null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if (connection!=null){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
3、测试下消息接收
至此,RabbitMQ与SpringMVC整合也已经完成,希望童鞋们动手实践下。