Parallel.Invoke(Run1, Run2); // 尽可能并行执行提供的每个操作。
//执行 for(在 Visual Basic 中为 For)循环,其中可能会并行运行迭代。
Object obj = new object();
int num = 0;
Parallel.For(0, 1000, i =>
{
int sum = 0;
for (int j = 0; j < 1000; j++)
{
sum += j;
//Parallel.For由于是并行运行的,所以会同时访问全局变量num,为了得到正确的结果,要使用lock;
// 这时并行会很慢,这主要是由于并行同时访问全局变量,会出现资源争夺,大多数时间消耗在了资源等待上面。
lock (obj)
{
num++;
}
}
});
//数据小或者要访问全局变量时不推荐使用; Parallel.ForEach用法类似
中途退出并行循环
ConcurrentBag<int> bag = new ConcurrentBag<int>();
Random rdm = new Random();
Parallel.For(0, 1000, (i, state) =>
{
Thread.Sleep(rdm.Next(10, 100));
if (bag.Count == 300)
{
state.Stop();
// state.Break();
return;
}
bag.Add(i);
});
并行的异常捕获
public void Run1()
{
Thread.Sleep(1000);
Console.WriteLine("Task 1 is cost 2 sec");
throw new Exception("Exception in task 1");
}
public void Run2()
{
Thread.Sleep(3000);
Console.WriteLine("Task 2 is cost 3 sec");
throw new Exception("Exception in task 2");
}
try
{
Parallel.Invoke(Run2, Run1);
}
catch (AggregateException aex)
{
foreach (var ex in aex.InnerExceptions)
{
Console.WriteLine(ex.Message);
}
}
//如果用Exception也能捕获到,但是得转成AggregateException才能点出InnerExceptions获取异常的具体信息,不然ex.Message的信息是"一个或多个异常"
指定使用的硬件线程数
var bag = new ConcurrentBag<int>();
ParallelOptions options = new ParallelOptions();
//指定使用的硬件线程数
options.MaxDegreeOfParallelism = Environment.ProcessorCount-1;
stopWatch.Restart();
Parallel.For(0, 9000000, options, i =>
{
bag.Add(i);
});
stopWatch.Stop();
Console.WriteLine("并行计算:集合有:{0}", bag.Count + ", " + stopWatch.ElapsedMilliseconds);
Console.ReadKey();
Task
Wait
var task1 = new Task(() =>
{
Console.WriteLine("Task 1 Begin");
System.Threading.Thread.Sleep(2000);
Console.WriteLine("Task 1 Finish");
});
var task2 = new Task(() =>
{
Console.WriteLine("Task 2 Begin");
System.Threading.Thread.Sleep(5000);
Console.WriteLine("Task 2 Finish");
});
Console.WriteLine("Before start:" + task1.Status);
task1.Start();
task2.Start();
Console.WriteLine("After start:" + task1.Status);
task1.Wait(); //等待Task1完成了才会往下走
Console.WriteLine("After Finish:" + task1.Status);
Task.WaitAny(task1, task2); //等待Task1,Task2任意一个完成了才会往下走
Task.WaitAll(task1, task2); //等待Task1,Task2都完成了才会往下走
Console.WriteLine("All task finished!");
Console.ReadKey();
ContinueWith
var task1 = new Task(() =>
{
Console.WriteLine("Task 1 Begin");
Thread.Sleep(2000);
Console.WriteLine("Task 1 Finish");
});
var task2 = new Task(() =>
{
Console.WriteLine("Task 2 Begin");
Thread.Sleep(3000);
Console.WriteLine("Task 2 Finish");
});
task1.Start();
task2.Start();
var result = task1.ContinueWith<string>(task =>
{
Console.WriteLine("task1 finished!");
return "This is task result!";
});
Console.WriteLine(result.Result.ToString());
Console.ReadKey();
///在每次调用ContinueWith方法时,每次会把上次Task的引用传入进来,以便检测上次Task的状态,比如我们可以使用上次Task的Result属性来获取返回值。
var SendFeedBackTask = Task.Factory.StartNew(() => { Console.WriteLine("Get some Data!"); })
.ContinueWith<bool>(s => { return true; })
.ContinueWith<string>(r =>
{
if (r.Result)
{
return "Finished";
}
else
{
return "Error";
}
});
Console.WriteLine(SendFeedBackTask.Result);
Console.ReadKey();
取消任务
var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;
var task = Task.Factory.StartNew(() =>
{
for (var i = 0; i < 1000; i++)
{
Thread.Sleep(100);
Console.WriteLine("Abort mission !");
if (token.IsCancellationRequested)
{
Console.WriteLine("Abort mission success!");
return;
}
}
}, token);
token.Register(() =>
{
Console.WriteLine("Canceled");
});
Console.ReadKey();
Console.WriteLine("Press enter to cancel task...");
tokenSource.Cancel();
Console.ReadKey();
屏障同步:使多个任务能够采用并行方式在多个阶段中协同工作
public static void Barrier()
{
Task[] tasks = new Task[4];
Barrier barrier = null;
//表示tasks.Lengt个Task到达屏障才可以继续执行
barrier = new Barrier(tasks.Length, (i) =>
{
Console.WriteLine("**********************************************************");
Console.WriteLine("\n屏障中当前阶段编号:{0}\n", i.CurrentPhaseNumber);
Console.WriteLine("**********************************************************");
});
for (int j = 0; j < tasks.Length; j++)
{
tasks[j] = Task.Factory.StartNew((obj) =>
{
var single = Convert.ToInt32(obj);
LoadUser(single);
barrier.SignalAndWait(); //发出参与者已达到屏障并等待所有其他参与者也达到屏障。
LoadUser(single);
barrier.SignalAndWait();
LoadProduct(single);
barrier.SignalAndWait();
LoadOrder(single);
barrier.SignalAndWait();
}, j);
}
Console.ReadKey();
}
static void LoadUser(int num)
{
Console.WriteLine("当前任务:{0}正在加载User部分数据!", num);
}
static void LoadProduct(int num)
{
Console.WriteLine("当前任务:{0}正在加载Product部分数据!", num);
}
static void LoadOrder(int num)
{
Console.WriteLine("当前任务:{0}正在加载Order部分数据!", num);
}