1、引入依赖
1 <dependency> 2 <groupId>org.springframework.boot</groupId> 3 <artifactId>spring-boot-starter-amqp</artifactId> 4 </dependency>
2、配置文件配置
spring: rabbitmq: host: 10.3.98.152 port: 5672 username: rabbitmq password: rabbitmq publisher-confirms: true publisher-returns: true virtual-host: / listener: simple: concurrency: 1 max-concurrency: 1 retry: enabled: true prefetchCount: 10 concurrentConsumers: 10
3、配置队列来接收消息
@Configuration public class RabbitConfig { @Bean public Queue Queue() { return new Queue("hello"); } }
4、定义消息发送者
public class HelloSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String context = "hello " + new Date(); System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("hello", context); } }
5、定义消息消费者
@Component @RabbitListener(queues = "hello") public class HelloReceiver { @RabbitHandler public void process(String hello) { System.out.println("Receiver : " + hello); } }
二、动态绑定队列,并且实现消息的发送
在SpringCloud的环境下,每个客户端的队列都是不同的,此时需要动态生成队列,并且向rabbitMQ服务器注册和绑定,后台同时也需要监听该队列的接收
1、动态生成队列并且向rabbitMQ服务器注册和绑定
/** *由于使用到rabbitMQ的连接来操作channel绑定队列,所有这里需要手动获取ConnectionFactory */ @Bean public ConnectionFactory connectionFactory() { connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses(host + ":" + port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); connectionFactory.setConnectionTimeout(10000); /* 如果要进行消息回调,则这里必须要设置为true */ connectionFactory.setPublisherConfirms(publisherConfirms); connectionFactory.setPublisherReturns(true); return connectionFactory; } /** * 动态创建queue,命名为:企业SCEOID.queue1【xx.queue1】,并返回数组queue名称(为了以后需要注册多个队列进行扩展) * * @return 对列名的数组 * @throws AmqpException * @throws IOException */ @Bean public String mqMsgQueues() throws AmqpException, IOException { //获取我方的SCEOID Result enterpriseResult = idService.getEnterprise(); String intcEnte = ""; if (Result.RECODE_SUCCESS.equals(enterpriseResult.getRetCode())) { Map<String, Object> map = (Map<String, Object>) enterpriseResult.getData(); intcEnte = (String) map.get("ente_idcode"); } logger.info("从ID系统查询得到的SCEOID为:" + intcEnte); logger.info("注册的intcEnte:" + intcEnte); String queueName = String.format("%s.queue%d", intcEnte, 1); connectionFactory.createConnection().createChannel(false).queueDeclare(queueName, true, false, false, null); connectionFactory.createConnection().createChannel(false).queueBind(queueName, exchangeName, queueName); return queueName; }
2、监听队列
/** * 监听队列 * * @return * @throws AmqpException * @throws IOException */ @Bean public SimpleMessageListenerContainer messageContainer() throws AmqpException, IOException { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueueNames(mqMsgQueues()); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); //设置确认模式手工确认 container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //监听处理类 container.setMessageListener(handleService()); return container; }
3、初始化处理类
/** * 获取hanleService实例 * * @return */ @Bean @Scope("prototype") public HandleService handleService() { return new HandleService(); }
4、监听处理
@Service public class HandleService implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { byte[] body = message.getBody(); //消息消費成功(这里必须消费成功,如果因为业务的原因没有将消息消费掉,很容易造成消息阻塞) channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // do something } }