消息生产者和消费者
import com.rabbitmq.client.Channel;
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import java.io.IOException; import java.util.Date; import java.util.UUID; /** * Created by yangliu on 2018/4/8. */ @Controller @RequestMapping("/rabbitMq") public class TestController { private Logger logger= LoggerFactory.getLogger(TestController.class); @Autowired RabbitAdmin rabbitAdmin; @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.getRabbitTemplate().setConfirmCallback(new MsgSendConfirmCallBack()); rabbitAdmin.getRabbitTemplate().setReturnCallback(new MsgSendReturnCallback()); return rabbitAdmin; } @RequestMapping("/sendMq") @ResponseBody public String send(String name) throws Exception { String context = "hello "+name+" --" + new Date(); String sendStr; for(int i=1;i<=100;i++){ sendStr="第["+i+"]个 hello --" + new Date(); logger.debug("HelloSender: " + sendStr); sendMessage("myqueue",sendStr); //Thread.sleep(1000); } return context; } /** * 方式一:动态声明exchange和queue它们的绑定关系 rabbitAdmin * @param exchangeName * @param queueName */ protected void declareBinding(String exchangeName, String queueName) { if (rabbitAdmin.getQueueProperties(queueName) == null) { /* queue 队列声明 durable=true,交换机持久化,rabbitmq服务重启交换机依然存在,保证不丢失; durable=false,相反 auto-delete=true:无消费者时,队列自动删除; auto-delete=false:无消费者时,队列不会自动删除 排他性,exclusive=true:首次申明的connection连接下可见; exclusive=false:所有connection连接下*/ Queue queue = new Queue(queueName, true, false, false, null); rabbitAdmin.declareQueue(queue); TopicExchange directExchange = new TopicExchange(exchangeName); rabbitAdmin.declareExchange(directExchange);//声明exchange Binding binding = BindingBuilder.bind(queue).to(directExchange).with(queueName); //将queue绑定到exchange rabbitAdmin.declareBinding(binding); //声明绑定关系 rabbitAdmin.getRabbitTemplate().setMandatory(true); rabbitAdmin.getRabbitTemplate().setConfirmCallback(new MsgSendConfirmCallBack());//消息确认 rabbitAdmin.getRabbitTemplate().setReturnCallback(new MsgSendReturnCallback());//确认后回调 } else { rabbitAdmin.getRabbitTemplate().setQueue(queueName); rabbitAdmin.getRabbitTemplate().setExchange(queueName); rabbitAdmin.getRabbitTemplate().setRoutingKey(queueName); } } /** * 发送消息 * @param queueName * @param message * @throws Exception */ public void sendMessage(String queueName, String message) throws Exception { declareBinding(queueName, queueName); CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); rabbitAdmin.getRabbitTemplate().convertAndSend(queueName, queueName, message,correlationId); logger.debug("[rabbitmq-sendMessage]queueName:{} ,uuid:{},msg:{}",queueName,correlationId.getId(),message); } /** * 消费者 * @param connectionFactory * @return */ @Bean public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); Queue queue = new Queue("myqueue", true, false, false, null); container.setQueues(queue); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认 container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { byte[] body = message.getBody(); try { //业务逻辑 logger.info("消费 receive msg : " + new String(body)); // 消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息 //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //手动确认确认消息成功消费 } catch (Exception e) { logger.info("消费失败: " + new String(body)); // ack返回false,并重新回到队列,api里面解释得很清楚 try { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } catch (IOException e1) { e1.printStackTrace(); } } } }); return container; } /* //消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //ack返回false,并重新回到队列,api里面解释得很清楚 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); //拒绝消息 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); 如果消息没有到exchange,则confirm回调,ack=false 如果消息到达exchange,则confirm回调,ack=true exchange到queue成功,则不回调return exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了) */ }
失败后return回调:
import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; public class MsgSendReturnCallback implements RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("确认后回调return--message:"+new String(message.getBody())+",replyCode:"+replyCode+",replyText:" +replyText+",exchange:"+exchange+",routingKey:"+routingKey); } }
确认后回调:
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { System.out.println("消息确认成功cause"+cause); } else { //处理丢失的消息 System.out.println("消息确认失败:"+correlationData.getId()+"#cause"+cause); } } }