您的位置:首页 > Web前端

IOCP Thread Pool 在 C# 的Safe实现

2010-08-24 17:05 579 查看
IOCP Thread Pool 在 C# 的Safe实现



IOCP是一种高性能的I/O模型,更多资料可以google下。
在.Net Framework下,没有提供IOCP的类库,我们需要引入Win32 API来建立IOCP Thread Pool。



[DllImport("Kernel32", CharSet = CharSet.Auto)]
            private static extern SafeFileHandle CreateIoCompletionPort(IntPtr hFile, IntPtr hExistingCompletionPort, IntPtr puiCompletionKey, UInt32 uiNumberOfConcurrentThreads);
            /// <summary> Win32Func: Closes an IO Completion Port Thread Pool </summary>
            [DllImport("Kernel32", CharSet = CharSet.Auto)]
            private static extern Boolean CloseHandle(SafeHandle hObject);
            /// <summary> Win32Func: Posts a context based event into an IO Completion Port Thread Pool </summary>
            [DllImport("Kernel32", CharSet = CharSet.Auto)]
            private static extern Boolean PostQueuedCompletionStatus(SafeFileHandle hCompletionPort, UInt32 uiSizeOfArgument, IntPtr dwCompletionKey, IntPtr pOverlapped);
            /// <summary> Win32Func: Waits on a context based event from an IO Completion Port Thread Pool.
            ///           All threads in the pool wait in this Win32 Function </summary>
            [DllImport("Kernel32", CharSet = CharSet.Auto)]
            private static extern Boolean GetQueuedCompletionStatus(SafeFileHandle hCompletionPort, out UInt32 pSizeOfArgument, out IntPtr dwCompletionKey, out IntPtr ppOverlapped, UInt32 uiMilliseconds);


我们用CreateIOCompletionPort获得一个IOCP对象的句柄,用PostQueuedCompetionStatus把状态对象(Socket编程下一般系传socket)放进队列,开启一定量线程来运行GetQueuedCompletionStatus监听,GetQueuedCompletionStatus函数会阻塞调用线程。

由于与非托管代码打交道,要实现Safe的代码,有几点需要注意。

1.我们要传递一个状态对象地址给非托管代码,由于GC的关系,我们不能直接传递地址,因为GC在回收的过程中,会移动在堆上的对象,造成地址改变。一般来说,GC移动会修改托管代码里面地址指向,但我们现在把地址传递出托管代码范围,GC也无能为力了。情况如图。







针对这种情况,可以用GCHandle类解决,GCHandle类的Alloc方法为对象注册,Alloc方法有两个参数,第二个参数系GCHandleType类型枚举,默认情况系Normal。当我们要GC不移动对象的时候,例如有个byte[]的Buffer需要非托管代码填充,可以使用Pinned。

GCHandle gch = GCHandle.Alloc(obj);
            PostQueuedCompletionStatus(GetHandle, (uint)Marshal.SizeOf(gch), IntPtr.Zero, (IntPtr)gch);

2.PostQueuedCompletionStatus函数的第二个参数是要传送数据的长度,直接使用sizeof系unsafe代码,这里使用Marshal.SizeOf方法。

3.还有需要注意的是,传递的对象必须实现[StructLayout(LayoutKind.Sequential)]标签,以保证该对象的成员在内存里连续分配,遵守C++方式。

4.利用SafeFileHandle引用内核对象更安全,这个类能实现引用计数。

至此,已经讲述完实现Safe代码的要点了。

IOCPThreadPool的实现代码:

using System; 
using System.Threading; 
using System.Runtime.InteropServices; 
using Microsoft.Win32.SafeHandles; 
namespace Continuum.SafeThreading 
{ 
    [StructLayout(LayoutKind.Sequential)] 
public class MyData 
    { 
private int value; 
public int Value 
        { 
get { return value; } 
set { this.value = value; } 
        } 
    } 
// Classes 
//============================================ 
/// <summary> This class provides the ability to create a thread pool to manage work.  The 
///           class abstracts the Win32 IOCompletionPort API so it requires the use of 
///           unmanaged code.  Unfortunately the .NET framework does not provide this functionality </summary> 
public sealed class SafeIOCPThreadPool 
    { 
// Win32 Function Prototypes 
/// <summary> Win32Func: Create an IO Completion Port Thread Pool </summary> 
        [DllImport("Kernel32", CharSet = CharSet.Auto)] 
private static extern SafeFileHandle CreateIoCompletionPort(IntPtr hFile, IntPtr hExistingCompletionPort, IntPtr puiCompletionKey, UInt32 uiNumberOfConcurrentThreads); 
/// <summary> Win32Func: Closes an IO Completion Port Thread Pool </summary> 
        [DllImport("Kernel32", CharSet = CharSet.Auto)] 
private static extern Boolean CloseHandle(SafeHandle hObject); 
/// <summary> Win32Func: Posts a context based event into an IO Completion Port Thread Pool </summary> 
        [DllImport("Kernel32", CharSet = CharSet.Auto)] 
private static extern Boolean PostQueuedCompletionStatus(SafeFileHandle hCompletionPort, UInt32 uiSizeOfArgument, IntPtr dwCompletionKey, IntPtr pOverlapped); 
/// <summary> Win32Func: Waits on a context based event from an IO Completion Port Thread Pool. 
///           All threads in the pool wait in this Win32 Function </summary> 
        [DllImport("Kernel32", CharSet = CharSet.Auto)] 
private static extern Boolean GetQueuedCompletionStatus(SafeFileHandle hCompletionPort, out UInt32 pSizeOfArgument, out IntPtr dwCompletionKey, out IntPtr ppOverlapped, UInt32 uiMilliseconds); 
// Constants 
/// <summary> SimTypeConst: This represents the Win32 Invalid Handle Value Macro </summary> 
private readonly IntPtr INVALID_HANDLE_VALUE = new IntPtr(-1); 
/// <summary> SimTypeConst: This represents the Win32 INFINITE Macro </summary> 
private readonly UInt32 INIFINITE = 0xffffffff; 
/// <summary> SimTypeConst: This tells the IOCP Function to shutdown </summary> 
private readonly IntPtr SHUTDOWN_IOCPTHREAD = new IntPtr(0x7fffffff); 
// Delegate Function Types 
/// <summary> DelType: This is the type of user function to be supplied for the thread pool </summary> 
public delegate void USER_FUNCTION(MyData obj); 
// Private Properties 
private SafeFileHandle m_hHandle; 
/// <summary> SimType: Contains the IO Completion Port Thread Pool handle for this instance </summary> 
private SafeFileHandle GetHandle { get { return m_hHandle; } set { m_hHandle = value; } } 
private Int32 m_uiMaxConcurrency; 
/// <summary> SimType: The maximum number of threads that may be running at the same time </summary> 
private Int32 GetMaxConcurrency { get { return m_uiMaxConcurrency; } set { m_uiMaxConcurrency = value; } } 
private Int32 m_iMinThreadsInPool; 
/// <summary> SimType: The minimal number of threads the thread pool maintains </summary> 
private Int32 GetMinThreadsInPool { get { return m_iMinThreadsInPool; } set { m_iMinThreadsInPool = value; } } 
private Int32 m_iMaxThreadsInPool; 
/// <summary> SimType: The maximum number of threads the thread pool maintains </summary> 
private Int32 GetMaxThreadsInPool { get { return m_iMaxThreadsInPool; } set { m_iMaxThreadsInPool = value; } } 
private Object m_pCriticalSection; 
/// <summary> RefType: A serialization object to protect the class state </summary> 
private Object GetCriticalSection { get { return m_pCriticalSection; } set { m_pCriticalSection = value; } } 
private USER_FUNCTION m_pfnUserFunction; 
/// <summary> DelType: A reference to a user specified function to be call by the thread pool </summary> 
private USER_FUNCTION GetUserFunction { get { return m_pfnUserFunction; } set { m_pfnUserFunction = value; } } 
private Boolean m_bDisposeFlag; 
/// <summary> SimType: Flag to indicate if the class is disposing </summary> 
private Boolean IsDisposed { get { return m_bDisposeFlag; } set { m_bDisposeFlag = value; } } 
// Public Properties 
private Int32 m_iCurThreadsInPool; 
/// <summary> SimType: The current number of threads in the thread pool </summary> 
public Int32 GetCurThreadsInPool { get { return m_iCurThreadsInPool; } set { m_iCurThreadsInPool = value; } } 
/// <summary> SimType: Increment current number of threads in the thread pool </summary> 
private Int32 IncCurThreadsInPool() { return Interlocked.Increment(ref m_iCurThreadsInPool); } 
/// <summary> SimType: Decrement current number of threads in the thread pool </summary> 
private Int32 DecCurThreadsInPool() { return Interlocked.Decrement(ref m_iCurThreadsInPool); } 
private Int32 m_iActThreadsInPool; 
/// <summary> SimType: The current number of active threads in the thread pool </summary> 
public Int32 GetActThreadsInPool { get { return m_iActThreadsInPool; } set { m_iActThreadsInPool = value; } } 
/// <summary> SimType: Increment current number of active threads in the thread pool </summary> 
private Int32 IncActThreadsInPool() { return Interlocked.Increment(ref m_iActThreadsInPool); } 
/// <summary> SimType: Decrement current number of active threads in the thread pool </summary> 
private Int32 DecActThreadsInPool() { return Interlocked.Decrement(ref m_iActThreadsInPool); } 
private Int32 m_iCurWorkInPool; 
/// <summary> SimType: The current number of Work posted in the thread pool </summary> 
public Int32 GetCurWorkInPool { get { return m_iCurWorkInPool; } set { m_iCurWorkInPool = value; } } 
/// <summary> SimType: Increment current number of Work posted in the thread pool </summary> 
private Int32 IncCurWorkInPool() { return Interlocked.Increment(ref m_iCurWorkInPool); } 
/// <summary> SimType: Decrement current number of Work posted in the thread pool </summary> 
private Int32 DecCurWorkInPool() { return Interlocked.Decrement(ref m_iCurWorkInPool); } 
// Constructor, Finalize, and Dispose  
//*********************************************** 
/// <summary> Constructor </summary> 
/// <param name = "iMaxConcurrency"> SimType: Max number of running threads allowed </param> 
/// <param name = "iMinThreadsInPool"> SimType: Min number of threads in the pool </param> 
/// <param name = "iMaxThreadsInPool"> SimType: Max number of threads in the pool </param> 
/// <param name = "pfnUserFunction"> DelType: Reference to a function to call to perform work </param> 
/// <exception cref = "Exception"> Unhandled Exception </exception> 
public SafeIOCPThreadPool(Int32 iMaxConcurrency, Int32 iMinThreadsInPool, Int32 iMaxThreadsInPool, USER_FUNCTION pfnUserFunction) 
        { 
try 
            { 
// Set initial class state 
                GetMaxConcurrency = iMaxConcurrency; 
                GetMinThreadsInPool = iMinThreadsInPool; 
                GetMaxThreadsInPool = iMaxThreadsInPool; 
                GetUserFunction = pfnUserFunction; 
// Init the thread counters 
                GetCurThreadsInPool = 0; 
                GetActThreadsInPool = 0; 
                GetCurWorkInPool = 0; 
// Initialize the Monitor Object 
                GetCriticalSection = new Object(); 
// Set the disposing flag to false 
                IsDisposed = false; 
// Create an IO Completion Port for Thread Pool use 
                GetHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, IntPtr.Zero, IntPtr.Zero, (UInt32)GetMaxConcurrency); 
// Test to make sure the IO Completion Port was created 
if (GetHandle.IsInvalid) 
throw new Exception("Unable To Create IO Completion Port"); 
// Allocate and start the Minimum number of threads specified 
                Int32 iStartingCount = GetCurThreadsInPool; 
                ThreadStart tsThread = new ThreadStart(IOCPFunction); 
for (Int32 iThread = 0; iThread < GetMinThreadsInPool; ++iThread) 
                { 
// Create a thread and start it 
                    Thread thThread = new Thread(tsThread); 
                    thThread.Name = "IOCP " + thThread.GetHashCode(); 
                    thThread.Start(); 
// Increment the thread pool count 
                    IncCurThreadsInPool(); 
                    Console.WriteLine(thThread.Name); 
                } 
            } 
catch (Exception) 
            { 
throw; 
            } 
        } 
//*********************************************** 
/// <summary> Finalize called by the GC </summary> 
~SafeIOCPThreadPool() 
        { 
if (!IsDisposed) 
                Dispose(); 
        } 
//********************************************** 
/// <summary> Called when the object will be shutdown.  This 
///           function will wait for all of the work to be completed 
///           inside the queue before completing </summary> 
public void Dispose() 
        { 
try 
            { 
// Flag that we are disposing this object 
                IsDisposed = true; 
// Get the current number of threads in the pool 
                Int32 iCurThreadsInPool = GetCurThreadsInPool; 
// Shutdown all thread in the pool 
for (Int32 iThread = 0; iThread < iCurThreadsInPool; ++iThread) 
                { 
bool bret = PostQueuedCompletionStatus(GetHandle, 4, SHUTDOWN_IOCPTHREAD, IntPtr.Zero); 
                } 
// Wait here until all the threads are gone 
while (GetCurThreadsInPool != 0) Thread.Sleep(100); 
// Close the IOCP Handle 
                CloseHandle(GetHandle); 
            } 
catch 
            { 
            } 
        } 
// Private Methods 
//******************************************* 
/// <summary> IOCP Worker Function that calls the specified user function </summary> 
private void IOCPFunction() 
        { 
            UInt32 uiNumberOfBytes; 
            IntPtr dwCompletionKey; 
            IntPtr lpOverlapped; 
try 
            { 
while (true) 
                { 
// Wait for an event 
                    GetQueuedCompletionStatus(GetHandle, out uiNumberOfBytes, out dwCompletionKey, out lpOverlapped, INIFINITE); 
if(uiNumberOfBytes <= 0) 
                    { 
continue; 
                    } 
// Decrement the number of events in queue 
                    DecCurWorkInPool(); 
// Was this thread told to shutdown 
if (dwCompletionKey == SHUTDOWN_IOCPTHREAD) 
break; 
// Increment the number of active threads 
                    IncActThreadsInPool(); 
try 
                    { 
// Call the user function 
                        GCHandle gch = GCHandle.FromIntPtr(lpOverlapped); 
                        MyData obj = (MyData) gch.Target; 
                        GetUserFunction(obj); 
                    } 
catch 
                    { 
throw; 
                    } 
// Get a lock 
                    Monitor.Enter(GetCriticalSection); 
try 
                    { 
// If we have less than max threads currently in the pool 
if (GetCurThreadsInPool < GetMaxThreadsInPool) 
                        { 
// Should we add a new thread to the pool 
if (GetActThreadsInPool == GetCurThreadsInPool) 
                            { 
if (IsDisposed == false) 
                                { 
// Create a thread and start it 
                                    ThreadStart tsThread = new ThreadStart(IOCPFunction); 
                                    Thread thThread = new Thread(tsThread); 
                                    thThread.Name = "IOCP " + thThread.GetHashCode(); 
                                    thThread.Start(); 
// Increment the thread pool count 
                                    IncCurThreadsInPool(); 
                                } 
                            } 
                        } 
                    } 
catch 
                    { 
                    } 
// Relase the lock 
                    Monitor.Exit(GetCriticalSection); 
// Increment the number of active threads 
                    DecActThreadsInPool(); 
                } 
            } 
catch 
            { 
            } 
// Decrement the thread pool count 
            DecCurThreadsInPool(); 
        } 
// Public Methods 
//****************************************** 
/// <summary> IOCP Worker Function that calls the specified user function </summary> 
/// <param name="obj"> SimType: A value to be passed with the event </param> 
/// <exception cref = "Exception"> Unhandled Exception </exception> 
public void PostEvent(MyData obj) 
        { 
try 
            { 
// Only add work if we are not disposing 
if (IsDisposed == false) 
                { 
// Post an event into the IOCP Thread Pool 
                    GCHandle gch = GCHandle.Alloc(obj); 
                    PostQueuedCompletionStatus(GetHandle, (uint)Marshal.SizeOf(gch), IntPtr.Zero, (IntPtr)gch); 
// Increment the number of item of work 
                    IncCurWorkInPool(); 
// Get a lock 
                    Monitor.Enter(GetCriticalSection); 
try 
                    { 
// If we have less than max threads currently in the pool 
if (GetCurThreadsInPool < GetMaxThreadsInPool) 
                        { 
// Should we add a new thread to the pool 
if (GetActThreadsInPool == GetCurThreadsInPool) 
                            { 
if (IsDisposed == false) 
                                { 
// Create a thread and start it 
                                    ThreadStart tsThread = new ThreadStart(IOCPFunction); 
                                    Thread thThread = new Thread(tsThread); 
                                    thThread.Name = "IOCP " + thThread.GetHashCode(); 
                                    thThread.Start(); 
// Increment the thread pool count 
                                    IncCurThreadsInPool(); 
                                } 
                            } 
                        } 
                    } 
catch 
                    { 
                    } 
// Release the lock 
                    Monitor.Exit(GetCriticalSection); 
                } 
            } 
catch (Exception e) 
            { 
throw e; 
            } 
catch 
            { 
throw new Exception("Unhandled Exception"); 
            } 
        } 
//***************************************** 
/// <summary> IOCP Worker Function that calls the specified user function </summary> 
/// <exception cref = "Exception"> Unhandled Exception </exception> 
public void PostEvent() 
        { 
try 
            { 
// Only add work if we are not disposing 
if (IsDisposed == false) 
                { 
// Post an event into the IOCP Thread Pool 
                    PostQueuedCompletionStatus(GetHandle, 0, IntPtr.Zero, IntPtr.Zero); 
// Increment the number of item of work 
                    IncCurWorkInPool(); 
// Get a lock 
                    Monitor.Enter(GetCriticalSection); 
try 
                    { 
// If we have less than max threads currently in the pool 
if (GetCurThreadsInPool < GetMaxThreadsInPool) 
                        { 
// Should we add a new thread to the pool 
if (GetActThreadsInPool == GetCurThreadsInPool) 
                            { 
if (IsDisposed == false) 
                                { 
// Create a thread and start it 
                                    ThreadStart tsThread = new ThreadStart(IOCPFunction); 
                                    Thread thThread = new Thread(tsThread); 
                                    thThread.Name = "IOCP " + thThread.GetHashCode(); 
                                    thThread.Start(); 
// Increment the thread pool count 
                                    IncCurThreadsInPool(); 
                                } 
                            } 
                        } 
                    } 
catch 
                    { 
                    } 
// Release the lock 
                    Monitor.Exit(GetCriticalSection); 
                } 
            } 
catch (Exception e) 
            { 
throw e; 
            } 
catch 
            { 
throw new Exception("Unhandled Exception"); 
            } 
        } 
    } 
}

测试代码:

using System; 
using System.Threading;  // Included for the Thread.Sleep call 
using Continuum.SafeThreading; 
namespace SafeSample 
{ 
//============================================ 
/// <summary> Sample class for the threading class </summary> 
public class UtilThreadingSample 
    { 
//*******************************************  
/// <summary> Test Method </summary> 
static void Main() 
        { 
// Create the MSSQL IOCP Thread Pool 
            SafeIOCPThreadPool pThreadPool = new SafeIOCPThreadPool(0, 5, 10, new SafeIOCPThreadPool.USER_FUNCTION(IOCPThreadFunction)); 
for (int i = 0; i < 100; i++) 
            { 
                pThreadPool.PostEvent(new MyData(){Value = i}); 
            } 
            pThreadPool.Dispose(); 
            Console.WriteLine("Disposed"); 
            Console.ReadLine(); 
        } 
private static object syncRoot = new object(); 
//***************************************** 
/// <summary> Function to be called by the IOCP thread pool.  Called when 
///           a command is posted for processing by the SocketManager </summary> 
/// <param name="obj"> The value provided by the thread posting the event </param> 
static public void IOCPThreadFunction(MyData obj) 
        { 
try 
            { 
                    Console.WriteLine("Value: {0},Thread:{1}", obj.Value, Thread.CurrentThread.Name); 
            } 
catch (Exception pException) 
            { 
                Console.WriteLine(pException.Message); 
            } 
        } 
    } 
}

参考资料:

IOCP Thread Pooling in C# - Part I
http://www.devarticles.com/c/a/C-Sharp/IOCP-Thread-Pooling-in-C-sharp-Part-I/

IOCP Thread Pooling in C# - Part II
http://www.devarticles.com/c/a/C-Sharp/IOCP-Thread-Pooling-in-C-sharp-Part-II/

蛙蛙推荐:在c#使用IOCP(完成端口)的简单示例
http://www.cnblogs.com/onlytiancai/archive/2009/01/05/1241571.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: