相对来说生产者生产消息是比较容易的,但是消费者在处理消息的时候则需要更多的时间,甚至若干倍于生产者,所以如果只是使用简单队列的话,生产者和消费者之间的关系是一一对应的,这样当消费者处理需要耗费较长时间的时候,就会造成消息大量的积压,不能得到及时的处理。
因此,我们需要学习新的工作队列,即一个消息队列可以对应多个消费者,它们将共同消费队列的消息,这样就会成倍的提升处理的效率。
(以下代码无注释,详细注释已在上一篇简单队列之中给出,请参考,谢谢)
3.1、定义生产者
和简单队列一样,创建生产者(具体可查看上一篇博客——简单队列)。本次循环发送50条消息到队列。
package com.mmr.rabbitmq.work;
import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Send {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 50; i++) {
String msg="Hello work QUEUE " + i;
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("Send----->" + msg + "----" + i);
Thread.sleep(100);
}
channel.close();
connection.close();
}
}
3.2.1、定义消费者
消费者1:
package com.mmr.rabbitmq.work;
import java.io.IOException;
import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class Recv1 {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel)
{
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "utf-8");
System.out.println("Recv1---->" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("Recv1---->" + msg + "---->Done");
}
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
消费者
2
:
package com.mmr.rabbitmq.work;
import java.io.IOException;
import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel)
{
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "utf-8");
System.out.println("Recv2---->" + msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("Recv2---->" + msg + "---->Done");
}
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
分别运行两个消费者,然后运行生产者,我们可以看到发送的50条消息就会被消费者接收到。不过我们可以发现这两个消费者处理消息的速度是不相同的,一个是2秒,一个是1秒,但从控制台的输出我们可以看到它们处理消息的总量却是相同的,都是25条。这就是第一种工作模式——轮询分发,不管消费者的处理速度,总是一人一条依次分发的,每个消费者得到的消息总量都是相等的。
在实际应用中,我们想要得到的一般会是谁处理的速度快,就会给谁多分发消息,也就是能者多劳,而不是上述这样依次轮询。也就是我们下一篇所要说的公平分发模式。