发送确认

package com.xxx.common.rabbitmq.send;

import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;

/**
 * @Description
 * @Author hejunfei
 * @date 2019/9/6
 */
@Configuration
public class CouponsConfirmCallback implements RabbitTemplate.ConfirmCallback {
        @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.out.println("消息唯一标识:"+correlationData);
            System.out.println("确认结果:"+ack);
            System.out.println("失败原因:"+cause);
        }
}
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.HashMap;
import java.util.Map;

/**
 * @Description 
 * @Author hejunfei
 * @date 2019/9/4
 */
@Component
public class CouponsNotifySend {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private CouponsConfirmCallback couponsConfirmCallback;
    @Autowired
    private CouponsReturnCallback couponsReturnCallback;
//    public void sender(String msg) {
//        System.out.println("coupons");
//        rabbitTemplate.convertAndSend("notify.coupons", msg);
//    }

    public void couponsQuery(String msg){
        CouponsEntity couponsEntity = new CouponsEntity();
        couponsEntity.setId(DateUtils.sysDataLong().toString());
        Map<String,Object> map = new HashMap<>();
        map.put("body",msg);
        couponsEntity.setMap(map);
        System.out.println("coupons");
        ByteArrayOutputStream bo = new ByteArrayOutputStream();
        ObjectOutputStream oo = null;
        try {
            oo = new ObjectOutputStream(bo);
            oo.writeObject(couponsEntity);
        } catch (IOException e) {
            e.printStackTrace();
        }
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setMessageId(DateUtils.sysDataLong().toString());
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
        messageProperties.setContentEncoding("utf-8");
        Message message = new Message(bo.toByteArray(), messageProperties);
        rabbitTemplate.convertAndSend("couponsExchange", "api.coupons.coupons.query", message);
    }

    @PostConstruct
    public void initConfirm(){
        rabbitTemplate.setConfirmCallback(couponsConfirmCallback);            //指定 ConfirmCallback
    }
    @PostConstruct
    public void initReturn(){
        rabbitTemplate.setReturnCallback(couponsReturnCallback);             //指定 ReturnCallback
    }
}
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * @Description
 * @Author hejunfei
 * @date 2019/9/4
 */
@Configuration
public class RabbitConfig  {

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }

    @Bean
    public Queue couponsNotifyQueue() {
        return new Queue("notify.coupons");
    }
    @Bean
    public TopicExchange couponsExchange() {
        return new TopicExchange("couponsExchange");
    }

    @Bean
    public Binding bindingPaymentExchange() {
        return BindingBuilder.bind(this.couponsNotifyQueue()).to(this.couponsExchange()).with("api.coupons.#");
    }


}
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 开启发送确认
spring.rabbitmq.publisher-confirms=true
# 开启发送失败退回
spring.rabbitmq.publisher-returns=true
# 开启ACK
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#指定最小的消费者数量
spring.rabbitmq.listener.simple.concurrency=1
#指定最大的消费者数量
spring.rabbitmq.listener.simple.max-concurrency=1

猜你喜欢

转载自blog.csdn.net/hejunfei/article/details/101050270