任务和并行编程
- 1 任务和并行编程
- 2 Parallel 类
- 2.1 Parallel.for
- 2.2 Parallel.foreach
- 2.3 Parallel.Invoke
- 2.4 ParallelLoopResult 与 ParallelLoopState
- 2.5 实例
- 3 Task 类
- 3.1 特点
- 3.2 Tasks 的基本使用
- 3.2.1 使用 ThreadPool
- 3.2.2 同步任务(Synchronous Tasks)
- 3.2.3 使用单独的线程
- 3.2.4 获得 Task 的返回值
- 3.2.5 连续的 Task
- 3.2.6 Task 的层次结构(Hierarchies)
- 3.3 Task 的其它使用方法
- 3.4 实例
- 4 取消任务
- 5 DataFlow
- 6 Timer
- 7 线程问题
- 8 其它线程相关的类与方法
- 8.1 Monitor
- 8.2 SpinLock
- 8.3 WaitHandle
- 8.4 Mutex 和 EventWaitHandle
- 8.5 Semaphore
- 8.6 Events
- 8.7 Barrier
- 8.8 ReaderWriteLockSlim
- 8.9 Locks with await
- 9 总结
1 任务和并行编程
1.1 使用多线程的例子
- Word 中主线程用于处理用户输入,一个线程用于检测拼写错误,一个线程用于联网发送消息
- 迅雷,主线程用于 UI显示,一个线程或多个线程用于后台下载
1.2 多线程的特点
- 一个进程的多个线程可以在不同的cpu上运行
- 也可以在一个多核CPU的不同核上同时运行
1.3 使用多线程需要注意的问题
- 如果线程访问相同的数据,可能会出现竞争和死锁问题(race conditions and deadlocks)
- 为了避免这种情况,必须实现同步机制(synchronization mechanisms)
2 Parallel 类
2.1 Parallel.for
- 命名空间:System.Threading.Tasks
2.1.1 基本使用方法
循环中执行的任务并无先后顺序,ParallelLoopResult 用于判断是否完成线程
执行结果
Parallel不会等待后台进程,如
执行结果
2.1.2 结束并行任务
使用 Action<int, ParallelLoopState>
执行结果
2.1.3 任务数据初始化
- 这个方法非常适合积累大量数据收集的结果
说明:
Parallel.For <TLocal> 方法 (这里 TLocal 为 string)
- 除了 from 与 to 两个 int 外还有另外三个委托方法作为参数
- 有返回值的委托 ,对于执行迭代的每个线程,此方法只调用一次。
Func<string> localinit
- 有返回值的委托 ,参数1 为迭代的 index,参数2 为 ParallelLoopState,参数3为从初始化时(localinit)返回的值,参数4为从body 方法中返回的值
Func<int, ParallelLoopState, string ,string > body
- 无返回值的委托 ,该委托可以接受 body 返回的值作为参数传入
Aciton <string> localFinally
示例代码:
执行结果
2.2 Parallel.foreach
2.1.1 基本使用方法
- ForEach迭代实现IEnumerable的集合,其方式类似于ForEach语句,但采用异步方式
- 不能保证执行顺序
public static void ParallelForEach()
{
string[] data = { "zero", "one", "two", "three", "four", "five" };
ParallelLoopResult result = Parallel.ForEach<string>(data, (s,pls,l) => {
if (s == "three") { pls.Break(); }
Console.WriteLine($"{s} {l}");
});
}
执行结果
2.3 Parallel.Invoke
- 提供了任务并行化模式
- Invoke 允许传递操作委托数组,可以通过该数组分配运行的方法
public static void ParallelInvoke()
{
Parallel.Invoke(action1, action2);
}
public static void action1() =>
Console.WriteLine("action1");
public static void action2() =>
Console.WriteLine("action2");
2.4 ParallelLoopResult 与 ParallelLoopState
- ParallelLoopResult Struct
提供执行 Parallel 循环的完成状态。
属性
IsCompleted
获取该循环是否已运行完成(该循环的所有迭代均已执行,并且该循环没有收到提前结束的请求)
LowestBreakIteration
获取从中调用 Break() 的最低迭代的索引。 - ParallelLoopState Class
方法
Break()
告知 Parallel 循环应在系统方便的时候尽早停止执行当前迭代之外的迭代。
Stop()
告知 Parallel 循环应在系统方便的时候尽早停止执行。
属性
IsExceptional
获取循环的任何迭代是否已引发相应迭代未处理的异常。
IsStopped
获取循环的任何迭代是否已调用 Stop() 方法。
LowestBreakIteration
获取从中调用 Break() 的最低循环迭代。
ShouldExitCurrentIteration
获取循环的当前迭代是否应基于此迭代或其他迭代发出的请求退出。
2.5 实例
3 Task 类
命名空间:System.Threading.Tasks
3.1 特点
- 灵活性 flexibility
定义延续工作——任务完成后应该做什么,并可以根据任务是否成功来区分;
父任务可以创建新的子任务,取消父任务也会取消它的子任务。
3.2 Tasks 的基本使用
3.2.1 使用 ThreadPool
方法一:使用实例化的TaskFactory,其中方法TaskMethod 被传递给startNew方法,然后任务立即启动
var tf = new TaskFactory();
Task t1 = tf.StartNew(TaskMethod, "using a task factory");
方法二:使用Task类的静态属性 Factory 来访问TaskFactory,并调用StartNew方法
Task t2 = Task.Factory.StartNew(TaskMethod, "factory via a task");
方法三:使用Task类的构造函数,在实例化对象后,使用 Start 方法启动
var t3 = new Task(TaskMethod, "using a task constructor and Start");
t3.Start();
方法四:使用 Task.Run,分配类型为 Action 的 lambda 表达式作为其参数参数
Task t4 = Task.Run(() =>
{
TaskMethod("using the Run method");
});
上述 TaskMethod 定义如下
3.2.2 同步任务(Synchronous Tasks)
任务并不一定使用来自 ThreadPool 的线程,任务可以相同的线程同步运行
TaskMethod("just the main thread");
var t1 = new Task(TaskMethod, "run sync");
t1.RunSynchronously();
3.2.3 使用单独的线程
如果任务要运行较长时间,则应该使用 TaskcreationOptions,该参数指示任务调度程序创建一个新线程,而不是从线程池创建一个线程,也不由线程池管理
- 该线程为后台线程
var t2 = new Task(TaskMethod, "long running", TaskCreationOptions.LongRunning);
t2.Start();
3.2.4 获得 Task 的返回值
当任务完成时,它可以
- 将状态信息写入共享对象(这样的共享对象必须是线程安全的)
- 返回任务的结果
public static void TaskWithResultDemo()
{
var t1 = new Task<(int Result1, int Result2)>(TaskWithResult, (8, 3));
t1.Start();
Console.WriteLine($"result from task:8-3= {t1.Result.Result1},8*3={t1.Result.Result2}");
}
结果:
3.2.5 连续的 Task
- 可以指定在一个 Task 完成后,执行另一个 Task
例如,使用前一个任务的结果来开始新的任务,或者在前一个任务失败时进行清理工作 - 任务处理程序没有参数或只有一个对象参数,而延续处理程序有一个类型为task的参数
- 当一个任务完成时,启动多个任务,一个延续任务也可以继续延续
Task t1 = new Task(DoOnFirst);
Task t2 = t1.ContinueWith(DoOnSecond);
Task t3 = t1.ContinueWith(DoOnSecond);
Task t4 = t2.ContinueWith(DoOnSecond);
t1.Start();
- ContinueWith 的委托参数为 Task t,表示上一个任务
结果:
- 可以使用 TaskContinuationOptions 来指定在什么情况下延续任务
Task t5 = t1.ContinueWith(DoOnError,TaskContinuationOptions.OnlyOnFaulted);
OnlyOnRanToCompletion
OnlyOnFaulted
NotOnFaulted
OnlyOnCanceled
NotOnCanceled
3.2.6 Task 的层次结构(Hierarchies)
使用 ContinueWith,任务接连启动,形成层次结构,即形成父/子层次结构
结果:
当创建子任务时,指定 TaskCreationOptions.AttachedToParent,则有如下效果,否则无以下效果
- 所有的子任务都完成时,父任务状态才为完成
- 取消父任务也会取消子任务
- 子任务的异常会传到父任务
3.3 Task 的其它使用方法
- 从任务返回值,使用 Task<T>,如下面这个例子,
public async Task<int> DownLoadFromNet()
{
int result;
//do something
result=2;
return result;
}
- 使用 Task.FromResult
return Task.FromResult<int>(123);
3.4 实例
- 启动一个下载任务,异步等待其返回值
private async void DownLoad(object sender, MouseEventArgs e)
{
bool success;
success = await Task.Run(() =>
{
ChaoNet cn = new ChaoNet();
return cn.DownLoadFromNet();
});
if(success){
Show();
}
}
- 任务的等待
WaitAll 与 WhenAll
等待所有任务完成。WaitAll方法阻塞调用任务,直到等待的所有任务都完成。WhenAll方法返回一个任务,该任务可使用 async 关键字等待结果,不会阻塞等待的任务
WaitAny 与 WhenAny
等待任一任务完成完成。
- ValueTask
4 取消任务
CancellationToken 可以作为取消参数,这个类定义了 IscancellationRequested 属性,通过这个属性可以检查长时间操作是否应该中止
4.1 Parell.For 的取消
取消任务可以向 Parell.For 传递一个 Paralleloptions 类型的参数,并设置该参数的 CancellationToken
CancellationToken 是通过创建 CancellationTokenSource 生成的,CancellationTokensource 实现了 IcancancelableOperation 接口,因此可以在 Cancellationtoken 中 Register ,并允许使用 cancel 方法进行取消
在 Fo 循环中 Parallel 类验证 cancellationToken的结果并取消操作。在取消时,For方法抛出一个OperationCanceledException 类型的异常,使用CancellationToken,可通过调用 Register 方法在取消时启动回调函数
如:
结果
说明:
- 9个任务开始,9个任务完成,说明该处理器为八核
- 开始的任务在 取消 token 后继续完成直到结束
4.2 Task 的取消
示例:
结果:
5 DataFlow
6 Timer
- Timer
命名空间:System.Threading
参数1:要调用的方法,回调方法
参数2:可传递得任何对象,在回调方法中用对象参数接收该对象
参数3:第一次调用回调的时间跨度
参数4:指定回调的重复间隔,-1表示指触发一次
例子:
执行结果
- DispatcherTimer
命名空间:System.Windows.Threading
7 线程问题
当多个线程启动访问相同数据的时,可能会碰到竞争条件和死锁问题
7.1 Race Conditions
竞争条件:
如果两个或多个线程访问相同的对象,并且对共享状态的访问没有同步,则可能出现竞争条件。
代码示例:
一个发生竞争条件的类
public class StateObject
{
private int _state = 5;
public void ChangeState(int loop)
{
if (_state == 5)
{
_state++;
if (_state != 6)
{
Console.WriteLine($"Race condition occured after {loop} loops");
Trace.Fail("race condition");
}
}
_state = 5;
}
}
触发竞争条件的方法:
public void RaceConditions()
{
var state = new StateObject();//定义一个对象
for (int i = 0; i < 2; i++)
{
Task.Run(() => { DoSomeRCWork(state); });//2个线程对一个对象进行操作
}
}
public void DoSomeRCWork(object o)
{
Trace.Assert(o is StateObject, "o must be of type StateObject");
StateObject stateObject = o as StateObject;
int i = 0;
while (true)
{
stateObject.ChangeState(i++);
}
}
执行过程:
- 判断 state 是否为5,是则 state++
- state++ 后,state 值可能并不是6,因为有其它线程在操作这个 state
- 线程1执行到 state= =5 时,线程2也可能恰好执行到 state= =5 ,则线程2执行 state++ 使得 state=6,线程1执行 state++使得state=7,此时发生 竞争条件
- 之所以指定 state != 6 是因为如果没发生竞争条件,state就是6,但由于发生了竞争条件,state变为7了,如果3个线程,state可能就为7或者8
解决办法:
- 可以通过锁定共享对象来避免这个问题,即在线程内通过使用 lock 语句锁定线程之间共享的变量状态
如果一个线程拥有锁,那么其它线程必须等待锁解除。一旦锁被定义,线程就拥有锁,并在锁语句结束时停止锁。如果使用状态变量引用的每个改变对象的线程都在使用锁,那么竞争条件将不再发生。
方法1:将上述中 DoSomeRCWork 改为如下,竞争条件便不会发生
while (true)
{
lock(stateObject){
stateObject.ChangeState(i++);
}
}
方法2:在可能发生竞争条件的 StateObject 类中使用 lock
因为不能锁定状态变量本身(只有引用类型可以用于锁定),所以在该类中定义一个 object 对象,专门用来防止竞争条件
如果每次值状态改变时都使用相同的同步对象进行锁定,那么竞态条件将不再发生:
public class StateObject
{
private int _state = 5;
private object _sync= new object();// 防止竞争的object 对象
public void ChangeState(int loop)
{
lock(_sync){
if (_state == 5)
{
_state++;
if (_state != 6)
{
Console.WriteLine($"Race condition occured after {loop} loops");
Trace.Fail("race condition");
}
}
_state = 5;
}
}
}
7.2 Deadlocks
死锁:至少有两个线程停止并等待对方释放锁。当两个线程互相等待时,就会发生死锁,线程会无休止地等待
代码示例
定义一个执行类:
public class SampleTask
{
private StateObject _s1;
private StateObject _s2;
public SampleTask(StateObject s1,StateObject s2)
{
_s1 = s1;
_s2 = s2;
}
public void RunDeadLock1()
{
int i = 0;
while (true)
{
lock (_s1)
{
lock (_s2)
{
_s1.ChangeState(i);
_s2.ChangeState(i++);
Console.WriteLine($"still running,{i}");
}
}
}
}
public void RunDeadLock2()
{
int i = 0;
while (true)
{
lock (_s2)
{
lock (_s1)
{
_s1.ChangeState(i);
_s2.ChangeState(i++);
Console.WriteLine($"still running,{i}");
}
}
}
}
}
启用两个线程执行:
var state1 = new StateObject();
var state2 = new StateObject();
new Task(new SampleTask(state1, state2).RunDeadLock1).Start();
new Task(new SampleTask(state1, state2).RunDeadLock2).Start();
说明:
- RunDeadLock1 首先对 s1 进行锁定,然后对 s2 进行锁定;RunDeadLock2 首先锁住 s2 ,然后锁住 s1
- 接下来发生线程切换,RunDeadLock2 开始运行并获取 s2 的锁,线程2等待 s1 的锁,线程调度程序再次调度到线程1,线程1等待 s2
- 两个线程现在都在等待,只要锁块没有结束语句,就不会释放锁,这是一个典型的死锁
7.3 锁语句和线程安全
7.4 InterLocked
8 其它线程相关的类与方法
8.1 Monitor
8.2 SpinLock
8.3 WaitHandle
- WaitHandle 是一个抽象基类,用于等待一个信号被设置,WaitHandle 可以等待,方法有 WaitOne()、WaitAny()、WaitAll()
- Mutex、EventWaitHandle、Semaphore 从 WaitHandle 中派生
8.4 Mutex 和 EventWaitHandle
在 Mutex 类中,只有一个线程可以获得锁并访问锁内的资源
- 通过 Mutex 类的构造函数,下列语句新建一个 Mutex 实例,通过 createNew 来确定该锁是否已经存在
bool createdNew;
var mutex = new Mutex(false, "ProCSharpMutex", out createdNew);
- Mutex 可以在不同的进程中定义,具有名称的 Mutex 会被系统区分开来,可以在不同的进程之间共享;如果没有给 Mutex 分配名称,则不能在不同进程之间共享
例子:具有名称的 EventWaitHandle 会被系统区分开来,可以使用它防止应用程序被启动两次
protected override void OnStartup(StartupEventArgs e)
{
bool createNew;
ProgramStarted = new EventWaitHandle(false, EventResetMode.AutoReset, "AppName", out createNew);
if (!createNew)
{
MessageBox.Show("程序已经在运行!");
App.Current.Shutdown();
Environment.Exit(0);
}
}
8.5 Semaphore
- Semaphore 和 Mutex 都派生于 WaitEventHandle,与 Mutex 不同的是,Semaphore 可以被多个线程同时使用
- 使用 Semaphore 可以设定允许同时访问资源的线程数,比如设定最多三个线程下载,那么四个线程就需要等待,直到其他线程释放资源