Spring Boot项目中使用RabbitMQ

版权声明:希望大家多多指教 https://blog.csdn.net/muriyue6/article/details/82778430

Spring Boot整合RabbitMQ

Spring Boot与RabbitMQ集成非常简单, 不需要做任何的额外设置只需要两步即可:

* 引入相关依赖

* 对application.properties进行配置

(1) 发送消息端
* 引入相关依赖

<!--rabbitmq依赖-->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

* 对application.properties进行配置

spring.rabbitmq.addresses=106.12.83.122:5672
spring.rabbitmq.username=abc
spring.rabbitmq.password=abc
spring.rabbitmq.virtual-host=/
# 连接超时时间
spring.rabbitmq.connection-timeout=15000

spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true

server.servlet.context-path=/
server.port=8001

代码实现RabbitMQ消息发送

* 实体类

package rabbitmq.entity;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

/**
 * Created by gailun on 2018/9/12.
 */
public class Order implements Serializable{

    private Long id;
    private String name;
    private Long messageId;   //存储消息发送的唯一标识

    public Order() {
    }

    public Order(Long id, String name, Long messageId) {
        this.id = id;
        this.name = name;
        this.messageId = messageId;
    }

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Long getMessageId() {
        return messageId;
    }

    public void setMessageId(Long messageId) {
        this.messageId = messageId;
    }


    @Override
    public String toString() {
        return "Order{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", messageId=" + messageId +
                '}';
    }
}

* 发送消息的方法

package rabbitmq.producer;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import rabbitmq.entity.Order;

/**
 * Created by gailun on 2018/9/12.
 */
@Component
public class RabbitOrderSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //回调函数: confirm确认
    final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String s) {
            System.out.println("CorrelationData: "+correlationData);
            if (ack){
                //如果confirm返回成功 则进行更新
                System.out.println("更新");
            }else{
                //失败则进行具体的后续操作; 重试或者补偿等手段
                System.out.println("异常处理....");
            }
        }
    };

    public void sendOrder(Order order) throws Exception{
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(order.getMessageId()+"");
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.convertAndSend("order-exchange",   //exchang 交换机
                "order.demo",   //routingKey  路由键
                order,      //消息体内容
                correlationData);       //correlationData 消息唯一ID

    }

}

* 测试

package rabbitmq;
import org.json.JSONArray;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import rabbitmq.entity.Order;
import rabbitmq.producer.RabbitOrderSender;

import java.util.ArrayList;
import java.util.List;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {

   @Test
   public void contextLoads() {
   }

   @Autowired
   private RabbitOrderSender rabbitOrderSender;

   @Test
   public void testSend1() throws Exception{
      Order order = new Order();
      order.setId(1L);
      order.setName("测试订单1");
      order.setMessageId(1l);
      rabbitOrderSender.sendOrder(order);
   }
}

(2) 接收消息端
<!--rabbitmq依赖-->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

* 对application.properties进行配置

#springboot整合rabbitmq基本配置
spring.rabbitmq.addresses=106.12.83.102:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
# 连接超时时间
spring.rabbitmq.connection-timeout=15000

#springboot整合rabbitmq消费端配置
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=15
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.prefetch=1

server.servlet.context-path=/
server.port=8002

代码实现接收消息

* 实体类

package rabbitmq.entity;

import java.io.Serializable;

/**
 * Created by gailun on 2018/9/12.
 */
public class Order implements Serializable{

    private Long id;
    private String name;
    private Long messageId;   //存储消息发送的唯一标识

    public Order() {
    }

    public Order(Long id, String name, Long messageId) {
        this.id = id;
        this.name = name;
        this.messageId = messageId;
    }

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Long getMessageId() {
        return messageId;
    }

    public void setMessageId(Long messageId) {
        this.messageId = messageId;
    }


    @Override
    public String toString() {
        return "Order{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", messageId=" + messageId +
                '}';
    }
}

* 接收消息的方法

package rabbitmq.consumer;
import com.rabbitmq.client.Channel;
import rabbitmq.entity.Order;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.Map;

/**
 * Created by gailun on 2018/9/15.
 */
@Component
public class OrderReceiver {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "order-queue",durable = "true"),
            exchange = @Exchange(name = "order-exchange", durable = "true", type = "topic"),
            key = "order.*"
    ))
    @RabbitHandler
    public void onOrderMessage(@Payload Order order,
                               @Headers Map<String,Object> headers,
                               Channel channel) throws Exception{
        //消费者操作
        System.out.println("---- 收到消息,开始消费 ----");
        System.out.println("订单ID: "+ order.getId());
        System.out.println("订单名称: "+ order.getName());

        Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
        //手动必须
        channel.basicAck(deliveryTag,false);

    }
}

执行结果:

(3) 问题总结

原因:
1.可能是没有实现序列化接口
2.可能是你的包名称不一致, 两个工程的包名称必须一致.

消息投递成功设计方案(一)

 

总结: 

(1) RabbitMQ发送字符串的时候, 生产者和消费者两个项目的包名称可以一致, 也可以不一致; 是可以正常发送和接收消息的.

(2) RabbitMQ发送HashMap的时候, 生产者和消费者两个项目的包名称可以一致, 也可以不一致; 是可以正常发送和接收消息的.

(3) RabbitMQ发送json对象的时候, 生产者和消费者两个项目的包名称可以一致, 也可以不一致; 是可以正常发送和接收消息的.

(4) RabbitMQ发送自定义实体类的时候, 生产者和消费者两个项目的包名称必须保持一致 ; 才可以正常发送和接收消息的.

 

猜你喜欢

转载自blog.csdn.net/muriyue6/article/details/82778430