- 简介
- 使用ConcurrentDictionary
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using static System.Console;
namespace Chapter6.Recipe1
{
class Program
{
static void Main(string[] args)
{
var concurrentDictionary = new ConcurrentDictionary<int, string>();
var dictionary = new Dictionary<int, string>();
var sw = new Stopwatch();
sw.Start();
for (int i = 0; i < Iterations; i++)
{
lock (dictionary)
{
dictionary[i] = Item;
}
}
sw.Stop();
WriteLine($"Writing to dictionary with a lock: {sw.Elapsed}");
sw.Restart();
for (int i = 0; i < Iterations; i++)
{
concurrentDictionary[i] = Item;
}
sw.Stop();
WriteLine($"Writing to a concurrent dictionary: {sw.Elapsed}");
sw.Restart();
for (int i = 0; i < Iterations; i++)
{
lock (dictionary)
{
CurrentItem = dictionary[i];
}
}
sw.Stop();
WriteLine($"Reading from dictionary with a lock: {sw.Elapsed}");
sw.Restart();
for (int i = 0; i < Iterations; i++)
{
CurrentItem = concurrentDictionary[i];
}
sw.Stop();
WriteLine($"Reading from a concurrent dictionary: {sw.Elapsed}");
}
const string Item = "Dictionary item";
const int Iterations = 1000000;
public static string CurrentItem;
}
}
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using static System.Console;
namespace Chapter6.Recipe2
{
class Program
{
static void Main(string[] args)
{
Task t = RunProgram();
t.Wait();
}
static async Task RunProgram()
{
var taskQueue = new ConcurrentQueue<CustomTask>();
var cts = new CancellationTokenSource();
var taskSource = Task.Run(() => TaskProducer(taskQueue));
Task[] processors = new Task[4];
for (int i = 1; i <= 4; i++)
{
string processorId = i.ToString();
processors[i-1] = Task.Run(
() => TaskProcessor(taskQueue, $"Processor {processorId}", cts.Token));
}
await taskSource;
cts.CancelAfter(TimeSpan.FromSeconds(2));
await Task.WhenAll(processors);
}
static async Task TaskProducer(ConcurrentQueue<CustomTask> queue)
{
for (int i = 1; i <= 20; i++)
{
await Task.Delay(50);
var workItem = new CustomTask {Id = i};
queue.Enqueue(workItem);
WriteLine($"Task {workItem.Id} has been posted");
}
}
static async Task TaskProcessor(
ConcurrentQueue<CustomTask> queue, string name, CancellationToken token)
{
CustomTask workItem;
bool dequeueSuccesful = false;
await GetRandomDelay();
do
{
dequeueSuccesful = queue.TryDequeue(out workItem);
if (dequeueSuccesful)
{
WriteLine($"Task {workItem.Id} has been processed by {name}");
}
await GetRandomDelay();
}
while (!token.IsCancellationRequested);
}
static Task GetRandomDelay()
{
int delay = new Random(DateTime.Now.Millisecond).Next(1, 500);
return Task.Delay(delay);
}
class CustomTask
{
public int Id { get; set; }
}
}
}
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using static System.Console;
namespace Chapter6.Recipe3
{
class Program
{
static void Main(string[] args)
{
Task t = RunProgram();
t.Wait();
}
static async Task RunProgram()
{
var taskStack = new ConcurrentStack<CustomTask>();
var cts = new CancellationTokenSource();
var taskSource = Task.Run(() => TaskProducer(taskStack));
Task[] processors = new Task[4];
for (int i = 1; i <= 4; i++)
{
string processorId = i.ToString();
processors[i - 1] = Task.Run(
() => TaskProcessor(taskStack, $"Processor {processorId}", cts.Token));
}
await taskSource;
cts.CancelAfter(TimeSpan.FromSeconds(2));
await Task.WhenAll(processors);
}
static async Task TaskProducer(ConcurrentStack<CustomTask> stack)
{
for (int i = 1; i <= 20; i++)
{
await Task.Delay(50);
var workItem = new CustomTask { Id = i };
stack.Push(workItem);
WriteLine($"Task {workItem.Id} has been posted");
}
}
static async Task TaskProcessor(
ConcurrentStack<CustomTask> stack, string name, CancellationToken token)
{
await GetRandomDelay();
do
{
CustomTask workItem;
bool popSuccesful = stack.TryPop(out workItem);
if (popSuccesful)
{
WriteLine($"Task {workItem.Id} has been processed by {name}");
}
await GetRandomDelay();
}
while (!token.IsCancellationRequested);
}
static Task GetRandomDelay()
{
int delay = new Random(DateTime.Now.Millisecond).Next(1, 500);
return Task.Delay(delay);
}
class CustomTask
{
public int Id { get; set; }
}
}
}
- 使用ConcurrentBag创建一个可扩展的爬虫
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
using static System.Console;
namespace Chapter6.Recipe4
{
class Program
{
static void Main(string[] args)
{
CreateLinks();
Task t = RunProgram();
t.Wait();
}
static Dictionary<string, string[]> _contentEmulation = new Dictionary<string, string[]>();
static async Task RunProgram()
{
var bag = new ConcurrentBag<CrawlingTask>();
string[] urls = {"http://microsoft.com/", "http://google.com/", "http://facebook.com/", "http://twitter.com/"};
var crawlers = new Task[4];
for (int i = 1; i <= 4; i++)
{
string crawlerName = $"Crawler {i}";
bag.Add(new CrawlingTask { UrlToCrawl = urls[i-1], ProducerName = "root"});
crawlers[i - 1] = Task.Run(() => Crawl(bag, crawlerName));
}
await Task.WhenAll(crawlers);
}
static async Task Crawl(ConcurrentBag<CrawlingTask> bag, string crawlerName)
{
CrawlingTask task;
while (bag.TryTake(out task))
{
IEnumerable<string> urls = await GetLinksFromContent(task);
if (urls != null)
{
foreach (var url in urls)
{
var t = new CrawlingTask
{
UrlToCrawl = url,
ProducerName = crawlerName
};
bag.Add(t);
}
}
WriteLine($"Indexing url {task.UrlToCrawl} posted by " +
$"{task.ProducerName} is completed by {crawlerName}!");
}
}
static async Task<IEnumerable<string>> GetLinksFromContent(CrawlingTask task)
{
await GetRandomDelay();
if (_contentEmulation.ContainsKey(task.UrlToCrawl)) return _contentEmulation[task.UrlToCrawl];
return null;
}
static void CreateLinks()
{
_contentEmulation["http://microsoft.com/"] = new [] { "http://microsoft.com/a.html", "http://microsoft.com/b.html" };
_contentEmulation["http://microsoft.com/a.html"] = new[] { "http://microsoft.com/c.html", "http://microsoft.com/d.html" };
_contentEmulation["http://microsoft.com/b.html"] = new[] { "http://microsoft.com/e.html" };
_contentEmulation["http://google.com/"] = new[] { "http://google.com/a.html", "http://google.com/b.html" };
_contentEmulation["http://google.com/a.html"] = new[] { "http://google.com/c.html", "http://google.com/d.html" };
_contentEmulation["http://google.com/b.html"] = new[] { "http://google.com/e.html", "http://google.com/f.html" };
_contentEmulation["http://google.com/c.html"] = new[] { "http://google.com/h.html", "http://google.com/i.html" };
_contentEmulation["http://facebook.com/"] = new [] { "http://facebook.com/a.html", "http://facebook.com/b.html" };
_contentEmulation["http://facebook.com/a.html"] = new[] { "http://facebook.com/c.html", "http://facebook.com/d.html" };
_contentEmulation["http://facebook.com/b.html"] = new[] { "http://facebook.com/e.html" };
_contentEmulation["http://twitter.com/"] = new[] { "http://twitter.com/a.html", "http://twitter.com/b.html" };
_contentEmulation["http://twitter.com/a.html"] = new[] { "http://twitter.com/c.html", "http://twitter.com/d.html" };
_contentEmulation["http://twitter.com/b.html"] = new[] { "http://twitter.com/e.html" };
_contentEmulation["http://twitter.com/c.html"] = new[] { "http://twitter.com/f.html", "http://twitter.com/g.html" };
_contentEmulation["http://twitter.com/d.html"] = new[] { "http://twitter.com/h.html" };
_contentEmulation["http://twitter.com/e.html"] = new[] { "http://twitter.com/i.html" };
}
static Task GetRandomDelay()
{
int delay = new Random(DateTime.Now.Millisecond).Next(150, 200);
return Task.Delay(delay);
}
class CrawlingTask
{
public string UrlToCrawl { get; set; }
public string ProducerName { get; set; }
}
}
}
- 使用BlockingCollection进行异步处理
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using static System.Console;
namespace Chapter6.Recipe5
{
class Program
{
static void Main(string[] args)
{
WriteLine("Using a Queue inside of BlockingCollection");
WriteLine();
Task t = RunProgram();
t.Wait();
WriteLine();
WriteLine("Using a Stack inside of BlockingCollection");
WriteLine();
t = RunProgram(new ConcurrentStack<CustomTask>());
t.Wait();
}
static async Task RunProgram(IProducerConsumerCollection<CustomTask> collection = null)
{
var taskCollection = new BlockingCollection<CustomTask>();
if(collection != null)
taskCollection= new BlockingCollection<CustomTask>(collection);
var taskSource = Task.Run(() => TaskProducer(taskCollection));
Task[] processors = new Task[4];
for (int i = 1; i <= 4; i++)
{
string processorId = $"Processor {i}";
processors[i - 1] = Task.Run(
() => TaskProcessor(taskCollection, processorId));
}
await taskSource;
await Task.WhenAll(processors);
}
static async Task TaskProducer(BlockingCollection<CustomTask> collection)
{
for (int i = 1; i <= 20; i++)
{
await Task.Delay(20);
var workItem = new CustomTask { Id = i };
collection.Add(workItem);
WriteLine($"Task {workItem.Id} has been posted");
}
collection.CompleteAdding();
}
static async Task TaskProcessor(
BlockingCollection<CustomTask> collection, string name)
{
await GetRandomDelay();
foreach (CustomTask item in collection.GetConsumingEnumerable())
{
WriteLine($"Task {item.Id} has been processed by {name}");
await GetRandomDelay();
}
}
static Task GetRandomDelay()
{
int delay = new Random(DateTime.Now.Millisecond).Next(1, 500);
return Task.Delay(delay);
}
class CustomTask
{
public int Id { get; set; }
}
}
}