您的位置:首页 > 编程语言 > C#

[C#]I/O完成端口的类定义和测试实例

2008-12-24 13:54 344 查看
从William Kennedy那里整理过来的,不同之处在于他自己定义了一个Overlapped,而我们这里直接使用

System.Threading.NativeOverlapped:。

附一段我以前的Win32下的IOCP文档,如果您了解IOCP也可以直接跳过看后面的C#测试示范:

我们采用的是I/O Complete Port(以下简称IOCP)处理机制。

简单的讲,当服务应用程序初始化时,它应该先创建一个I/O CP。我们在请求到来后,将得到的数据打包用PostQueuedCompletionStatus发送到IOCP中。这时需要创建一些个线程(7个线程/每CPU,再多就没有意义了)来处理发送到IOCP端口的消息。实现步骤大致如下:

1 先在主线程中调用CreateIoCompletionPort创建IOCP。

CreateIoCompletionPort的前三个参数只在把设备同Complete Port相关联时才有用。

此时我们只需传递INVALID_HANDLE_VALUE,NULL和0即可。

第四个参数告诉端口同时能运行的最多线程数,这里设置为0,表示默认为当前计算机的CPU数目。

2 我们的ThreadFun线程函数执行一些初始化之后,将进入一个循环,该循环会在服务进程终止时才结束。

在循环中,调用GetQueuedCompletionStatus,这样就把当前线程的ID放入一个等待线程队列中,I/O CP内核对象就总能知道哪个线程在等待处理完成的I/O请求。

如果在IDLE_THREAD_TIMEOUT规定的时间内I/O CP上还没有出现一个Completion Packet,则转入下一次循环。在这里我们设置的IDLE_THREAD_TIMEOUT为1秒。

当端口的I/O完成队列中出现一项时,完成端口就唤醒等待线程队列中的这个线程,该线程将得到完成的I/O项中的信息: 传输的字节数、完成键和OVERLAPPED结构的地址。

在我们的程序中可以用智能指针或者BSTR或者int来接受这个OVERLAPPED结构的地址的值,从而得到消息;然后在这个线程中处理消息。

GetQueuedCompletionStatus的第一个参数hCompletionPort指出了要监视哪一个端口,这里我们传送先前从CreateIoCompletionPort返回的端口句柄。

需要注意的是:

第一, 线程池的数目是有限制的,和CPU数目有关系。

第二, IOCP是一种较为完美的睡眠/唤醒 线程机制;线程当前没有任务要处理时,就进入睡眠状态,从而不占用CPU资源,直到被内核唤醒;

第三, 最近一次刚执行完的线程,下次任务来的时候还会唤醒它;所以有可能比较少被调用的线程以后被调用的几率也少。

测试代码:




using System;


using System.Threading; // Included for the Thread.Sleep call


using Continuum.Threading;


using System.Runtime.InteropServices;




namespace IOCPDemo






{


//=============================================================================




/**//// <summary> Sample class for the threading class </summary>


public class UtilThreadingSample






{


//*****************************************************************************




/**//// <summary> Test Method </summary>


static void Main()






{


// Create the MSSQL IOCP Thread Pool


IOCPThreadPool pThreadPool = new IOCPThreadPool(0, 10, 20, new IOCPThreadPool.USER_FUNCTION(IOCPThreadFunction));




//for(int i =1;i<10000;i++)






{


pThreadPool.PostEvent(1234);


}




Thread.Sleep(100);




pThreadPool.Dispose();


}




//********************************************************************




/**//// <summary> Function to be called by the IOCP thread pool. Called when


/// a command is posted for processing by the SocketManager </summary>


/// <param name="iValue"> The value provided by the thread posting the event </param>


static public void IOCPThreadFunction(int iValue)






{


try






{


Console.WriteLine("Value: {0}", iValue.ToString());


Thread.Sleep(3000);


}




catch (Exception pException)






{


Console.WriteLine(pException.Message);


}


}


}




}



类代码:


using System;


using System.Threading;


using System.Runtime.InteropServices;




namespace IOCPThreading






{


[StructLayout(LayoutKind.Sequential, CharSet=CharSet.Auto)]




public sealed class IOCPThreadPool






{


[DllImport("Kernel32", CharSet=CharSet.Auto)]


private unsafe static extern UInt32 CreateIoCompletionPort(UInt32 hFile, UInt32 hExistingCompletionPort, UInt32* puiCompletionKey, UInt32 uiNumberOfConcurrentThreads);




[DllImport("Kernel32", CharSet=CharSet.Auto)]


private unsafe static extern Boolean CloseHandle(UInt32 hObject);




[DllImport("Kernel32", CharSet=CharSet.Auto)]


private unsafe static extern Boolean PostQueuedCompletionStatus(UInt32 hCompletionPort, UInt32 uiSizeOfArgument, UInt32* puiUserArg, System.Threading.NativeOverlapped* pOverlapped);




[DllImport("Kernel32", CharSet=CharSet.Auto)]


private unsafe static extern Boolean GetQueuedCompletionStatus(UInt32 hCompletionPort, UInt32* pSizeOfArgument, UInt32* puiUserArg, System.Threading.NativeOverlapped** ppOverlapped, UInt32 uiMilliseconds);




private const UInt32 INVALID_HANDLE_VALUE = 0xffffffff;


private const UInt32 INIFINITE = 0xffffffff;


private const Int32 SHUTDOWN_IOCPTHREAD = 0x7fffffff;


public delegate void USER_FUNCTION(int iValue);


private UInt32 m_hHandle;




private UInt32 GetHandle

{ get

{ return m_hHandle; } set

{ m_hHandle = value; } }




private Int32 m_uiMaxConcurrency;






private Int32 GetMaxConcurrency

{ get

{ return m_uiMaxConcurrency; } set

{ m_uiMaxConcurrency = value; } }






private Int32 m_iMinThreadsInPool;






private Int32 GetMinThreadsInPool

{ get

{ return m_iMinThreadsInPool; } set

{ m_iMinThreadsInPool = value; } }




private Int32 m_iMaxThreadsInPool;






private Int32 GetMaxThreadsInPool

{ get

{ return m_iMaxThreadsInPool; } set

{ m_iMaxThreadsInPool = value; } }






private Object m_pCriticalSection;






private Object GetCriticalSection

{ get

{ return m_pCriticalSection; } set

{ m_pCriticalSection = value; } }






private USER_FUNCTION m_pfnUserFunction;






private USER_FUNCTION GetUserFunction

{ get

{ return m_pfnUserFunction; } set

{ m_pfnUserFunction = value; } }






private Boolean m_bDisposeFlag;






/**//// <summary> SimType: Flag to indicate if the class is disposing </summary>






private Boolean IsDisposed

{ get

{ return m_bDisposeFlag; } set

{ m_bDisposeFlag = value; } }




private Int32 m_iCurThreadsInPool;






/**//// <summary> SimType: The current number of threads in the thread pool </summary>






public Int32 GetCurThreadsInPool

{ get

{ return m_iCurThreadsInPool; } set

{ m_iCurThreadsInPool = value; } }






/**//// <summary> SimType: Increment current number of threads in the thread pool </summary>






private Int32 IncCurThreadsInPool()

{ return Interlocked.Increment(ref m_iCurThreadsInPool); }






/**//// <summary> SimType: Decrement current number of threads in the thread pool </summary>






private Int32 DecCurThreadsInPool()

{ return Interlocked.Decrement(ref m_iCurThreadsInPool); }






private Int32 m_iActThreadsInPool;






/**//// <summary> SimType: The current number of active threads in the thread pool </summary>






public Int32 GetActThreadsInPool

{ get

{ return m_iActThreadsInPool; } set

{ m_iActThreadsInPool = value; } }






/**//// <summary> SimType: Increment current number of active threads in the thread pool </summary>






private Int32 IncActThreadsInPool()

{ return Interlocked.Increment(ref m_iActThreadsInPool); }






/**//// <summary> SimType: Decrement current number of active threads in the thread pool </summary>






private Int32 DecActThreadsInPool()

{ return Interlocked.Decrement(ref m_iActThreadsInPool); }






private Int32 m_iCurWorkInPool;






/**//// <summary> SimType: The current number of Work posted in the thread pool </summary>






public Int32 GetCurWorkInPool

{ get

{ return m_iCurWorkInPool; } set

{ m_iCurWorkInPool = value; } }






/**//// <summary> SimType: Increment current number of Work posted in the thread pool </summary>






private Int32 IncCurWorkInPool()

{ return Interlocked.Increment(ref m_iCurWorkInPool); }






/**//// <summary> SimType: Decrement current number of Work posted in the thread pool </summary>






private Int32 DecCurWorkInPool()

{ return Interlocked.Decrement(ref m_iCurWorkInPool); }




public IOCPThreadPool(Int32 iMaxConcurrency, Int32 iMinThreadsInPool, Int32 iMaxThreadsInPool, USER_FUNCTION pfnUserFunction)






{


try






{


// Set initial class state




GetMaxConcurrency = iMaxConcurrency;




GetMinThreadsInPool = iMinThreadsInPool;




GetMaxThreadsInPool = iMaxThreadsInPool;




GetUserFunction = pfnUserFunction;






// Init the thread counters




GetCurThreadsInPool = 0;




GetActThreadsInPool = 0;




GetCurWorkInPool = 0;






// Initialize the Monitor Object




GetCriticalSection = new Object();






// Set the disposing flag to false




IsDisposed = false;






unsafe






{




// Create an IO Completion Port for Thread Pool use


GetHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, null, (UInt32) GetMaxConcurrency);




}






// Test to make sure the IO Completion Port was created




if (GetHandle == 0)




throw new Exception("Unable To Create IO Completion Port");






// Allocate and start the Minimum number of threads specified




Int32 iStartingCount = GetCurThreadsInPool;








ThreadStart tsThread = new ThreadStart(IOCPFunction);




for (Int32 iThread = 0; iThread < GetMinThreadsInPool; ++iThread)






{




// Create a thread and start it




Thread thThread = new Thread(tsThread);




thThread.Name = "IOCP " + thThread.GetHashCode();




thThread.Start();






// Increment the thread pool count




IncCurThreadsInPool();




}




}






catch






{




throw new Exception("Unhandled Exception");




}




}




~IOCPThreadPool()






{




if (!IsDisposed)




Dispose();




}




public void Dispose()






{




try






{




// Flag that we are disposing this object




IsDisposed = true;






// Get the current number of threads in the pool




Int32 iCurThreadsInPool = GetCurThreadsInPool;






// Shutdown all thread in the pool




for (Int32 iThread = 0; iThread < iCurThreadsInPool; ++iThread)






{


unsafe






{




bool bret = PostQueuedCompletionStatus(GetHandle, 4, (UInt32*) SHUTDOWN_IOCPTHREAD, null);




}




}






// Wait here until all the threads are gone




while (GetCurThreadsInPool != 0) Thread.Sleep(100);






unsafe






{




// Close the IOCP Handle


CloseHandle(GetHandle);




}




}




catch






{




}




}


private void IOCPFunction()






{


UInt32 uiNumberOfBytes;




Int32 iValue;




try






{


while (true)






{




unsafe






{




System.Threading.NativeOverlapped* pOv;






// Wait for an event




GetQueuedCompletionStatus(GetHandle, &uiNumberOfBytes, (UInt32*) &iValue, &pOv, INIFINITE);


}




// Decrement the number of events in queue




DecCurWorkInPool();






// Was this thread told to shutdown




if (iValue == SHUTDOWN_IOCPTHREAD)




break;






// Increment the number of active threads




IncActThreadsInPool();






try






{


// Call the user function


GetUserFunction(iValue);




}




catch(Exception ex)






{


throw ex;


}






// Get a lock




Monitor.Enter(GetCriticalSection);






try






{




// If we have less than max threads currently in the pool




if (GetCurThreadsInPool < GetMaxThreadsInPool)






{




// Should we add a new thread to the pool




if (GetActThreadsInPool == GetCurThreadsInPool)






{




if (IsDisposed == false)






{




// Create a thread and start it




ThreadStart tsThread = new ThreadStart(IOCPFunction);




Thread thThread = new Thread(tsThread);




thThread.Name = "IOCP " + thThread.GetHashCode();




thThread.Start();






// Increment the thread pool count




IncCurThreadsInPool();




}




}




}




}




catch






{




}






// Relase the lock




Monitor.Exit(GetCriticalSection);






// Increment the number of active threads




DecActThreadsInPool();




}




}






catch(Exception ex)






{


string str=ex.Message;




}






// Decrement the thread pool count




DecCurThreadsInPool();




}




//public void PostEvent(Int32 iValue


public void PostEvent(int iValue)






{




try






{




// Only add work if we are not disposing




if (IsDisposed == false)






{




unsafe






{




// Post an event into the IOCP Thread Pool




PostQueuedCompletionStatus(GetHandle, 4, (UInt32*) iValue, null);




}






// Increment the number of item of work




IncCurWorkInPool();






// Get a lock




Monitor.Enter(GetCriticalSection);






try






{




// If we have less than max threads currently in the pool




if (GetCurThreadsInPool < GetMaxThreadsInPool)






{




// Should we add a new thread to the pool




if (GetActThreadsInPool == GetCurThreadsInPool)






{




if (IsDisposed == false)






{




// Create a thread and start it




ThreadStart tsThread = new ThreadStart(IOCPFunction);




Thread thThread = new Thread(tsThread);




thThread.Name = "IOCP " + thThread.GetHashCode();




thThread.Start();






// Increment the thread pool count




IncCurThreadsInPool();




}




}




}




}






catch






{




}






// Release the lock




Monitor.Exit(GetCriticalSection);




}




}






catch (Exception e)






{




throw e;




}






catch






{




throw new Exception("Unhandled Exception");




}




}




public void PostEvent()






{




try






{




// Only add work if we are not disposing




if (IsDisposed == false)






{




unsafe






{




// Post an event into the IOCP Thread Pool




PostQueuedCompletionStatus(GetHandle, 0, null, null);




}






// Increment the number of item of work




IncCurWorkInPool();






// Get a lock




Monitor.Enter(GetCriticalSection);






try








{




// If we have less than max threads currently in the pool




if (GetCurThreadsInPool < GetMaxThreadsInPool)








{




// Should we add a new thread to the pool




if (GetActThreadsInPool == GetCurThreadsInPool)








{




if (IsDisposed == false)








{




// Create a thread and start it




ThreadStart tsThread = new ThreadStart(IOCPFunction);




Thread thThread = new Thread(tsThread);




thThread.Name = "IOCP " + thThread.GetHashCode();




thThread.Start();






// Increment the thread pool count




IncCurThreadsInPool();




}




}




}




}






catch








{




}






// Release the lock




Monitor.Exit(GetCriticalSection);




}




}




catch








{




throw new Exception("Unhandled Exception");




}




}




}




}

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: