您的位置:首页 > 编程语言 > C#

24 C# 第十九章(二) 基于 .Net 4 TPL 的同步和线程处理模式

2013-09-23 08:41 411 查看

异步编程模式 (APM: Asynchronous Programming Model)

多线程编程的主要问题:

1) 监视异步操作的状态,知道何时完成。(最好不用使用轮询的方式)

2) 线程池。避免启动和终止线程的开销。

3) 避免死锁。

4) 为不同的操作提供原子性访问。

异步编程模式就是为解决上述问题而产生的。给定一个要长时间运行的同步方法X(),APM使用一个BeginX()方法,以异步方式启动与X()等价的工作;并用一个EndX()方法结束。这里基本把所有的线程控制和调度相关的工作都交给了系统来完成。

异步编程模式的产生是由于开发人员写的多线程的代码越来越多,从而总结出的一系列的对问题的解决方法。

APM一个简单的实例:

using System;
using System.IO;
using System.Net;
using System.Linq;

namespace TPL_Sync_APM_WebRequest
{
class Program
{
static void Main(string[] args)
{
string url = "http://www.sina.com.cn";

Console.WriteLine("URL : {0}", url);

WebRequest webreq = WebRequest.Create(url);

IAsyncResult asyncResult = webreq.BeginGetResponse(null, null);
while (!asyncResult.AsyncWaitHandle.WaitOne(100))
{
Console.Write('.');
}

WebResponse webrsp = webreq.EndGetResponse(asyncResult);

using (StreamReader reader = new StreamReader(webrsp.GetResponseStream()))
{
Console.WriteLine();
int length = reader.ReadToEnd().Length;
Console.WriteLine("length = {0} ", length);
}

Console.ReadKey();
}
}
}


EndX() 方法的作用:

1) 会阻塞线程继续执行,直到请求工作完成。

2) 如果方法需要返回数据,这个数据可以从EndX方法中回调。

3) 如果执行请求的工作发生异常,可在调用EndX时重新引发异常,确保异常会被调用的代码发现。

4) 如果任何资源需要在调用X后被清理,EndX将负责清理。

APM签名

BeginX 和 EndX APM方法合并起来应与签名的同步版本匹配。

EndX 的返回参数应与 X方法的返回参数一致。

例如 一个方法 bool TryDosomething(string url, ref string data, out string[] links)

System.IAsyncResult BeginTryDosomething(string url, ref string data, out string[] links, System.AsyncCallback callback, object state)

bool TryDosomething(string url, ref string data, out string[] links)

bool EndTryDosomething(string url, ref string data, out string[] links, System.IAyncResult result)

BeginX 参数,除了与 X() 函数一致外它还包含了2个其他参数,

1) 一个回调参数,是方法结束时要调用的一个 System.AsyncCallback 委托。

2) 一个Object 型的状态参数。

使用 TPL 调用 APM

这里把TPL和APM结合起来进行使用。使一个长时间的任务运行在一独立的线程中。

程序实例:

using System;
using System.IO;
using System.Net;
using System.Linq;
using System.Threading.Tasks;
using System.Collections.Generic;

namespace TPL_Sync_APM_WebRequestTP_TPLworkwithAPM
{
class Program
{
public static readonly object ConsoleSyncObject = new object();
static void Main(string[] args)
{
string[] urls = new string[] {"http://www.sina.com.cn",
"http://www.sohu.com.cn",
"http://www.baidu.com",
"http://www.163.com",
"http://www.qq.com/"
};

//Console.WriteLine("URL : {0}", urls);
int line = 0;

Task<WebResponse>[] taskWithState = urls.Select(
url=>DisplayPageSizeAsync(url, line++)).ToArray();

while (!Task.WaitAll(taskWithState.ToArray(), 50))
{
Console.Write('.');
}

Console.SetCursorPosition(0, line);
Console.WriteLine("Press any key to continue ...");
Console.ReadKey();
}

private static Task<WebResponse> DisplayPageSizeAsync(string url, int line)
{
lock (ConsoleSyncObject)
{
Console.WriteLine(url);
}
//Console.WriteLine("DisplayPageSizeAsync  running  ...");
WebRequest webreq = WebRequest.Create(url);
WebRequestState stat = new WebRequestState(webreq, line);

Task<WebResponse> task = Task<WebResponse>.Factory.FromAsync(webreq.BeginGetResponse,
GetResponseAsyncCompleted,
stat);
return task;
}

private static WebResponse GetResponseAsyncCompleted(IAsyncResult asyncResult)
{
//Console.WriteLine("GetResponseAsyncCompleted  running  ...");

WebRequestState stat = (WebRequestState)asyncResult.AsyncState;

HttpWebResponse response = (HttpWebResponse)stat.WebRequest.EndGetResponse(asyncResult);

Stream stream = response.GetResponseStream();

using (StreamReader reader = new StreamReader(stream))
{
int len = reader.ReadToEnd().Length;
DisplayPageSize(stat, len);
}

return response;
}

private static void DisplayProgress(IEnumerable<Task<WebResponse>> taskWithState)
{
//Console.WriteLine("DisplayProgress  running  ...");

foreach (WebRequestState state in taskWithState
.Where(task => !task.IsCompleted)
.Select(task => (WebRequestState)task.AsyncState))
{
DisplayProgress(state);
}
}

private static void DisplayPageSize(WebRequestState completedState, int length)
{
lock (ConsoleSyncObject)
{
//Console.WriteLine("DisplayPageSize  running  ...");

Console.SetCursorPosition(completedState.ConsoleColumn, completedState.ConsoleLine);
Console.Write(FormatBytes(length));
completedState.ConsoleColumn += length.ToString().Length;
}
}

private static void DisplayProgress(WebRequestState state)
{
int left = state.ConsoleColumn;
int top = state.ConsoleLine;

lock (ConsoleSyncObject)
{
//Console.WriteLine("DisplayProgress  running  ...");

if (left >= Console.BufferWidth - int.MaxValue.ToString().Length)
{
left = state.Url.Length;
Console.SetCursorPosition(left, top);
Console.Write("".PadRight(Console.BufferWidth - state.Url.Length));
state.ConsoleColumn = left;
}
else
{
state.ConsoleColumn++;
}

Console.SetCursorPosition(left, top);
Console.Write(".");
}
}

static public string FormatBytes(long bytes)
{
string[] magnitudes = new string[] { "GM", "MB", "KB", "Bytes" };

//Console.WriteLine("FormatBytes  running  ...");

long max = (long)Math.Pow(1024, magnitudes.Length);

return string.Format("{1:##.##} {0}",
magnitudes.FirstOrDefault(magnitude => bytes > (max /= 1024)) ?? "0 Bytes",
(decimal)bytes / (decimal)max).Trim();
}
}

class WebRequestState
{
public WebRequestState(WebRequest webRequest, int line)
{
WebRequest = webRequest;
ConsoleLine = line;
ConsoleColumn = Url.Length + 1;
}

public WebRequestState(WebRequest webRequest)
{
WebRequest = webRequest;
}

public WebRequest WebRequest { get; private set; }

public string Url
{
get
{
return WebRequest.RequestUri.ToString();
}
}

public int ConsoleLine { get; set; }
public int ConsoleColumn { get; set; }
}
}


这里关于异步编程模型,书中还介绍了其他几种方式,异步委托调用,基于事件的异步模式,Background Worker模式等等。

是把委托,事件等功能与异步编程模式结合起来使用,达到程序目的。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: