using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using static System.Console;
using static System.Threading.Thread;
namespace Chapter7.Recipe1
{
class Program
{
static void Main(string[] args)
{
Parallel.Invoke(
() => EmulateProcessing("Task1"),
() => EmulateProcessing("Task2"),
() => EmulateProcessing("Task3")
);
var cts = new CancellationTokenSource();
var result = Parallel.ForEach(
Enumerable.Range(1, 30),
new ParallelOptions
{
CancellationToken = cts.Token,
MaxDegreeOfParallelism = Environment.ProcessorCount,
TaskScheduler = TaskScheduler.Default
},
(i, state) =>
{
WriteLine(i);
if (i == 20)
{
state.Break();
WriteLine($"Loop is stopped: {state.IsStopped}");
}
});
WriteLine("---");
WriteLine($"IsCompleted: {result.IsCompleted}");
WriteLine($"Lowest break iteration: {result.LowestBreakIteration}");
}
static string EmulateProcessing(string taskName)
{
Sleep(TimeSpan.FromMilliseconds(
new Random(DateTime.Now.Millisecond).Next(250, 350)));
WriteLine($"{taskName} task was processed on a " +
$"thread id {CurrentThread.ManagedThreadId}");
return taskName;
}
}
}
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using static System.Console;
using static System.Threading.Thread;
namespace Chapter7.Recipe2
{
class Program
{
static void Main(string[] args)
{
var sw = new Stopwatch();
sw.Start();
var query = from t in GetTypes()
select EmulateProcessing(t);
foreach (string typeName in query)
{
PrintInfo(typeName);
}
sw.Stop();
WriteLine("---");
WriteLine("Sequential LINQ query.");
WriteLine($"Time elapsed: {sw.Elapsed}");
WriteLine("Press ENTER to continue....");
ReadLine();
Clear();
sw.Reset();
sw.Start();
var parallelQuery = from t in GetTypes().AsParallel()
select EmulateProcessing(t);
foreach (var typeName in parallelQuery)
{
PrintInfo(typeName);
}
sw.Stop();
WriteLine("---");
WriteLine("Parallel LINQ query. The results are being merged on a single thread");
WriteLine($"Time elapsed: {sw.Elapsed}");
WriteLine("Press ENTER to continue....");
ReadLine();
Clear();
sw.Reset();
sw.Start();
parallelQuery = from t in GetTypes().AsParallel()
select EmulateProcessing(t);
parallelQuery.ForAll(PrintInfo);
sw.Stop();
WriteLine("---");
WriteLine("Parallel LINQ query. The results are being processed in parallel");
WriteLine($"Time elapsed: {sw.Elapsed}");
WriteLine("Press ENTER to continue....");
ReadLine();
Clear();
sw.Reset();
sw.Start();
query = from t in GetTypes().AsParallel().AsSequential()
select EmulateProcessing(t);
foreach (string typeName in query)
{
PrintInfo(typeName);
}
sw.Stop();
WriteLine("---");
WriteLine("Parallel LINQ query, transformed into sequential.");
WriteLine($"Time elapsed: {sw.Elapsed}");
WriteLine("Press ENTER to continue....");
ReadLine();
Clear();
}
static void PrintInfo(string typeName)
{
Sleep(TimeSpan.FromMilliseconds(150));
WriteLine($"{typeName} type was printed on a thread " +
$"id {CurrentThread.ManagedThreadId}");
}
static string EmulateProcessing(string typeName)
{
Sleep(TimeSpan.FromMilliseconds(150));
WriteLine($"{typeName} type was processed on a thread " +
$"id {CurrentThread.ManagedThreadId}");
return typeName;
}
static IEnumerable<string> GetTypes()
{
return from assembly in AppDomain.CurrentDomain.GetAssemblies()
from type in assembly.GetExportedTypes()
where type.Name.StartsWith("Web")
select type.Name;
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using static System.Console;
using static System.Threading.Thread;
namespace Chapter7.Recipe3
{
class Program
{
static void Main(string[] args)
{
var parallelQuery = from t in GetTypes().AsParallel()
select EmulateProcessing(t);
var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(3));
try
{
parallelQuery
.WithDegreeOfParallelism(Environment.ProcessorCount)
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.WithMergeOptions(ParallelMergeOptions.Default)
.WithCancellation(cts.Token)
.ForAll(WriteLine);
}
catch (OperationCanceledException)
{
WriteLine("---");
WriteLine("Operation has been canceled!");
}
WriteLine("---");
WriteLine("Unordered PLINQ query execution");
var unorderedQuery = from i in ParallelEnumerable.Range(1, 30)
select i;
foreach (var i in unorderedQuery)
{
WriteLine(i);
}
WriteLine("---");
WriteLine("Ordered PLINQ query execution");
var orderedQuery = from i in ParallelEnumerable.Range(1, 30).AsOrdered()
select i;
foreach (var i in orderedQuery)
{
WriteLine(i);
}
}
static string EmulateProcessing(string typeName)
{
Sleep(TimeSpan.FromMilliseconds(
new Random(DateTime.Now.Millisecond).Next(250,350)));
WriteLine($"{typeName} type was processed on a thread " +
$"id {CurrentThread.ManagedThreadId}");
return typeName;
}
static IEnumerable<string> GetTypes()
{
return from assembly in AppDomain.CurrentDomain.GetAssemblies()
from type in assembly.GetExportedTypes()
where type.Name.StartsWith("Web")
orderby type.Name.Length
select type.Name;
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using static System.Console;
namespace Chapter7.Recipe4
{
class Program
{
static void Main(string[] args)
{
IEnumerable<int> numbers = Enumerable.Range(-5, 10);
var query = from number in numbers
select 100 / number;
try
{
foreach(var n in query)
WriteLine(n);
}
catch (DivideByZeroException)
{
WriteLine("Divided by zero!");
}
WriteLine("---");
WriteLine("Sequential LINQ query processing");
WriteLine();
var parallelQuery = from number in numbers.AsParallel()
select 100 / number; ;
try
{
parallelQuery.ForAll(WriteLine);
}
catch (DivideByZeroException)
{
WriteLine("Divided by zero - usual exception handler!");
}
catch (AggregateException e)
{
e.Flatten().Handle(ex =>
{
if (ex is DivideByZeroException)
{
WriteLine("Divided by zero - aggregate exception handler!");
return true;
}
return false;
});
}
WriteLine("---");
WriteLine("Parallel LINQ query processing and results merging");
}
}
}
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using static System.Console;
using static System.Threading.Thread;
namespace Chapter7.Recipe5
{
class Program
{
static void Main(string[] args)
{
var timer = Stopwatch.StartNew();
var partitioner = new StringPartitioner(GetTypes());
var parallelQuery = from t in partitioner.AsParallel()
// .WithDegreeOfParallelism(1)
select EmulateProcessing(t);
parallelQuery.ForAll(PrintInfo);
int count = parallelQuery.Count();
timer.Stop();
WriteLine(" ----------------------- ");
WriteLine($"Total items processed: {count}");
WriteLine($"Time elapsesd: {timer.Elapsed}");
}
static void PrintInfo(string typeName)
{
Sleep(TimeSpan.FromMilliseconds(150));
WriteLine($"{typeName} type was printed on a thread " +
$"id {CurrentThread.ManagedThreadId}");
}
static string EmulateProcessing(string typeName)
{
Sleep(TimeSpan.FromMilliseconds(150));
WriteLine($"{typeName} type was processed on a thread " +
$"id { CurrentThread.ManagedThreadId}. Has " +
$"{(typeName.Length % 2 == 0 ? "even" : "odd")} length.");
return typeName;
}
static IEnumerable<string> GetTypes()
{
var types = AppDomain.CurrentDomain
.GetAssemblies()
.SelectMany(a => a.GetExportedTypes());
return from type in types
where type.Name.StartsWith("Web")
select type.Name;
}
public class StringPartitioner : Partitioner<string>
{
private readonly IEnumerable<string> _data;
public StringPartitioner(IEnumerable<string> data)
{
_data = data;
}
public override bool SupportsDynamicPartitions => false;
public override IList<IEnumerator<string>> GetPartitions(int partitionCount)
{
var result = new List<IEnumerator<string>>(partitionCount);
for (int i = 1; i <= partitionCount; i++)
{
result.Add(CreateEnumerator(i, partitionCount));
}
return result;
}
IEnumerator<string> CreateEnumerator(int partitionNumber, int partitionCount)
{
int evenPartitions = partitionCount / 2;
bool isEven = partitionNumber % 2 == 0;
int step = isEven ? evenPartitions : partitionCount - evenPartitions;
int startIndex = partitionNumber / 2 + partitionNumber % 2;
var q = _data
.Where(v => !(v.Length % 2 == 0 ^ isEven) || partitionCount == 1)
.Skip(startIndex - 1);
return q
.Where((x, i) => i % step == 0)
.GetEnumerator();
}
}
}
}
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using static System.Console;
using static System.Threading.Thread;
namespace Chapter7.Recipe6
{
class Program
{
static void Main(string[] args)
{
var parallelQuery = from t in GetTypes().AsParallel()
select t;
var parallelAggregator = parallelQuery.Aggregate(
() => new ConcurrentDictionary<char, int>(),
(taskTotal, item) => AccumulateLettersInformation(taskTotal, item),
(total, taskTotal) => MergeAccumulators(total, taskTotal),
total => total);
WriteLine();
WriteLine("There were the following letters in type names:");
var orderedKeys = from k in parallelAggregator.Keys
orderby parallelAggregator[k] descending
select k;
foreach (var c in orderedKeys)
{
WriteLine($"Letter '{c}' ---- {parallelAggregator[c]} times");
}
}
static ConcurrentDictionary<char, int> AccumulateLettersInformation(
ConcurrentDictionary<char, int> taskTotal , string item)
{
foreach (var c in item)
{
if (taskTotal.ContainsKey(c))
{
taskTotal[c] = taskTotal[c] + 1;
}
else
{
taskTotal[c] = 1;
}
}
WriteLine($"{item} type was aggregated on a thread " +
$"id {CurrentThread.ManagedThreadId}");
return taskTotal;
}
static ConcurrentDictionary<char, int> MergeAccumulators(
ConcurrentDictionary<char, int> total, ConcurrentDictionary<char, int> taskTotal)
{
foreach (var key in taskTotal.Keys)
{
if (total.ContainsKey(key))
{
total[key] = total[key] + taskTotal[key];
}
else
{
total[key] = taskTotal[key];
}
}
WriteLine("---");
WriteLine($"Total aggregate value was calculated on a thread " +
$"id {CurrentThread.ManagedThreadId}");
return total;
}
static IEnumerable<string> GetTypes()
{
var types = AppDomain.CurrentDomain
.GetAssemblies()
.SelectMany(a => a.GetExportedTypes());
return from type in types
where type.Name.StartsWith("Web")
select type.Name;
}
}
}