C# 并行编程 之 命令式任务并行 (.Net Framework 4.0)
2015-04-22 08:58
459 查看
此文为个人学习《C#并行编程高级教程》的笔记,总结并调试了一些文章中的代码示例。 在以后开发过程中可以加以运用。
Task 的使用非常简单,定义工作函数,创建Task,Task开始运行,Wait/WaitAll 等待Task结束。
Sample 1 并行任务的创建
要实现线程的取消需要一下步骤。
1) 首先要在线程中加入取消检查点(ThrowIfCancellationRequested),这里是在工作函数的起始处和while循环的运行中。
2) 在Main函数中定义CancellationTokenSource对象并把它作为参数传递给工作Task。
3) 在运行时调用CancellationTokenSource.Cancel(),触发线程的取消。
4) 在Main函数中捕获取消异常(OperationCanceledException),线程终止,取消成功。
Sample 2 线程的取消
使用返回值的步骤:
1) 第一步当然要给工作函数定义返回值,并且在实现的过程中返回有效值。
2) 定义Task时,要使用var而不能使用Task了,而且还得使用Task.Factory。 var t1 = Task.Factory.StartNew(() => StartT1()); 。后面取得结果时需要用到t1.Result,如果使用把t1定义为Task它是没有Result属性的。
3) 运行Task。
4) 取得结果,主要使用的是 t1.Result 属性。
5) 关于 TaskCreationOptions.LongRunning,我们可以指定操作系统运行任务时的一些方式,当然这些方式还是由系统控制的,未必真的能够满足我们定义任务的需要。
https://msdn.microsoft.com/zh-tw/library/system.threading.tasks.taskcreationoptions(v=vs.110).aspx
Sample 3 线程返回值
用ContinueWith和Wait函数组合使用便可以控制任务的运行顺序。
Sample 4 控制线程的运行顺序
代码示例工程下载
最基本的使用,并行任务的创建
在 .Net Framework 4 中出现了Task 的概念。在以往的多线程程序中虽然使用的是thread,但大多数的时候我们还是会把业务处理划分为Task。这样更接近于人类的思考方式,毕竟Thread不能说明它和业务的关联。这里C#直接提供了task,也算是对开发者简便了一些。Task 的使用非常简单,定义工作函数,创建Task,Task开始运行,Wait/WaitAll 等待Task结束。
Sample 1 并行任务的创建
using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; namespace CSParallel_Program { class ThreadWork1 { public ThreadWork1() { } public void run() { System.Console.WriteLine("ThreadWork1 run { "); for (int i = 0; i < 100; i++) { System.Console.WriteLine("ThreadWork1 : " + i); } System.Console.WriteLine("ThreadWork1 run } "); } } class ThreadWork2 { public ThreadWork2() { } public void run() { System.Console.WriteLine("ThreadWork2 run { "); for (int i = 0; i < 100; i++) { System.Console.WriteLine("ThreadWork2 : " + i*i); } System.Console.WriteLine("ThreadWork2 run } "); } } class Program { static void StartT1() { ThreadWork1 work1 = new ThreadWork1(); work1.run(); } static void StartT2() { ThreadWork2 work2 = new ThreadWork2(); work2.run(); } static void Main(string[] args) { Task t1 = new Task(() => StartT1()); Task t2 = new Task(() => StartT2()); Console.WriteLine("Sample 3-1 Main {"); Console.WriteLine("Main t1 t2 started {"); t1.Start(); t2.Start(); Console.WriteLine("Main t1 t2 started }"); Console.WriteLine("Main wait t1 t2 end {"); Task.WaitAll(t1,t2); Console.WriteLine("Main wait t1 t2 end }"); Console.WriteLine("Sample 3-1 Main }"); Console.ReadKey(); } } }
线程的取消
Task 提供了线程取消的操作,使用起来也是非常的简单。它本质上是靠异常来打断线程的执行,并且把Task的状态置为Cancel状态(但这一点在测试程序中,Task的状态并没有置为Cancel,而是Faild,也许换个PC能解决这个问题也不一定)。要实现线程的取消需要一下步骤。
1) 首先要在线程中加入取消检查点(ThrowIfCancellationRequested),这里是在工作函数的起始处和while循环的运行中。
2) 在Main函数中定义CancellationTokenSource对象并把它作为参数传递给工作Task。
3) 在运行时调用CancellationTokenSource.Cancel(),触发线程的取消。
4) 在Main函数中捕获取消异常(OperationCanceledException),线程终止,取消成功。
Sample 2 线程的取消
using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; namespace CSParallel_Program { class ThreadWork1 { public ThreadWork1() { } public void run_with_cancel(System.Threading.CancellationToken ct) { //try //{ System.Console.WriteLine("ThreadWork1 run { "); ct.ThrowIfCancellationRequested(); for (int i = 0; i < 100; i++) { System.Console.WriteLine("ThreadWork1 : " + i); System.Threading.Thread.Sleep(200); ct.ThrowIfCancellationRequested(); } System.Console.WriteLine("ThreadWork1 run } "); //} //catch (OperationCanceledException ex) //{ // System.Console.WriteLine("ThreadWork1 Get Exception : " + ex.ToString()); //} } } class ThreadWork2 { public ThreadWork2() { } public void run_with_cancel(System.Threading.CancellationToken ct) { //try //{ ct.ThrowIfCancellationRequested(); System.Console.WriteLine("ThreadWork2 run { "); for (int i = 0; i < 100; i++) { System.Console.WriteLine("ThreadWork2 : " + i * i); System.Threading.Thread.Sleep(300); ct.ThrowIfCancellationRequested(); } System.Console.WriteLine("ThreadWork2 run } "); //} //catch (OperationCanceledException ex) //{ // System.Console.WriteLine("ThreadWork2 Get Exception : " + ex.ToString()); //} } } class Program { static void StartT1(System.Threading.CancellationToken ct) { ThreadWork1 work1 = new ThreadWork1(); work1.run_with_cancel(ct); } static void StartT2(System.Threading.CancellationToken ct) { ThreadWork2 work2 = new ThreadWork2(); work2.run_with_cancel(ct); } static void Main(string[] args) { System.Threading.CancellationTokenSource cts = new System.Threading.CancellationTokenSource(); System.Threading.CancellationToken ct = cts.Token; Task t1 = new Task(() => StartT1(ct)); Task t2 = new Task(() => StartT2(ct)); Console.WriteLine("Sample 3-1 Main {"); Console.WriteLine("Main t1 t2 started {"); t1.Start(); t2.Start(); Console.WriteLine("Main t1 t2 started }"); Console.WriteLine("Main sleep 2 seconds and CANCEL {"); System.Threading.Thread.Sleep(2000); cts.Cancel(); Console.WriteLine("Main sleep 2 seconds and CANCEL }"); try { Console.WriteLine("Main wait t1 t2 end {"); if (!Task.WaitAll(new Task[] { t1, t2 }, 5000)) { Console.WriteLine("Worker1 and Worker2 NOT complete within 5 seconds"); Console.WriteLine("Worker1 Status: " + t1.Status); Console.WriteLine("Worker2 Status: " + t2.Status); } else { Console.WriteLine("Worker1 and Worker2 complete within 5 seconds"); } Console.WriteLine("Main wait t1 t2 end }"); } catch (AggregateException agg_ex) { foreach (Exception ex in agg_ex.InnerExceptions) { Console.WriteLine("Agg Exceptions: " + ex.ToString()); Console.WriteLine(""); } } if (t1.IsCanceled) { Console.WriteLine("Worker 1 is CANCELED"); } if (t2.IsCanceled) { Console.WriteLine("Worker 2 is CANCELED"); } Console.WriteLine("Sample 3-1 Main }"); Console.ReadKey(); } } }
线程的返回值
Task可以提供返回值服务,这个与Thread还是有些区别的,在Thread中,无论是C#还是C++会通过共享的队列等方式返回测试结果。使用返回值的步骤:
1) 第一步当然要给工作函数定义返回值,并且在实现的过程中返回有效值。
2) 定义Task时,要使用var而不能使用Task了,而且还得使用Task.Factory。 var t1 = Task.Factory.StartNew(() => StartT1()); 。后面取得结果时需要用到t1.Result,如果使用把t1定义为Task它是没有Result属性的。
3) 运行Task。
4) 取得结果,主要使用的是 t1.Result 属性。
5) 关于 TaskCreationOptions.LongRunning,我们可以指定操作系统运行任务时的一些方式,当然这些方式还是由系统控制的,未必真的能够满足我们定义任务的需要。
https://msdn.microsoft.com/zh-tw/library/system.threading.tasks.taskcreationoptions(v=vs.110).aspx
Sample 3 线程返回值
using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; namespace CSParallel_Program { class ThreadWork1 { public ThreadWork1() { } public List<string> run() { List<string> RetList = new List<string>(); System.Console.WriteLine("ThreadWork1 run { "); System.Console.WriteLine("ThreadWork1 running ... ... "); for (int i = 0; i < 100; i++) { RetList.Add("ThreadWork1 : " + i); //System.Console.WriteLine("ThreadWork1 : " + i); } System.Console.WriteLine("ThreadWork1 run } "); return RetList; } } class ThreadWork2 { public ThreadWork2() { } public List<string> run() { List<string> RetList = new List<string>(); System.Console.WriteLine("ThreadWork2 run { "); System.Console.WriteLine("ThreadWork2 running ... ... "); for (int i = 0; i < 100; i++) { RetList.Add("ThreadWork2 : " + i); //System.Console.WriteLine("ThreadWork2 : " + i * i); } System.Console.WriteLine("ThreadWork2 run } "); return RetList; } } class Program { static List<string> StartT1() { ThreadWork1 work1 = new ThreadWork1(); return work1.run(); } static List<string> StartT2() { ThreadWork2 work2 = new ThreadWork2(); return work2.run(); } static void Main(string[] args) { // For return value we can't use this one : new Task(() => StartT2()); // The problem is the compiler will not know there is a return value. // if we use t2.Result after that, there will be a compiler error. var t1 = Task.Factory.StartNew(() => StartT1()); var t2 = Task.Factory.StartNew(() => StartT2()); Console.WriteLine("Sample 3-3 Main {"); // If we use Task.Factory.StartNew, it's no need to use t1.start() //Console.WriteLine("Main t1 t2 started {"); //t1.Start(); //t2.Start(); //Console.WriteLine("Main t1 t2 started }"); Console.WriteLine("Main wait t1 t2 end {"); Task.WaitAll(t1, t2); Console.WriteLine("Main wait t1 t2 end }"); var t3 = Task.Factory.StartNew(() => { Console.WriteLine("============= T1 Result ============="); for (int i = 0; i < t1.Result.Count; i++) { Console.WriteLine(t1.Result[i]); } Console.WriteLine("============= ========= =============\n\n"); Console.WriteLine("============= T2 Result ============="); for (int i = 0; i < t2.Result.Count; i++) { Console.WriteLine(t2.Result[i]); } Console.WriteLine("============= ========= =============\n\n"); }, TaskCreationOptions.LongRunning ); Console.WriteLine("Sample 3-3 Main }"); Console.ReadKey(); } } }
控制任务的执行顺序
Task还提供了简单的控制任务执行顺序的方式 ContinueWith()。用ContinueWith和Wait函数组合使用便可以控制任务的运行顺序。
Sample 4 控制线程的运行顺序
using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; namespace CSParallel_Program { class ThreadWork1 { public ThreadWork1() { } public List<string> run() { List<string> RetList = new List<string>(); System.Console.WriteLine("ThreadWork1 run { "); System.Console.WriteLine("ThreadWork1 running ... ... "); for (int i = 0; i < 100; i++) { RetList.Add("ThreadWork1 : " + i); //System.Console.WriteLine("ThreadWork1 : " + i); } System.Console.WriteLine("ThreadWork1 run } "); return RetList; } } class ThreadWork2 { public ThreadWork2() { } public List<string> run() { List<string> RetList = new List<string>(); System.Console.WriteLine("ThreadWork2 run { "); System.Console.WriteLine("ThreadWork2 running ... ... "); for (int i = 0; i < 100; i++) { RetList.Add("ThreadWork2 : " + i); //System.Console.WriteLine("ThreadWork2 : " + i * i); } System.Console.WriteLine("ThreadWork2 run } "); return RetList; } } class Program { static void StartT0() { System.Console.WriteLine("Hello I am T0 Task, sleep 3 seconds. when I am ready others GO!"); for (int i = 0; i < 3; i++) { Console.WriteLine("StartT0 sleeping ... ... " + i); System.Threading.Thread.Sleep(1000); } } static List<string> StartT1() { ThreadWork1 work1 = new ThreadWork1(); return work1.run(); } static List<string> StartT2() { ThreadWork2 work2 = new ThreadWork2(); return work2.run(); } static void Main(string[] args) { Console.WriteLine("Sample 3-4 Main {"); // The sequence of the task is: // T0 (Wait 3s) --> | // | --> T1 (Cacluate) | // | --> T2 (Cacluate) | // | --> T3 (Print) var t0 = Task.Factory.StartNew(() => StartT0()); var t1 = t0.ContinueWith((t) => StartT1()); var t2 = t0.ContinueWith((t) => StartT2()); Console.WriteLine("Main wait t1 t2 end {"); Task.WaitAll(t1, t2); Console.WriteLine("Main wait t1 t2 end }"); var t3 = Task.Factory.StartNew(() => { Console.WriteLine("============= T1 Result ============="); for (int i = 0; i < t1.Result.Count; i++) { Console.WriteLine(t1.Result[i]); } Console.WriteLine("============= ========= =============\n\n"); Console.WriteLine("============= T2 Result ============="); for (int i = 0; i < t2.Result.Count; i++) { Console.WriteLine(t2.Result[i]); } Console.WriteLine("============= ========= =============\n\n"); }, TaskCreationOptions.LongRunning); Console.WriteLine("Sample 3-4 Main }"); Console.ReadKey(); } } }
代码示例工程下载
相关文章推荐
- C# 并行编程 之 命令式任务并行 (.Net Framework 4.0)
- C# 并行编程 之 并发集合 (.Net Framework 4.0)(转)
- C# 并行编程 之 并发集合 (.Net Framework 4.0)
- C# 并行编程 之 并发集合 (.Net Framework 4.0)
- C# 4.0的并行任务
- C# 并行编程 之 Barrier的使用 - 通过屏障同步并发任务
- C#命令式任务并行开发
- C#4.0入门 第二章 任务并行库—第一页 只使用双核(转)
- C#4.0入门 第二章 任务并行库—第二页 活用了内核吗?(转)
- C#4.0入门 第二章 任务并行库—第三页 性能的差异(转)
- C# 并行编程 之 Barrier的使用 - 通过屏障同步并发任务
- C# 并行编程 之 Barrier的使用 - 通过屏障同步并发任务
- C# 4.0 的新特性之并行运算 Parallel
- [转]异步、多线程、任务、并行编程之一:选择合适的多线程模型
- C# 并行编程 之 PLINQ 规约操作和聚合函数
- C#学习笔记二:并行编程基础:在 PLINQ 和 TPL 中的 Lambda 表达式
- 【C#】52. 使用Flatten方法处理并行任务抛出的异常
- 5天玩转C#并行和多线程编程 —— 第五天 多线程编程大总结
- C# 4.0新特性-"协变"与"逆变"以及背后的编程思想