RabbitMq03——轮询分发

       相对来说生产者生产消息是比较容易的,但是消费者在处理消息的时候则需要更多的时间,甚至若干倍于生产者,所以如果只是使用简单队列的话,生产者和消费者之间的关系是一一对应的,这样当消费者处理需要耗费较长时间的时候,就会造成消息大量的积压,不能得到及时的处理。

       因此,我们需要学习新的工作队列,即一个消息队列可以对应多个消费者,它们将共同消费队列的消息,这样就会成倍的提升处理的效率。

(以下代码无注释,详细注释已在上一篇简单队列之中给出,请参考,谢谢)

3.1、定义生产者

和简单队列一样,创建生产者(具体可查看上一篇博客——简单队列)。本次循环发送50条消息到队列。

package com.mmr.rabbitmq.work;

import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {

    private final static String QUEUE_NAME = "test_queue_work";
	
    public static void main(String[] args) throws Exception {		
        Connection connection = ConnectionUtils.getConnection();		
        Channel channel = connection.createChannel();		
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
			
        for (int i = 0; i < 50; i++) {
            String msg="Hello work QUEUE " + i;
	    channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
	    System.out.println("Send----->" + msg + "----" + i);
	    Thread.sleep(100);		
        }		
        channel.close();
        connection.close();
    }
}

3.2.1、定义消费者

消费者1

package com.mmr.rabbitmq.work;

import java.io.IOException;

import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Recv1 {
	
    private final static String QUEUE_NAME = "test_queue_work";
	
    public static void main(String[] args) throws Exception {	
        Connection connection = ConnectionUtils.getConnection();	
        Channel channel = connection.createChannel();	
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		
        DefaultConsumer consumer = new DefaultConsumer(channel)
        {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
	            throws IOException {
                String msg = new String(body, "utf-8");
	        System.out.println("Recv1---->" + msg);
	        try {
	        Thread.sleep(1000);
	        } catch (InterruptedException e) {
                    e.printStackTrace();
	        }finally {
	            System.out.println("Recv1---->" + msg + "---->Done");
	        }
	    }
        };	
    channel.basicConsume(QUEUE_NAME, true, consumer);	
    }	
}
消费者 2
package com.mmr.rabbitmq.work;

import java.io.IOException;

import com.mmr.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Recv2 {
	
    private final static String QUEUE_NAME = "test_queue_work";
	
    public static void main(String[] args) throws Exception {
	Connection connection = ConnectionUtils.getConnection();
	Channel channel = connection.createChannel();	
	channel.queueDeclare(QUEUE_NAME, false, false, false, null);
	
	DefaultConsumer consumer = new DefaultConsumer(channel)
	{
	    @Override
	    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
	            throws IOException {
	        String msg = new String(body, "utf-8");
		System.out.println("Recv2---->" + msg);				
		try {
		    Thread.sleep(2000);
		} catch (InterruptedException e) {
		    e.printStackTrace();
		}finally {
		    System.out.println("Recv2---->" + msg + "---->Done");
		}
	    }
	};	
	channel.basicConsume(QUEUE_NAME, true, consumer);		
    }	
}

    分别运行两个消费者,然后运行生产者,我们可以看到发送的50条消息就会被消费者接收到。不过我们可以发现这两个消费者处理消息的速度是不相同的,一个是2秒,一个是1秒,但从控制台的输出我们可以看到它们处理消息的总量却是相同的,都是25条。这就是第一种工作模式——轮询分发,不管消费者的处理速度,总是一人一条依次分发的,每个消费者得到的消息总量都是相等的。

    在实际应用中,我们想要得到的一般会是谁处理的速度快,就会给谁多分发消息,也就是能者多劳,而不是上述这样依次轮询。也就是我们下一篇所要说的公平分发模式。

猜你喜欢

转载自blog.csdn.net/yangsheng0111/article/details/80871504