在订阅模式中多了一个Exchange角色。
Exchange通常有三种类型:
Fanout(广播),使用该模式,Exchange会将消息发布到所绑定的所有队列中。
Direct(定向),把消息交给符合指定routing key 的队列。
Topic(通配符),把消息交给符合routing pattern(路由模式) 的队列。
product代码:
public class Producer {
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("139.196.37.163");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("my_vhost");
//创建连接
Connection connection = connectionFactory.newConnection();
//创建频道
Channel channel = connection.createChannel();
//创建队列
/**
* String queue,
* boolean durable,
* boolean exclusive,
* boolean autoDelete,
* Map<String, Object
*/
channel.queueDeclare("ps_queues1",true,false,false,null);
channel.queueDeclare("ps_queues2",true,false,false,null);
//创建交换机
/**
* String exchange, 交换机名
* BuiltinExchangeType type, 交换机类型
* boolean durable, 是否持久化储存
* boolean autoDelete, 是否自动删除
* boolean internal,内部使用。 一般false
* Map<String, Object 参数
*/
String exchangeName = "test_fanout";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
//7. 绑定队列和交换机
/**
* String queue,
* String exchange,
* String routingKey
*/
channel.queueBind("ps_queues1",exchangeName,null);
channel.queueBind("ps_queues2",exchangeName,null);
//发送消息
channel.basicPublish("amq.fanout","",null,"小逼崽子".getBytes());
channel.close();
connection.close();
}
}
Consumer代码:
public class Consumer1 {
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("139.196.37.163");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("my_vhost");
//创建连接
Connection connection = connectionFactory.newConnection();
//创建频道
Channel channel = connection.createChannel();
//创建队列,如果有就不会创建
/**
* String queue,
* boolean durable,
* boolean exclusive,
* boolean autoDelete,
* Map<String, Object> arguments
*/
channel.queueDeclare("ps_queues1", true, false, false, null);
/**
* String queue,
* boolean autoAck,
* Map<String, Object> arguments,
* Consumer callback
*/
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
};
channel.basicConsume("ps_queues1", true, defaultConsumer);
}
}