您的位置:首页 > 编程语言 > C#

读书笔记:《C#并行编程高级教程》-第二章:命令式数据并行

2014-06-06 17:18 323 查看
《C#并行编程高级教程》-第二章:命令式数据并行

System.Threading.Tasks.Parallel类

三个基本的方法:

Parallel.For,Parallel.ForEach,Parallel.Invoke

Parallel.Invoke:定义

public static void Invoke( params Action[] actions)

public static void Invoke( ParallelOptions parallelOptions, params Action[] actions)

特点:传递给Invoke的方法在并行执行过程中没有特定的执行顺序,假设传递给Invoke的方法有四个,即使电脑有四个或以上的逻辑内核也不能能保证四个方法并发的同时启动,因为可能有些内核正处於繁忙状态;底层逻辑会根据系统状态指定最佳的执行的计划

由于Invoke加载的方法各自需要的运行时间不同,所以就需要其中运行时间最长的才能返回控制,这会导致很多逻辑内核长时间处于闲置状态,因此使用这个方法的时候一定要测量运行的结果、实现的加速比以及内核使用率;而且由于它调用的是固定数目的委托,如果逻辑内核数大于委托数,那么会有一些内核处于闲置状态;由于无法保证执行顺序,委托之间的任何相关或交互性都会带来意想不到的副作用。

(PS:交错式并发:假设每个任务可以分为1,2,3...过程,那么交错式并发就是在一个单独的内核上先依次执行所有任务的过程1,再依次执行所有任务的过程2,,,这样造成并行运行的假象。 并发:多个内核分别顺序执行其中某个任务的1,2,3...过程)

加速比=(串行执行时间)/(并行执行时间)



循环并行化

Parallel.For

public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action<int> body)  //(有多个重载)

fromInclusive开始索引(含);toExclusive 结束索引(不含); body 将为每个迭代调用一次的委托。
//原始串行for
var md5M=MD5.Create();
for(int i=0;i<100;i++)
{
byte[] data=Encoding.Unicode.GetBytes("CSDN"+i.ToString());
byte[] result=md5M.ComputerHash(data);
string str=ConvertToHexString(result);
}
//并行版本
Parallel.For(1,100,(int i)=>
{
var md5M=MD5.Create();
byte[] data=Encoding.Unicode.GetBytes("CSDN"+i.ToString());
byte[] result=md5M.ComputerHash(data);
string str=ConvertToHexString(result);
});


特点:并不是顺序迭代,即第50号迭代可能在第2号迭代之前运行。 不支持浮点数递增(即使是普通的for使用浮点数递增也会因为浮点精度的问题而变得相当危险) 对于循环共享的变量,由于执行是无序的或者并发的,所以在访问共享变量时会发生冲突或者变得混乱。比较好的解决办法就是把这些共享的局部变量转变为委托方法内部的局部变量,如上例中的md5M。还有一种方法就是同步结构。 并行化的代码能够随着内核的增长而扩展,这对于Invoke是不可能的。也由于并行循环对每个内核完成的工作进行负载均衡,因此一般也会比Invoke获得更高的加速比。 由于在对并行迭代的分配和协调会带来额外的开销,因此,其加速比并不会随着内核的增加而从线性增加。

Parallel.ForEach
public static ParallelLoopResult ForEach<TSource>(Partitioner<TSource> source, Action<TSource> body)

public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, Action<TSource> body)


//并行版本  分区器
Parallel.ForEach(Partitioner.Create(1,100),range=>
{
var md5M=MD5.Create();
for(int i=range.Item1;i<range.Item2;i++)
{
byte[] data=Encoding.Unicode.GetBytes("CSDN"+i.ToString());
byte[] result=md5M.ComputerHash(data);
string str=ConvertToHexString(result);
}
});


这里使用分区器将[1,100)分成很多个带有上下界的分区,创建了一个Tuple<int ,int>,并采用的是默认的分区大小,也可以指定分区大小作为第三个参数传入分区器:一般计算公式为:分区范围大小=(元素个数)/(逻辑内核个数)+1:

Partitioner.Create(1,100,(int)(100/Environment.ProcessorCount)+1)
注意:第一个达到串行内层for循环的并不是从1开始的第一个分区!

//并行版本  使用带有IEnumerable接口的数据源
private static IEnumerable<int> GenerateMD5InputData()
{ return Enumerable.Range(1,100);} //第二个参数100指的是生成的序列化整数的数目而非上限
var inputData=GenerateMD5InputData();
Parallel.ForEach(inputData,(int i)=>
{
var md5M=MD5.Create();
for(int i=range.Item1;i<range.Item2;i++)
{
byte[] data=Encoding.Unicode.GetBytes("CSDN"+i.ToString());
byte[] result=md5M.ComputerHash(data);
string str=ConvertToHexString(result);
}
});
这段代码比普通的串行化代码耗时还长,部分原因在于:Enumerable实际上是串行遍历的,Parallel.ForEach加载的所以支持任务执行的线程在访问迭代器的时候都被迫串行化。为了避免这种串行化,可以创建一个能够进行静态范围分区的自定义分区器。

退出并行循环

使用ParallelLoopState作为委托的第二个参数:

<pre name="code" class="csharp">Parallel.ForEach(inputData,(int i,ParallelLoopState  loopState)=>{...
if(条件)
loopState.Break();
....});



ParallelLoopState停止Parallel.For和Parallel.ForEach的方法:

Break:告诉循环在执行了当前迭代之后应该尽快的停止执行。假如当前正在迭代50,那么小于50的迭代都会被处理完;

Stop:告诉循环在执行了当前迭代之后应该尽快的停止执行。假如当前正在迭代50,那么小于50的迭代不能保证会被处理完;

还可以用如下方法:在委托体中的迭代代码之前加入这样的判断:

if(loopState.ShouldExitCurrentIteration)
{
return;
}


ShouldExitCurrentIteration请求既可以是当前迭代也可以是其他迭代发出的。在并行循环外部取消循环尅采用cancellation的形式。

分析并行循环结果

ParallelLoopState的属性:IsCompleted,LowestBreakIteration.HasValue:

IsCompleted 循环运行完成

(!IsCompleted)&&(!LowestBreakIteration.HasValue) 因为stop语句循环终止

(!IsCompleted)&&(LowestBreakIteration.HasValue) 因为break语句循环终止,LowestBreakIteration属性的值为调用break语句的值最小的那个迭代的值(可能多个迭代都调用了break)

扑捉并行循环异常

try{
Parallel.For(...........)
}
catch(AggregateException ex)
{
foreach(Exception innerEx in ex.InnerException)
{ ........}
}
AggregateException包含了在一个或者多个迭代中产生的异常,并且把异常放在只读属性InnerException中

指定并行度

即指定使用最大的内核数,而不是全部的内核数

var p=new ParallelOptions();
p.MaxDegreeOfParallelism=maxDegree;
Parallel.For(1,100,p,(int i)=>{....});
其中,maxDegree,int型,即为指定的并行度,如果她为-1则表示利用所有可用内核。

注意:为了更好的扩展性,应该使用Environment.ProcessorCount(逻辑内核的数目,而非硬件内核的数目)的计算式来指定并行度。如何使用某个常数,比如4,那如果到了一个有8个内核的计算机,那么其性能就不好了。默认情况下。如果没有指定并行度,那么TPL就会容许通过启发式算法提高或降低线程的数目,通常都会高于ProcessCount因为这样可以更好的支持CPU和I/O混合型的工作负载。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: