生产者:
package com.xuecheng.rabbitmq.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Auther: 星仔
* @Date: 2018/12/24 21:31
* @Description:
*/
public class ProducerTest03 {
//声明队列名称
private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
//声明交换机的名称
private static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = null;
Channel channel = null;
try {
//创建连接工厂,建立连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
//创建虚拟主机,rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
factory.setVirtualHost("/");
//创建于MQ服务的TCP连接
connection = factory.newConnection();
//创建于EXchange的通道,每一个通道都相当于一个会话事务
channel = connection.createChannel();
//声明交换机 String exchange,BuiltinExchangeType type
/**
* 参数明细:
* exchange:交换机名称
* type:交换机的类型
* fanout,topic,direct,headers
*/
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
//声明队列,如果RabbitMQ中没有该队列,则会创建
/**
*参数:String queue,boolean durable,boolean exclusive,boolean autoDelete,Map<String,Object> params
*参数明细:
* 1. queue:队列名称
* 2. durable:是否持久化,如果持久化,将MQ重启之后队列还在
* 3. exclusive: 是否独占连接,队列只允许在该队列中访问,一旦连接关闭,该队列将自动删除,如果将此参数设置为true,那么可用于临时队列的创建
* 4. autoDelete:自动删除,队列不再使用时是否关闭,如果将此参数设置为true将exclusive设置为true,可用于创建临时队列
* 5. params: 可以设置队列的一些扩展参数,比如设置存活时间等等
*/
channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
//交换机和队列进行绑定String queue, String exchange, String routingKey
/**
* 参数明细:
* queue:队列的名称
* exchange:交换机的名称
* routinfkey:路由key
*/
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,"inform02");
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,"inform01");
//发送消息
/**
* 参数:String exchange,String routingKey,String props,Byte[] body
* 参数明细:
* 1、exchange: 交换机,如果不使用,将使用MQ的默认交换机
* 2、routingKey: 路由key,交换机根据路由key将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
* 3、props:消息的属性
* 4、body: 消息内容
*/
for (int i=0; i<5;i++){
String msg = "helllo world"+i;
channel.basicPublish(EXCHANGE_ROUTING_INFORM,"inform02",null,msg.getBytes());
}
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}finally{
//关闭连接,先关闭通道,在关闭连接
if(channel!=null){
channel.close();
}
if(connection!=null){
connection.close();
}
}
}
}
消费者:e-mail
package com.xuecheng.rabbitmq.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Auther: 星仔
* @Date: 2018/12/24 22:43
* @Description:
*/
public class ConsumeTestEmail {
//声明队列名称
private static final String QUEUE_INFORM_SMS = "queue_inform_email";
//声明交换机的名称
private static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂,建立连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
//创建虚拟主机,rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
factory.setVirtualHost("/");
//创建于MQ服务的TCP连接
Connection connection = factory.newConnection();
//创建于EXchange的通道,每一个通道都相当于一个会话事务
Channel channel = connection.createChannel();
/**
* 参数明细:
* exchange:交换机名称
* type:交换机的类型
* fanout,topic,direct,headers
*/
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
//声明队列
channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
//声明消费消息的方法
DefaultConsumer consumer = new DefaultConsumer(channel){
/**
* 消费者接收消息调用此方法
* @param consumerTag 消费者的标签,在channel.basicConsume()去指定
* @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException{
//交换机
String exchange = envelope.getExchange();
//路由key
String routingKey = envelope.getRoutingKey();
//消息id,mq在channel中用来标识消息的id,用于确认消息已接收
long deliveryTag = envelope.getDeliveryTag();
//消息内容
String msg = new String(body,"utf-8"); System.out.println("receive message.." + msg);
}
};
/**
* 监听队列
* 参数:String queue, boolean autoAck,Consumer callback
* 参数明细:
* 1.queue: 队列名称
* 2.autoAck: 自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动回复
* 3.callback: 消费消息的方法,消费者接收到消息后调用此方法
*/
channel.basicConsume(QUEUE_INFORM_SMS,true,consumer);
}
}
消费者-sms:
package com.xuecheng.rabbitmq.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Auther: 星仔
* @Date: 2018/12/24 22:43
* @Description:
*/
public class ConsumeTestSms {
//声明队列名称
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
//声明交换机的名称
private static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂,建立连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
//创建虚拟主机,rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
factory.setVirtualHost("/");
//创建于MQ服务的TCP连接
Connection connection = factory.newConnection();
//创建于EXchange的通道,每一个通道都相当于一个会话事务
Channel channel = connection.createChannel();
/**
* 参数明细:
* exchange:交换机名称
* type:交换机的类型
* fanout,topic,direct,headers
*/
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
//声明队列
channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
//声明消费消息的方法
DefaultConsumer consumer = new DefaultConsumer(channel){
/**
* 消费者接收消息调用此方法
* @param consumerTag 消费者的标签,在channel.basicConsume()去指定
* @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException{
//交换机
String exchange = envelope.getExchange();
//路由key
String routingKey = envelope.getRoutingKey();
//消息id,mq在channel中用来标识消息的id,用于确认消息已接收
long deliveryTag = envelope.getDeliveryTag();
//消息内容
String msg = new String(body,"utf-8"); System.out.println("receive message.." + msg);
}
};
/**
* 监听队列
* 参数:String queue, boolean autoAck,Consumer callback
* 参数明细:
* 1.queue: 队列名称
* 2.autoAck: 自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动回复
* 3.callback: 消费消息的方法,消费者接收到消息后调用此方法
*/
channel.basicConsume(QUEUE_INFORM_SMS,true,consumer);
}
}