您的位置:首页 > 编程语言

TPL 和传统 .NET 异步编程一

2010-05-28 17:47 363 查看
TPL即任务并行库,是.NET Framework 4版本中的新鲜物,是System.Threading 和 System.Threading.Tasks 命名空间中的一组公共类型和 API。TPL 的目的在于简化向应用程序中添加并行性和并发性的过程,从而提高开发人员的工作效率。 TPL 会动态地按比例调节并发程度,以便最有效地使用所有可用的处理器。此外,TPL 还处理工作分区、ThreadPool 上的线程调度、取消支持、状态管理以及其他低级别的细节操作。通过使用 TPL,您可以在将精力集中于程序要完成的工作,同时最大程度地提高代码的性能。



TPL涉及的内容很多,在这里主要先介绍一下TPL与传统的.NET异步编程之间的关系。在以前的.NET Framework版本中,它 提供以下两种执行 I/O 绑定和计算绑定异步操作的标准模式:

异步编程模型 (APM),在该模型中异步操作由一对 Begin/End 方法(如 FileStream.BeginRead 和 Stream.EndRead)表示。

基于事件的异步模式 (EAP),在该模式中异步操作由名为“操作名称Async”和“操作名称Completed”的方法/事件对(例如 WebClient.DownloadStringAsync 和 WebClient.DownloadStringCompleted)表示。(EAP 是在 .NET Framework 2.0 版中引入的)。



而任务并行库 (TPL) 可通过各种方式与任一异步模式结合使用。可以将 APM 和 EAP 操作以 Task 的形式向库使用者公开,或者可以公开 APM 模式,但使用 Task 对象在内部实现它们。在这两种方案中,通过使用 Task 对象,可以简化代码并利用以下有用的功能:

在任务启动后,可以随时以任务延续的形式注册回调。

通过使用 ContinueWhenAll 和 ContinueWhenAny 方法或者 WaitAll 方法或 WaitAny 方法,协调多个为了响应 Begin_ 方法而执行的操作。

在同一 Task 对象中封装异步 I/O 绑定和计算绑定操作。

监视 Task 对象的状态。

使用 TaskCompletionSource<TResult> 将操作的状态封送到 Task 对象。



下面介绍一下TPL是如何与传统的.NET 异步相互结合在一起使用的。



[b]在任务中包装 APM 操作[/b]



System.Threading.Tasks.TaskFactory 和 System.Threading.Tasks.TaskFactory<TResult> 类都提供了 FromAsync 方法的一些重载,这些重载使您可以在一个 Task 实例或 Task<TResult> 实例中封装一个 APM Begin/End 方法对。各种重载可容纳任何具有零到三个输入参数的 Begin/End 方法对。
对于具有返回值的 End 方法(在 Visual Basic 中为 Function)的对,在创建 Task<TResult> 的 TaskFactory<TResult> 中使用这些方法。对于返回 void 的 End 方法(在 Visual Basic 中为 Sub),在创建 Task 的 TaskFactory 中使用这些方法。
对于 Begin 方法有三个以上的参数或包含 ref 或 out 参数的少数情况,提供了仅封装 End 方法的附加 FromAsync 重载。
下面的代码示例演示与 FileStream.BeginRead 和 FileStream.EndRead 方法匹配的 FromAsync 重载的签名。此重载采用三个输入参数,如下所示。

public Task<TResult> FromAsync<TArg1, TArg2, TArg3>(
    Func<TArg1, TArg2, TArg3, AsyncCallback, object, IAsyncResult> beginMethod, //BeginRead
     Func<IAsyncResult, TResult> endMethod, //EndRead
     TArg1 arg1, // the byte[] buffer
     TArg2 arg2, // the offset in arg1 at which to start writing data
     TArg3 arg3, // the maximum number of bytes to read
     object state // optional state information
    )




第一个参数是与 FileStream.BeginRead 方法的签名匹配的 Func<T1, T2, T3, T4, T5, TResult> 委托。第二个参数是采用 IAsyncResult 并返回 TResult 的 Func<T, TResult> 委托。由于 EndRead 返回一个整数,因此编译器将 TResult 的类型推断为 Int32,将任务的类型推断为 Task<Int32>。最后四个参数等同于 FileStream.BeginRead 方法中的那些参数:

在其中存储文件数据的缓冲区。

缓冲区中开始写入数据的偏移量。

要从文件中读取的最大数据量。

用于存储要传递给回调的用户定义状态数据的可选对象。



将 ContinueWith 用于回调功能

如果需要访问文件中的数据,而不只是访问字节数,则使用 FromAsync 方法是不够的。而应当使用 Task<String>,其 Result 属性包含文件数据。可以通过向原始任务添加延续任务来执行此操作。延续任务执行 AsyncCallback 委托通常执行的工作。当前面的任务完成并且数据缓冲区已填充时,会调用它。(FileStream 对象应在返回之前关闭)。



下面的示例演示如何返回 Task<String>,它封装了 FileStream 类的 BeginRead/EndRead 对。

const int MAX_FILE_SIZE = 14000000;
public static Task<string> GetFileStringAsync(string path)
{
    FileInfo fi = new FileInfo(path);
    byte[] data = null;
    data = new byte[fi.Length];
    FileStream fs = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, data.Length, true);
    //Task<int> returns the number of bytes read
    Task<int> task = Task<int>.Factory.FromAsync(
            fs.BeginRead, fs.EndRead, data, 0, data.Length, null);
    // It is possible to do other work here while waiting
    // for the antecedent task to complete.
    // ...
    // Add the continuation, which returns a Task<string>. 
    return task.ContinueWith((antecedent) =>
    {
        fs.Close();
        // Result = "number of bytes read" (if we need it.)
        if (antecedent.Result < 100)
        {
            return "Data is too small to bother with.";
        }
        else
        {
            // If we did not receive the entire file, the end of the
            // data buffer will contain garbage.
            if (antecedent.Result < data.Length)
                Array.Resize(ref data, antecedent.Result);
            // Will be returned in the Result property of the Task<string>
            // at some future point after the asynchronous file I/O operation completes.
            return new UTF8Encoding().GetString(data);
        }
    });
}


然后可以按如下所示调用该方法。

Task<string> t = GetFileStringAsync(path);          
// Do some other work:
// ...
try
{
     Console.WriteLine(t.Result.Substring(0, 500));
}
catch (AggregateException ae)
{
    Console.WriteLine(ae.InnerException.Message);
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: