PFX在Parallel类中提供了三个静态方法作为结构化并行的基本形式:
这三个方法都会阻塞线程直到所有工作完成为止。和PLINQ一样,在出现未处理异常之后,其他的工作线程将会在其当前迭代完成之后停止,并将异常包装为AggregateException抛出给调用者。
Parallel.Invoke方法并行执行一组Action委托,然后等待它们完成。该方法最简单的定义方式如下:
public static void Invoke (params Action[] actions);
和PLINQ一样,Parallel.*方法是针对计算密集型任务而不是I/O密集型任务进行优化的。但是,我们可以使用一次下载两个网页的方式来简单展示Parallel.Invoke的用法:
Parallel.Invoke ( () => new WebClient().DownloadFile ("http://www.linqpad.net", "lp.html"), () => new WebClient().DownloadFile ("http://www.jaoo.dk", "jaoo.html"));
从表面看来,Parallel.Invoke就像是创建了两个绑定到线程的Task对象,然后等待它们执行结束的快捷操作。但是它们存在一个重要区别:如果将一百万个委托传递给Parallel.Invoke方法,它仍然能够有效工作。这是因为该方法会将大量的元素划分为若干批次,并将其分派给底层的Task,而不会单纯为每一个委托创建一个独立的Task。
所有的Parallel方法都需要自行整理结果,这意味着要时刻注意线程安全性。例如,以下代码就不是线程安全的:
var data = new List<string>(); Parallel.Invoke ( () => data.Add(new WebClient().DownloadFile ("http://www.linqpad.net", "lp.html")), () => data.Add(new WebClient().DownloadFile ("http://www.jaoo.dk", "jaoo.html")));
Parallel.For和Parallel.ForEach分别等价于C#中的for和foreach循环,但是每一次迭代都是并行而非顺序执行的。以下给出了这两个方法最简单的声明:
public static ParallelLoopResult For( int fromInclusive, int toExclusive, Action<int, ParallelLoopState> body); public static ParallelLoopResult ForEach<TSource>( OrderablePartitioner<TSource> source, Action<TSource, ParallelLoopState, long> body);
对于for循环:
// 顺序 for(int i=0; i<100; i++) Foo(i); // 并行话代码 Parallel.For(0, 100, i=> Foo(i)); // 简洁写法 Parallel.For(0, 100,Foo);
而对于foreach循环:
Parallel.ForEach("hello,world",Foo);
Parallel.For和Parallel.ForEach通常在外层循环比内层循环效果更好,因为前者通常为并行化提供了更大的工作块,而这可以降低管理开销。通常,没必要同时将内层和外层循环并行化。
有些情况下,循环迭代中的索引用处很大。对于顺序执行的foreach循环,我们很容易获得索引:
int i = 0; foreach(char c in "hello,world") Console.WriteLine(c.ToString() + i++);
但是在并行上下文中,递增一个共享变量的值不是线程安全的。因此必须使用以下版本的ForEach语句:
Parallel.ForEach ("Hello, world", (c, state, i) => { Console.WriteLine (c.ToString() + i); });
为了在实际环境中使用它,我们仍以前面使用PLINQ编写的拼写检查器为例。以下代码将加载一个字典和一个包含一百万个测试单词的数组:
string wordLookupFile = Path.Combine(Path.GetTempPath(), "WordLookup.txt"); if (!File.Exists(wordLookupFile)) new WebClient().DownloadFile( "http://www.albahari.com/ispell/allwords.txt", wordLookupFile); var wordLookup = new HashSet<string>( File.ReadAllLines(wordLookupFile), StringComparer.InvariantCultureIgnoreCase); var random = new Random(); string[] wordList = wordLookup.ToArray(); string[] wordsToTest = Enumerable.Range(0, 1000000) .Select(i => wordList[random.Next(0, wordList.Length)]) .ToArray(); wordsToTest[12345] = "woozsh"; wordsToTest[23456] = "wubsie"; var misspellings = new ConcurrentBag<Tuple<int, string>>(); Parallel.ForEach(wordsToTest, (word, state, i) => { if (!wordLookup.Contains(word)) misspellings.Add(Tuple.Create((int)i, word)); });
需要注意的是,必须将结果整理到一个线程安全的集合中,这是一个缺点(与PLINQ相比)。而这种方式胜过PLINQ的地方是索引化ForEach比索引化Select查询运算符的执行效率高。
由于并行的For或ForEach的循环体是一个委托,因此无法使用break语句提前结束循环。但是可以调用ParallelLoopState对象的Break方法或Stop方法来跳出或者结束循环:
public class ParallelLoopState { public void Break(); public void Stop(); public bool IsExceptional { get; } public bool IsStopped { get; } public long? LowestBreakIteration { get; } public bool ShouldExitCurrentIteration { get; } }
获得ParallelLoopState很容易:所有的For和ForEach都有以Action<TSource, ParallelLoopState>
为循环体的重载。因此,如果需要并行化如下代码:
foreach (char c in "hello,world") { if (c == ',') break; else Console.Write(c); } // 并行化 Parallel.ForEach("hello,world", (c, loopState) => { if (c == ',') loopState.Break(); else Console.Write(c); });
Parallel.For和Parallel.ForEach方法均提供了一组接受TLocal泛型类型参数的重载。这些重载方法可帮助优化密集迭代循环过程中的结果整理工作。其中最简单的形式如下:
public static ParallelLoopResult For<TLocal>( int fromInclusive, int toExclusive, Func <TLocal> localInit, Func <int, ParallelLoopState, TLocal, TLocal> body, Action <TLocal> localFinally);
这些重载都非常复杂,在实际中也很少用到,幸运的是其大部分应用场景都可以通过PLINQ解决。
大部分问题和以下例子类似:假设要对1~10 000 000之间的数字的平方根求和。计算一千万个平方根这种工作很容易并行化,但是对它们的值求和却有点麻烦。因为我们需要锁定并更新最终结果:
object locker = new object(); double total = 0; Parallel.For (1, 10000000, i => { lock (locker) total += Math.Sqrt (i); });
并行化的效果大部分都被一千万个锁操作和因此带来的阻塞抵消了。
实际上,并不需要一千万次的锁操作。
考虑这样一种情况,假设有一组志愿者需要收集大量的垃圾。如果所有志愿者都共用一个垃圾桶,那么移动和竞争将显著降低收集效率。显然,更加有效的方式是让每一个志愿者都有仅属于自己的(本地)垃圾桶,只是偶尔将自己的垃圾倒入主垃圾桶。
拥有TLocal的For和ForEach重载就是按照上述方式工作的。志愿者就是内部工作线程,本地值就是本地垃圾桶。为了保证Parallel类型的工作,还需要提供两个额外的委托,分别负责:
此外,循环体委托的返回值也不是void,而是本地值的聚合结果。以下是重构之后的代码:
object locker = new object(); double grandTotal = 0; Parallel.For(1, 10000000, () => 0.0, //初始化本地值 (i, state, localTotal) => // 主体委托 localTotal + Math.Sqrt(i), //返回新的本地总数 localTotal => { lock (locker) grandTotal += localTotal; // 将本地数添加到主总数中 } );
虽然我们仍需要锁定,但是这个锁定过程只会在本地结果和最终结果合并时发生,因此整个过程会更加高效。
如前所述,这种场景使用PLINQ往往会更加有效。上述例子可以直接用PLINQ进行并行化:
var total = ParallelEnumerable.Range(1, 10000000) .Sum(i => Math.Sqrt(i));