沿用微軟的寫法,System.Threading.Tasks.::.Parallel類,提供對(duì)并行循環(huán)和區(qū)域的支持。 我們會(huì)用到的方法有For,F(xiàn)orEach,Invoke。
一、簡(jiǎn)單使用
首先我們初始化一個(gè)List用于循環(huán),這里我們循環(huán)10次。(后面的代碼都會(huì)按這個(gè)標(biāo)準(zhǔn)進(jìn)行循環(huán))
Code
- Program.Data = new List<int>();
- for (int i = 0; i < 10; i++)
- {
- Data.Add(i);
- }
下面我們定義4個(gè)方法,分別為for,foreach,并行For,并行ForEach。并測(cè)試他們的運(yùn)行時(shí)長(zhǎng)。
Code
- /// <summary>
- /// 是否顯示執(zhí)行過(guò)程
- /// </summary>
- public bool ShowProcessExecution = false;
-
- /// <summary>
- /// 這是普通循環(huán)for
- /// </summary>
- private void Demo1()
- {
- List<int> data = Program.Data;
- DateTime dt1 = DateTime.Now;
- for (int i = 0; i < data.Count; i++)
- {
- Thread.Sleep(500);
- if (ShowProcessExecution)
- Console.WriteLine(data[i]);
- }
- DateTime dt2 = DateTime.Now;
- Console.WriteLine("普通循環(huán)For運(yùn)行時(shí)長(zhǎng):{0}毫秒。", (dt2 - dt1).TotalMilliseconds);
- }
-
- /// <summary>
- /// 這是普通循環(huán)foreach
- /// </summary>
- private void Demo2()
- {
- List<int> data = Program.Data;
- DateTime dt1 = DateTime.Now;
- foreach (var i in data)
- {
- Thread.Sleep(500);
- if (ShowProcessExecution)
- Console.WriteLine(i);
- }
- DateTime dt2 = DateTime.Now;
- Console.WriteLine("普通循環(huán)For運(yùn)行時(shí)長(zhǎng):{0}毫秒。", (dt2 - dt1).TotalMilliseconds);
- }
-
- /// <summary>
- /// 這是并行計(jì)算For
- /// </summary>
- private void Demo3()
- {
- List<int> data = Program.Data;
- DateTime dt1 = DateTime.Now;
- Parallel.For(0, data.Count, (i) =>
- {
- Thread.Sleep(500);
- if (ShowProcessExecution)
- Console.WriteLine(data[i]);
- });
- DateTime dt2 = DateTime.Now;
- Console.WriteLine("并行運(yùn)算For運(yùn)行時(shí)長(zhǎng):{0}毫秒。", (dt2 - dt1).TotalMilliseconds);
- }
-
- /// <summary>
- /// 這是并行計(jì)算ForEach
- /// </summary>
- private void Demo4()
- {
- List<int> data = Program.Data;
- DateTime dt1 = DateTime.Now;
- Parallel.ForEach(data, (i) =>
- {
- Thread.Sleep(500);
- if (ShowProcessExecution)
- Console.WriteLine(i);
- });
- DateTime dt2 = DateTime.Now;
- Console.WriteLine("并行運(yùn)算ForEach運(yùn)行時(shí)長(zhǎng):{0}毫秒。", (dt2 - dt1).TotalMilliseconds);
- }
下面是運(yùn)行結(jié)果:
這里我們可以看出并行循環(huán)在執(zhí)行效率上的優(yōu)勢(shì)了。
結(jié)論1:在對(duì)一個(gè)數(shù)組內(nèi)的每一個(gè)項(xiàng)做單獨(dú)處理時(shí),完全可以選擇并行循環(huán)的方式來(lái)提升執(zhí)行效率。
原理1:并行計(jì)算的線程開啟是緩步開啟的,線程數(shù)量1,2,4,8緩步提升。(不詳,PLinq最多64個(gè)線程,可能這也是64)
二、 并行循環(huán)的中斷和跳出
當(dāng)在進(jìn)行循環(huán)時(shí),偶爾會(huì)需要中斷循環(huán)或跳出循環(huán)。下面是兩種跳出循環(huán)的方法Stop和Break,LoopState是循環(huán)狀態(tài)的參數(shù)。
Code
- /// <summary>
- /// 中斷Stop
- /// </summary>
- private void Demo5()
- {
- List<int> data = Program.Data;
- Parallel.For(0, data.Count, (i, LoopState) =>
- {
- if (data[i] > 5)
- LoopState.Stop();
- Thread.Sleep(500);
- Console.WriteLine(data[i]);
- });
- Console.WriteLine("Stop執(zhí)行結(jié)束。");
- }
-
- /// <summary>
- /// 中斷Break
- /// </summary>
- private void Demo6()
- {
- List<int> data = Program.Data;
- Parallel.ForEach(data, (i, LoopState) =>
- {
- if (i > 5)
- LoopState.Break();
- Thread.Sleep(500);
- Console.WriteLine(i);
- });
- Console.WriteLine("Break執(zhí)行結(jié)束。");
- }
執(zhí)行結(jié)果如下:
結(jié)論2:使用Stop會(huì)立即停止循環(huán),使用Break會(huì)執(zhí)行完畢所有符合條件的項(xiàng)。
三、并行循環(huán)中為數(shù)組/集合添加項(xiàng)
上面的應(yīng)用場(chǎng)景其實(shí)并不是非常多見,畢竟只是為了遍歷一個(gè)數(shù)組內(nèi)的資源,我們更多的時(shí)候是為了遍歷資源,找到我們所需要的。那么請(qǐng)繼續(xù)看。
下面是我們一般會(huì)想到的寫法:
Code
- private void Demo7()
- {
- List<int> data = new List<int>();
- Parallel.For(0, Program.Data.Count, (i) =>
- {
- if (Program.Data[i] % 2 == 0)
- data.Add(Program.Data[i]);
- });
- Console.WriteLine("執(zhí)行完成For.");
- }
-
- private void Demo8()
- {
- List<int> data = new List<int>();
- Parallel.ForEach(Program.Data, (i) =>
- {
- if (Program.Data[i] % 2 == 0)
- data.Add(Program.Data[i]);
- });
- Console.WriteLine("執(zhí)行完成ForEach.");
- }
看起來(lái)應(yīng)該是沒有問(wèn)題的,但是我們多次運(yùn)行后會(huì)發(fā)現(xiàn),偶爾會(huì)出現(xiàn)錯(cuò)誤如下:
這是因?yàn)長(zhǎng)ist是非線程安全的類,我們需要使用System.Collections.Concurrent命名空間下的類型來(lái)用于并行循環(huán)體內(nèi)。
類 |
說(shuō)明 |
BlockingCollection<T> |
為實(shí)現(xiàn) IProducerConsumerCollection<T> 的線程安全集合提供阻止和限制功能。 |
ConcurrentBag<T> |
表示對(duì)象的線程安全的無(wú)序集合。 |
ConcurrentDictionary<TKey, TValue> |
表示可由多個(gè)線程同時(shí)訪問(wèn)的鍵值對(duì)的線程安全集合。 |
ConcurrentQueue<T> |
表示線程安全的先進(jìn)先出 (FIFO) 集合。 |
ConcurrentStack<T> |
表示線程安全的后進(jìn)先出 (LIFO) 集合。 |
OrderablePartitioner<TSource> |
表示將一個(gè)可排序數(shù)據(jù)源拆分成多個(gè)分區(qū)的特定方式。 |
Partitioner |
提供針對(duì)數(shù)組、列表和可枚舉項(xiàng)的常見分區(qū)策略。 |
Partitioner<TSource> |
表示將一個(gè)數(shù)據(jù)源拆分成多個(gè)分區(qū)的特定方式。 |
那么我們上面的代碼可以修改為,加了了ConcurrentQueue和ConcurrentStack的最基本的操作。
Code
- /// <summary>
- /// 并行循環(huán)操作集合類,集合內(nèi)只取5個(gè)對(duì)象
- /// </summary>
- private void Demo7()
- {
- ConcurrentQueue<int> data = new ConcurrentQueue<int>();
- Parallel.For(0, Program.Data.Count, (i) =>
- {
- if (Program.Data[i] % 2 == 0)
- data.Enqueue(Program.Data[i]);//將對(duì)象加入到隊(duì)列末尾
- });
- int R;
- while (data.TryDequeue(out R))//返回隊(duì)列中開始處的對(duì)象
- {
- Console.WriteLine(R);
- }
-
- Console.WriteLine("執(zhí)行完成For.");
- }
-
- /// <summary>
- /// 并行循環(huán)操作集合類
- /// </summary>
- private void Demo8()
- {
- ConcurrentStack<int> data = new ConcurrentStack<int>();
- Parallel.ForEach(Program.Data, (i) =>
- {
- if (Program.Data[i] % 2 == 0)
- data.Push(Program.Data[i]);//將對(duì)象壓入棧中
- });
- int R;
- while (data.TryPop(out R))//彈出棧頂對(duì)象
- {
- Console.WriteLine(R);
- }
-
- Console.WriteLine("執(zhí)行完成ForEach.");
- }
ok,這里返回一個(gè)序列的問(wèn)題也解決了。
結(jié)論3:在并行循環(huán)內(nèi)重復(fù)操作的對(duì)象,必須要是thread-safe(線程安全)的。集合類的線程安全對(duì)象全部在System.Collections.Concurrent命名空間下。
四、返回集合運(yùn)算結(jié)果/含有局部變量的并行循環(huán)
使用循環(huán)的時(shí)候經(jīng)常也會(huì)用到迭代,那么在并行循環(huán)中叫做 含有局部變量的循環(huán) 。下面的代碼中詳細(xì)的解釋,這里就不啰嗦了。
Code
- /// <summary>
- /// 具有線程局部變量的For循環(huán)
- /// </summary>
- private void Demo9()
- {
- List<int> data = Program.Data;
- long total = 0;
-
- //這里定義返回值為long類型方便下面各個(gè)參數(shù)的解釋
- Parallel.For<long>(0, // For循環(huán)的起點(diǎn)
- data.Count, // For循環(huán)的終點(diǎn)
- () => 0, // 初始化局部變量的方法(long),既為下面的subtotal的初值
- (i, LoopState, subtotal) => // 為每個(gè)迭代調(diào)用一次的委托,i是當(dāng)前索引,LoopState是循環(huán)狀態(tài),subtotal為局部變量名
- {
- subtotal += data[i]; // 修改局部變量
- return subtotal; // 傳遞參數(shù)給下一個(gè)迭代
- },
- (finalResult) => Interlocked.Add(ref total, finalResult) //對(duì)每個(gè)線程結(jié)果執(zhí)行的最后操作,這里是將所有的結(jié)果相加
- );
- Console.WriteLine(total);
- }
-
- /// <summary>
- /// 具有線程局部變量的ForEach循環(huán)
- /// </summary>
- private void Demo10()
- {
- List<int> data = Program.Data;
- long total = 0;
-
- Parallel.ForEach<int, long>(data, // 要循環(huán)的集合對(duì)象
- () => 0, // 初始化局部變量的方法(long),既為下面的subtotal的初值
- (i, LoopState, subtotal) => // 為每個(gè)迭代調(diào)用一次的委托,i是當(dāng)前元素,LoopState是循環(huán)狀態(tài),subtotal為局部變量名
- {
- subtotal += i; // 修改局部變量
- return subtotal; // 傳遞參數(shù)給下一個(gè)迭代
- },
- (finalResult) => Interlocked.Add(ref total, finalResult) //對(duì)每個(gè)線程結(jié)果執(zhí)行的最后操作,這里是將所有的結(jié)果相加
- );
-
- Console.WriteLine(total);
- }
結(jié)論4:并行循環(huán)中的迭代,確實(shí)很傷人。代碼太難理解了。
五、PLinq(Linq的并行計(jì)算)
上面介紹完了For和ForEach的并行計(jì)算盛宴,微軟也沒忘記在Linq中加入并行計(jì)算。下面介紹Linq中的并行計(jì)算。
4.0中在System.Linq命名空間下加入了下面幾個(gè)新的類:
類 |
說(shuō)明 |
ParallelEnumerable |
提供一組用于查詢實(shí)現(xiàn) ParallelQuery{TSource} 的對(duì)象的方法。這是 Enumerable 的并行等效項(xiàng)。 |
ParallelQuery |
表示并行序列。 |
ParallelQuery<TSource> |
表示并行序列。 |
原理2:PLinq最多會(huì)開啟64個(gè)線程
原理3:PLinq會(huì)自己判斷是否可以進(jìn)行并行計(jì)算,如果不行則會(huì)以順序模式運(yùn)行。
原理4:PLinq會(huì)在昂貴的并行算法或成本較低的順序算法之間進(jìn)行選擇,默認(rèn)情況下它選擇順序算法。
在ParallelEnumerable中提供的并行化的方法
ParallelEnumerable 運(yùn)算符 |
說(shuō)明 |
AsParallel() |
PLINQ 的入口點(diǎn)。指定如果可能,應(yīng)并行化查詢的其余部分。 |
AsSequential() |
指定查詢的其余部分應(yīng)像非并行 LINQ 查詢一樣按順序運(yùn)行。 |
AsOrdered() |
指定 PLINQ 應(yīng)保留查詢的其余部分的源序列排序,直到例如通過(guò)使用 orderby 子句更改排序?yàn)橹埂?/td>
|
AsUnordered() |
指定查詢的其余部分的 PLINQ 不需要保留源序列的排序。 |
WithCancellation() |
指定 PLINQ 應(yīng)定期監(jiān)視請(qǐng)求取消時(shí)提供的取消標(biāo)記和取消執(zhí)行的狀態(tài)。 |
WithDegreeOfParallelism() |
指定 PLINQ 應(yīng)當(dāng)用來(lái)并行化查詢的處理器的最大數(shù)目。 |
WithMergeOptions() |
提供有關(guān) PLINQ 應(yīng)當(dāng)如何(如果可能)將并行結(jié)果合并回到使用線程上的一個(gè)序列的提示。 |
WithExecutionMode() |
指定 PLINQ 應(yīng)當(dāng)如何并行化查詢(即使默認(rèn)行為是按順序運(yùn)行查詢)。 |
ForAll() |
多線程枚舉方法,與循環(huán)訪問(wèn)查詢結(jié)果不同,它允許在不首先合并回到使用者線程的情況下并行處理結(jié)果。 |
Aggregate() 重載 |
對(duì)于 PLINQ 唯一的重載,它啟用對(duì)線程本地分區(qū)的中間聚合以及一個(gè)用于合并所有分區(qū)結(jié)果的最終聚合函數(shù)。 |
下面是PLinq的簡(jiǎn)單代碼
Code
- /// <summary>
- /// PLinq簡(jiǎn)介
- /// </summary>
- private void Demo11()
- {
- var source = Enumerable.Range(1, 10000);
-
- //查詢結(jié)果按source中的順序排序
- var evenNums = from num in source.AsParallel().AsOrdered()
- where num % 2 == 0
- select num;
-
- //ForAll的使用
- ConcurrentBag<int> concurrentBag = new ConcurrentBag<int>();
- var query = from num in source.AsParallel()
- where num % 10 == 0
- select num;
- query.ForAll((e) => concurrentBag.Add(e * e));
- }
上面代碼中使用了ForAll,F(xiàn)orAll和foreach的區(qū)別如下:
PLinq的東西很繁雜,但是都只是幾個(gè)簡(jiǎn)單的方法,熟悉下方法就好了。