您的位置:首页 > 编程语言 > C#

C# 并行编程 之 Barrier的使用 - 通过屏障同步并发任务

2015-05-14 09:28 567 查看

基本信息

Barrier 是 .Net 提供的一直并发的机制,它允许多个任务同步他们不同阶段的并发工作。

这里的关键点是【多个任务】和【不同阶段】。

假设有4个相同的任务(Task),每个任务都有4个阶段(Phase),当他们并发工作时,只有当所有任务的相同步骤都完成时,所有任务才可以开始下一个步骤。

如图所示:



这里的 Barrier 就是 .NetFramework 提供的一种机制。它像一个篱笆(屏障),把所有任务的阶段隔离开来,当前阶段不完成,不会开始下一个阶段。

代码示例:

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Sample5_1_barrier
{
class Program
{
private static void Phase0Doing(int TaskID)
{
Console.WriteLine("Task : #{0}   =====  Phase 0", TaskID);
}

private static void Phase1Doing(int TaskID)
{
Console.WriteLine("Task : #{0}   *****  Phase 1", TaskID);
}

private static void Phase2Doing(int TaskID)
{
Console.WriteLine("Task : #{0}   ^^^^^  Phase 2", TaskID);
}

private static void Phase3Doing(int TaskID)
{
Console.WriteLine("Task : #{0}   $$$$$  Phase 3", TaskID);
}

private static int _TaskNum = 4;
private static Task[] _Tasks;
private static Barrier _Barrier;

static void Main(string[] args)
{
_Tasks = new Task[_TaskNum];
_Barrier = new Barrier(_TaskNum, (barrier) =>
{
Console.WriteLine("-------------------------- Current Phase:{0} --------------------------",
_Barrier.CurrentPhaseNumber);
});

for (int i = 0; i < _TaskNum; i++)
{
_Tasks[i] = Task.Factory.StartNew((num) =>
{
var taskid = (int)num;

Phase0Doing(taskid);
_Barrier.SignalAndWait();

Phase1Doing(taskid);
_Barrier.SignalAndWait();

Phase2Doing(taskid);
_Barrier.SignalAndWait();

Phase3Doing(taskid);
_Barrier.SignalAndWait();

}, i);
}

var finalTask = Task.Factory.ContinueWhenAll(_Tasks, (tasks) =>
{
Task.WaitAll(_Tasks);
Console.WriteLine("========================================");
Console.WriteLine("All Phase is completed");

_Barrier.Dispose();
});

finalTask.Wait();

Console.ReadLine();
}
}
}


测试结果:



使用屏障时的异常处理

如果进入屏障后,工作的代码出现了异常,这个异常会被包装在BarrierPostPhaseException中,而且所有任务都能够捕捉到这个异常。原始的异常可以通过NarrierPostPhaseException 对象的InnerException进行访问。

代码示例: 注意其中抛出异常的位置,和捕获异常的位置。

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Sample5_1_barrier
{
class Program
{
private static void Phase0Doing(int TaskID)
{
Console.WriteLine("Task : #{0}   =====  Phase 0", TaskID);
}

private static void Phase1Doing(int TaskID)
{
Console.WriteLine("Task : #{0}   *****  Phase 1", TaskID);

}

private static void Phase2Doing(int TaskID)
{
Console.WriteLine("Task : #{0}   ^^^^^  Phase 2", TaskID);
}

private static void Phase3Doing(int TaskID)
{
Console.WriteLine("Task : #{0}   $$$$$  Phase 3", TaskID);
}

private static int _TaskNum = 4;
private static Task[] _Tasks;
private static Barrier _Barrier;

static void Main(string[] args)
{
_Tasks = new Task[_TaskNum];
_Barrier = new Barrier(_TaskNum, (barrier) =>
{
Console.WriteLine("-------------------------- Current Phase:{0} --------------------------",
_Barrier.CurrentPhaseNumber);
if (_Barrier.CurrentPhaseNumber == 1)
throw new InvalidOperationException("Phase 2 need to be TERMINTED!!!!!");
});

for (int i = 0; i < _TaskNum; i++)
{
_Tasks[i] = Task.Factory.StartNew((num) =>
{
var taskid = (int)num;

Phase0Doing(taskid);
_Barrier.SignalAndWait();

Phase1Doing(taskid);
try
{
_Barrier.SignalAndWait();
}
catch (BarrierPostPhaseException bpp_ex)
{
Console.WriteLine("Got an Exception in Phase1: " + bpp_ex.InnerException);
}

Phase2Doing(taskid);
_Barrier.SignalAndWait();

Phase3Doing(taskid);
_Barrier.SignalAndWait();

}, i);
}

var finalTask = Task.Factory.ContinueWhenAll(_Tasks, (tasks) =>
{
Task.WaitAll(_Tasks);
Console.WriteLine("========================================");
Console.WriteLine("All Phase is completed");

_Barrier.Dispose();
});

finalTask.Wait();

Console.ReadLine();
}
}
}


Barrier 关于超时的处理

这里会使用Barrier.SignalAndWait(TIMEOUT)),来对超时进行判断。示例代码中Phase 2的Task 3会等待10秒,超过了超时时间 2秒,在Barrier中会检查到Task的Phase2超时了并返回错误。

示例代码:

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Sample5_1_barrier
{
class Program
{

private static void Phase0Doing(int TaskID)
{
Console.WriteLine("Task : #{0}   =====  Phase 0", TaskID);
}

private static void Phase1Doing(int TaskID)
{
Console.WriteLine("Task : #{0}   *****  Phase 1", TaskID);
}

private static void Phase2Doing(int TaskID)
{
int i = 0;
Console.WriteLine("Task : #{0}   ^^^^^  Phase 2", TaskID);
if (TaskID == 3)
while (i < 10)
{
System.Threading.Thread.Sleep(1000);
i++;
}
}

private static void Phase3Doing(int TaskID)
{
Console.WriteLine("Task : #{0}   $$$$$  Phase 3", TaskID);
}

private static int _TaskNum = 4;
private static Task[] _Tasks;
private static Barrier _Barrier;
private static int TIMEOUT = 2000;

static void Main(string[] args)
{
var cts = new System.Threading.CancellationTokenSource();
var ct = cts.Token;

_Tasks = new Task[_TaskNum];
_Barrier = new Barrier(_TaskNum, (barrier) =>
{
Console.WriteLine("-------------------------- Current Phase:{0} --------------------------",
_Barrier.CurrentPhaseNumber);
//if (_Barrier.CurrentPhaseNumber == 1)
//    throw new InvalidOperationException("Phase 2 need to be TERMINTED!!!!!");
});

for (int i = 0; i < _TaskNum; i++)
{
_Tasks[i] = Task.Factory.StartNew((num) =>
{
var taskid = (int)num;

Phase0Doing(taskid);
if (!_Barrier.SignalAndWait(TIMEOUT))
{
Console.WriteLine("``````````````  This Phase {0} TIMEOUT  ``````````````", _Barrier.CurrentPhaseNumber);
throw new OperationCanceledException("Phase 0 canceled: ", ct);
}

Phase1Doing(taskid);
if (!_Barrier.SignalAndWait(TIMEOUT))
{
Console.WriteLine("``````````````  This Phase {0} TIMEOUT  ``````````````", _Barrier.CurrentPhaseNumber);
throw new OperationCanceledException("Phase 1 canceled: ", ct);
}

Phase2Doing(taskid);
if (!_Barrier.SignalAndWait(TIMEOUT))
{
Console.WriteLine("``````````````  This Phase {0} TIMEOUT  ``````````````", _Barrier.CurrentPhaseNumber);
throw new OperationCanceledException("Phase 2 canceled: ", ct);
}

Phase3Doing(taskid);
if (!_Barrier.SignalAndWait(TIMEOUT))
{
Console.WriteLine("``````````````  This Phase {0} TIMEOUT  ``````````````", _Barrier.CurrentPhaseNumber);
throw new OperationCanceledException("Phase 3 canceled: ", ct);
}

}, i, ct);
}

var finalTask = Task.Factory.ContinueWhenAll(_Tasks, (tasks) =>
{
Task.WaitAll(_Tasks);
Console.WriteLine("========================================");
Console.WriteLine("All Phase is completed");
}, ct);

try
{
finalTask.Wait();
}
catch (AggregateException aex)
{
Console.WriteLine("Task failed And Canceled" + aex.ToString());
}
finally
{
_Barrier.Dispose();
}
Console.ReadLine();
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: