在本文中,我们将学习如何使用TaskCompletionSource。它是您几乎不需要使用的那些工具之一,但是当您这样做时,您会很高兴知道它。让我们深入研究它。
基本用法
本节的源代码位于Gigi Labs BitBucket存储库的TaskCompletionSource1文件夹中。
让我们创建一个新的控制台应用程序,在中Main()
,我们将具有在控制台应用程序中运行异步代码的常用解决方法:
1个
2
3
4
5
|
static void Main( string [] args)
{
Run();
Console.ReadLine();
}
|
在该Run()
方法中,我们有一个简单的示例显示TaskCompletionSource的工作方式:
1个
2
3
4
5
6
7
8
9
|
static async void Run()
{
var tcs = new TaskCompletionSource< bool >();
var fireAndForgetTask = Task.Delay(5000)
.ContinueWith(task => tcs.SetResult( true ));
await tcs.Task;
}
|
TaskCompletionSource只是a的包装Task
,可让您控制其完成情况。因此,a TaskCompletionSource<bool>
将包含一个Task<bool>
,您可以bool
根据自己的逻辑设置结果。
在这里,我们使用TaskCompletionSource作为同步机制。我们的主线程使用TaskCompletionSource中的Task产生一个操作并等待其结果。即使该操作不是基于任务的,它也可以在TaskCompletionSource中设置Task的结果,从而允许主线程恢复其执行。
让我们添加一些诊断代码,以便我们可以了解输出中发生的情况:
1个
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18岁
19
|
static async void Run()
{
var stopwatch = Stopwatch.StartNew();
var tcs = new TaskCompletionSource< bool >();
Console.WriteLine($ "Starting... (after {stopwatch.ElapsedMilliseconds}ms)" );
var fireAndForgetTask = Task.Delay(5000)
.ContinueWith(task => tcs.SetResult( true ));
Console.WriteLine($ "Waiting... (after {stopwatch.ElapsedMilliseconds}ms)" );
await tcs.Task;
Console.WriteLine($ "Done. (after {stopwatch.ElapsedMilliseconds}ms)" );
stopwatch.Stop();
}
|
这是输出:
1个
2
3
|
Starting... (after 0ms)
Waiting... (after 41ms)
Done. (after 5072ms)
|
如您所见,主线程一直等待直到tcs.SetResult(true)
被调用为止。这触发了TaskCompletionSource的基础任务(主线程正在等待)的完成,并允许主线程恢复。
除了SetResult()
,TaskCompletionSource还提供了取消任务或将其错误处理的方法。也有安全Try...()
等效项:
SDK示例
本节的源代码位于Gigi Labs BitBucket存储库中的TaskCompletionSource2文件夹中。
我发现TaskCompletionSource非常适合的一种情况是,当您获得公开事件的第三方SDK时。想象一下:您通过SDK方法提交订单,它为该订单提供了ID,但没有结果。SDK将关闭并执行其可能要做的操作,以与外部服务进行对话并处理订单。当这种情况最终发生时,SDK将触发一个事件,以通知调用应用程序订单是否成功下达。
我们将使用它作为SDK代码的示例:
1个
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
public class MockSdk
{
public event EventHandler<OrderOutcome> OnOrderCompleted;
public Guid SubmitOrder( decimal price)
{
var orderId = Guid.NewGuid();
// do a REST call over the network or something
Task.Delay(3000).ContinueWith(task => OnOrderCompleted( this ,
new OrderOutcome(orderId, true )));
return orderId;
}
}
|
本OrderOutcome
类只是一个简单的DTO:
1个
2
3
4
5
6
7
8
9
10
11
|
public class OrderOutcome
{
public Guid OrderId { get ; set ; }
public bool Success { get ; set ; }
public OrderOutcome(Guid orderId, bool success)
{
this .OrderId = orderId;
this .Success = success;
}
}
|
请注意MockSdk
,的SubmitOrder
不会返回任何形式的Task
,并且我们无法等待。这并不一定意味着它正在阻塞;它可能正在使用另一种形式的异步方式,例如异步编程模型或具有请求-响应方式的消息传递框架(例如RPC over RabbitMQ)。
归根结底,这仍然是异步的,我们可以使用TaskCompletionSource在其之上构建基于任务的异步模式抽象(允许应用程序简单地await
调用)。
首先,我们开始构建包装SDK的简单代理类:
1个
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
public class SdkProxy
{
private MockSdk sdk;
public SdkProxy()
{
this .sdk = new MockSdk();
this .sdk.OnOrderCompleted += Sdk_OnOrderCompleted;
}
private void Sdk_OnOrderCompleted( object sender, OrderOutcome e)
{
// TODO
}
}
|
然后,我们添加一个字典,该字典使我们能够将每个OrderId与其对应的TaskCompletionSource关联起来。使用ConcurrentDictionary而不是普通的Dictionary可帮助处理多线程方案而无需锁定:
1个
2
3
4
5
6
7
8
9
10
11
12
|
private ConcurrentDictionary<Guid,
TaskCompletionSource< bool >> pendingOrders;
private MockSdk sdk;
public SdkProxy()
{
this .pendingOrders = new ConcurrentDictionary<Guid,
TaskCompletionSource< bool >>();
this .sdk = new MockSdk();
this .sdk.OnOrderCompleted += Sdk_OnOrderCompleted;
}
|
代理类公开了一个SubmitOrderAsync()
方法:
1个
2
3
4
5
6
7
8
9
10
11
|
public Task SubmitOrderAsync( decimal price)
{
var orderId = sdk.SubmitOrder(price);
Console.WriteLine($ "OrderId {orderId} submitted with price {price}" );
var tcs = new TaskCompletionSource< bool >();
this .pendingOrders.TryAdd(orderId, tcs);
return tcs.Task;
}
|
此方法调用SubmitOrder()
SDK中的基础,并使用返回的OrderId在字典中添加新的TaskCompletionSource。Task
返回TaskCompletionSource的基础,以便应用程序可以等待它。
1个
2
3
4
5
6
7
8
|
private void Sdk_OnOrderCompleted( object sender, OrderOutcome e)
{
string successStr = e.Success ? "was successful" : "failed" ;
Console.WriteLine($ "OrderId {e.OrderId} {successStr}" );
this .pendingOrders.TryRemove(e.OrderId, out var tcs);
tcs.SetResult(e.Success);
}
|
当SDK触发完成事件时,代理将从待处理订单中删除TaskCompletionSource并设置其结果。等待基础任务的应用程序将恢复并根据逻辑做出决定。
我们可以在控制台应用程序中使用以下程序代码对此进行测试:
1个
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
static void Main( string [] args)
{
Run();
Console.ReadLine();
}
static async void Run()
{
var sdkProxy = new SdkProxy();
await sdkProxy.SubmitOrderAsync(10);
await sdkProxy.SubmitOrderAsync(20);
await sdkProxy.SubmitOrderAsync(5);
await sdkProxy.SubmitOrderAsync(15);
await sdkProxy.SubmitOrderAsync(4);
}
|
输出显示该程序确实确实在开始下一个订单之前等待每个订单完成:
1个
2
3
4
5
6
7
8
9
10
|
OrderId 3e2d4577-8bbb-46b7-a5df-2efec23bae6b submitted with price 10
OrderId 3e2d4577-8bbb-46b7-a5df-2efec23bae6b was successful
OrderId e22425b9-3aa3-48db-a40f-8b8cfbdcd3af submitted with price 20
OrderId e22425b9-3aa3-48db-a40f-8b8cfbdcd3af was successful
OrderId 3b5a2602-a5d2-4225-bbdb-10642a63f7bc submitted with price 5
OrderId 3b5a2602-a5d2-4225-bbdb-10642a63f7bc was successful
OrderId ffd61cea-343e-4a9c-a76f-889598a45993 submitted with price 15
OrderId ffd61cea-343e-4a9c-a76f-889598a45993 was successful
OrderId b443462c-f949-49b9-a6f0-08bbbb82fe7e submitted with price 4
OrderId b443462c-f949-49b9-a6f0-08bbbb82fe7e was successful
|
摘要
使用TaskCompletionSource适应于异步任务使用的任意形式,并启用优雅async
/ await
使用。
不要使用它只是为异步方法公开异步包装器。您要么根本不这样做,要么改用Task.FromResult()。
如果您担心异步调用可能永远不会恢复,请考虑添加一个timeout。