Topic类型交换机跟Direct类型交换机相比,key多了一个匹配模式,会根据路由规则把消息分配到不同的队列中,供消费者消费。
通过一个示例说明此类交换机的用法,项目结构:
代码实例:
生产者
using RabbitMQ.Client;
using System;
using System.Text;
namespace AspNetCore.RabbitMQ.MessageProducer.MessageProducer
{
/// <summary>
/// Topic类型交换机
/// </summary>
public class TopicExchange
{
public static void Show()
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "localhost";//rabbitmq服务在本地运行
factory.UserName = "guest";//用户名
factory.Password = "guest";//密码
//创建链接
using (IConnection connection = factory.CreateConnection())
{
//创建信道
using (IModel channel = connection.CreateModel())
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine("生产者已准备就绪......");
//声明交换机exchange
channel.ExchangeDeclare(exchange: "TopicExchange", type: ExchangeType.Topic, durable: true, autoDelete: false, arguments: null);
//声明两个队列
channel.QueueDeclare(queue: "ChinaQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.QueueDeclare(queue: "NewsQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
//绑定exchange和queue
channel.QueueBind(queue: "ChinaQueue", exchange: "TopicExchange", routingKey: "China.#", arguments: null);
channel.QueueBind(queue: "NewsQueue", exchange: "TopicExchange", routingKey: "#.News", arguments: null);
//待发送的消息
{
string message = "来自中国的新闻消息";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "TopicExchange", routingKey: "China.News", basicProperties: null, body: body);
Console.WriteLine($"消息 {message} 已发送到队列");
}
{
string message = "来自中国的天气消息";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "TopicExchange", routingKey: "China.Weather", basicProperties: null, body: body);
Console.WriteLine($"消息 {message} 已发送到队列");
}
{
string message = "来自美国的新闻消息";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "TopicExchange", routingKey: "SUA.News", basicProperties: null, body: body);
Console.WriteLine($"消息 {message} 已发送到队列");
}
{
string message = "来自美国的天气消息";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "TopicExchange", routingKey: "SUA.Weather", basicProperties: null, body: body);
Console.WriteLine($"消息 {message} 已发送到队列");
}
}
}
}
}
}
生产者定义了两个队列ChinaQueue和NewsQueue,ChinaQueue的路由规则是以China开头的路由,消息都会进入到此队列中。NewsQueue的路由规则是以News结尾的路由,都会进入到此队列中。
main方法:
class Program
{
static void Main(string[] args)
{
TopicExchange.Show();
Console.ReadLine();
}
}
消费者一:消费队列ChinaQueue的消息
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace AspNetCore.RabbitMQ.MessageConsumer_01.MessageConsumer
{
/// <summary>
/// 消费者 Topic类型交换机
/// </summary>
public class TopicExchange
{
public static void Show()
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "localhost";//rabbitmq服务在本地运行
factory.UserName = "guest";//用户名
factory.Password = "guest";//密码
//创建链接
using (IConnection connection = factory.CreateConnection())
{
//创建信道
using (IModel channel = connection.CreateModel())
{
Console.ForegroundColor = ConsoleColor.Green;
try
{
//基于当前信道创建事件
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine($"接收成功 {message}");
};
//处理消息
channel.BasicConsume(queue: "ChinaQueue", autoAck: true, consumer: consumer);
Console.WriteLine("对来自中国的消息比较感兴趣的消费者");
Console.ReadLine();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
}
}
}
main方法:
class Program
{
static void Main(string[] args)
{
TopicExchange.Show();
Console.ReadLine();
}
}
消费者二:消费队列NewsQueue的消息
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace AspNetCore.RabbitMQ.MessageConsumer_02.MessageConsumer
{
/// <summary>
/// 消费者 Topic类型交换机
/// </summary>
public class TopicExchange
{
public static void Show()
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "localhost";//rabbitmq服务在本地运行
factory.UserName = "guest";//用户名
factory.Password = "guest";//密码
//创建链接
using (IConnection connection = factory.CreateConnection())
{
//创建信道
using (IModel channel = connection.CreateModel())
{
Console.ForegroundColor = ConsoleColor.Green;
try
{
//基于当前信道创建事件
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine($"接收成功 {message}");
};
//处理消息
channel.BasicConsume(queue: "NewsQueue", autoAck: true, consumer: consumer);
Console.WriteLine("对来自新闻的消息比较感兴趣的消费者");
Console.ReadLine();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
}
}
}
main方法:
class Program
{
static void Main(string[] args)
{
TopicExchange.Show();
Console.ReadLine();
}
}
运行结果:
总结:Topic类型交换机可以用于分组,可以根据交换机绑定队列的规则来指定不同的组。