C# 并行编程 之 Barrier的使用 - 通过屏障同步并发任务
2015-05-14 09:28
567 查看
基本信息
Barrier 是 .Net 提供的一直并发的机制,它允许多个任务同步他们不同阶段的并发工作。这里的关键点是【多个任务】和【不同阶段】。
假设有4个相同的任务(Task),每个任务都有4个阶段(Phase),当他们并发工作时,只有当所有任务的相同步骤都完成时,所有任务才可以开始下一个步骤。
如图所示:
这里的 Barrier 就是 .NetFramework 提供的一种机制。它像一个篱笆(屏障),把所有任务的阶段隔离开来,当前阶段不完成,不会开始下一个阶段。
代码示例:
using System; using System.Threading; using System.Threading.Tasks; namespace Sample5_1_barrier { class Program { private static void Phase0Doing(int TaskID) { Console.WriteLine("Task : #{0} ===== Phase 0", TaskID); } private static void Phase1Doing(int TaskID) { Console.WriteLine("Task : #{0} ***** Phase 1", TaskID); } private static void Phase2Doing(int TaskID) { Console.WriteLine("Task : #{0} ^^^^^ Phase 2", TaskID); } private static void Phase3Doing(int TaskID) { Console.WriteLine("Task : #{0} $$$$$ Phase 3", TaskID); } private static int _TaskNum = 4; private static Task[] _Tasks; private static Barrier _Barrier; static void Main(string[] args) { _Tasks = new Task[_TaskNum]; _Barrier = new Barrier(_TaskNum, (barrier) => { Console.WriteLine("-------------------------- Current Phase:{0} --------------------------", _Barrier.CurrentPhaseNumber); }); for (int i = 0; i < _TaskNum; i++) { _Tasks[i] = Task.Factory.StartNew((num) => { var taskid = (int)num; Phase0Doing(taskid); _Barrier.SignalAndWait(); Phase1Doing(taskid); _Barrier.SignalAndWait(); Phase2Doing(taskid); _Barrier.SignalAndWait(); Phase3Doing(taskid); _Barrier.SignalAndWait(); }, i); } var finalTask = Task.Factory.ContinueWhenAll(_Tasks, (tasks) => { Task.WaitAll(_Tasks); Console.WriteLine("========================================"); Console.WriteLine("All Phase is completed"); _Barrier.Dispose(); }); finalTask.Wait(); Console.ReadLine(); } } }
测试结果:
使用屏障时的异常处理
如果进入屏障后,工作的代码出现了异常,这个异常会被包装在BarrierPostPhaseException中,而且所有任务都能够捕捉到这个异常。原始的异常可以通过NarrierPostPhaseException 对象的InnerException进行访问。代码示例: 注意其中抛出异常的位置,和捕获异常的位置。
using System; using System.Threading; using System.Threading.Tasks; namespace Sample5_1_barrier { class Program { private static void Phase0Doing(int TaskID) { Console.WriteLine("Task : #{0} ===== Phase 0", TaskID); } private static void Phase1Doing(int TaskID) { Console.WriteLine("Task : #{0} ***** Phase 1", TaskID); } private static void Phase2Doing(int TaskID) { Console.WriteLine("Task : #{0} ^^^^^ Phase 2", TaskID); } private static void Phase3Doing(int TaskID) { Console.WriteLine("Task : #{0} $$$$$ Phase 3", TaskID); } private static int _TaskNum = 4; private static Task[] _Tasks; private static Barrier _Barrier; static void Main(string[] args) { _Tasks = new Task[_TaskNum]; _Barrier = new Barrier(_TaskNum, (barrier) => { Console.WriteLine("-------------------------- Current Phase:{0} --------------------------", _Barrier.CurrentPhaseNumber); if (_Barrier.CurrentPhaseNumber == 1) throw new InvalidOperationException("Phase 2 need to be TERMINTED!!!!!"); }); for (int i = 0; i < _TaskNum; i++) { _Tasks[i] = Task.Factory.StartNew((num) => { var taskid = (int)num; Phase0Doing(taskid); _Barrier.SignalAndWait(); Phase1Doing(taskid); try { _Barrier.SignalAndWait(); } catch (BarrierPostPhaseException bpp_ex) { Console.WriteLine("Got an Exception in Phase1: " + bpp_ex.InnerException); } Phase2Doing(taskid); _Barrier.SignalAndWait(); Phase3Doing(taskid); _Barrier.SignalAndWait(); }, i); } var finalTask = Task.Factory.ContinueWhenAll(_Tasks, (tasks) => { Task.WaitAll(_Tasks); Console.WriteLine("========================================"); Console.WriteLine("All Phase is completed"); _Barrier.Dispose(); }); finalTask.Wait(); Console.ReadLine(); } } }
Barrier 关于超时的处理
这里会使用Barrier.SignalAndWait(TIMEOUT)),来对超时进行判断。示例代码中Phase 2的Task 3会等待10秒,超过了超时时间 2秒,在Barrier中会检查到Task的Phase2超时了并返回错误。示例代码:
using System; using System.Threading; using System.Threading.Tasks; namespace Sample5_1_barrier { class Program { private static void Phase0Doing(int TaskID) { Console.WriteLine("Task : #{0} ===== Phase 0", TaskID); } private static void Phase1Doing(int TaskID) { Console.WriteLine("Task : #{0} ***** Phase 1", TaskID); } private static void Phase2Doing(int TaskID) { int i = 0; Console.WriteLine("Task : #{0} ^^^^^ Phase 2", TaskID); if (TaskID == 3) while (i < 10) { System.Threading.Thread.Sleep(1000); i++; } } private static void Phase3Doing(int TaskID) { Console.WriteLine("Task : #{0} $$$$$ Phase 3", TaskID); } private static int _TaskNum = 4; private static Task[] _Tasks; private static Barrier _Barrier; private static int TIMEOUT = 2000; static void Main(string[] args) { var cts = new System.Threading.CancellationTokenSource(); var ct = cts.Token; _Tasks = new Task[_TaskNum]; _Barrier = new Barrier(_TaskNum, (barrier) => { Console.WriteLine("-------------------------- Current Phase:{0} --------------------------", _Barrier.CurrentPhaseNumber); //if (_Barrier.CurrentPhaseNumber == 1) // throw new InvalidOperationException("Phase 2 need to be TERMINTED!!!!!"); }); for (int i = 0; i < _TaskNum; i++) { _Tasks[i] = Task.Factory.StartNew((num) => { var taskid = (int)num; Phase0Doing(taskid); if (!_Barrier.SignalAndWait(TIMEOUT)) { Console.WriteLine("`````````````` This Phase {0} TIMEOUT ``````````````", _Barrier.CurrentPhaseNumber); throw new OperationCanceledException("Phase 0 canceled: ", ct); } Phase1Doing(taskid); if (!_Barrier.SignalAndWait(TIMEOUT)) { Console.WriteLine("`````````````` This Phase {0} TIMEOUT ``````````````", _Barrier.CurrentPhaseNumber); throw new OperationCanceledException("Phase 1 canceled: ", ct); } Phase2Doing(taskid); if (!_Barrier.SignalAndWait(TIMEOUT)) { Console.WriteLine("`````````````` This Phase {0} TIMEOUT ``````````````", _Barrier.CurrentPhaseNumber); throw new OperationCanceledException("Phase 2 canceled: ", ct); } Phase3Doing(taskid); if (!_Barrier.SignalAndWait(TIMEOUT)) { Console.WriteLine("`````````````` This Phase {0} TIMEOUT ``````````````", _Barrier.CurrentPhaseNumber); throw new OperationCanceledException("Phase 3 canceled: ", ct); } }, i, ct); } var finalTask = Task.Factory.ContinueWhenAll(_Tasks, (tasks) => { Task.WaitAll(_Tasks); Console.WriteLine("========================================"); Console.WriteLine("All Phase is completed"); }, ct); try { finalTask.Wait(); } catch (AggregateException aex) { Console.WriteLine("Task failed And Canceled" + aex.ToString()); } finally { _Barrier.Dispose(); } Console.ReadLine(); } } }
相关文章推荐
- C# 并行编程 之 Barrier的使用 - 通过屏障同步并发任务
- C# 并行编程 之 Barrier的使用 - 通过屏障同步并发任务
- Atitit 三种并发编程模型 艾龙 attilax总结 1. 并发系统可以使用不同的并发模型去实现。 1 2. 并行工作者 并行工作者模型。进来的任务分配给不同的工作者 银行模式 2 2.1.
- C++ 并行与分布式编程 chapter5 任务间并发的同步(1)
- C++ 并行与分布式编程 chapter5 任务间并发的同步(2)
- Java并发编程之2——同步工具类的使用(CountDownLatch,CyclicBarrier,BlockungQueue,Semaphore)
- 使用C#和.NET 4编写的并行应用程序“多核并发编程的规则”
- C# 并行编程 之 限制资源的并发访问 使用SemaphoreSlim
- C# 并行编程 之 轻量级手动重置事件的使用
- C# 并行编程 之 PLINQ 基本使用
- C# 并行编程 之 PLINQ并行度的指定 和 ForAll的使用
- C#5.0之后推荐使用TPL(Task Parallel Libray 任务并行库) 和PLINQ(Parallel LINQ, 并行Linq). 其次是TAP(Task-based Asynchronous Pattern, 基于任务的异步模式)
- C# 并行编程 之 ThreadPool的基本使用
- java并发编程,通过Future取消任务
- C#多任务并行阶段控制—— Threading.Barrier
- java多线程--同步屏障CyclicBarrier的使用
- 并发编程实战 2.1. 使用synchronized实现同步
- geotrellis使用(六)Scala并发(并行)编程
- Java并发编程-同步辅助类之CyclicBarrier
- C#并行编程-并发集合