线程池学习之ThreadPool Class
2011-08-12 09:52
309 查看
先存着,还没仔细看。哈哈
// ==++==
//
//
// Copyright (c) 2006 Microsoft Corporation. All rights reserved.
//
// The use and distribution terms for this software are contained in the file
// named license.txt, which can be found in the root of this distribution.
// By using this software in any fashion, you are agreeing to be bound by the
// terms of this license.
//
// You must not remove this notice, or any other, from this software.
//
//
// ==--==
/*=============================================================================
**
** Class: ThreadPool
**
**
** Purpose: Class for creating and managing a threadpool
**
**
=============================================================================*/
/*
* Below you'll notice two sets of APIs that are separated by the
* use of 'Unsafe' in their names. The unsafe versions are called
* that because they do not propagate the calling stack onto the
* worker thread. This allows code to lose the calling stack and
* thereby elevate its security privileges. Note that this operation
* is much akin to the combined ability to control security policy
* and control security evidence. With these privileges, a person
* can gain the right to load assemblies that are fully trusted which
* then assert full trust and can call any code they want regardless
* of the previous stack information.
*/
namespace System.Threading {
using System.Security;
using System.Threading;
using System.Runtime.Remoting;
using System.Security.Permissions;
using System;
using Microsoft.Win32;
using System.Runtime.CompilerServices;
using System.Runtime.ConstrainedExecution;
using System.Runtime.InteropServices;
internal sealed class RegisteredWaitHandleSafe : CriticalFinalizerObject
{
private static readonly IntPtr InvalidHandle = Win32Native.INVALID_HANDLE_VALUE;
private IntPtr registeredWaitHandle;
private WaitHandle m_internalWaitObject;
private bool bReleaseNeeded = false;
private int m_lock = 0;
internal RegisteredWaitHandleSafe()
{
registeredWaitHandle = InvalidHandle;
}
internal IntPtr GetHandle()
{
return registeredWaitHandle;
}
internal void SetHandle(IntPtr handle)
{
registeredWaitHandle = handle;
}
[ReliabilityContract(Consistency.WillNotCorruptState, Cer.MayFail)]
internal void SetWaitObject(WaitHandle waitObject)
{
// needed for DangerousAddRef
RuntimeHelpers.PrepareConstrainedRegions();
try
{
}
finally
{
m_internalWaitObject = waitObject;
if (waitObject != null)
{
m_internalWaitObject.SafeWaitHandle.DangerousAddRef(ref bReleaseNeeded);
}
}
}
[ReliabilityContract(Consistency.WillNotCorruptState, Cer.MayFail)]
internal bool Unregister(
WaitHandle waitObject // object to be notified when all callbacks to delegates have completed
)
{
bool result = false;
// needed for DangerousRelease
RuntimeHelpers.PrepareConstrainedRegions();
try
{
}
finally
{
// lock(this) cannot be used reliably in Cer since thin lock could be
// promoted to syncblock and that is not a guaranteed operation
bool bLockTaken = false;
do
{
if (Interlocked.CompareExchange(ref m_lock, 1, 0) == 0)
{
bLockTaken = true;
try
{
if (ValidHandle())
{
result = UnregisterWaitNative(GetHandle(), waitObject == null ? null : waitObject.SafeWaitHandle);
if (result == true)
{
if (bReleaseNeeded)
{
m_internalWaitObject.SafeWaitHandle.DangerousRelease();
bReleaseNeeded = false;
}
// if result not true don't release/suppress here so finalizer can make another attempt
SetHandle(InvalidHandle);
m_internalWaitObject = null;
GC.SuppressFinalize(this);
}
}
}
finally
{
m_lock = 0;
}
}
Thread.SpinWait(1); // yield to processor
}
while (!bLockTaken);
}
return result;
}
private bool ValidHandle()
{
return (registeredWaitHandle != InvalidHandle && registeredWaitHandle != IntPtr.Zero);
}
~RegisteredWaitHandleSafe()
{
// if the app has already unregistered the wait, there is nothing to cleanup
// we can detect this by checking the handle. Normally, there is no race here
// so no need to protect reading of handle. However, if this object gets
// resurrected and then someone does an unregister, it would introduce a race
// PrepareConstrainedRegions call not needed since finalizer already in Cer
// lock(this) cannot be used reliably even in Cer since thin lock could be
// promoted to syncblock and that is not a guaranteed operation
bool bLockTaken = false;
do
{
if (Interlocked.CompareExchange(ref m_lock, 1, 0) == 0)
{
bLockTaken = true;
try
{
if (ValidHandle())
{
WaitHandleCleanupNative(registeredWaitHandle);
if (bReleaseNeeded)
{
m_internalWaitObject.SafeWaitHandle.DangerousRelease();
bReleaseNeeded = false;
}
SetHandle(InvalidHandle);
m_internalWaitObject = null;
}
}
finally
{
m_lock = 0;
}
}
Thread.SpinWait(1); // yield to processor
}
while (!bLockTaken);
}
[MethodImplAttribute(MethodImplOptions.InternalCall)]
private static extern void WaitHandleCleanupNative(IntPtr handle);
[MethodImplAttribute(MethodImplOptions.InternalCall)]
private static extern bool UnregisterWaitNative(IntPtr handle, SafeHandle waitObject);
}
[System.Runtime.InteropServices.ComVisible(true)]
public sealed class RegisteredWaitHandle : MarshalByRefObject
{
private RegisteredWaitHandleSafe internalRegisteredWait;
internal RegisteredWaitHandle()
{
internalRegisteredWait = new RegisteredWaitHandleSafe();
}
internal void SetHandle(IntPtr handle)
{
internalRegisteredWait.SetHandle(handle);
}
internal void SetWaitObject(WaitHandle waitObject)
{
internalRegisteredWait.SetWaitObject(waitObject);
}
[System.Runtime.InteropServices.ComVisible(true)]
// This is the only public method on this class
public bool Unregister(
WaitHandle waitObject // object to be notified when all callbacks to delegates have completed
)
{
return internalRegisteredWait.Unregister(waitObject);
}
}
[System.Runtime.InteropServices.ComVisible(true)]
public delegate void WaitCallback(Object state);
[System.Runtime.InteropServices.ComVisible(true)]
public delegate void WaitOrTimerCallback(Object state, bool timedOut); // signalled or timed out
internal class _ThreadPoolWaitCallback
{
WaitCallback _waitCallback;
ExecutionContext _executionContext;
Object _state;
static internal ContextCallback _ccb = new ContextCallback(WaitCallback_Context);
static internal void WaitCallback_Context(Object state)
{
_ThreadPoolWaitCallback obj = (_ThreadPoolWaitCallback)state;
obj._waitCallback(obj._state);
}
internal _ThreadPoolWaitCallback(WaitCallback waitCallback, Object state, bool compressStack, ref StackCrawlMark stackMark)
{
_waitCallback = waitCallback;
_state = state;
if (compressStack && !ExecutionContext.IsFlowSuppressed())
{
// clone the exection context
_executionContext = ExecutionContext.Capture(ref stackMark);
ExecutionContext.ClearSyncContext(_executionContext);
}
}
// call back helper
static internal void PerformWaitCallback(Object state)
{
_ThreadPoolWaitCallback helper = (_ThreadPoolWaitCallback)state;
BCLDebug.Assert(helper != null, "Null state passed to PerformWaitCallback!");
// call directly if it is an unsafe call OR EC flow is suppressed
if (helper._executionContext == null)
{
WaitCallback callback = helper._waitCallback;
callback(helper._state);
}
else
{
ExecutionContext.Run(helper._executionContext, _ccb, helper);
}
}
};
internal class _ThreadPoolWaitOrTimerCallback
{
WaitOrTimerCallback _waitOrTimerCallback;
ExecutionContext _executionContext;
Object _state;
static private ContextCallback _ccbt = new ContextCallback(WaitOrTimerCallback_Context_t);
static private ContextCallback _ccbf = new ContextCallback(WaitOrTimerCallback_Context_f);
internal _ThreadPoolWaitOrTimerCallback(WaitOrTimerCallback waitOrTimerCallback, Object state, bool compressStack, ref StackCrawlMark stackMark)
{
_waitOrTimerCallback = waitOrTimerCallback;
_state = state;
if (compressStack && !ExecutionContext.IsFlowSuppressed())
{
// capture the exection context
_executionContext = ExecutionContext.Capture(ref stackMark);
ExecutionContext.ClearSyncContext(_executionContext);
}
}
static private void WaitOrTimerCallback_Context_t(Object state)
{
WaitOrTimerCallback_Context(state, true);
}
static private void WaitOrTimerCallback_Context_f(Object state)
{
WaitOrTimerCallback_Context(state, false);
}
static private void WaitOrTimerCallback_Context(Object state, bool timedOut)
{
_ThreadPoolWaitOrTimerCallback helper = (_ThreadPoolWaitOrTimerCallback)state;
helper._waitOrTimerCallback(helper._state, timedOut);
}
// call back helper
static internal void PerformWaitOrTimerCallback(Object state, bool timedOut)
{
_ThreadPoolWaitOrTimerCallback helper = (_ThreadPoolWaitOrTimerCallback)state;
BCLDebug.Assert(helper != null, "Null state passed to PerformWaitOrTimerCallback!");
// call directly if it is an unsafe call OR EC flow is suppressed
if (helper._executionContext == null)
{
WaitOrTimerCallback callback = helper._waitOrTimerCallback;
callback(helper._state, timedOut);
}
else
{
if (timedOut)
ExecutionContext.Run(helper._executionContext.CreateCopy(), _ccbt, helper);
else
ExecutionContext.Run(helper._executionContext.CreateCopy(), _ccbf, helper);
}
}
};
[CLSCompliant(false)]
[System.Runtime.InteropServices.ComVisible(true)]
unsafe public delegate void IOCompletionCallback(uint errorCode, // Error code
uint numBytes, // No. of bytes transferred
NativeOverlapped* pOVERLAP // ptr to OVERLAP structure
);
[HostProtection(Synchronization=true, ExternalThreading=true)]
public static class ThreadPool
{
[SecurityPermissionAttribute(SecurityAction.Demand, ControlThread = true)]
public static bool SetMaxThreads(int workerThreads, int completionPortThreads)
{
return SetMaxThreadsNative(workerThreads, completionPortThreads);
}
public static void GetMaxThreads(out int workerThreads, out int completionPortThreads)
{
GetMaxThreadsNative(out workerThreads, out completionPortThreads);
}
[SecurityPermissionAttribute(SecurityAction.Demand, ControlThread = true)]
public static bool SetMinThreads(int workerThreads, int completionPortThreads)
{
return SetMinThreadsNative(workerThreads, completionPortThreads);
}
public static void GetMinThreads(out int workerThreads, out int completionPortThreads)
{
GetMinThreadsNative(out workerThreads, out completionPortThreads);
}
public static void GetAvailableThreads(out int workerThreads, out int completionPortThreads)
{
GetAvailableThreadsNative(out workerThreads, out completionPortThreads);
}
[CLSCompliant(false)]
public static RegisteredWaitHandle RegisterWaitForSingleObject( // throws RegisterWaitException
WaitHandle waitObject,
WaitOrTimerCallback callBack,
Object state,
uint millisecondsTimeOutInterval,
bool executeOnlyOnce // NOTE: we do not allow other options that allow the callback to be queued as an APC
)
{
StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
return RegisterWaitForSingleObject(waitObject,callBack,state,millisecondsTimeOutInterval,executeOnlyOnce,ref stackMark,true);
}
[CLSCompliant(false),
SecurityPermissionAttribute( SecurityAction.LinkDemand, Flags = SecurityPermissionFlag.ControlEvidence | SecurityPermissionFlag.ControlPolicy)]
public static RegisteredWaitHandle UnsafeRegisterWaitForSingleObject( // throws RegisterWaitException
WaitHandle waitObject,
WaitOrTimerCallback callBack,
Object state,
uint millisecondsTimeOutInterval,
bool executeOnlyOnce // NOTE: we do not allow other options that allow the callback to be queued as an APC
)
{
StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
return RegisterWaitForSingleObject(waitObject,callBack,state,millisecondsTimeOutInterval,executeOnlyOnce,ref stackMark,false);
}
private static RegisteredWaitHandle RegisterWaitForSingleObject( // throws RegisterWaitException
WaitHandle waitObject,
WaitOrTimerCallback callBack,
Object state,
uint millisecondsTimeOutInterval,
bool executeOnlyOnce, // NOTE: we do not allow other options that allow the callback to be queued as an APC
ref StackCrawlMark stackMark,
bool compressStack
)
{
if (RemotingServices.IsTransparentProxy(waitObject))
throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_WaitOnTransparentProxy"));
RegisteredWaitHandle registeredWaitHandle = new RegisteredWaitHandle();
if (callBack != null)
{
_ThreadPoolWaitOrTimerCallback callBackHelper = new _ThreadPoolWaitOrTimerCallback(callBack, state, compressStack, ref stackMark);
state = (Object)callBackHelper;
// call SetWaitObject before native call so that waitObject won't be closed before threadpoolmgr registration
// this could occur if callback were to fire before SetWaitObject does its addref
registeredWaitHandle.SetWaitObject(waitObject);
IntPtr nativeRegisteredWaitHandle = RegisterWaitForSingleObjectNative(waitObject,
state,
millisecondsTimeOutInterval,
executeOnlyOnce,
registeredWaitHandle,
ref stackMark,
compressStack);
registeredWaitHandle.SetHandle(nativeRegisteredWaitHandle);
}
else
{
throw new ArgumentNullException("WaitOrTimerCallback");
}
return registeredWaitHandle;
}
public static RegisteredWaitHandle RegisterWaitForSingleObject( // throws RegisterWaitException
WaitHandle waitObject,
WaitOrTimerCallback callBack,
Object state,
int millisecondsTimeOutInterval,
bool executeOnlyOnce // NOTE: we do not allow other options that allow the callback to be queued as an APC
)
{
if (millisecondsTimeOutInterval < -1)
throw new ArgumentOutOfRangeException("millisecondsTimeOutInterval", Environment.GetResourceString("ArgumentOutOfRange_NeedNonNegOrNegative1"));
StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
return RegisterWaitForSingleObject(waitObject,callBack,state,(UInt32)millisecondsTimeOutInterval,executeOnlyOnce,ref stackMark,true);
}
[SecurityPermissionAttribute( SecurityAction.LinkDemand, Flags = SecurityPermissionFlag.ControlEvidence | SecurityPermissionFlag.ControlPolicy)]
public static RegisteredWaitHandle UnsafeRegisterWaitForSingleObject( // throws RegisterWaitException
WaitHandle waitObject,
WaitOrTimerCallback callBack,
Object state,
int millisecondsTimeOutInterval,
bool executeOnlyOnce // NOTE: we do not allow other options that allow the callback to be queued as an APC
)
{
if (millisecondsTimeOutInterval < -1)
throw new ArgumentOutOfRangeException("millisecondsTimeOutInterval", Environment.GetResourceString("ArgumentOutOfRange_NeedNonNegOrNegative1"));
StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
return RegisterWaitForSingleObject(waitObject,callBack,state,(UInt32)millisecondsTimeOutInterval,executeOnlyOnce,ref stackMark,false);
}
public static RegisteredWaitHandle RegisterWaitForSingleObject( // throws RegisterWaitException
WaitHandle waitObject,
WaitOrTimerCallback callBack,
Object state,
long millisecondsTimeOutInterval,
bool executeOnlyOnce // NOTE: we do not allow other options that allow the callback to be queued as an APC
)
{
if (millisecondsTimeOutInterval < -1)
throw new ArgumentOutOfRangeException("millisecondsTimeOutInterval", Environment.GetResourceString("ArgumentOutOfRange_NeedNonNegOrNegative1"));
StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
return RegisterWaitForSingleObject(waitObject,callBack,state,(UInt32)millisecondsTimeOutInterval,executeOnlyOnce,ref stackMark,true);
}
[SecurityPermissionAttribute( SecurityAction.LinkDemand, Flags = SecurityPermissionFlag.ControlEvidence | SecurityPermissionFlag.ControlPolicy)]
public static RegisteredWaitHandle UnsafeRegisterWaitForSingleObject( // throws RegisterWaitException
WaitHandle waitObject,
WaitOrTimerCallback callBack,
Object state,
long millisecondsTimeOutInterval,
bool executeOnlyOnce // NOTE: we do not allow other options that allow the callback to be queued as an APC
)
{
if (millisecondsTimeOutInterval < -1)
throw new ArgumentOutOfRangeException("millisecondsTimeOutInterval", Environment.GetResourceString("ArgumentOutOfRange_NeedNonNegOrNegative1"));
StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
return RegisterWaitForSingleObject(waitObject,callBack,state,(UInt32)millisecondsTimeOutInterval,executeOnlyOnce,ref stackMark,false);
}
public static RegisteredWaitHandle RegisterWaitForSingleObject(
WaitHandle waitObject,
WaitOrTimerCallback callBack,
Object state,
TimeSpan timeout,
bool executeOnlyOnce
)
{
long tm = (long)timeout.TotalMilliseconds;
if (tm < -1)
throw new ArgumentOutOfRangeException("timeout", Environment.GetResourceString("ArgumentOutOfRange_NeedNonNegOrNegative1"));
if (tm > (long) Int32.MaxValue)
throw new ArgumentOutOfRangeException("timeout", Environment.GetResourceString("ArgumentOutOfRange_LessEqualToIntegerMaxVal"));
StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
return RegisterWaitForSingleObject(waitObject,callBack,state,(UInt32)tm,executeOnlyOnce,ref stackMark,true);
}
[SecurityPermissionAttribute( SecurityAction.LinkDemand, Flags = SecurityPermissionFlag.ControlEvidence | SecurityPermissionFlag.ControlPolicy)]
public static RegisteredWaitHandle UnsafeRegisterWaitForSingleObject(
WaitHandle waitObject,
WaitOrTimerCallback callBack,
Object state,
TimeSpan timeout,
bool executeOnlyOnce
)
{
long tm = (long)timeout.TotalMilliseconds;
if (tm < -1)
throw new ArgumentOutOfRangeException("timeout", Environment.GetResourceString("ArgumentOutOfRange_NeedNonNegOrNegative1"));
if (tm > (long) Int32.MaxValue)
throw new ArgumentOutOfRangeException("timeout", Environment.GetResourceString("ArgumentOutOfRange_LessEqualToIntegerMaxVal"));
StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
return RegisterWaitForSingleObject(waitObject,callBack,state,(UInt32)tm,executeOnlyOnce,ref stackMark,false);
}
public static bool QueueUserWorkItem(
WaitCallback callBack, // NOTE: we do not expose options that allow the callback to be queued as an APC
Object state
)
{
StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
return QueueUserWorkItemHelper(callBack,state,ref stackMark,true);
}
public static bool QueueUserWorkItem(
WaitCallback callBack // NOTE: we do not expose options that allow the callback to be queued as an APC
)
{
StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
return QueueUserWorkItemHelper(callBack,null,ref stackMark,true);
}
[SecurityPermissionAttribute( SecurityAction.LinkDemand, Flags = SecurityPermissionFlag.ControlEvidence | SecurityPermissionFlag.ControlPolicy)]
public static bool UnsafeQueueUserWorkItem(
WaitCallback callBack, // NOTE: we do not expose options that allow the callback to be queued as an APC
Object state
)
{
StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
return QueueUserWorkItemHelper(callBack,state,ref stackMark,false);
}
private static bool QueueUserWorkItemHelper(WaitCallback callBack, Object state, ref StackCrawlMark stackMark, bool compressStack )
{
if (callBack != null)
{
_ThreadPoolWaitCallback callBackHelper = new _ThreadPoolWaitCallback(callBack, state, compressStack, ref stackMark);
state = (Object)callBackHelper;
return QueueUserWorkItem(state, ref stackMark, compressStack);
}
else
{
throw new ArgumentNullException("WaitCallback");
}
}
[MethodImplAttribute(MethodImplOptions.InternalCall)]
private static extern bool QueueUserWorkItem(Object state, ref StackCrawlMark stackMark, bool compressStack );
[MethodImplAttribute(MethodImplOptions.InternalCall)]
unsafe private static extern bool PostQueuedCompletionStatus(NativeOverlapped* overlapped);
[CLSCompliant(false)]
[SecurityPermissionAttribute( SecurityAction.LinkDemand, Flags = SecurityPermissionFlag.ControlEvidence | SecurityPermissionFlag.ControlPolicy)]
unsafe public static bool UnsafeQueueNativeOverlapped(NativeOverlapped* overlapped)
{
return PostQueuedCompletionStatus(overlapped);
}
// Native methods:
[MethodImplAttribute(MethodImplOptions.InternalCall)]
private static extern bool SetMinThreadsNative(int workerThreads, int completionPortThreads);
[MethodImplAttribute(MethodImplOptions.InternalCall)]
private static extern bool SetMaxThreadsNative(int workerThreads, int completionPortThreads);
[MethodImplAttribute(MethodImplOptions.InternalCall)]
private static extern void GetMinThreadsNative(out int workerThreads, out int completionPortThreads);
[MethodImplAttribute(MethodImplOptions.InternalCall)]
private static extern void GetMaxThreadsNative(out int workerThreads, out int completionPortThreads);
[MethodImplAttribute(MethodImplOptions.InternalCall)]
private static extern void GetAvailableThreadsNative(out int workerThreads, out int completionPortThreads);
[MethodImplAttribute(MethodImplOptions.InternalCall)]
private static extern IntPtr RegisterWaitForSingleObjectNative(
WaitHandle waitHandle,
Object state,
uint timeOutInterval,
bool executeOnlyOnce,
RegisteredWaitHandle registeredWaitHandle,
ref StackCrawlMark stackMark,
bool compressStack
);
[Obsolete("ThreadPool.BindHandle(IntPtr) has been deprecated. Please use ThreadPool.BindHandle(SafeHandle) instead.", false)]
[SecurityPermissionAttribute( SecurityAction.Demand, Flags = SecurityPermissionFlag.UnmanagedCode)]
public static bool BindHandle(
IntPtr osHandle
)
{
return BindIOCompletionCallbackNative(osHandle);
}
[SecurityPermissionAttribute( SecurityAction.Demand, Flags = SecurityPermissionFlag.UnmanagedCode)]
public static bool BindHandle(SafeHandle osHandle)
{
if (osHandle == null)
throw new ArgumentNullException("osHandle");
bool ret = false;
bool mustReleaseSafeHandle = false;
RuntimeHelpers.PrepareConstrainedRegions();
try {
osHandle.DangerousAddRef(ref mustReleaseSafeHandle);
ret = BindIOCompletionCallbackNative(osHandle.DangerousGetHandle());
}
finally {
if (mustReleaseSafeHandle)
osHandle.DangerousRelease();
}
return ret;
}
[MethodImplAttribute(MethodImplOptions.InternalCall)]
[ReliabilityContract(Consistency.WillNotCorruptState, Cer.MayFail)]
private static extern bool BindIOCompletionCallbackNative(IntPtr fileHandle);
}
}
// ==++==
//
//
// Copyright (c) 2006 Microsoft Corporation. All rights reserved.
//
// The use and distribution terms for this software are contained in the file
// named license.txt, which can be found in the root of this distribution.
// By using this software in any fashion, you are agreeing to be bound by the
// terms of this license.
//
// You must not remove this notice, or any other, from this software.
//
//
// ==--==
/*=============================================================================
**
** Class: ThreadPool
**
**
** Purpose: Class for creating and managing a threadpool
**
**
=============================================================================*/
/*
* Below you'll notice two sets of APIs that are separated by the
* use of 'Unsafe' in their names. The unsafe versions are called
* that because they do not propagate the calling stack onto the
* worker thread. This allows code to lose the calling stack and
* thereby elevate its security privileges. Note that this operation
* is much akin to the combined ability to control security policy
* and control security evidence. With these privileges, a person
* can gain the right to load assemblies that are fully trusted which
* then assert full trust and can call any code they want regardless
* of the previous stack information.
*/
namespace System.Threading {
using System.Security;
using System.Threading;
using System.Runtime.Remoting;
using System.Security.Permissions;
using System;
using Microsoft.Win32;
using System.Runtime.CompilerServices;
using System.Runtime.ConstrainedExecution;
using System.Runtime.InteropServices;
internal sealed class RegisteredWaitHandleSafe : CriticalFinalizerObject
{
private static readonly IntPtr InvalidHandle = Win32Native.INVALID_HANDLE_VALUE;
private IntPtr registeredWaitHandle;
private WaitHandle m_internalWaitObject;
private bool bReleaseNeeded = false;
private int m_lock = 0;
internal RegisteredWaitHandleSafe()
{
registeredWaitHandle = InvalidHandle;
}
internal IntPtr GetHandle()
{
return registeredWaitHandle;
}
internal void SetHandle(IntPtr handle)
{
registeredWaitHandle = handle;
}
[ReliabilityContract(Consistency.WillNotCorruptState, Cer.MayFail)]
internal void SetWaitObject(WaitHandle waitObject)
{
// needed for DangerousAddRef
RuntimeHelpers.PrepareConstrainedRegions();
try
{
}
finally
{
m_internalWaitObject = waitObject;
if (waitObject != null)
{
m_internalWaitObject.SafeWaitHandle.DangerousAddRef(ref bReleaseNeeded);
}
}
}
[ReliabilityContract(Consistency.WillNotCorruptState, Cer.MayFail)]
internal bool Unregister(
WaitHandle waitObject // object to be notified when all callbacks to delegates have completed
)
{
bool result = false;
// needed for DangerousRelease
RuntimeHelpers.PrepareConstrainedRegions();
try
{
}
finally
{
// lock(this) cannot be used reliably in Cer since thin lock could be
// promoted to syncblock and that is not a guaranteed operation
bool bLockTaken = false;
do
{
if (Interlocked.CompareExchange(ref m_lock, 1, 0) == 0)
{
bLockTaken = true;
try
{
if (ValidHandle())
{
result = UnregisterWaitNative(GetHandle(), waitObject == null ? null : waitObject.SafeWaitHandle);
if (result == true)
{
if (bReleaseNeeded)
{
m_internalWaitObject.SafeWaitHandle.DangerousRelease();
bReleaseNeeded = false;
}
// if result not true don't release/suppress here so finalizer can make another attempt
SetHandle(InvalidHandle);
m_internalWaitObject = null;
GC.SuppressFinalize(this);
}
}
}
finally
{
m_lock = 0;
}
}
Thread.SpinWait(1); // yield to processor
}
while (!bLockTaken);
}
return result;
}
private bool ValidHandle()
{
return (registeredWaitHandle != InvalidHandle && registeredWaitHandle != IntPtr.Zero);
}
~RegisteredWaitHandleSafe()
{
// if the app has already unregistered the wait, there is nothing to cleanup
// we can detect this by checking the handle. Normally, there is no race here
// so no need to protect reading of handle. However, if this object gets
// resurrected and then someone does an unregister, it would introduce a race
// PrepareConstrainedRegions call not needed since finalizer already in Cer
// lock(this) cannot be used reliably even in Cer since thin lock could be
// promoted to syncblock and that is not a guaranteed operation
bool bLockTaken = false;
do
{
if (Interlocked.CompareExchange(ref m_lock, 1, 0) == 0)
{
bLockTaken = true;
try
{
if (ValidHandle())
{
WaitHandleCleanupNative(registeredWaitHandle);
if (bReleaseNeeded)
{
m_internalWaitObject.SafeWaitHandle.DangerousRelease();
bReleaseNeeded = false;
}
SetHandle(InvalidHandle);
m_internalWaitObject = null;
}
}
finally
{
m_lock = 0;
}
}
Thread.SpinWait(1); // yield to processor
}
while (!bLockTaken);
}
[MethodImplAttribute(MethodImplOptions.InternalCall)]
private static extern void WaitHandleCleanupNative(IntPtr handle);
[MethodImplAttribute(MethodImplOptions.InternalCall)]
private static extern bool UnregisterWaitNative(IntPtr handle, SafeHandle waitObject);
}
[System.Runtime.InteropServices.ComVisible(true)]
public sealed class RegisteredWaitHandle : MarshalByRefObject
{
private RegisteredWaitHandleSafe internalRegisteredWait;
internal RegisteredWaitHandle()
{
internalRegisteredWait = new RegisteredWaitHandleSafe();
}
internal void SetHandle(IntPtr handle)
{
internalRegisteredWait.SetHandle(handle);
}
internal void SetWaitObject(WaitHandle waitObject)
{
internalRegisteredWait.SetWaitObject(waitObject);
}
[System.Runtime.InteropServices.ComVisible(true)]
// This is the only public method on this class
public bool Unregister(
WaitHandle waitObject // object to be notified when all callbacks to delegates have completed
)
{
return internalRegisteredWait.Unregister(waitObject);
}
}
[System.Runtime.InteropServices.ComVisible(true)]
public delegate void WaitCallback(Object state);
[System.Runtime.InteropServices.ComVisible(true)]
public delegate void WaitOrTimerCallback(Object state, bool timedOut); // signalled or timed out
internal class _ThreadPoolWaitCallback
{
WaitCallback _waitCallback;
ExecutionContext _executionContext;
Object _state;
static internal ContextCallback _ccb = new ContextCallback(WaitCallback_Context);
static internal void WaitCallback_Context(Object state)
{
_ThreadPoolWaitCallback obj = (_ThreadPoolWaitCallback)state;
obj._waitCallback(obj._state);
}
internal _ThreadPoolWaitCallback(WaitCallback waitCallback, Object state, bool compressStack, ref StackCrawlMark stackMark)
{
_waitCallback = waitCallback;
_state = state;
if (compressStack && !ExecutionContext.IsFlowSuppressed())
{
// clone the exection context
_executionContext = ExecutionContext.Capture(ref stackMark);
ExecutionContext.ClearSyncContext(_executionContext);
}
}
// call back helper
static internal void PerformWaitCallback(Object state)
{
_ThreadPoolWaitCallback helper = (_ThreadPoolWaitCallback)state;
BCLDebug.Assert(helper != null, "Null state passed to PerformWaitCallback!");
// call directly if it is an unsafe call OR EC flow is suppressed
if (helper._executionContext == null)
{
WaitCallback callback = helper._waitCallback;
callback(helper._state);
}
else
{
ExecutionContext.Run(helper._executionContext, _ccb, helper);
}
}
};
internal class _ThreadPoolWaitOrTimerCallback
{
WaitOrTimerCallback _waitOrTimerCallback;
ExecutionContext _executionContext;
Object _state;
static private ContextCallback _ccbt = new ContextCallback(WaitOrTimerCallback_Context_t);
static private ContextCallback _ccbf = new ContextCallback(WaitOrTimerCallback_Context_f);
internal _ThreadPoolWaitOrTimerCallback(WaitOrTimerCallback waitOrTimerCallback, Object state, bool compressStack, ref StackCrawlMark stackMark)
{
_waitOrTimerCallback = waitOrTimerCallback;
_state = state;
if (compressStack && !ExecutionContext.IsFlowSuppressed())
{
// capture the exection context
_executionContext = ExecutionContext.Capture(ref stackMark);
ExecutionContext.ClearSyncContext(_executionContext);
}
}
static private void WaitOrTimerCallback_Context_t(Object state)
{
WaitOrTimerCallback_Context(state, true);
}
static private void WaitOrTimerCallback_Context_f(Object state)
{
WaitOrTimerCallback_Context(state, false);
}
static private void WaitOrTimerCallback_Context(Object state, bool timedOut)
{
_ThreadPoolWaitOrTimerCallback helper = (_ThreadPoolWaitOrTimerCallback)state;
helper._waitOrTimerCallback(helper._state, timedOut);
}
// call back helper
static internal void PerformWaitOrTimerCallback(Object state, bool timedOut)
{
_ThreadPoolWaitOrTimerCallback helper = (_ThreadPoolWaitOrTimerCallback)state;
BCLDebug.Assert(helper != null, "Null state passed to PerformWaitOrTimerCallback!");
// call directly if it is an unsafe call OR EC flow is suppressed
if (helper._executionContext == null)
{
WaitOrTimerCallback callback = helper._waitOrTimerCallback;
callback(helper._state, timedOut);
}
else
{
if (timedOut)
ExecutionContext.Run(helper._executionContext.CreateCopy(), _ccbt, helper);
else
ExecutionContext.Run(helper._executionContext.CreateCopy(), _ccbf, helper);
}
}
};
[CLSCompliant(false)]
[System.Runtime.InteropServices.ComVisible(true)]
unsafe public delegate void IOCompletionCallback(uint errorCode, // Error code
uint numBytes, // No. of bytes transferred
NativeOverlapped* pOVERLAP // ptr to OVERLAP structure
);
[HostProtection(Synchronization=true, ExternalThreading=true)]
public static class ThreadPool
{
[SecurityPermissionAttribute(SecurityAction.Demand, ControlThread = true)]
public static bool SetMaxThreads(int workerThreads, int completionPortThreads)
{
return SetMaxThreadsNative(workerThreads, completionPortThreads);
}
public static void GetMaxThreads(out int workerThreads, out int completionPortThreads)
{
GetMaxThreadsNative(out workerThreads, out completionPortThreads);
}
[SecurityPermissionAttribute(SecurityAction.Demand, ControlThread = true)]
public static bool SetMinThreads(int workerThreads, int completionPortThreads)
{
return SetMinThreadsNative(workerThreads, completionPortThreads);
}
public static void GetMinThreads(out int workerThreads, out int completionPortThreads)
{
GetMinThreadsNative(out workerThreads, out completionPortThreads);
}
public static void GetAvailableThreads(out int workerThreads, out int completionPortThreads)
{
GetAvailableThreadsNative(out workerThreads, out completionPortThreads);
}
[CLSCompliant(false)]
public static RegisteredWaitHandle RegisterWaitForSingleObject( // throws RegisterWaitException
WaitHandle waitObject,
WaitOrTimerCallback callBack,
Object state,
uint millisecondsTimeOutInterval,
bool executeOnlyOnce // NOTE: we do not allow other options that allow the callback to be queued as an APC
)
{
StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
return RegisterWaitForSingleObject(waitObject,callBack,state,millisecondsTimeOutInterval,executeOnlyOnce,ref stackMark,true);
}
[CLSCompliant(false),
SecurityPermissionAttribute( SecurityAction.LinkDemand, Flags = SecurityPermissionFlag.ControlEvidence | SecurityPermissionFlag.ControlPolicy)]
public static RegisteredWaitHandle UnsafeRegisterWaitForSingleObject( // throws RegisterWaitException
WaitHandle waitObject,
WaitOrTimerCallback callBack,
Object state,
uint millisecondsTimeOutInterval,
bool executeOnlyOnce // NOTE: we do not allow other options that allow the callback to be queued as an APC
)
{
StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
return RegisterWaitForSingleObject(waitObject,callBack,state,millisecondsTimeOutInterval,executeOnlyOnce,ref stackMark,false);
}
private static RegisteredWaitHandle RegisterWaitForSingleObject( // throws RegisterWaitException
WaitHandle waitObject,
WaitOrTimerCallback callBack,
Object state,
uint millisecondsTimeOutInterval,
bool executeOnlyOnce, // NOTE: we do not allow other options that allow the callback to be queued as an APC
ref StackCrawlMark stackMark,
bool compressStack
)
{
if (RemotingServices.IsTransparentProxy(waitObject))
throw new InvalidOperationException(Environment.GetResourceString("InvalidOperation_WaitOnTransparentProxy"));
RegisteredWaitHandle registeredWaitHandle = new RegisteredWaitHandle();
if (callBack != null)
{
_ThreadPoolWaitOrTimerCallback callBackHelper = new _ThreadPoolWaitOrTimerCallback(callBack, state, compressStack, ref stackMark);
state = (Object)callBackHelper;
// call SetWaitObject before native call so that waitObject won't be closed before threadpoolmgr registration
// this could occur if callback were to fire before SetWaitObject does its addref
registeredWaitHandle.SetWaitObject(waitObject);
IntPtr nativeRegisteredWaitHandle = RegisterWaitForSingleObjectNative(waitObject,
state,
millisecondsTimeOutInterval,
executeOnlyOnce,
registeredWaitHandle,
ref stackMark,
compressStack);
registeredWaitHandle.SetHandle(nativeRegisteredWaitHandle);
}
else
{
throw new ArgumentNullException("WaitOrTimerCallback");
}
return registeredWaitHandle;
}
public static RegisteredWaitHandle RegisterWaitForSingleObject( // throws RegisterWaitException
WaitHandle waitObject,
WaitOrTimerCallback callBack,
Object state,
int millisecondsTimeOutInterval,
bool executeOnlyOnce // NOTE: we do not allow other options that allow the callback to be queued as an APC
)
{
if (millisecondsTimeOutInterval < -1)
throw new ArgumentOutOfRangeException("millisecondsTimeOutInterval", Environment.GetResourceString("ArgumentOutOfRange_NeedNonNegOrNegative1"));
StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
return RegisterWaitForSingleObject(waitObject,callBack,state,(UInt32)millisecondsTimeOutInterval,executeOnlyOnce,ref stackMark,true);
}
[SecurityPermissionAttribute( SecurityAction.LinkDemand, Flags = SecurityPermissionFlag.ControlEvidence | SecurityPermissionFlag.ControlPolicy)]
public static RegisteredWaitHandle UnsafeRegisterWaitForSingleObject( // throws RegisterWaitException
WaitHandle waitObject,
WaitOrTimerCallback callBack,
Object state,
int millisecondsTimeOutInterval,
bool executeOnlyOnce // NOTE: we do not allow other options that allow the callback to be queued as an APC
)
{
if (millisecondsTimeOutInterval < -1)
throw new ArgumentOutOfRangeException("millisecondsTimeOutInterval", Environment.GetResourceString("ArgumentOutOfRange_NeedNonNegOrNegative1"));
StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
return RegisterWaitForSingleObject(waitObject,callBack,state,(UInt32)millisecondsTimeOutInterval,executeOnlyOnce,ref stackMark,false);
}
public static RegisteredWaitHandle RegisterWaitForSingleObject( // throws RegisterWaitException
WaitHandle waitObject,
WaitOrTimerCallback callBack,
Object state,
long millisecondsTimeOutInterval,
bool executeOnlyOnce // NOTE: we do not allow other options that allow the callback to be queued as an APC
)
{
if (millisecondsTimeOutInterval < -1)
throw new ArgumentOutOfRangeException("millisecondsTimeOutInterval", Environment.GetResourceString("ArgumentOutOfRange_NeedNonNegOrNegative1"));
StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
return RegisterWaitForSingleObject(waitObject,callBack,state,(UInt32)millisecondsTimeOutInterval,executeOnlyOnce,ref stackMark,true);
}
[SecurityPermissionAttribute( SecurityAction.LinkDemand, Flags = SecurityPermissionFlag.ControlEvidence | SecurityPermissionFlag.ControlPolicy)]
public static RegisteredWaitHandle UnsafeRegisterWaitForSingleObject( // throws RegisterWaitException
WaitHandle waitObject,
WaitOrTimerCallback callBack,
Object state,
long millisecondsTimeOutInterval,
bool executeOnlyOnce // NOTE: we do not allow other options that allow the callback to be queued as an APC
)
{
if (millisecondsTimeOutInterval < -1)
throw new ArgumentOutOfRangeException("millisecondsTimeOutInterval", Environment.GetResourceString("ArgumentOutOfRange_NeedNonNegOrNegative1"));
StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
return RegisterWaitForSingleObject(waitObject,callBack,state,(UInt32)millisecondsTimeOutInterval,executeOnlyOnce,ref stackMark,false);
}
public static RegisteredWaitHandle RegisterWaitForSingleObject(
WaitHandle waitObject,
WaitOrTimerCallback callBack,
Object state,
TimeSpan timeout,
bool executeOnlyOnce
)
{
long tm = (long)timeout.TotalMilliseconds;
if (tm < -1)
throw new ArgumentOutOfRangeException("timeout", Environment.GetResourceString("ArgumentOutOfRange_NeedNonNegOrNegative1"));
if (tm > (long) Int32.MaxValue)
throw new ArgumentOutOfRangeException("timeout", Environment.GetResourceString("ArgumentOutOfRange_LessEqualToIntegerMaxVal"));
StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
return RegisterWaitForSingleObject(waitObject,callBack,state,(UInt32)tm,executeOnlyOnce,ref stackMark,true);
}
[SecurityPermissionAttribute( SecurityAction.LinkDemand, Flags = SecurityPermissionFlag.ControlEvidence | SecurityPermissionFlag.ControlPolicy)]
public static RegisteredWaitHandle UnsafeRegisterWaitForSingleObject(
WaitHandle waitObject,
WaitOrTimerCallback callBack,
Object state,
TimeSpan timeout,
bool executeOnlyOnce
)
{
long tm = (long)timeout.TotalMilliseconds;
if (tm < -1)
throw new ArgumentOutOfRangeException("timeout", Environment.GetResourceString("ArgumentOutOfRange_NeedNonNegOrNegative1"));
if (tm > (long) Int32.MaxValue)
throw new ArgumentOutOfRangeException("timeout", Environment.GetResourceString("ArgumentOutOfRange_LessEqualToIntegerMaxVal"));
StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
return RegisterWaitForSingleObject(waitObject,callBack,state,(UInt32)tm,executeOnlyOnce,ref stackMark,false);
}
public static bool QueueUserWorkItem(
WaitCallback callBack, // NOTE: we do not expose options that allow the callback to be queued as an APC
Object state
)
{
StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
return QueueUserWorkItemHelper(callBack,state,ref stackMark,true);
}
public static bool QueueUserWorkItem(
WaitCallback callBack // NOTE: we do not expose options that allow the callback to be queued as an APC
)
{
StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
return QueueUserWorkItemHelper(callBack,null,ref stackMark,true);
}
[SecurityPermissionAttribute( SecurityAction.LinkDemand, Flags = SecurityPermissionFlag.ControlEvidence | SecurityPermissionFlag.ControlPolicy)]
public static bool UnsafeQueueUserWorkItem(
WaitCallback callBack, // NOTE: we do not expose options that allow the callback to be queued as an APC
Object state
)
{
StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
return QueueUserWorkItemHelper(callBack,state,ref stackMark,false);
}
private static bool QueueUserWorkItemHelper(WaitCallback callBack, Object state, ref StackCrawlMark stackMark, bool compressStack )
{
if (callBack != null)
{
_ThreadPoolWaitCallback callBackHelper = new _ThreadPoolWaitCallback(callBack, state, compressStack, ref stackMark);
state = (Object)callBackHelper;
return QueueUserWorkItem(state, ref stackMark, compressStack);
}
else
{
throw new ArgumentNullException("WaitCallback");
}
}
[MethodImplAttribute(MethodImplOptions.InternalCall)]
private static extern bool QueueUserWorkItem(Object state, ref StackCrawlMark stackMark, bool compressStack );
[MethodImplAttribute(MethodImplOptions.InternalCall)]
unsafe private static extern bool PostQueuedCompletionStatus(NativeOverlapped* overlapped);
[CLSCompliant(false)]
[SecurityPermissionAttribute( SecurityAction.LinkDemand, Flags = SecurityPermissionFlag.ControlEvidence | SecurityPermissionFlag.ControlPolicy)]
unsafe public static bool UnsafeQueueNativeOverlapped(NativeOverlapped* overlapped)
{
return PostQueuedCompletionStatus(overlapped);
}
// Native methods:
[MethodImplAttribute(MethodImplOptions.InternalCall)]
private static extern bool SetMinThreadsNative(int workerThreads, int completionPortThreads);
[MethodImplAttribute(MethodImplOptions.InternalCall)]
private static extern bool SetMaxThreadsNative(int workerThreads, int completionPortThreads);
[MethodImplAttribute(MethodImplOptions.InternalCall)]
private static extern void GetMinThreadsNative(out int workerThreads, out int completionPortThreads);
[MethodImplAttribute(MethodImplOptions.InternalCall)]
private static extern void GetMaxThreadsNative(out int workerThreads, out int completionPortThreads);
[MethodImplAttribute(MethodImplOptions.InternalCall)]
private static extern void GetAvailableThreadsNative(out int workerThreads, out int completionPortThreads);
[MethodImplAttribute(MethodImplOptions.InternalCall)]
private static extern IntPtr RegisterWaitForSingleObjectNative(
WaitHandle waitHandle,
Object state,
uint timeOutInterval,
bool executeOnlyOnce,
RegisteredWaitHandle registeredWaitHandle,
ref StackCrawlMark stackMark,
bool compressStack
);
[Obsolete("ThreadPool.BindHandle(IntPtr) has been deprecated. Please use ThreadPool.BindHandle(SafeHandle) instead.", false)]
[SecurityPermissionAttribute( SecurityAction.Demand, Flags = SecurityPermissionFlag.UnmanagedCode)]
public static bool BindHandle(
IntPtr osHandle
)
{
return BindIOCompletionCallbackNative(osHandle);
}
[SecurityPermissionAttribute( SecurityAction.Demand, Flags = SecurityPermissionFlag.UnmanagedCode)]
public static bool BindHandle(SafeHandle osHandle)
{
if (osHandle == null)
throw new ArgumentNullException("osHandle");
bool ret = false;
bool mustReleaseSafeHandle = false;
RuntimeHelpers.PrepareConstrainedRegions();
try {
osHandle.DangerousAddRef(ref mustReleaseSafeHandle);
ret = BindIOCompletionCallbackNative(osHandle.DangerousGetHandle());
}
finally {
if (mustReleaseSafeHandle)
osHandle.DangerousRelease();
}
return ret;
}
[MethodImplAttribute(MethodImplOptions.InternalCall)]
[ReliabilityContract(Consistency.WillNotCorruptState, Cer.MayFail)]
private static extern bool BindIOCompletionCallbackNative(IntPtr fileHandle);
}
}
相关文章推荐
- 关于线程池ThreadPool的学习
- C# 多线程学习(六)线程池(ThreadPool)——线程资源的复用和自动管理
- 重点学习线程池ThreadPool
- [深入学习C#]C#实现多线程的方法:线程(Thread类)和线程池(ThreadPool)
- C# 线程池ThreadPool的学习
- Java性能优化学习之 巧用线程池ThreadPool
- java核心知识点学习----重点学习线程池ThreadPool
- [深入学习C#]C#实现多线程的方法:线程(Thread类)和线程池(ThreadPool)
- muduo网络库学习(八)事件驱动循环线程池EventLoopThreadPool
- android 多线程 - 线程池 Executors.newFixedThreadPool 的使用例子
- Java_线程池ThreadPool
- Java newCachedThreadPool 创建线程池
- Mysql线程池系列二(Oracle Mysql Thread pool的安装和原理)
- spring ThreadPoolTaskExecutor的线程池类实现多线程
- 概要ThreadPool 线程池
- Android 线程与线程池 Thread&ThreadPool
- ThreadPoolTaskExecutor 创建线程池管理
- C#多线程实现方法——线程池(Thread Pool)
- C#线程池ThreadPool的理解
- 自定义线程池ThreadPool