其实它和路由模式差不多的,更加通配而已
代码:
生产者:
package com.itheima.producer;
/**
* @author QLBF
* @version 1.0
* @date 2021/2/26 10:55
*/
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 发送消息
*/
public class Producer_Topics {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory=new ConnectionFactory();
//2. 设置参数
factory.setHost("127.0.0.1"); //ip,写RabbitMQ启动的ip
factory.setPort(5672);//端口 默认值 5672
factory.setVirtualHost("/itcast"); //虚拟机 默认值/
factory.setUsername("heima"); //用户名 默认 guest
factory.setPassword("heima"); //密码 默认值 guest
//3. 创建连接 Connection
Connection connection=factory.newConnection();
//4. 创建Channel
Channel channel=connection.createChannel();
/*
exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
参数:
1. exchange:交换机名称
2. type:交换机类型
DIRECT("direct"),:定向
FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
TOPIC("topic"),通配符的方式
HEADERS("headers");参数匹配
3. durable:是否持久化
4. autoDelete:自动删除
5. internal:内部使用。 一般false
6. arguments:参数
*/
String exchangeName="test_topic";
//5. 创建交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,
false,false,null);
//6. 创建队列
String queue1Name="test_topic_queue1";
String queue2Name="test_topic_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
//7. 绑定队列和交换机,重要
/*
queueBind(String queue, String exchange, String routingKey)
参数:
1. queue:队列名称
2. exchange:交换机名称
3. routingKey:路由键,绑定规则
如果交换机的类型为fanout ,routingKey设置为""
*/
// routing key 系统的名称.日志的级别。
//=需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
channel.queueBind(queue1Name,exchangeName,"#.error");
channel.queueBind(queue1Name,exchangeName,"order.*");
channel.queueBind(queue2Name,exchangeName,"*.*");//*.*代表啥都行
String boby="日志信息:张三调用了delete方法...出错误了。。。日志级别:(info)...";
//8. 发送消息,下面error可以改为别的来测试哦
channel.basicPublish(exchangeName,"goods.error",null,boby.getBytes());
//9.释放资源
channel.close();
connection.close();
}
}
消费者:
package com.itheima.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author QLBF
* @version 1.0
* @date 2021/2/26 11:39
*/
public class Consumer_Topic1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory=new ConnectionFactory();
//2. 设置参数
factory.setHost("127.0.0.1"); //ip,写RabbitMQ启动的ip
factory.setPort(5672);//端口 默认值 5672
factory.setVirtualHost("/itcast"); //虚拟机 默认值/,这个是我在启动Rabbit网站设置的
factory.setUsername("heima"); //用户名 默认 guest
factory.setPassword("heima"); //密码 默认值 guest
//3. 创建连接 Connection
Connection connection=factory.newConnection();
//4. 创建Channel
Channel channel=connection.createChannel();
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称,从生产者的work_queues拿出来
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 6.接收消息
Consumer consumer=new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/
System.out.println("body:"+new String(body)); //获取生产者发给MQ的消息,转为字符串
System.out.println("将日志信息存到到控制台.....");
}
};
channel.basicConsume(queue1Name,true,consumer);
//7关闭资源?不要
}
}
package com.itheima.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author QLBF
* @version 1.0
* @date 2021/2/26 11:39
*/
public class Consumer_Topic2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory=new ConnectionFactory();
//2. 设置参数
factory.setHost("127.0.0.1"); //ip,写RabbitMQ启动的ip
factory.setPort(5672);//端口 默认值 5672
factory.setVirtualHost("/itcast"); //虚拟机 默认值/,这个是我在启动Rabbit网站设置的
factory.setUsername("heima"); //用户名 默认 guest
factory.setPassword("heima"); //密码 默认值 guest
//3. 创建连接 Connection
Connection connection=factory.newConnection();
//4. 创建Channel
Channel channel=connection.createChannel();
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
/*
basicConsume(String queue, boolean autoAck, Consumer callback)
参数:
1. queue:队列名称,从生产者的work_queues拿出来
2. autoAck:是否自动确认
3. callback:回调对象
*/
// 6.接收消息
Consumer consumer=new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
/* System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);*/
System.out.println("body:"+new String(body)); //获取生产者发给MQ的消息,转为字符串
System.out.println("将日志信息打印到到控制台.....");
}
};
channel.basicConsume(queue2Name,true,consumer);
//7关闭资源?不要
}
}
测试:
先直接运行生产者,再运行消费者,不用tomcat的:
这是由于生产者设置的通配符,所以两个都输出了,可以改的
测试成功!
小结