消息队列的使用

3.1 RabbitMQ的核心:

核心官网有介绍,说的connecnton,channel之类的,到底怎么样,who care?

总体来看,我们关注业务实现是:1)消息怎么投递的。2)消费者怎么消费消息。3)消息是否是可靠投递。4)消息投递方式。5)消息的生命周期。6)消息队列生命周期

3.2  消息是怎么投递的?(记住一点,生产者消息投递都是面向交换机的)

RabbitMQ 是面向交换机投递消息的。交换机可能绑定有许多队列,交换机如何将消息投递给这些队列呢?

首先说一下面向交换机的设计的优势:1)这明显借助了数据链路层那个交换机的设计思想。除了层级分明以外,还能从分提高链路利用率(可能有点抽像)。

2)从代码层面来看:如果没有交换机,你至少得维护一个十分庞大的路由表,然后从路由表正确投递消息,有了交互机,这里路

由表就会被拆分到多个交换机里面,效果不必多说。

   3)然后就是高度的解耦,不同的交换机可有不同的路由规则,要是没有交换机。。。。。。

在RabbitMQ,交换机有4种投递方式,就是枚举类BuiltinExchangeType的4个枚举变量:

    DIRECT:会将所有消息先取消息的ROUTE_KEY,然后投递到与ROUTE_KEY绑定的队列里面(if(msg.routekey.equals(queue.routekey)))。

FANOUT:此种模式下,根本不检查消息的ROUTE_KEY,直接投送到交换机所拥有的所有队列里面。

TOPIC,HEADERS自行看一下官网怎么说的,不想码字了^_^||

总结起来就一个函数就把消息发出去了:channel.basicPublish(excange_name,route_key,false,bs,"test".getBytes());可以去官网查一下这个API

3.3 消费者怎么消费消息(记住一点,消费者消费消息是面向消息队列的,这与生成者有点不一样)

还不是就是TCP长连接心跳的那些事,就是这么一个API:channel.basicConsume(QUEUE_AUTODELETE, true, consumer);consumer是Consumer类的一个实例,

你直接去处理回调接口就ok了

3.4 消息传递是否可靠

很明显是可靠的,除非你将消息队列,声明成非持久模式,这事你又重启了机器。这会丢失消息的。还有就是他有应答机制,你可以通过设置消费者消费消息的模式,

去手动应答。channel.basicConsume(?,autoACk,?)的autoAck参数设置

3.5 消息的生命周期

一旦受到消费者应答,标识消息已被消费,则消息被回收掉。

3.6 队列生命周期

channel.queueDeclare(QUEUE_NAME,false,false,true,null);

第二个参数设置为true,会将消息持久化到磁盘,第四个参数设置为true表示没有消息并且没有连接则删除改队列,详情可以查一下API

四、一个示例
4.1 生产者代码:

自行导入相关依赖包或相关依赖

复制代码
  ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("username");
        factory.setPort(5672);//注意这里的端口与管理插件的端口不一样
        factory.setPassword("pwd");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明一个dirent模式的交换机
        channel.exchangeDeclare("exchange_name",BuiltinExchangeType.DIRECT,true);
        //声明一个非持久化自动删除的队列
        channel.queueDeclare("queue_name",false,false,true,null);//如果该队列不在被使用就删除他 zhe
        //将绑定到改交换机
        channel.queueBind("queue_name","exchange_name","route_key");
        //声明一个消息头部
        Map<String,Object> header=new HashMap<>();
        AMQP.BasicProperties.Builder b= new AMQP.BasicProperties.Builder();
        header.put("charset","utf-8");
        b.headers(header);
        AMQP.BasicProperties bp=b.build();
        //将消息发出去
       channel.basicPublish("exchange_name","route_key",false,bp,"test3".getBytes());
复制代码
4.2 消费者代码

ConnectionFactory factory = new ConnectionFactory();
  factory.setHost("localhost");
  factory.setUsername("username");
  factory.setPort(5672);//注意这里的端口与管理插件的端口不一样
  factory.setPassword("pwd");
  Connection connection = factory.newConnection();
  Channel channel = connection.createChannel();
  //声明一个dirent模式的交换机
  channel.exchangeDeclare("exchange_name",BuiltinExchangeType.DIRECT,true);
  //声明一个非持久化自动删除的队列
  channel.queueDeclare("queue_name",false,false,true,null);//如果该队列不在被使用就删除他 zhe
  //将绑定到改交换机
  channel.queueBind("queue_name","exchange_name","route_key");
  Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                 byte[] body) throws IOException {
          String message = new String(body, "UTF-8");
          System.out.println(" [x] Received '" + message + "'");
      }
  };
  channel.basicConsume("queue_name", true, consumer);


    
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
       

猜你喜欢

转载自blog.csdn.net/qq_39879632/article/details/81669340