RabbitMQ+Springboot

RabbitMQ+Springboot

官方API

运行demo(https://github.com/HOwen123/rabbitmq)

springboot与rabbitmq整合参考https://www.cnblogs.com/xmzJava/p/8036591.html

rabbitMQ依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

一、 简单队列

模型

image

创建连接

public class ConnectionUtil {

    public static Connection getConnection() throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        //主机
        factory.setHost(your host);
        //协议端口号
        factory.setPort(port);
        //用户名
        factory.setUsername(your rabbitmqUserName);
        //密码
        factory.setPassword(your rabbitmqPassword);
        //虚拟主机路径(相当于数据库名)
        factory.setVirtualHost("***");
        //返回连接
        return factory.newConnection();
    }
}

发送信息队列

public class Send {

    private final static String Queue_Name="hello";

    public static void main(String [] args) throws IOException, TimeoutException {
        //创建连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //申明队列
        channel.queueDeclare(Queue_Name,false,false,false,null);
        String message = "我觉得也是";
        //发布信息
        channel.basicPublish("",Queue_Name,null,message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}

获取信息队列

public class Recv {
    private static final String QUEUE_NAME = "hello";

    public static void main(String [] args) throws IOException, TimeoutException {
        //获取连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道(并发的时候你可以确定哪条管道发送)
        Channel channel = connection.createChannel();
        //申明一个队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        System.out.println("请等待......");

        //消费者
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body,"UTF-8");
                System.out.println("[x] Received '"+message+"'");
            }
        };
        //监听队列
        channel.basicConsume(QUEUE_NAME,true,consumer);
        channel.close();
        connection.close();
    }
}

简单队列的不足

耦合性高,生产者一一对应消费者(如果有多个消费者想消费队列中的消息,就可能出现问题)。如果队列名改变,那么消费者队列名跟着改变。

二、Work queues 工作队列

模型

image

为什么要使用工作队列?

simple 队列中生产者和消费者是一一对应的。在我们实际的开发中,生产者发送消息是很快的,而消费者对消息进行消费是需要进行业务处理的,这是需要花费比较多的时间的。这时候队列就会挤压很多的消息。

常量类

package com.howen.rabbitmq.common;

public class Consts {
    public static final String SM_QUEUE_NAME = "simple_test";

    public static final String WQ_QUEUE_NAME = "work_queue";
}

连接工具类

package com.howen.rabbitmq.Utils;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConnectionUtil {

    public static Connection getConnection() throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        //主机
        factory.setHost(***);
        //协议端口号
        factory.setPort(***);
        //用户名
        factory.setUsername(****);
        //密码
        factory.setPassword(****);
        //虚拟主机路径(相当于数据库名)
        factory.setVirtualHost(****);
        //返回连接
        return factory.newConnection();
    }
}

轮询分发

生产者生产消息

package com.howen.rabbitmq.workqueue;

import com.howen.rabbitmq.Utils.ConnectionUtil;
import com.howen.rabbitmq.common.Consts;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {

    public static void main(String [] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare(Consts.WQ_QUEUE_NAME,false,false,false,null);

        for (int i = 0; i <50 ; i++) {
            String sendMsg = "信息"+i;

            System.out.println(sendMsg);

            channel.basicPublish("",Consts.WQ_QUEUE_NAME,null,sendMsg.getBytes());

            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                System.out.println("done");
            }
        }

        channel.close();
        connection.close();

    }

}

消费者 1 消费消息

package com.howen.rabbitmq.workqueue;

import com.howen.rabbitmq.Utils.ConnectionUtil;
import com.howen.rabbitmq.common.Consts;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv1 {

    public static void main(String [] args) throws IOException, TimeoutException {
       Connection connection = ConnectionUtil.getConnection();

       Channel channel = connection.createChannel();

       channel.queueDeclare(Consts.WQ_QUEUE_NAME,false,false,false,null);

       Consumer consumer = new DefaultConsumer(channel){
           @Override
           public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

               String recvMsg = new String(body,"UTF-8");

               System.out.println("[1] Recv msg "+recvMsg);

               try {
                   Thread.sleep(1000);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
       };

       boolean autoAck = true;

       channel.basicConsume(Consts.WQ_QUEUE_NAME,autoAck,consumer);
    }
}

消费者 2 消费消息

package com.howen.rabbitmq.workqueue;

import com.howen.rabbitmq.Utils.ConnectionUtil;
import com.howen.rabbitmq.common.Consts;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv2 {

    public static void main(String [] args) throws IOException, TimeoutException {
       Connection connection = ConnectionUtil.getConnection();

       Channel channel = connection.createChannel();

       channel.queueDeclare(Consts.WQ_QUEUE_NAME,false,false,false,null);

       Consumer consumer = new DefaultConsumer(channel){
           @Override
           public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

               String recvMsg = new String(body,"UTF-8");

               System.out.println("[2] Recv msg "+recvMsg);

               try {
                   Thread.sleep(2000);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
       };

       boolean autoAck = true;

       channel.basicConsume(Consts.WQ_QUEUE_NAME,autoAck,consumer);
    }
}

WQ 总结

  1. 消费者1 和消费者2 处理的消息是一样的
  2. 消费者之间的消息是你一个我一个的

这种方式叫做轮询分发(round-robbin)结果就是不过谁比较清闲都好,任务消息的发送总是公平的。

公平分发(fair dispatch)

发送者发送消息

package com.howen.rabbitmq.workqueue;

import com.howen.rabbitmq.Utils.ConnectionUtil;
import com.howen.rabbitmq.common.Consts;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {

    public static void main(String [] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        //声明队列
        channel.queueDeclare(Consts.WQ_QUEUE_NAME,false,false,false,null);

        /**
         * 每个消费者发送确认消息之前,消息队列只给消费者发送一条消息
         *
         * 限制发送给消费者不超过一条信息
         */
        int prefetchCount = 1;
        channel.basicQos(prefetchCount);

        for (int i = 0; i <50 ; i++) {
            String sendMsg = "信息"+i;

            System.out.println(sendMsg);

            channel.basicPublish("",Consts.WQ_QUEUE_NAME,null,sendMsg.getBytes());

            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                System.out.println("done");
            }
        }

        channel.close();
        connection.close();

    }

}

要设置确认消息收到之前发多少消息。

消费者 3 消费消息

package com.howen.rabbitmq.workfair;

import com.howen.rabbitmq.Utils.ConnectionUtil;
import com.howen.rabbitmq.common.Consts;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv3 {

    public static void main(String [] args) throws IOException, TimeoutException {
       Connection connection = ConnectionUtil.getConnection();

       Channel channel = connection.createChannel();
       //声明队列
       channel.queueDeclare(Consts.WQ_QUEUE_NAME,false,false,false,null);
       //保证一次只分发一次
       channel.basicQos(1);

       Consumer consumer = new DefaultConsumer(channel){
           @Override
           public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

               String recvMsg = new String(body,"UTF-8");

               System.out.println("[3] Recv msg "+recvMsg);

               channel.basicAck(envelope.getDeliveryTag(),false);

               try {
                   Thread.sleep(1000);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
       };

       boolean autoAck = false;

       channel.basicConsume(Consts.WQ_QUEUE_NAME,autoAck,consumer);
    }
}

消费者 4 消费消息

package com.howen.rabbitmq.workfair;

import com.howen.rabbitmq.Utils.ConnectionUtil;
import com.howen.rabbitmq.common.Consts;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv4 {

    public static void main(String [] args) throws IOException, TimeoutException {
       Connection connection = ConnectionUtil.getConnection();

       Channel channel = connection.createChannel();

       channel.queueDeclare(Consts.WQ_QUEUE_NAME,false,false,false,null);

        //保证一次只分发一次
        channel.basicQos(1);

       Consumer consumer = new DefaultConsumer(channel){
           @Override
           public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

               String recvMsg = new String(body,"UTF-8");

               System.out.println("[4] Recv msg "+recvMsg);

               channel.basicAck(envelope.getDeliveryTag(),false);

               try {
                   Thread.sleep(3000);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
       };

       boolean autoAck = false;

       channel.basicConsume(Consts.WQ_QUEUE_NAME,autoAck,consumer);
    }
}

WF 总结

  1. 需要关闭自动应答
  2. 消息处理快的消费者能更快的获取消息

消息应答与消息持久化

boolean autoAck=false;``channel.basicConsume(Consts.WQ_QUEUE_NAME,autoAck,consumer);

boolean autoAck=true;(自动确认模式)一旦rabbitmq将消息分发给消费者,就会从内存中删除;

这种情况下,如果杀死正在执行的消费者,就会丢失正在处理的消息。

boolean autoAck=false(手动模式),如果有一个消费者挂掉了,就会交付给其他消费者,rabbitmq支持消息应答,消费者发送一个消息应答,告诉rabbitmq这个消息我已经处理完成,你可以删了,然后rabbitmq就删除内存中的消息

消息应答默认是打开的

消息持久化

 channel.queueDeclare(Consts.WQ_QUEUE_NAME,false,false,false,null);

第二个参数 durableboolean类型持久化参数

  • 在服务器端已经存在队列名,则不能直接修改durabletrue,否则会报错。需要新建一个队列名或者在rabbitmq服务器端进行删除。

三、发布订阅模式(publish_subscribe)

模型

image

说明
1. 一个生产者,多个消费者。
2. 每个消费者都对应有自己的队列
3. 生产者不是把消息直接发送到队列中,而是发送到交换机上(exchange)
4. 每个队列都要绑定到交换机上
5. 生产者发送的消息净多交换机到达队列,就能实现一个消息被多个消费者消费

生产者

package com.howen.rabbitmq.publishSubscribe;

import com.howen.rabbitmq.Utils.ConnectionUtil;
import com.howen.rabbitmq.common.Consts;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {

    public static void main(String [] args) throws IOException, TimeoutException {
       Connection connection = ConnectionUtil.getConnection();

       Channel channel = connection.createChannel();

       //声明分发
       channel.exchangeDeclare(Consts.PS_EXCHANGE_NAME,"fanout");

       String msg = "hello rabbit";

       //发布到交换机
       channel.basicPublish(Consts.PS_EXCHANGE_NAME,"",null,msg.getBytes());

       System.out.println("send :"+msg);

       channel.close();
       connection.close();

    }

}

消费者 1

package com.howen.rabbitmq.publishSubscribe;

import com.howen.rabbitmq.Utils.ConnectionUtil;
import com.howen.rabbitmq.common.Consts;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv1 {

    public static void main(String [] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();
        //队列声明
        channel.queueDeclare(Consts.PS_QUEUE_NAME_EMAIL,true,false,false,null);
        //队列与交换机绑定
        channel.queueBind(Consts.PS_QUEUE_NAME_EMAIL,Consts.PS_EXCHANGE_NAME,"");

        //保证一次只分发一次
        channel.basicQos(1);

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String recvMsg = new String(body,"UTF-8");

                System.out.println("[1] Recv msg "+recvMsg);

                channel.basicAck(envelope.getDeliveryTag(),false);

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        boolean autoAck = false;

        channel.basicConsume(Consts.PS_QUEUE_NAME_EMAIL,autoAck,consumer);
    }

}

消费者 2

package com.howen.rabbitmq.publishSubscribe;

import com.howen.rabbitmq.Utils.ConnectionUtil;
import com.howen.rabbitmq.common.Consts;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv2 {

    public static void main(String [] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare(Consts.PS_QUEUE_NAME_MSG,true,false,false,null);
        //队列与交换机绑定
        channel.queueBind(Consts.PS_QUEUE_NAME_MSG,Consts.PS_EXCHANGE_NAME,"");

        //保证一次只分发一次
        channel.basicQos(1);

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String recvMsg = new String(body,"UTF-8");

                System.out.println("[2] Recv msg "+recvMsg);

                channel.basicAck(envelope.getDeliveryTag(),false);

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        boolean autoAck = false;

        channel.basicConsume(Consts.PS_QUEUE_NAME_MSG,autoAck,consumer);
    }

}

Exchange(交换机 转发器)

一方面是接收生产者的消息,另一方面是向队列推送消息

匿名装发 “”

Fanout(不处理路由键(routing key))

image

Direct(处理(routing key))

image

四、路由模式

模型

image

生产者

package com.howen.rabbitmq.routing;

import com.howen.rabbitmq.Utils.ConnectionUtil;
import com.howen.rabbitmq.common.Consts;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {

    public static void main(String [] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        channel.exchangeDeclare(Consts.RT_EXCHANGE_NAME,"direct");

        String msg = " message: hello routing to info";

        System.out.println(msg);

        String routingKey = Consts.RT_ROUTING_INFO;

        channel.basicPublish(Consts.RT_EXCHANGE_NAME,routingKey,null,msg.getBytes());

        channel.close();

        connection.close();
    }

}

消费者1

package com.howen.rabbitmq.routing;

import com.howen.rabbitmq.Utils.ConnectionUtil;
import com.howen.rabbitmq.common.Consts;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv5 {

    public static void main(String [] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare(Consts.RT_QUEUE_NAME_1,false,false,false,null);

        //保证一次只分发一次
        channel.basicQos(1);

        channel.queueBind(Consts.RT_QUEUE_NAME_1,Consts.RT_EXCHANGE_NAME,Consts.RT_ROUTING_ERROR);

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String recvMsg = new String(body,"UTF-8");

                System.out.println("[5] Recv msg "+recvMsg);

                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        boolean autoAck = true;

        channel.basicConsume(Consts.RT_QUEUE_NAME_1,autoAck,consumer);
    }
}

消费者2

package com.howen.rabbitmq.routing;

import com.howen.rabbitmq.Utils.ConnectionUtil;
import com.howen.rabbitmq.common.Consts;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv6 {

    public static void main(String [] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare(Consts.RT_QUEUE_NAME_2,false,false,false,null);

        //保证一次只分发一次
        channel.basicQos(1);

        channel.queueBind(Consts.RT_QUEUE_NAME_2,Consts.RT_EXCHANGE_NAME,Consts.RT_ROUTING_ERROR);
        channel.queueBind(Consts.RT_QUEUE_NAME_2,Consts.RT_EXCHANGE_NAME,Consts.RT_ROUTING_INFO);
        channel.queueBind(Consts.RT_QUEUE_NAME_2,Consts.RT_EXCHANGE_NAME,Consts.RT_ROUTING_WARNING);


        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String recvMsg = new String(body,"UTF-8");

                System.out.println("[6] Recv msg "+recvMsg);

                channel.basicAck(envelope.getDeliveryTag(),false);

            }
        };

        boolean autoAck = true;

        channel.basicConsume(Consts.RT_QUEUE_NAME_2,autoAck,consumer);
    }
}

五、topic 主题模式

模型

image

实例图

image

发送者

package com.howen.rabbitmq.topic;

import com.howen.rabbitmq.Utils.ConnectionUtil;
import com.howen.rabbitmq.common.Consts;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {

    public static void main(String [] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        channel.exchangeDeclare(Consts.TP_EXCHANGE_NAME,"topic");

        String msg = " 帅哥一枚。。。。";

        channel.basicPublish(Consts.TP_EXCHANGE_NAME,"boys.add",null,msg.getBytes());

        System.out.println("发送: "+msg);

        channel.close();

        connection.close();
    }

}

消费者1

package com.howen.rabbitmq.topic;

import com.howen.rabbitmq.Utils.ConnectionUtil;
import com.howen.rabbitmq.common.Consts;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv7 {

    public static void main(String [] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare(Consts.TP_QUEUE_NAME_1,false,false,false,null);

        channel.queueBind(Consts.TP_QUEUE_NAME_1,Consts.TP_EXCHANGE_NAME,"boys.#");

        //保证一次只分发一次
        channel.basicQos(1);

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String recvMsg = new String(body,"UTF-8");

                System.out.println("[7] 接收: "+recvMsg);

                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        boolean autoAck = true;

        channel.basicConsume(Consts.TP_QUEUE_NAME_1,autoAck,consumer);
    }
}

消费2

package com.howen.rabbitmq.topic;

import com.howen.rabbitmq.Utils.ConnectionUtil;
import com.howen.rabbitmq.common.Consts;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv8 {

    public static void main(String [] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare(Consts.TP_QUEUE_NAME_2,false,false,false,null);

        //保证一次只分发一次
        channel.basicQos(1);

        channel.queueBind(Consts.TP_QUEUE_NAME_2,Consts.TP_EXCHANGE_NAME,"*.add.*");


        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String recvMsg = new String(body,"UTF-8");

                System.out.println("[8] 接收: "+recvMsg);

                channel.basicAck(envelope.getDeliveryTag(),false);

            }
        };

        boolean autoAck = true;

        channel.basicConsume(Consts.TP_QUEUE_NAME_2,autoAck,consumer);
    }
}

Rabbitmq的消息确认机制(事物+confirm)

在rabbitmq中,我们可以通过持久化数据,解决rabbitmq服务器异常的数据丢失。

两种方式:

AMQP实现了事务机制

Confirm模式

事务机制

txSelect:开启事务;txCommit:提交事务;txRollback:事务回滚;

缺点: 耗时,降低了吞吐量

package com.howen.rabbitmq.tx;

import com.howen.rabbitmq.Utils.ConnectionUtil;
import com.howen.rabbitmq.common.Consts;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {
    public static void main(String [] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare(Consts.TX_QUEUE_NAME,false,false,false,null);

        String msg="事务";
        //开启事务
        channel.txSelect();

        try{
            channel.basicPublish("",Consts.TX_QUEUE_NAME,null,msg.getBytes());
            System.out.println(msg);
            //提交事务
            channel.txCommit();
        }catch (Exception e){
            System.out.println("msg rollback");
            //事务回滚
            channel.txRollback();
        }finally {
            channel.close();
            connection.close();
        }

    }
}

Confirm 模式

Confirm模式最大的好处在于他是异步的

开始confirm模式

channel.select 修改为channel.confirmSelect()

编程模式:

  1. 普通: 发一条 waitForConfirms()
  2. 批量的:waitForConfirms();所有发完后确认
  3. 异步confirm模式:提供一个回调方法

批量调用

package com.howen.rabbitmq.confirm;

import com.howen.rabbitmq.Utils.ConnectionUtil;
import com.howen.rabbitmq.common.Consts;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send1 {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare(Consts.CF_QUEUE_NAME, false, false, false, null);

        String msg = "confirm模式";
        //开启事务
        channel.confirmSelect();
        //批量
        for (int i = 0; i <10 ; i++) {
            channel.basicPublish("", Consts.CF_QUEUE_NAME, null, msg.getBytes());
        }

        if (!channel.waitForConfirms()){
            System.out.println("消息发送失败!");
        }else{
            System.out.println("消息发送成功!");
        }
        System.out.println(msg);
        channel.close();
        connection.close();


    }
}

异步调用

package com.howen.rabbitmq.confirm;

import com.howen.rabbitmq.Utils.ConnectionUtil;
import com.howen.rabbitmq.common.Consts;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;


public class Send2 {

    /**
     * @author 
     * @TODO (注:异步调用)
     * @param
     * @DATE: 2018/7/12 16:51
     */
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare(Consts.CF_QUEUE_NAME, false, false, false, null);

        String msg = "confirm模式";
        //开启confirm模式
        channel.confirmSelect();
        //未确认的消息标识
        final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());

        //添加通道监听
        channel.addConfirmListener(new ConfirmListener() {
            //没有问题的handleAck
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                if (multiple){
                    System.out.println("---handleAck-----multiple");
                    confirmSet.headSet(deliveryTag+1).clear();
                }else {
                    System.out.println("---handleAck-----multiple false");
                    confirmSet.remove(deliveryTag);
                }
            }
            //handleNack 回执有问题的
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                if (multiple){
                    System.out.println("---handleNack-----multiple");
                    confirmSet.headSet(deliveryTag+1).clear();
                }else {
                    System.out.println("---handleNack-----multiple false");
                    confirmSet.remove(deliveryTag);
                }
            }
        });

        while(true){
            long seqNo = channel.getNextPublishSeqNo();
            channel.basicPublish("",Consts.CF_QUEUE_NAME,null,msg.getBytes());
            confirmSet.add(seqNo);
        }


    }
}

猜你喜欢

转载自blog.csdn.net/lianowen/article/details/81202069