消费者和生产者一样,都需要与Mq建立一个socket连接,建立连接完成以后,在消费者本地为此连接也维护一个与生产者一样的同名队列(没有的话就会生成),然后使用channel.basicConsume(QUEUE,true,consumer),注意consummer是个回调函数,只有消费者收到消息才会使用这一方法进行对这一消息的一些操作,需要预先定义这个回调函数
创建连接->如果mq没有队列,要创建队列(其实通过消费者在mq上创建队列),有就没必要了->然后声明接受到信息后处理方法->最后连接将对消息进行处理。
注意消费者不用关连接,要保持对mq的监听.
package com.xuecheng.test.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 测试rabbitmq 入门程序
*
* @author Administrator
* @version 1.0
* @create 2018-06-29 9:22
**/
public class Consumer01 {
private static final String QUEUE = "helloworld";
public static void main(String[] args) {
//连接
Connection connection = null;
//通道
Channel channel = null;
try {
//给MQ发送消息
//连接MQ
//通过连接工厂创建连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");//IP地址
connectionFactory.setPort(5672);//默认mq服务端口
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");//设置mq虚拟机
//和MQ创建连接
connection = connectionFactory.newConnection();
//建立channel通道,会话通道,在通道中向mq发送消息
channel = connection.createChannel();
//声明一个队列,根据队列名称判断,如果在mq中没有此队列就创建一个队列,如果有此队列则不作处理
/**
* 参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
* 1、队列名称
* 2、是否持久化,true表示持久化,当mq重启之后此队列还在
* 3、是否独占通道,true表示此通道只能由此队列来使用
* 4、自动删除,true表示自动删除,mq重启后队列删除
* 5、队列参数列表
*/
channel.queueDeclare(QUEUE,true,false,false,null);
//创建回调方法类
DefaultConsumer consumer = new DefaultConsumer(channel){
/**
* 消费者接收到消息后会调用此方法
* @param consumerTag 消费者标签,用来标识消费者,如果不指定默认一个名称,在监听队列时设置channel.basicConsume
* @param envelope 消息内容包/信封,可以拿到交换机,消息id
* @param properties 消息的属性
* @param body 消息的内容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String exchange = envelope.getExchange();//交换机
long deliveryTag = envelope.getDeliveryTag();//消息id,可以用于消息已经接受
String routingKey = envelope.getRoutingKey();//路由key
//消息内容
String message = new String(body, "utf-8");
System.out.println("i am cousumer,i have received the message "+message);
}
};
//监听队列,接收消息
/**
* 参数:String queue, boolean autoAck, Consumer callback
* 1、监听队列的名称
* 2、autoAck是否自动回复,消息者接收到消息要给mq回复表示此消息已接收,此时mq去删除消息
* 如果autoAck设置true,表示消费者接收到消息后自动回复,如果设置为false,需要程序员在代码中手动回复(channel.basicAck();)
* 3、回调方法,接收到消息后调用此callback
*/
channel.basicConsume(QUEUE,true,consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}