您的位置:首页 > 其它

Asynchronous Programming Model in .NET 2.0

2007-05-15 00:11 423 查看
[align=center]Asynchronous Programming Model (APM) in.NET 2.0[/align]
[align=center] [/align]
[align=center]Xiaolin (Colin) Peng[/align]
[align=center] [/align]
[align=center]WitStream Technologies Inc.[/align]

1. Asynchronous programming model (APM)

1.1 Asynchronous pattern

Asynchronous programming is to write computer programs to carry out computer operations asynchronously. The main reason to use asynchronous operations is to better use the CPU cycles, therefore achieve high-performance in your applications.

Asynchronous programming is usually done through process/thread API. Application developers identify tasks that can be executed in parallel, then create a process/thread to each of those tasks. In runtime, depending on the number of actual tasks and the number of CPUs available, multiple processes/threads may be running at the same time to execute the tasks in parallel.

Asynchronous pattern is implemented in .NET Framework Class libraries. This pattern is designed around methods that usually take long time to execute, for example, in doing extensive I/O operations, database transactions, remoting, scientific computations, etc.

Classes designed to use asynchronous pattern for methods that can potentially block have BeginXXX method to start a task and EndXXX method to complete the task, XXX represents the actual name of the blocking task, for example, BeginRead/EndRead methods in System.IO.Stream-derived classes.

We will see some examples using the asynchronous pattern.

1.1 I/O-bound asynchronous operations

All the System.IO.Stream-derived classes implement the asynchronous pattern. They have BeginRead/EndRead, BeginWrite/EndWrite methods.

Generally there are two ways to call the asynchronous Read/Write methods: with callback or without callback.

This is an example of blocking call without using callback (see [1] for more details).

void AsynchronousReadWithNoCallback()
{
//open a file to read
FileStream fs = new FileStream(fileName, FileMode.Open,
FileAccess.Read, FileShare.Read,
1024, FileOptions.Asynchronous);

byte[] buffer = new byte[512];

//do asychronous read
IAsyncResult ar = fs.BeginRead(bugfer, 0,
buffer.Length, null, null);

//you can do someting here
long sum = 0L;
for (int i = 0; i < 100; i++)
{
sum += i * i;
//you can check if the asynchronous read is
//complete here
if( ar.IsCompleted == false)
System.Console.WriteLine("Asynchronous is still “ +
“noy completed yet");
}

//blocking, wait for the read to complete
int bytesRead = fs.EndRead(ar);

//close the file and do other stuff
fs.Close();
}

BeginRead () creates thread pool thread to execute the Read () operation, you can examine the “IsCompleted” flag from IAsyncResult returned from the BeginRead method to see if the read is complete. Calling EndRead() will block the main thread (the thread executing AsynchronousReadWithNoCallback method) until the Read () operation completes.

Here is an example of asynchronous read with callback.

void AsynchronousReadWithCallback()
{
//open a file to read
FileStream fs = new FileStream(fileName, FileMode.Open,
FileAccess.Read, FileShare.Read,
1024, FileOptions.Asynchronous);

byte[] buffer = new byte[512];

//do asychronous read with callback
//we MUST pass fs as argument to callback method
//since we want to complete the read and close the file
//stream, ReadIsComplete will be called when the async
//read is complete
IAsyncResult ar = fs.BeginRead(bugfer, 0, buffer.Length,
this.ReadIsComplete, fs);

//you are free to do anything here
long sum = 0L;
for (int i = 0; i < 100; i++)
{
//just crazy to waste some CPU cycle
sum += i * i;
}

}

void ReadIsComplete(IAsyncResult ar)
{
//this method will be called when async read is complete
//get the argument passed from the BeginRead() method
FileStream fs = (FileStream)ar.AsyncState;

//get the results, we must call it
int bytesRead = fs.EndRead(ar);

//close the file stream
fs.Close();
}

The main difference here is that the calling thread does not wait for the completion of the asynchronous operation (so it can do some other important computing of its own). When the Read () completes, the callback method ReadIsComplete will be called.

In both examples, the FileStream’s EndRead() method is called(in different places). As matter of fact, the EndXXX() method has to be called in Asynchronous Programming Model implementation.

You should not mix the use of asynchronous calls with synchronous calls from the same class. If both methods are called at the same time in runtime, then unpredictable behavior may occur.

1.1 Threading within UI

For a single-threaded UI application, if the main thread makes a blocking call, then the UI will be poor responsive.

The solution is to create worker threads to do the compute-intensive operations; this will free the main UI thread to respond to user interactions. Worker thread typically updates UI its current progress.

ProgressBar component is a typical control used in WinForm, this control is used to show the progress of some task execution in the background.

The following code segment shows how to do asynchronous operation and how to update the main UI.

public delegate void UpdateUIDelegate(int val);

public partial class Form1 : Form
{
Thread worker;
public Form1()
{
InitializeComponent();

//let’s create a thread to do some compute bound operation
WorkerClass workerClass = new WorkerClass();
workerClass.UICallback = this.SetProgressbarValue;
workerClass.Parent = this;
worker = new Thread(
new ThreadStart(workerClass.ComputeBoundOperation));
worker.Start();
}

public void SetProgressbarValue(int val)
{
this.progressBar1.Value = val;
this.label1.Text = val.ToString() + "%";
}

protected override void Dispose(bool disposing)
{
worker.Abort();
if (disposing && (components != null))
{
components.Dispose();
}
base.Dispose(disposing);
}

}

class WorkerClass
{
private UpdateUIDelegate callback;
///<summary>
/// set the UI callback to update the UI
///</summary>
public UpdateUIDelegate UICallback
{
set { this.callback = value; }
}

private Form1 parent;
///<summary>
/// set the master control
///</summary>
public Form1 Parent
{
set { this.parent = value; }
}

///<summary>
/// this method may take lots of time to complete
///</summary>
public void ComputeBoundOperation()
{
for (int i = 0; i < 100; i++)
{
if (parent != null)
parent.BeginInvoke(callback, new object[] { i });
Thread.Sleep(1000);
}
}
}

The UI is very simple, it contains two controls: a ProgressBar and a Label. When the form is created, a worker thread is created and started running.

SetProgressbarValue()method in Form1 class is implemented to update the ProgressBar and the Label controls so user can see how much operation is done in the background. This method can not be directly called from the thread class because SetProgressbarValue ()directly sets the UI components (only the UI thread can set controls). That is why a delegate type UpdateUIDelegate has to be created and passed to the worker thread through the UICallback property in the worker class.

The update of UI from the worker thread occures in the following line of code:

parent.BeginInvoke(callback, new object[] { i });

Here parent points to the main UI object (a WinForm).

Since background worker thread is needed within the contect UI, Microsoft .NET 2.0 Framework Class Library specifically introduced the BackgroundWorkertype to simplify the asynchronous programming with UI [4].

2 APM using .NET delegate

2.1 Delegate in .NET

Delegate is a .Net specific type; it is like a function pointer in C/C++. It is very straightforward to define:

public delegate void UpdateUIDelegate(int val);

It is really like to declare a function prototype in C/C++, except it has a keyword delegate.

What is more interesting is that the .NET compiler generates a lot more type data for a delegate (keep in mind a delegate is a type/class in .NET).

Here is what C# compiler generates for the type UpdateUIDelegate:

Internal sealed class UpdateUIDelegate: MulticastDelegate
{
public UpdateUIDelegate(Object object, IntPtr method);
public void Invoke(Int32 val);
public IAsyncResult BeginInvoke(Int32 val, AsyncCallback callback,
Object object);
public void EndInvoke(IAsyncResult result);
}

The BeginInvoke/EndInvoke methods indicate that the delegate can be invoked asynchronously.

2.2 Asynchronous operation with delegate

Let’s say we have a method that is compute-bound. Calling this method sequentially may block the main thread and slow down the performance of the application; we know the right approach is to call this method asynchronously.

With the BeginInvoke/EndInvoke method in a delegate, this is easy to achieve.

Let’s see a method that does lots of database operations and potentially is very slow:

private JournalEntry JournalEntryBuilder(Statement statement,
StatementId statementId,
StatementGroup statementGroup,
bool isStatementPPA,
IList<StatementLineItem> lineItems)

{
//database-bound operations
}

This is the code to call this method sequentially:

Statement statement; //assign value to it
StatementId statementId; //assign value to it
StatementGroup statementGroup; //assign value to it

foreach (IList<CustomStatementLineItem>
aggregatedStatementLineItems
in processedStatementLineItems)
{
JournalEntry je = this.JournalEntryBuilder(statement,
statementId,
statementGroup,
false,
aggregatedStatementLineItems);
}

Obviously any execution of JournalEntryBuilder method will block the execution of caller thread if it takes excessive amount of time.

To use delegate to call JournalEntryBuilder method asynchronously, you need to define a delegate that has the same prototype as the method:

internal delegate JournalEntry JournalEntryBuilderDelegate(
Statement statement,
StatementId statementId,
StatementGroup statementGroup,
bool isStatementPPA,
IList<CustomStatementLineItem> lineItems);

Then define an IAsyncCallback method:

private void JournalEntryBuildComplete(IAsyncResult ar)
{
JournalEntryBuilderDelegate builderDelegate =
(JournalEntryBuilderDelegate)ar.AsyncState;

//get the result, this EndInvoke has to be called
try
{
JournalEntry je = builderDelegate.EndInvoke(ar);
}
catch (Exception ex)
{
}
}

The callback method can get the argument passed to it in the BeginInvoke () call by accessing the AsyncState property in IAsyncResult argument. Inside the callback method, delegate has to call EndInvoke () method to make sure the thread created for the asynchronous operation is properly de-allocated.

Here is the code segment to call JournalEntryBuilder asynchronously:

foreach (IList<CustomStatementLineItem>
aggregatedStatementLineItems in processedStatementLineItems)
{
#region Create Journal Entry
//use asynchronous/thread pool to perform
//journal entry creation
JournalEntryBuilderDelegate builderDelegate =
new JournalEntryBuilderDelegate(this.JournalEntryBuilder);
IAsyncResult ar = builderDelegate.BeginInvoke(
statement,
statementId,
statementGroup,
isPPA,
aggregatedStatementLineItems,
this.JournalEntryBuildComplete,
builderDelegate);
#endregion Create Journal Entry
}

Calling BeginInvoke () method on a delegate queues the compute-bound operation to the CLR’s thread pool by internally calling ThreadPool’s QueueUserWorkItem() method. So the compute-bound method is called by threads and it is subjected to thread synchronizations for shared resources.

2.3 Waiting for the computation results

If no callback is passed to the BeginInvoke method, and you like to query the status of the asynchronous operation, you can use IAsyncResult.IsCompletedor IAsyncResult.AsyncWaitHandle.WaitOne(0, false).

If the caller thread just wants to wait until the operation complete, you can periodically poll the IsCompleted flag:

while (ar.IsCompleted == false)
{
//do somthing if you like
Thread.Sleep(10);
}

Or call
ar.AsyncWaitHandle.WaitOne();

Of course, a callback is the preferred solution.

One way to wait for the completion of asynchronous operations is as follows.

WaitHandle[] waitHandles =
new WaitHandle[processedStatementLineItems.Count];
int i = 0;
foreach (IList<CustomStatementLineItem>
aggregatedStatementLineItems in processedStatementLineItems)
{
#region Create Journal Entry
//use asynchronous/thread pool to perform
//journal entry creation
JournalEntryBuilderDelegate builderDelegate =
new JournalEntryBuilderDelegate(this.JournalEntryBuilder);
IAsyncResult ar = builderDelegate.BeginInvoke(
statement,
statementId,
statementGroup,
isPPA,
aggregatedStatementLineItems,
this.JournalEntryBuildComplete,
builderDelegate);
waitHandles[i++] = ar.AsyncWaitHandle;
#endregion Create Journal Entry
}

//wait for the journal entry creation complete
WaitHandle.WaitAll(waitHandles);

WaitHandle.WaitAll allows the calling thread to wait for multiple threads to finish the asynchronous operations.

3 Implement APM pattern

3.1 An example

To understand more about the APM pattern, let’s see how to implement it in your own class.

The following example is to provide an asynchronous operation for calling the method AddSqures(), which adds the squares of nature numbers less than or equal to a given number. The purpose here is to show you how to implement the APM pattern.

Before we do any actual implementation, we need to know what methods and types we need to implement. Let’s assume the class name for providing the asynchronous operations is called MyAsyncExample, this must have the following minimum methods to represent an implementation of the APM pattern:

class MyAsyncExample
{
public long AddSquares(int n);
public IAsyncResult BeginAddSquares(int n,
AsyncCallback callback, object state);
public long EndAddSquares(IAsyncResult ar);
}

The first method AddSquares () is for user who does not want to use asynchronous programming. The method BeginAddSquares()/EndAddSquares() is for asynchronous calls.

BeginAddSquares () returns a type of IAsyncResult, which means we will have to provide an implementation of the interface.

class AsyncResult : IAsyncResult
{
}

To implement this interface, we must implement the following four properties:

public object AsyncState
{get;}

public WaitHandle AsyncWaitHandle
{get;}

public bool CompletedSynchronously
{get;}

public bool IsCompleted
{get;}

The AsyncState is for returning the state object caller supplies in the BeginAddSquares method. AsyncWaitHandle is for caller to wait on a particular operation. CompletedSynchronously is for caller to check if the operation is done synchronously or asychronously. IsCompleted is for caller to check if the operation is complete.

Here is the actual example code; explanation is given in section 3.2:

class AsyncResult : IAsyncResult, IDisposable
{
private object state;
private AsyncCallback callback;
private ManualResetEvent waitHandle;
private bool doneSynchronously;
private bool isCompleted;
private int arg;
private long result;

public int Argument
{
get { return this.arg; }
set { this.arg = value; }
}

public long Result
{
get{ return this.result;}
set{ this.result = value;}
}

public void Dispose()
{
if (waitHandle != null)
{
waitHandle.Close();
waitHandle = null;
state = null;
callback = null;
}
}

public AsyncCallback Callback
{
get { return this.callback; }
set { this.callback = value; }
}

#region IAsyncResult properties
public object AsyncState
{
get { return this.state; }
set { this.state = value; }
}

public WaitHandle AsyncWaitHandle
{
get { return this.waitHandle; }
set { this.waitHandle = (ManualResetEvent)value; }
}

public bool CompletedSynchronously
{
get { return this.doneSynchronously; }
set { this.doneSynchronously = value; }
}

public bool IsCompleted
{
get
{
if (isCompleted == true)
return isCompleted;
else
return waitHandle.WaitOne(0, false);
}
set { this.isCompleted = value; }
}
#endregion
}

class MyAsyncExample
{
///<summary>
/// this method compute the sum of the square of all
/// nature numbers less than or equal n
///</summary>
///<param name="n"></param>
///<returns></returns>
public long AddSquares(int n)
{
long sum = 0L;
for (int i = 1; i <= n; i++)
sum += i * i;

return sum;
}

///<summary>
/// this method is called by thread pool thread
///</summary>
///<param name="state"></param>
private void DoWork(object state)
{
AsyncResult ar = state as AsyncResult;
if (ar == null)
return;

ar.Result = this.AddSquares(ar.Argument);

//signal compelte
//signal thread complete
ar.IsCompleted = true;
((ManualResetEvent)ar.AsyncWaitHandle).Set();
//call callback
if (ar.Callback != null)
ar.Callback(ar);
}

///<summary>
/// asynchronous version of SumSquares
///</summary>
///<param name="n"></param>
///<param name="callback"></param>
///<param name="state"></param>
///<returns></returns>
public IAsyncResult BeginAddSquares(int n,
AsyncCallback callback, object state)
{
//create a thread to execute SumSquares
//create an instance of our own implementation
//of AsyncResult class
//set state to it
//return it

AsyncResult ar = new AsyncResult();
ar.Argument = n;
ar.AsyncState = state;
ar.Callback = callback;
if (n < 10)
{
long result = this.AddSquares(n);

ar.Result = result;
ar.AsyncWaitHandle = null;
ar.CompletedSynchronously = true;
ar.IsCompleted = true;

if (callback != null)
callback(ar);

}
else
{
//create a thread to do it
ar.AsyncWaitHandle = new ManualResetEvent(false);
ThreadPool.QueueUserWorkItem(
new WaitCallback(this.DoWork), ar);
}
return ar;
}

///<summary>
/// get the result of operation
///</summary>
///<param name="ar"></param>
///<returns></returns>
public long EndAddSquares(IAsyncResult ar)
{
//this method can be called from the caller thread
//or from the calculation thread

//make right argument is passed to me
if (ar == null)
throw new ArgumentException("ar");

AsyncResult myAr = (AsyncResult)ar;
if (myAr == null)
throw new ArgumentException("ar");

//when complete
if (myAr.IsCompleted == true)
return myAr.Result;
else
{
//wait if the thread created earlier is not complete
myAr.AsyncWaitHandle.WaitOne();
//when finish
long result = myAr.Result;
myAr.Dispose();
return result;
}
}
}

3.2 General Steps to implement your own APM pattern

Let’s start with the code segment using asynchronous operations in MyAsyncExample.

void CallAsynExample()
{
WaitHandle[] handles = new WaitHandle[10];
for (int i = 1; i <= 10; i++)
{
MyAsyncExample example = new MyAsyncExample(i);
IAsyncResult ar = example.BeginAddSquares(i*10,
this.AsynchOperationComplete, example);
handles[i-1] = ar.AsyncWaitHandle;
}

//wait all operations to complete
WaitHandle.WaitAll(handles);
System.Console.WriteLine("All operations are complete");
}

void AsynchOperationComplete(IAsyncResult ar)
{
//got to have this exmple because I need to call the
//EndXXX method
MyAsyncExample example = (MyAsyncExample)ar.AsyncState;
long result = example.EndAddSquares(ar);
System.Console.WriteLine("Result ({0}) = {1}",
example.Index, result);
}

The above example shows the calling of MyAsyncExample.BeginAddSquares () with a callback. Since the callback has to take an argument of IAsyncResult, in this case, AsyncResult, and in the callback, we have to call MyAsyncExample.EndAddSquares() method, so we have to pass the instance of MyAsyncExampleas the third argument to the BeginAddSquares () method so the instance of MyAsyncExample can be carried over to the callback method.

The computation result is returned from EndAddSquares () call.

Now let’s start with implementation of IAsyncResult:

class AsyncResult : IAsyncResult, IDisposable
{
private object state;
private AsyncCallback callback;
private ManualResetEvent waitHandle;
private bool doneSynchronously;
private bool isCompleted;
private int arg;
private long result;
}

The state field is for implementing AsyncState property and holding any user supplied argument type (the third argument in the BeginAddSquares() call), so its type is of object(user’s callback will cast objecttype back to its original type).

The callback is for holding the callback delegate specified as the second argument in the BeginAddSquares() method. Thread pool callback DoWork() will call this callback AsynchOperationComplete when it finishes the asynchronous operations.

The waitHandle is for implementing the AsyncWaitHandle property, its type is WaitHandle. We use the EventWaitHandleManualResetEvent in this example. The manual reset event object will signal the completion of the asynchronous operation when its Set() method is called.

The doneSynchronouslyis for implementing the CompletedSynchronously property and is used to indicate if the operation is executed synchronously or asynchronously. In our example, if the argument to the method AddSquares() is less than 10, then it is executed synchronously. If it is greater than or equal to 10, then it is executed asynchronously.

The isCompleted is for implementing the IsCompleted property and is used to indicate if the operation is complete.

The arg field is defined here because DoWork () needs to know the argument user sets in the BeginAddSquares () method.

The result field is defined here because the actual calculation result needs to be returned when user calls the EndAddSquares (). Keep in mind, the actual calculation is not done in EndAddSquares () but DoWork (), and AsyncResult is the only argument passed to EndAddSquares (), so AsyncResult has to remember the calculation result.

Let’s take a close look at how the event object and thread pool thread are created and used.

In BeginAddSquares () method, an AsyncResult object is created, all the arguments from the method call are set to the AsyncResult object. Then a synchronous call to AddSquares () is made if the integer argument is less than 10, and AsyncResult’s CompletedSynchronously is set to true. If the integer argument is greater than or equal to 10, an asynchronous call to AddSquares () is made. Pay close attention to how the asynchronous operation is carried out. First, a manual reset event object is created and set to the AsyncResult object’s AsyncWaitHandle property, secondly a thread pool thread is created to execute the DoWork() method. DoWork is added here just simply because we need to have a method that has the WaitCallback prototype so we can pass it to the ThreadPool.QueueUserWorkItem() method.

In DoWork() method, AddSquares() is called first. The computation result is set to the AsyncResult object’s result prroperty, and the manual reset object is signaled. The reason we need to signal the manual object is that, the caller, after calling BeginAddSquares () method, may make a blocking call of EndAddSquares (), and is waiting for the completion of the thread pool thread.

The EndAddSquares() method implementation is straightforward, if the asynchronous operation is already complete, it returns result from the AsyncResult object, and otherwise, it waits for the completion of the asynchronous call by calling:

myAr.AsyncWaitHandle.WaitOne();

Reference
[1] Jeffrey Richter, CLR via C#, Microsoft Press, 2006

[2] Anthony Jones, Jim Ohlund, Lance Olson, Network Programming for the Microsoft .NET Framework, Micosoft Press, 2004

[3] Colin Peng, Introduction to Asynchronous Programming, http://www.witstream.com/research/IntroductionAsynchronousprogramming.htm, WitStream Technologies Inc. 2007
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: