- 工作队列
- 注意要点
工作线程的产生,在同一队列上声明多个Consumer:
channel.basicConsume(queue, false, consumer);
消息持久化:
//第二个参数durable设为true,实现消息持久化 channel.queueDeclare(queue, true, false, false, null); // MessageProperties.PERSISTENT_TEXT_PLAIN,消息持久化 channel.basicPublish("", queue, MessageProperties.PERSISTENT_TEXT_PLAIN, SerializationUtils.serialize(object));
客户端最大任务量,避免工作线程任务累积,需要开启手动应答模式:
//同时可以接收的最大任务量 channel.basicQos(2);
手动应答,保证任务执行成功:
//关闭自动应答机制,默认开启;这时候需要手动进行应该 channel.basicConsume(queue, false, consumer); //每处理完一次,手动应答一次,否则服务端认为分配的任务都未完成 channel.basicAck(envelope.getDeliveryTag(), false);
- 消息发送类
package com.demo.mq.rabbitmq.example02; import java.io.IOException; import java.io.Serializable; import org.apache.commons.lang3.SerializationUtils; import com.demo.mq.rabbitmq.MqManager; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.MessageProperties; /** * 发送消息类 * @author sheungxin * */ public class Send{ /** * 发送消息,实现消息的持久化 * @param queue 队列名称 * @param object 消息主体 * @throws IOException */ public static void sendAToB(String queue,Serializable object) throws Exception{ Connection conn=MqManager.newConnection(); Channel channel=conn.createChannel(); //第二个参数durable设为true,实现消息持久化 channel.queueDeclare(queue, true, false, false, null); // MessageProperties.PERSISTENT_TEXT_PLAIN,消息持久化 channel.basicPublish("", queue, MessageProperties.PERSISTENT_TEXT_PLAIN, SerializationUtils.serialize(object)); System.out.println("A Send :'"+object+"'"); channel.close(); conn.close(); } public static void main(String[] args) throws Exception { String channel="task_queue"; for(int i=0;i<10;i++){ sendAToB(channel, "Hello World "+i+"!"); } } }
- 消息接收类
package com.demo.mq.rabbitmq.example02; import java.io.IOException; import java.util.Random; import org.apache.commons.lang3.SerializationUtils; import com.demo.mq.rabbitmq.MqManager; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; /** * 接收消息类 * @author sheungxin * */ public class Recv { /** * 用于接收消息,开启应答机制 * @param queue * @throws Exception */ public static void recvAToB(String queue) throws Exception{ Connection conn=MqManager.newConnection(); Channel channel=conn.createChannel(); //此处声明队列为了防止接收者先运行,队列还不存在时创建队列(同一队列只会创建一次) channel.queueDeclare(queue, true, false, false, null); //同时可以接收的最大任务量 channel.basicQos(2); Consumer consumer=new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException{ String mes=SerializationUtils.deserialize(body); System.out.println("B Received :'"+mes+"'..."); try { Thread.sleep(new Random().nextInt(10)*1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("B Received :'"+mes+"' done"); //每处理完一次,手动应答一次,否则服务端认为分配的任务都未完成 channel.basicAck(envelope.getDeliveryTag(), false); } }; //关闭自动应答机制,默认开启;这时候需要手动进行应该 channel.basicConsume(queue, false, consumer); } public static void main(String[] args) throws Exception { recvAToB("task_queue"); } }