RabbitMQ的工作模式之五主题(Topic)模式
发布订阅模式下,生产者与Topic类型的交换机打交道,由交换机将消息发送给对应绑定的队列,同时可以指定路由key,作为筛选的条件,同时可以进行模糊匹配。
导入依赖
<!--RabbitMQ的依赖-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
首先创建工具类,用以生成连接,避免重复书写此段代码
public class RabbitMQUtil {
private static ConnectionFactory factory = new ConnectionFactory();
static{
//用户名
factory.setUsername("hello");
//密码
factory.setPassword("123456");
//虚拟机
factory.setVirtualHost("/myhost");
//IP地址,运行rabbitmq组件的主机ip地址
factory.setHost("192.168.31.14");
//端口
factory.setPort(5672);
}
public static Connection getConnection(){
Connection connection = null;
try {
connection = factory.newConnection();
return connection;
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return null;
}
}
生产者
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = RabbitMQUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
/**
* 声明交换机
* 参数1:交换机名称
* 参数2:交换机类型:fanout、direct、topic、headers
*/
channel.exchangeDeclare("topic_exchange", BuiltinExchangeType.TOPIC);
Map<String,String> map = new HashMap<>();
map.put("001","重庆");
map.put("002","成都");
map.put("003","北京");
map.put("004","乌鲁木齐");
for (String key : map.keySet()) {
// 参数2:路由key就是 数据筛选的条件
channel.basicPublish("topic_exchange",key,null,map.get(key).getBytes());
}
channel.close();
connection.close();
}
}
消费者
public class Customer1{
public static void main(String[] args) throws IOException {
//获取连接
Connection connection = RabbitMQUtil.getConnection();
//创建通道
final Channel channel =connection.createChannel();
//声明队列
channel.queueDeclare("queue1",false,false,false,null);
//绑定交换机和队列,设置路由key
channel.queueBind("queue1","topic_exchange","001");
channel.queueBind("queue1","topic_exchange","002");
//签收消息:false表示手动编程的方式签收
channel.basicConsume("queue1",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("customer1接收到的消息:"+new String(body));
//签收
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
public class Customer2{
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtil.getConnection();
final Channel channel =connection.createChannel();
channel.queueDeclare("queue2",false,false,false,null);
//匹配所有
channel.queueBind("queue2","topic_exchange","*");
channel.basicConsume("queue2",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("customer2接收到的消息:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
注:生产者发送消息至交换机,交换机按照key分配给相应队列,这里测试时若先运行消费者端需要确保交换机被创建,故需要在消费者端声明交换机,此时生产者端不需要声明交换机,当然,如果先运行生产者端,则需要在生产者端先声明交换机,此时消费者端可以不用声明交换机,总之躯体情况具体分析,灵活运用,当然为了省事,可以一律声明。