RabitMQ二:Work Queues
概述
本文示例重点为:
- RabbitMQ的的消息及队列持久化(生产者服务挂了之后保证数据不丢失)
- RabbitMQ的消息应答(消费时服务挂了之后保证消息数据不丢失)
示例
NewTask
说明:
- QueueDeclare
的参数durable
表示设置队列是否持久化
- IBasicProperties.Persistent
设置消息是否持久化(设置消息持久化需先设置队列为可持久化,不然服务重启后,队列丢失,消息持久化也就没作用了。)
class NewTask
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "127.0.0.1" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null);
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "", routingKey: "task_queue", basicProperties: properties, body: body);
Console.WriteLine($"Sent: {message}");
}
}
Console.WriteLine("Press [enter] to exit.");
Console.ReadLine();
}
private static string GetMessage(string[] args)
{
return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}
}
Worker
说明:
- 使用
BasicQos
方法和prefetchCount = 1
设置告诉RabbitMQ不要一次向一个worker发生一条消息,即:在处理并确认前一个消息之前,不要向工作人员发送新消息。相反,它会将它发送给下一个仍然不忙的工人。 - 消息应答:当自动应答-autoAck(
automatic acknowledgement mode
)等于true的时候,表示当消费者一收到消息就表示消费者收到了消息,消费者收到了消息就会立即从队列中删除。为了保证数据不丢失,设置autoAck = false
,
class Worker
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "127.0.0.1" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
Console.WriteLine($" [*] Waiting for message.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($" [x] Received {message}");
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 10000);
Console.WriteLine(" [x] Done.");
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);
Console.WriteLine("Press [enter] to exit.");
Console.ReadLine();
}
}
}