0、引言
先决条件
本教程假设 RabbitMQ 已安装并且正在
本地主机
的标准端口(5672
)上运行。如果您使用了不同的主机、端口或凭证,则要求调整连接设置。获取帮助
如果您在阅读本教程时遇到问题,可以通过邮件列表或者 RabbitMQ 社区 Slack 与 RabbitMQ 官方取得联系。
在第二篇教程中我们学习了如何在多个工作者之间使用 工作队列 分发费时的任务。
但是如果我们需要运行远端计算机上的一个函数并等待其返回值呢?这是一种常见的工作模式,称之为 远程过程调用(Remote Procedure Call)。
本文我们将使用 RabbitMQ 在 .NET 下去构建一个(请求/应答 模式示例)RPC 系统:一个客户端和一个可扩展的 RPC 服务端。因为我们没有任何耗时的任务需要去分发,所以现在我们将创建一个返回斐波那契(Fibonacci)数的虚拟 RPC 服务。
原文链接:https://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html
1、客户端接口
为了说明如何使用 RPC 服务,我们现在去创建一个简单的客户端类。它将公开一个名为 CallAsync
的方法,该方法发送 RPC 请求并在收到结果前保持阻塞。
using var rpcClient = new RpcClient();
Console.WriteLine(" [x] Requesting fib({0})", n);
var response = await rpcClient.CallAsync(n);
Console.WriteLine(" [.] Got '{0}'", response);
RPC 笔记
尽管 RPC 在编程领域中是一种非常常见的模式,但它饱受诟病。当程序员不知道函数调用是本地的还是缓慢的 RPC 时,问题就出现了。这样的混乱会导致不可预测的系统,并为调试增加不必要的复杂性。滥用 RPC 不仅不能简化软件,还会导致不可维护的意大利面条式代码。
铭记这一点,并考虑如下这些建议:
- 确保明确表示哪个函数调用是本地的,哪个是远程的;
- 记录你的系统,明确组件之间的依赖关系;
- 处理错误情况。比如当 RPC 服务器长时间关闭时,客户端应该如何反应?
如有疑问,请避免使用 RPC。如果可以的话,您应该使用(结果被异步推送到下一个计算阶段的)异步管道 —— 而不是类似 RPC 的阻塞。
2、回调队列
一般来说,在 RabbitMQ 上做 RPC 是很容易的。客户端发送一个请求消息,服务端使用响应消息进行应答。为了接收响应,我们需要在请求中发送一个“回调”队列地址:
var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;
var messageBytes = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: string.Empty,
routingKey: "rpc_queue",
basicProperties: props,
body: messageBytes);
// ... 随后编写从回调队列读取响应消息的代码 ...
Message properties
AMQP 0-9-1 协议预定义了 14 个与消息一起使用的属性。大多数属性都很少使用,除了下面这几个:
3、Correlation Id
在上面给出的方法中,我们建议为每个 RPC 请求创建一个回调队列。这是相当低效的,但幸运的是有一个更好的方法 —— 让我们为每个客户端创建一个回调队列。
这会导致一个新的问题:在该队列中接收到响应后,不清楚该响应属于哪个请求。这个时候通常就会用到 CorrelationId
属性了。对于每个请求,我们将该属性的值设置为一个独一无二的值。随后,当我们在回调队列中收到一条消息时我们会查看该属性的值,并基于此我们得以将响应与对应的请求匹配起来。如果我们遇到了未知的 CorrelationId
值,我们可以安全地丢弃此消息 —— 因为它不属于我们的请求。
你也许会问:为什么我们应当忽视回调队列中未知的消息,而不是失败并出现错误?这是因为服务端存在竞争条件的可能性。虽然不大可能,但 RPC 服务端可能在向我们发送答复之后却在为请求发送确认消息前就会死亡。如果发生这种情况,重新启动的RPC服务器将再次处理请求。这就是为什么在客户端上我们必须优雅地处理重复响应的原因,理想情况下RPC应该是幂等的。
4、总结
我们的 RPC 将这样工作:
- 当客户端启动时,它创建一个匿名独占(exclusive)回调队列。
- 对于 RPC 请求,客户端发送消息时包含两个属性:
ReplyTo
:用于设置回调队列;CorrelationId
:为每个请求设置唯一值。
- 请求被发送到
rpc_queue
队列中。 - RPC 工作器(又名:服务端)正在该队列上等待请求。当出现请求时,它完成任务,并使用
ReplyTo
属性中的队列将带有结果的消息发送回客户端。 - 客户端等待回调队列上的数据。当消息出现时,它检查
CorrelationId
属性。如果它与请求中的值匹配,则将响应返回给应用程序。
将所有的东西放到一起:
斐波那契任务:
static int Fib(int n)
{
if (n is 0 or 1)
{
return n;
}
return Fib(n - 1) + Fib(n - 2);
}
我们声明了我们的斐波那契函数。它只假设有效的正整数输入。(不要指望这个方法适用于大的数字,它可能是最慢的递归实现)。
我们 RPC 服务端的代码 RPCServer.cs 看起来像是这样:
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
var factory = new ConnectionFactory {
HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: "rpc_queue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(queue: "rpc_queue",
autoAck: false,
consumer: consumer);
Console.WriteLine(" [x] Awaiting RPC requests");
consumer.Received += (model, ea) =>
{
string response = string.Empty;
var body = ea.Body.ToArray();
var props = ea.BasicProperties;
var replyProps = channel.CreateBasicProperties();
replyProps.CorrelationId = props.CorrelationId;
try
{
var message = Encoding.UTF8.GetString(body);
int n = int.Parse(message);
Console.WriteLine($" [.] Fib({message})");
response = Fib(n).ToString();
}
catch (Exception e)
{
Console.WriteLine($" [.] {e.Message}");
response = string.Empty;
}
finally
{
var responseBytes = Encoding.UTF8.GetBytes(response);
channel.BasicPublish(exchange: string.Empty,
routingKey: props.ReplyTo,
basicProperties: replyProps,
body: responseBytes);
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}
};
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
// 只假设有效的正整数输入。
// 不要指望这个方法适用于大的数字,它可能是最慢的递归实现
static int Fib(int n)
{
if (n is 0 or 1)
{
return n;
}
return Fib(n - 1) + Fib(n - 2);
}
服务端的代码相当简单:
- 像往常一样,我们首先建立连接、通道并声明队列。
- 我们可能想要运行多个服务端进程。为了在多个服务端上平均分配负载,我们需要在
channel.BasicQos
中设置prefetchCount
设置。 - 我们使用
BasicConsume
去访问队列。然后,我们注册一个交付处理程序,在其中完成工作并发送响应。
RPC 客户端的代码 RPCClient.cs:
using System.Collections.Concurrent;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
public class RpcClient : IDisposable
{
private const string QUEUE_NAME = "rpc_queue";
private readonly IConnection connection;
private readonly IModel channel;
private readonly string replyQueueName;
private readonly ConcurrentDictionary<string, TaskCompletionSource<string>> callbackMapper = new();
public RpcClient()
{
var factory = new ConnectionFactory {
HostName = "localhost" };
connection = factory.CreateConnection();
channel = connection.CreateModel();
//声明一个 RabbitMQ 服务器自动命名的“回调”队列
replyQueueName = channel.QueueDeclare().QueueName;
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
if (!callbackMapper.TryRemove(ea.BasicProperties.CorrelationId, out var tcs))
return;
var body = ea.Body.ToArray();
var response = Encoding.UTF8.GetString(body);
tcs.TrySetResult(response);
};
channel.BasicConsume(consumer: consumer,
queue: replyQueueName,
autoAck: true);
}
public Task<string> CallAsync(string message, CancellationToken cancellationToken = default)
{
IBasicProperties props = channel.CreateBasicProperties();
var correlationId = Guid.NewGuid().ToString();
props.CorrelationId = correlationId;
props.ReplyTo = replyQueueName;
var messageBytes = Encoding.UTF8.GetBytes(message);
var tcs = new TaskCompletionSource<string>();
callbackMapper.TryAdd(correlationId, tcs);
channel.BasicPublish(exchange: string.Empty,
routingKey: QUEUE_NAME,
basicProperties: props,
body: messageBytes);
cancellationToken.Register(() => callbackMapper.TryRemove(correlationId, out _));
return tcs.Task;
}
public void Dispose()
{
connection.Close();
}
}
public class Rpc
{
public static async Task Main(string[] args)
{
Console.WriteLine("RPC Client");
string n = args.Length > 0 ? args[0] : "30";
await InvokeAsync(n);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
private static async Task InvokeAsync(string n)
{
using var rpcClient = new RpcClient();
Console.WriteLine(" [x] Requesting fib({0})", n);
var response = await rpcClient.CallAsync(n);
Console.WriteLine(" [.] Got '{0}'", response);
}
}
客户端代码稍微复杂一些:
- 我们建立了一个连接和通道,并声明一个独占(exclusive)的“回调”队列用于应答。
- 我们订阅了“回调”队列,这样我们就可以接收 RPC 响应。
- 我们的
Call
方法生成实际的 RPC 请求。 - 在这里我们首先生成一个唯一的
CorrelationId
编号并保存它,以便响应到达时识别相应的响应。 - 下一步,我们发布请求消息,包含两个属性:
ReplyTo
和CorrelationId
。 - 这个时候我们就可以坐等响应到来了。
- 对于每条响应消息客户端检查
CorrelationId
是否是我们寻找的那个。如果是,保存响应。 - 最终我们将响应返回给用户。
生成客户端请求:
using var rpcClient = new RpcClient();
Console.WriteLine(" [x] Requesting fib({0})", n);
var response = await rpcClient.CallAsync(n);
Console.WriteLine(" [.] Got '{0}'", response);
那么现在,是时候去看看我们 RPCClient.cs 和 RPCServer.cs 的完整例程源码了(包含基本的异常处理)。
像往常一样配置即可(参考教程一)
现在我们的 RPC 服务端已经准备好了。我们可以启动服务端:
cd RPCServer
dotnet run
# => [x] Awaiting RPC requests
运行客户端以请求一个斐波那契数:
cd RPCClient
dotnet run
# => [x] Requesting fib(30)
运行效果:
这里介绍的设计并不是 RPC 服务的唯一可能实现,但它有一些重要的优点:
- 如果 RPC 服务端太慢,您可以通过运行另一个服务端来进行扩展。尝试在新控制台中运行第二个
RPCServer
。 - 在客户端,RPC 只需要发送和接收一条消息。不需要像
QueueDeclare
这样的同步调用。因此,RPC 客户端对于单个 RPC 请求只需要一次网络往返。
我们的代码仍然非常简单,并没有试图解决更复杂(但重要)的问题,比如:
- 如果没有服务端在运行,客户端应该如何反应?
- 客户端应该为 RPC 设置某种超时吗?
- 如果服务端出现故障并引发异常,是否应该将其转发给客户端?
- 在处理之前防止无效的传入消息(例如检查边界、类型)。
如果您想尝试一下,您可能会发现管理 UI 对于查看队列很有用。(可以参考这篇博客的 5.3、启用服务管理 小节了解如何启用管理 UI)
5、生产[非]适用性免责声明
请记住,本教程和其他教程都是教程。他们一次展示一个新概念,可能会有意地过度简化一些东西,而忽略其他东西。例如,为了简洁起见,连接管理、错误处理、连接恢复、并发性和指标收集等主题在很大程度上被省略了。这种简化的代码不应该被认为可以用于生产。
在发布您的应用之前,请先查看其他文档。我们特别推荐以下指南:发布者确认和消费者确认,生产清单和监控。