您的位置:首页 > 编程语言 > C#

学习迭代器实现C#异步编程——仿async/await(一)

2014-03-08 23:08 295 查看
  .NET 4.5的async/await真是个神奇的东西,巧妙异常以致我不禁对其实现充满好奇,但一直难以窥探其门径。不意间读了此篇强文《Asynchronous Programming in C# using Iterators》,犹如醍醐灌顶,茅厕顿开,思路犹如尿崩。美玉不敢独享,故写此篇,将所学中一些思考与诸君共享,期抛砖引玉,擦出一些基情火花……

  强文《Asynchronous Programming in C# using Iterators》出自大牛,大牛眼界高远。故文中所述较为简略,而文中所附代码亦较为晦涩,鄙人驽钝,反复阅读思考数十遍,方品出些味道。故本篇会对原文代码一个最简化的提取,再进行分析。

  强文提到的用迭代器在C#中进行异步编程,最核心的思想就是通过yield return产生一个可IEnumerable<Asyncable>的集合,取出第一个Asyncable,执行Async方法并立即返回,将控制权交给上层调用方,同时Async方法在完成后会回调MoveNext继续遍历之前集合。(本篇提到的最底层的Async方法均是以Begin/End来实现,之前的随笔也说过async/await只是语法糖)

  大概画了个草图意思一下,花了很久时间也没能画得特别清晰明了,请见谅,有好的想法还望赐教。



  接下来我们根据具体的代码来分析,首先看一下Main方法。第一行是一个异步方法,我们期待的结果是第二行的输出在异步方法结束前执行。

static void Main(string[] args)
{
AsyncMethod("http://www.microsoft.com").Execute();

Console.WriteLine("我先执行,不等你了");

Console.ReadLine();
}


  AsyncMethod方法返回了一个IEnumerable<IAsync>的集合,这里需要注意的是AsyncMethod方法的返回值其实是一个类似状态机的类对象,这个对象本身不会执行内部的代码语句,我们需要一个Execute方法来开始遍历运行这个集合里的代码语句。而AsyncMethod方法里的语句又根据yield return的个数来划分成块,每一次MoveNext方法其实是执行一块的代码。也就是说存在多少个yield return,就会有多少次回调。

  

static IEnumerable<IAsync> AsyncMethod(string url)
{
WebRequest req = HttpWebRequest.Create(url);
Console.WriteLine("[{0}] starting", url);

// asynchronously get the response from http server
Async<WebResponse> response = req.GetResponseAsync();
yield return response;

Console.WriteLine("[{0}] got response", url);
Stream resp = response.Result.GetResponseStream();

foreach (var item in resp.ReadToEndAsync())
{
yield return item;
}

Console.WriteLine("done");
}


  GetResponseAsync方法看上去很简单,就是封装了一下Beginxx/Endxxx。为什么说看上去简单,后面会提到。AsyncPrimitive就是我们会接触到最底层的Asyncable对象了,本篇一切异步都是基于它来实现的。

public static Async<WebResponse> GetResponseAsync(this WebRequest req)
{
return new AsyncPrimitive<WebResponse>(req.BeginGetResponse, req.EndGetResponse);
}


  ReadToEndAsync和AsyncMethod方法一样,是建立在可返回Async<T>对象的已有Async方法的基础上。

public static IEnumerable<IAsync> ReadToEndAsync(this Stream stream)
{
MemoryStream ms = new MemoryStream();
int read = -1;
while (read != 0)
{
byte[] buffer = new byte[512];
Async<int> count = stream.ReadAsync(buffer, 0, 512);
yield return count;

Console.WriteLine("[{0}] got data: {1}", "url", count.Result);
ms.Write(buffer, 0, count.Result);
read = count.Result;
}
}


  ReadAsync同样是通过Beginxxx/Endxxx来实现异步。他和GetResponseAsync一样是建立在AsyncPrimitive对象上的Async方法。

public static Async<int> ReadAsync(this Stream stream, byte[] buffer, int offset, int count)
{
return new AsyncPrimitive<int>(
(callback, st) => stream.BeginRead(buffer, offset, count, callback, st),
stream.EndRead);
}


  下面让我们重点来看一下AsyncPrimitive类,你可能已经发现这个类和Task<T>.Factory.FromAsync方法有点相似。可是如果让自己来实现,怕不是想象的那么简单。

public class AsyncPrimitive<T> : Async<T>
{
Action<Action<T>> func;

public AsyncPrimitive(Func<AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, T> end)
{
this.func = (cont) => begin(delegate(IAsyncResult res) { cont(end(res)); }, null);
}

public override void ExecuteStep(Action cont)
{
func((res) =>
{
result = res;
completed = true;
cont();
});
}
}


  完全由委托、匿名方法和lambda表达式组成的类。在ExecuteStep被调用前,它不会做任何事情,仅仅是构建了一个Action<Action<T>>的委托。那么分析这个类才是本篇最主要的目的,但难点在于这货不是三言两语就能说清楚的,鄙人在晕乎了很久很久以后,将该类翻译如下,去除了所有的匿名方法和lambda表达式。看明白了这个类,就明白了通过迭代器是如何实现异步的。

public class AsyncPrimitive<T> : Async<T>
{
Action<Action<T>> func;

public AsyncPrimitive(Func<AsyncCallback, object, IAsyncResult> begin, Func<IAsyncResult, T> end)
{
this.Begin = begin;
this.End = end;

this.func = this.ActionActionT;
}

Func<IAsyncResult, T> End { get; set; }
Func<AsyncCallback, object, IAsyncResult> Begin { get; set; }
Action<T> RunInCallback { get; set; }
Action ActionOuter {get;set;}

private void Callback(IAsyncResult ar)
{
this.RunInCallback(this.End(ar));
}

private void ActionActionT(Action<T> cont)
{
this.RunInCallback = cont;
this.Begin(this.Callback, null);
}

private void ActionT(T res)
{
this.result = res;
this.completed = true;
this.ActionOuter();
}

public override void ExecuteStep(Action cont)
{
this.ActionOuter = cont;
this.func(this.ActionT);
}
}


  直观的就可以感觉到lambda帮助我们省略了多少代码,在简洁的同时,也增加了些许理解的难度。代码就是最好的注释,我实在没信心去用文字描述这个类如何工作。

  最后补充Execute方法,这个方法真正的开始执行Async方法,大体思路就是遍历集合,但不是通过while循环,而是通过callback来执行下一个MoveNext。

public static void Execute(this IEnumerable<IAsync> async)
{
AsyncExtensions.Run(async.GetEnumerator());
}

internal static void Run(IEnumerator<IAsync> en)
{
if (!en.MoveNext()) return;
en.Current.ExecuteStep
(() => AsyncExtensions.Run(en));
}


  附上可运行的工程供调试用。原文链接开头已给出,原文中也给出了原文代码的下载。推荐都下载比对着看,可能会更有帮助。

  本篇也是初写的时候信心满满,不知道被各位吐槽后会是怎样一副情景……之后还应该还会有第二篇,也许是明天,也许是明年……

代码下载
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: