rabbitmq全解

一.应用场景和术语

1.应用
应用解耦(库存模块和订单模块解耦)
异步处理(注册成功后发送短信和发送邮件)
流量削峰(秒杀系统)
2.术语
queue 队列,拥有自己的 erlang 进程;

exchange 交换机,内部实现为保存 binding 关系的查找表;

channel 信道,实际进行路由工作的实体,按照 routing_key 将 message 投递给 queue 。由 AMQP 协议描述可知,channel 是真实 TCP 连接之上的虚拟连接,所有 AMQP 命令都是通过 channel 发送的,且每一个 channel 有唯一的 ID。一个 channel 只能被单独一个操作系统线程使用,故投递到特定 channel 上的 message 是有顺序的。但一个操作系统线程上允许使用多个 channel 。由于TCP连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。RabbitMQ使用信道的方式来传输数据。信道是建立在真实的TCP连接内的虚拟连接,且每条TCP连接上的信道数量没有限制。

vhost 可理解为虚拟 broker ,即 mini-RabbitMQ server。其内部均含有独立的 queue、exchange 和 binding 等,其拥有独立的权限系统,可以做到 vhost 范围的用户控制。当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段。

二.docker安装以及前期准备

docker search rabbitmq:management
docker pull rabbitmq:management 
docker images
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management
docker ps -a

如果docker pull rabbitmq 后面不带management,启动rabbitmq后是无法打开管理界面的,所以我们要下载带management插件的rabbitmq
在这里插入图片描述
在这里插入图片描述
用户名和密码 guest

添加用户
在这里插入图片描述

virtual hosts管理(相当于mysql中的db,一般以 / 开头)
在这里插入图片描述

对用户进行授权
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
java开发,引入依赖

<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.2</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.10</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>

<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
</dependency>

在这里插入图片描述
连接工具类

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConnectionUtils {
    
    
  
    public static  Connection getConnection(){
    
    
        /*定义一个连接工厂*/
         ConnectionFactory connectionFactory = new ConnectionFactory();
       /*设置连接信息*/
         connectionFactory.setHost("IP地址");
         connectionFactory.setPort(5672);
         connectionFactory.setVirtualHost("/vhost_mmr");
         connectionFactory.setUsername("user_mmr");
         connectionFactory.setPassword("123456");
        Connection connection=null;
        try {
    
    
            connection = connectionFactory.newConnection();
        } catch (IOException e) {
    
    
            e.printStackTrace();
        } catch (TimeoutException e) {
    
    
            e.printStackTrace();
        }
        return  connection;
    }
}

消息路由
涵盖三部分:交换器、路由、绑定。生产者把消息发布到交换器上;绑定决定了消息如何从路由器路由到特定的队列;消息最终到达队列,并被消费者接收。

a.消息发布到交换器时,消息将拥有一个路由键(routing key),在消息创建时设定。
通过队列路由键,可以把队列绑定到交换器上。
b.消息到达交换器后,RabbitMQ会将消息的路由键与队列的路由键进行匹配(针对不同的交换器有不同的路由规则)。如果能够匹配到队列,则消息会投递到相应队列中;如果不能匹配到任何队列,消息将进入 “黑洞”。

三.simple简单队列

1.模型
在这里插入图片描述
p:消息的生产者
红色:消息队列
c:消费者
只包含一个生产者以及一个消费者,生产者Producer将消息发送到队列中,消费者Consumer从该队列接收消息。
2.代码实现
生产者

import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {
    
    
    private static  final String QUEUE_NAME="test_simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
    
    
         Connection connection = ConnectionUtils.getConnection();
         /*从连接中获取通道*/
         Channel channel = connection.createChannel();
         /*创建队列声明*/
         channel.queueDeclare(QUEUE_NAME,false,false,false,null);
         String msg="hello simple queue";
         channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
         channel.close();
         connection.close();
    }
}

消费者

import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Recv {
    
    
    private static  final String QUEUE_NAME="test_simple_queue";
    public static void main(String[] args) throws IOException, InterruptedException {
    
    
        /*获取连接*/
        Connection connection = ConnectionUtils.getConnection();
        /*创建频道*/
        Channel channel = connection.createChannel();
        /*创建队列声明*/
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
    
    
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    
    
                String msg = new String(body, "utf-8");
                System.out.println(msg);
            }
        };
        /*监听队列*/
        channel.basicConsume(QUEUE_NAME,true,consumer);
    }
}

3.不足
耦合性高,生产者和消费者一一对应;

四.work quenes 工作队列

1.模型
在这里插入图片描述
多个消费者绑定到同一个队列上,一条消息只能被一个消费者进行消费。
在实际开发中,生产者发送消息毫不费力,而消费者一般要和业务结合。消费者接受到消息后需要处理,队列会积压很多消息。

2.代码实现
生产者

import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {
    
    
    private static  final String QUEUE_NAME="test_work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
    
    
         Connection connection = ConnectionUtils.getConnection();
         /*从连接中获取通道*/
         Channel channel = connection.createChannel();
         /*创建队列声明*/
         channel.queueDeclare(QUEUE_NAME,false,false,false,null);
         for(int i=0;i<50;i++){
    
    
             String msg="hello work queue"+i;
             channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
             try {
    
    
                 Thread.sleep(i*20);
             } catch (InterruptedException e) {
    
    
                 e.printStackTrace();
             }
         }

         channel.close();
         connection.close();
    }
}

消费者1

import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Recv1 {
    
    
    private static  final String QUEUE_NAME="test_work_queue";
    public static void main(String[] args) throws IOException, InterruptedException {
    
    
        /*获取连接*/
        Connection connection = ConnectionUtils.getConnection();
        /*创建频道*/
        Channel channel = connection.createChannel();
        /*创建队列声明*/
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
    
    
            /*消息到达触发*/
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    
    
                String msg = new String(body, "utf-8");
                System.out.println("recv1  "+msg);
                try {
    
    
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
    
    
                    e.printStackTrace();
                }
            }
        };
        /*监听队列*/
        boolean autoAck=true;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}

在这里插入图片描述

消费者2

import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Recv2 {
    
    
    private static  final String QUEUE_NAME="test_work_queue";
    public static void main(String[] args) throws IOException, InterruptedException {
    
    
        /*获取连接*/
        Connection connection = ConnectionUtils.getConnection();
        /*创建频道*/
        Channel channel = connection.createChannel();
        /*创建队列声明*/
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
    
    
            /*消息到达触发*/
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    
    
                String msg = new String(body, "utf-8");
                System.out.println("recv2   "+msg);
                try {
    
    
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
    
    
                    e.printStackTrace();
                }
            }
        };
        /*监听队列*/
        boolean autoAck=true;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}

在这里插入图片描述
现象:消费者1和消费者2处理的消息数量一致
这种现象称为轮询分发(round-robin)

公平分发 fair dispatch:

在这里插入图片描述
生产者

import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {
    
    
    private static  final String QUEUE_NAME="test_work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
    
    
         Connection connection = ConnectionUtils.getConnection();
         /*从连接中获取通道*/
         Channel channel = connection.createChannel();
         /*创建队列声明*/
         channel.queueDeclare(QUEUE_NAME,false,false,false,null);
         /*每个消费者发送确认消息之前,消息队列不发送下一个消息,一次仅处理一个*/
         int fetch=1;
         channel.basicQos(fetch);
         for(int i=0;i<50;i++){
    
    
             String msg="hello work queue"+i;
             channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
             try {
    
    
                 Thread.sleep(i*20);
             } catch (InterruptedException e) {
    
    
                 e.printStackTrace();
             }
         }

         channel.close();
         connection.close();
    }
}

消费者1

import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Recv1 {
    
    
    private static  final String QUEUE_NAME="test_work_queue";
    public static void main(String[] args) throws IOException, InterruptedException {
    
    
        /*获取连接*/
        Connection connection = ConnectionUtils.getConnection();
        /*创建频道*/
        Channel channel = connection.createChannel();
        /*创建队列声明*/
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        /*保证一次仅分发一个*/
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
    
    
            /*消息到达触发*/
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    
    
                String msg = new String(body, "utf-8");
                System.out.println("recv1  "+msg);
                try {
    
    
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
    
    
                    e.printStackTrace();
                }finally {
    
    
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        /*监听队列*/
        boolean autoAck=false; /*手动应答改为false*/
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}

消费者2

import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Recv2 {
    
    
    private static  final String QUEUE_NAME="test_work_queue";
    public static void main(String[] args) throws IOException, InterruptedException {
    
    
        /*获取连接*/
        Connection connection = ConnectionUtils.getConnection();
        /*创建频道*/
        Channel channel = connection.createChannel();
        /*创建队列声明*/
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
    
    
            /*消息到达触发*/
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    
    
                String msg = new String(body, "utf-8");
                System.out.println("recv2   "+msg);
                try {
    
    
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
    
    
                    e.printStackTrace();
                }finally {
    
    
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        /*监听队列*/
        boolean autoAck=false;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}

现象:能者多劳

3.消息应答和持久化

   /*监听队列*/
        boolean autoAck=false; /*手动应答改为false*/
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);

自动确认模式:一旦rabbitmq将消息分发给消费者,就会从内存中删除;在这种情况下,如果杀死正在执行的消费者,就会丢失正在处理的消息;

手动确认模式:如果有一个消费者挂掉,则交付给其他消费者。rabbitmq支持消息应答,消费者发送一个消息应答,告知rabbitmq可将数据从内存中删除;

 /*创建队列声明*/
        boolean durable=false;
        channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

若修改持久化为true时,运行报错,因为已经定义了test_work_queue,未被持久化,rabbitmq不允许重新定义一个(不同参数)已经存在的队列

五.publish_subscribe订阅模式

1.模型
在这里插入图片描述
x:交换机
一个生产者,多个消费者;每个消费者都有自己的队列;生产者没有直接把消息发送到队列,而是发送到交换机(转发器 exchange);每个队列都要绑定到交换机上;生产者发送的消息,经过交换机,到达队列就能实现,一个消息被多个消费者消费;

2.实现
生产者

import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {
    
    
    private static final String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] args) throws IOException, TimeoutException {
    
    
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        /*声明交换机*/
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        /*发送消息*/
        String msg = "hello ps";
        channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
        channel.close();
        connection.close();
    }
}

消费者1

import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Recv1 {
    
    
    private static  final  String QUEUE_NAME="test_exchange_fanout_email";
    private static final String EXCHANGE_NAME = "test_exchange_fanout";
    public static void main(String[] args) throws IOException {
    
    
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        /*队列声明*/
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        /*绑定队列到交换机转发器*/
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
        /*保证一次仅分发一个*/
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
    
    
            /*消息到达触发*/
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    
    
                String msg = new String(body, "utf-8");
                System.out.println("recv1  "+msg);
                try {
    
    
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
    
    
                    e.printStackTrace();
                }finally {
    
    
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        /*监听队列*/
        boolean autoAck=false; /*自动应答改为false*/
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}

消费者2

import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Recv2 {
    
    
    private static  final  String QUEUE_NAME="test_exchange_fanout_sms";
    private static final String EXCHANGE_NAME = "test_exchange_fanout";
    public static void main(String[] args) throws IOException {
    
    
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        /*队列声明*/
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        /*绑定队列到交换机转发器*/
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
        /*保证一次仅分发一个*/
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
    
    
            /*消息到达触发*/
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    
    
                String msg = new String(body, "utf-8");
                System.out.println("recv2  "+msg);
                try {
    
    
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
    
    
                    e.printStackTrace();
                }finally {
    
    
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        /*监听队列*/
        boolean autoAck=false; /*自动应答改为false*/
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}

在这里插入图片描述

3.Exchange(交换机)
一方面接受生产者的消息,另一方面向队列推送消息;
匿名转发 “”
Fanout 不处理路由键
Direct 处理路由键

六.路由模式

1.模型

在这里插入图片描述
2.代码实现
生产者

import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {
    
    
    private static final String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] args) throws IOException, TimeoutException {
    
    
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        /*声明交换机*/
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        /*发送消息*/
        String msg = "hello direct";

        String routingKey="info";
        channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
        channel.close();
        connection.close();
    }
}

消费者1

import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Recv1 {
    
    
    private static  final  String QUEUE_NAME="test_queue_direct_1";
    private static final String EXCHANGE_NAME = "test_exchange_direct";
    public static void main(String[] args) throws IOException {
    
    
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        /*队列声明*/
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        /*绑定队列到交换机转发器*/
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
        /*保证一次仅分发一个*/
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
    
    
            /*消息到达触发*/
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    
    
                String msg = new String(body, "utf-8");
                System.out.println("recv1  "+msg);
                try {
    
    
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
    
    
                    e.printStackTrace();
                }finally {
    
    
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        /*监听队列*/
        boolean autoAck=false; /*自动应答改为false*/
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}

消费者2

import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Recv2 {
    
    
    private static  final  String QUEUE_NAME="test_queue_direct_2";
    private static final String EXCHANGE_NAME = "test_exchange_direct";
    public static void main(String[] args) throws IOException {
    
    
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        /*队列声明*/
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        /*绑定队列到交换机转发器*/
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warning");
        /*保证一次仅分发一个*/
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
    
    
            /*消息到达触发*/
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    
    
                String msg = new String(body, "utf-8");
                System.out.println("recv2  "+msg);
                try {
    
    
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
    
    
                    e.printStackTrace();
                }finally {
    
    
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        /*监听队列*/
        boolean autoAck=false; /*自动应答改为false*/
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}

七.主题模式

1.模型
在这里插入图片描述
将路由键和某模式匹配 # 匹配一个或多个 * 匹配多个
Goods.#
商品 的发布 删除 修改 查询

2.代码实现
生产者

import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {
    
    
    private static final String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] args) throws IOException, TimeoutException {
    
    
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        /*声明交换机*/
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        /*发送消息*/
        String msg = "商品";
        channel.basicPublish(EXCHANGE_NAME, "goods.del", null, msg.getBytes());
        channel.close();
        connection.close();
    }
}

消费者1

import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Recv1 {
    
    
    private static  final  String QUEUE_NAME="test_exchange_topic_1";
    private static final String EXCHANGE_NAME = "test_exchange_topic";
    public static void main(String[] args) throws IOException {
    
    
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        /*队列声明*/
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        /*绑定队列到交换机转发器*/
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.add");
        /*保证一次仅分发一个*/
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
    
    
            /*消息到达触发*/
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    
    
                String msg = new String(body, "utf-8");
                System.out.println("recv1  "+msg);
                try {
    
    
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
    
    
                    e.printStackTrace();
                }finally {
    
    
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        /*监听队列*/
        boolean autoAck=false; /*自动应答改为false*/
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}

消费者2

import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Recv2 {
    
    
    private static  final  String QUEUE_NAME="test_exchange_topic_2";
    private static final String EXCHANGE_NAME = "test_exchange_topic";
    public static void main(String[] args) throws IOException {
    
    
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        /*队列声明*/
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        /*绑定队列到交换机转发器*/
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.*");
        /*保证一次仅分发一个*/
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
    
    
            /*消息到达触发*/
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    
    
                String msg = new String(body, "utf-8");
                System.out.println("recv2  "+msg);
                try {
    
    
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
    
    
                    e.printStackTrace();
                }finally {
    
    
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            }
        };
        /*监听队列*/
        boolean autoAck=false; /*自动应答改为false*/
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}

八.rabbitmq消息确认机制(生产者)之事务机制

1.消息确认
在rabbitmq中,可通过持久化数据解决rabbitmq服务器异常的数据丢失问题;
而在生产者将消息发送出去以后,消息是否到达rabbitmq服务器
两种方式: AMQP实现事务机制 Confirm模式
2.事务机制
txSelect:用户将当前channel设置为transaction模式
txCommit:用于提交事务
txRollback:用于回滚
3.代码实现
生产者

import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TxSend {
    
    
    private static  final String QUEUE_NAME="test_queue_tx";

    public static void main(String[] args) throws IOException, TimeoutException {
    
    
         Connection connection = ConnectionUtils.getConnection();
         /*从连接中获取通道*/
         Channel channel = connection.createChannel();
         /*创建队列声明*/
         channel.queueDeclare(QUEUE_NAME,false,false,false,null);
         String msg="hello tx";

        try {
    
    
            channel.txSelect();
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());

            channel.txCommit();
            System.out.println("事务提交");
        } catch (IOException e) {
    
    
            channel.txRollback();
            System.out.println("事务回滚");
        }
        channel.close();
         connection.close();
    }
}

消费者

import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class TxRecv {
    
    
    private static  final String QUEUE_NAME="test_queue_tx";
    public static void main(String[] args) throws IOException, InterruptedException {
    
    
        /*获取连接*/
        Connection connection = ConnectionUtils.getConnection();
        /*创建频道*/
        Channel channel = connection.createChannel();
        /*创建队列声明*/
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
    
    
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    
    
                String msg = new String(body, "utf-8");
                System.out.println(msg);
            }
        };
        /*监听队列*/
        channel.basicConsume(QUEUE_NAME,true,consumer);
    }
}

4.优劣势
事务机制降低吞吐量

九.rabbitmq消息确认机制之Confirm

1.原理
生产者将信道设置成Confirm模式,一旦信道进入Confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(以confirm.select为基础从1开始计数),一旦消息被投递到所有匹配的队列之后,Broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,Broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外Broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。
Confirm模式优势在于其异步;
2.原理
channel.confirmSelect() 开启信道的Confirm模式
编程模式:
串行
a.普通 waitForConfirms()
b.批量 waitForConfirms()
异步
c.异步 提供一个回调方法
3.代码实现
注意: channel设置为事务模式,不能再设置为Confirm模式
消费者

import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Recv {
    
    
    private static  final String QUEUE_NAME="test_simple_confirm1";
    public static void main(String[] args) throws IOException, InterruptedException {
    
    
        /*获取连接*/
        Connection connection = ConnectionUtils.getConnection();
        /*创建频道*/
        Channel channel = connection.createChannel();
        /*创建队列声明*/
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
    
    
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    
    
                String msg = new String(body, "utf-8");
                System.out.println(msg);
            }
        };
        /*监听队列*/
        channel.basicConsume(QUEUE_NAME,true,consumer);
    }
}

普通模式生产者

import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {
    
    
    private static  final String QUEUE_NAME="test_simple_confirm1";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    
    
         Connection connection = ConnectionUtils.getConnection();
         Channel channel = connection.createChannel();
         channel.queueDeclare(QUEUE_NAME,false,false,false,null);
         /*生产者设置Confirm模式*/
        channel.confirmSelect();
         String msg="hello confirm message";
         channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
         if(!channel.waitForConfirms()){
    
    
             System.out.println("消息发送失败");
         }else {
    
    
             System.out.println("消息发送成功");
         }
         channel.close();
         connection.close();
    }
}

批量模式生产者

import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {
    
    
    private static  final String QUEUE_NAME="test_simple_confirm1";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    
    
         Connection connection = ConnectionUtils.getConnection();
         Channel channel = connection.createChannel();
         channel.queueDeclare(QUEUE_NAME,false,false,false,null);
         /*生产者设置Confirm模式*/
        channel.confirmSelect();
         String msg="hello confirm message";
         /*批量发送后确认*/
         for(int i=0;i<10;i++){
    
    
             channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());

         }
         if(!channel.waitForConfirms()){
    
    
             System.out.println("消息发送失败");
         }else {
    
    
             System.out.println("消息发送成功");
         }
         channel.close();
         connection.close();
    }
}

异步模式
Channel 对象提供的ConfirmListener()回调方法只包含deliveryTag(当前Chanel发出的消息序号),我们需要自己为每一个Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次handleAck方法,unconfirm集合删除相应的一条(multiple=false)或多条(multiple=true)记录,从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构

import com.cz.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;

public class SendSync {
    
    
    private static final String QUEUE_NAME = "test_simple_confirm1";

    public static void main(String[] args) throws IOException {
    
    
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        /*生产者设置Confirm模式*/
        channel.confirmSelect();
        /*采用有序集合存储结构*/
        SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
        /*Channel对象提供ConfirmListener()回调方法*/
        channel.addConfirmListener(new ConfirmListener() {
    
    
            /*b代表multiple
             * l代表deliveryTag*/

            /*没有问题*/
            @Override
            public void handleAck(long l, boolean b) throws IOException {
    
    
                if (b) {
    
    
                    System.out.println("handleAck    multiple");
                    confirmSet.headSet(l + 1).clear();
                } else {
    
    
                    System.out.println("handleAck    multiple false");
                    confirmSet.remove(l);
                }
            }

            /*处理有问题,这里简单地移除*/
            @Override
            public void handleNack(long l, boolean b) throws IOException {
    
    
                if (b) {
    
    
                    System.out.println("handleAck    multiple");
                    confirmSet.headSet(l + 1).clear();
                } else {
    
    
                    System.out.println("handleAck    multiple false");
                    confirmSet.remove(l);
                }
            }


        });
        String msg="sssss";
        /*序列号存储到有序集合中*/
        while (true){
    
    
            long seqNo = channel.getNextPublishSeqNo();
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
            confirmSet.add(seqNo);
        }
    }
}

十.Spring集成

1.添加依赖

    <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.7.5.RELEASE</version>
        </dependency>

2.xml配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
                        http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.springframework.org/schema/context
                        http://www.springframework.org/schema/context/spring-context.xsd
                        http://www.springframework.org/schema/rabbit
                        http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd">
    <!-- 自动扫描 -->
    <context:component-scan base-package="com.cz" />

    <!-- rabbitMQ连接工厂 -->
<rabbit:connection-factory id="connectionFactory" host="47.98.32.134" port="5672" username="user_mmr" password="123456" virtual-host="/vhost_mmr"></rabbit:connection-factory>

    <!-- 创建rabbitTemplate 消息模板类 -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange"></rabbit:template>
    <!--MQ管理,包含队列 交换机声明-->
    <rabbit:admin connection-factory="connectionFactory"></rabbit:admin>
    <!--队列声明-->
    <rabbit:queue name="myQueue" auto-declare="true" durable="true"/>
    <!--交换机声明-->
    <rabbit:fanout-exchange name="fanoutExchange" auto-declare="true">
        <rabbit:bindings>
            <rabbit:binding queue="myQueue"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:fanout-exchange>
  <!--队列的监听-->
    <rabbit:listener-container connection-factory="connectionFactory" >
        <rabbit:listener ref="foo" method="listen" queue-names="myQueue"></rabbit:listener>
    </rabbit:listener-container>
<!--消费者-->
<bean id="foo" class="com.cz.Spring.MyConsumer"></bean>


</beans>

3.生产者 消费者

public class MyConsumer {
    
    
    public void  listen(String foo){
    
    
        System.out.println(foo);
    }
}
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class SpringMain {
    
    
    public static void main(String[] args) throws InterruptedException {
    
    
        ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:context.xml");
        RabbitTemplate rabbitTemplate= applicationContext.getBean(RabbitTemplate.class);
        /*发送消息*/
        rabbitTemplate.convertAndSend("hello spring rabbit");
        Thread.sleep(1000);
        applicationContext.destroy();

    }
}

十一.死信队列

1.简介
当队列中的消息成为死信以后,如果队列设置了DLX那么消息会被发送到DLX。通过x-dead-letter-exchange设置DLX,通过这个x-dead-letter-routing-key设置消息发送到DLX所用的routing-key,如果不设置默认使用消息本身的routing-key.
在这里插入图片描述

2.三种情况下会变成死信
(1)消息被拒绝(basic.reject 或者 basic.nack),并且requeue=false;
(2)消息的过期时间到期了;
(3)队列长度限制超过了。

十二.避免消息重复投递或重复消费

在消息生产时,MQ内部针对每条生产者发送的消息生成一个inner-msg-id,作为去重和幂等的依据(消息投递失败并重传),避免重复的消息进入队列;在消息消费时,要求消息体中必须要有一个bizId(对于同一业务全局唯一)作为去重和幂等的依据,避免同一条消息被重复消费。
业务场景下:
1.若消息做数据库的insert操作。给这个消息一个唯一主键,若出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
2.若消息做redis的set的操作,不用解决,因为set操作本来就是幂等操作。
3.准备一个第三方介质,来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。

猜你喜欢

转载自blog.csdn.net/javahelpyou/article/details/104833677