您的位置:首页 > 编程语言 > C#

C# Task 是什么?返回值如何实现? Wait如何实现

2017-11-26 16:12 246 查看
关于Task的API太多了,网上的实例也很多,现在我们来说说Task究竟是个什么东西【task一般用于多线程,它一定与线程有关】,还有它的返回值有事怎么搞的。

首先我们以一个最简单的API开始,TaskFactory的StartNew<TResult>方法,TaskFactory.cs

public Task<TResult> StartNew<TResult>(Func<Object, TResult> function, Object state)
{
StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
Task currTask = Task.InternalCurrent;
return Task<TResult>.StartNew(currTask, function, state, m_defaultCancellationToken,
m_defaultCreationOptions, InternalTaskOptions.None, GetDefaultScheduler(currTask), ref stackMark);
}
private TaskScheduler GetDefaultScheduler(Task currTask)
{
if (m_defaultScheduler != null) return m_defaultScheduler;
else if ((currTask != null)&& ((currTask.CreationOptions & TaskCreationOptions.HideScheduler) == 0))
return currTask.ExecutingTaskScheduler;
else return TaskScheduler.Default;
}


可见最终和调用Task<TResult>.StartNew等效的,这里的GetDefaultScheduler返回的是TaskScheduler.Default。TaskScheduler.cs实现如下:

public abstract class TaskScheduler
{
private static readonly TaskScheduler s_defaultTaskScheduler = new ThreadPoolTaskScheduler();
public static TaskScheduler Default
{
get
{
return s_defaultTaskScheduler;
}
}
internal void InternalQueueTask(Task task)
{
Contract.Requires(task != null);
task.FireTaskScheduledIfNeeded(this);
this.QueueTask(task);
}
}


默认的TaskScheduler.Default是ThreadPoolTaskScheduler实例。现在我们看看Task<TResult>的实现 Future.cs

public class Task<TResult> : Task
{
internal static Task<TResult> StartNew(Task parent, Func<object, TResult> function, object state, CancellationToken cancellationToken,
TaskCreationOptions creationOptions, InternalTaskOptions internalOptions, TaskScheduler scheduler, ref StackCrawlMark stackMark)
{
if (function == null)
{
throw new ArgumentNullException("function");
}
if (scheduler == null)
{
throw new ArgumentNullException("scheduler");
}
if ((internalOptions & InternalTaskOptions.SelfReplicating) != 0)
{
throw new ArgumentOutOfRangeException("creationOptions", Environment.GetResourceString("TaskT_ctor_SelfReplicating"));
}

// Create and schedule the future.
Task<TResult> f = new Task<TResult>(function, state, parent, cancellationToken, creationOptions, internalOptions | InternalTaskOptions.QueuedByRuntime, scheduler, ref stackMark);

f.ScheduleAndStart(false);
return f;
}
internal void ScheduleAndStart(bool needsProtection)
{
Contract.Assert(m_taskScheduler != null, "expected a task scheduler to have been selected");
Contract.Assert((m_stateFlags & TASK_STATE_STARTED) == 0, "task has already started");

// Set the TASK_STATE_STARTED bit
if (needsProtection)
{
if (!MarkStarted())
{
// A cancel has snuck in before we could get started.  Quietly exit.
return;
}
}
else
{
m_stateFlags |= TASK_STATE_STARTED;
}

if (s_asyncDebuggingEnabled)
{
AddToActiveTasks(this);
}

if (AsyncCausalityTracer.LoggingOn && (Options & (TaskCreationOptions)InternalTaskOptions.ContinuationTask) == 0)
{
//For all other task than TaskContinuations we want to log. TaskContinuations log in their constructor
AsyncCausalityTracer.TraceOperationCreation(CausalityTraceLevel.Required, this.Id, "Task: "+((Delegate)m_action).Method.Name, 0);
}
try
{
m_taskScheduler.InternalQueueTask(this);
}
catch (ThreadAbortException tae)
{
AddException(tae);
FinishThreadAbortedTask(true, false);
}
catch (Exception e)
{
TaskSchedulerException tse = new TaskSchedulerException(e);
AddException(tse);
Finish(false);
if ((Options & (TaskCreationOptions)InternalTaskOptions.ContinuationTask) == 0)
{
// m_contingentProperties.m_exceptionsHolder *should* already exist after AddException()
Contract.Assert(
(m_contingentProperties != null) &&
(m_contingentProperties.m_exceptionsHolder != null) &&
(m_contingentProperties.m_exceptionsHolder.ContainsFaultList),
"Task.ScheduleAndStart(): Expected m_contingentProperties.m_exceptionsHolder to exist " +
"and to have faults recorded.");

m_contingentProperties.m_exceptionsHolder.MarkAsHandled(false);
}
// re-throw the exception wrapped as a TaskSchedulerException.
throw tse;
}
}

internal Task(
Func<object, TResult> valueSelector, object state, Task parent, CancellationToken cancellationToken,
TaskCreationOptions creationOptions, InternalTaskOptions internalOptions, TaskScheduler scheduler, ref StackCrawlMark stackMark) :
this(valueSelector, state, parent, cancellationToken, creationOptions, internalOptions, scheduler)
{
PossiblyCaptureContext(ref stackMark);
}
internal void PossiblyCaptureContext(ref StackCrawlMark stackMark)
{
Contract.Assert(m_contingentProperties == null || m_contingentProperties.m_capturedContext == null, "Captured an ExecutionContext when one was already captured.");
CapturedContext = ExecutionContext.Capture(ref stackMark, ExecutionContext.CaptureOptions.IgnoreSyncCtx | ExecutionContext.CaptureOptions.OptimizeDefaultCase);
}
internal override void InnerInvoke()
{
// Invoke the delegate
Contract.Assert(m_action != null);
var func = m_action as Func<TResult>;
if (func != null)
{
m_result = func();
return;
}
var funcWithState = m_action as Func<object, TResult>;
if (funcWithState != null)
{
m_result = funcWithState(m_stateObject);
return;
}
Contract.Assert(false, "Invalid m_action in Task<TResult>");
}

public TResult Result
{
get { return IsWaitNotificationEnabledOrNotRanToCompletion ? GetResultCore(waitCompletionNotification: true) : m_result; }
}

}


Task<TResult>的StartNew方法首先调用构造函数,在构造函数里面调用PossiblyCaptureContext方法,PossiblyCaptureContext方法调用ExecutionContext.Capture捕获线程上下文,然后回到StartNew方法,调用Task<TResult>。ScheduleAndStart,ScheduleAndStart方法主要是调用TaskScheduler的InternalQueueTask方法;TaskScheduler的InternalQueueTask方法主要是调用QueueTask,QueueTask方法在子类被覆盖,这里调用ThreadPoolTaskScheduler的QueueTask方法,ThreadPoolTaskScheduler.cs

internal sealed class ThreadPoolTaskScheduler: TaskScheduler
{
private static readonly ParameterizedThreadStart s_longRunningThreadWork = new ParameterizedThreadStart(LongRunningThreadWork);

private static void LongRunningThreadWork(object obj)
{
Contract.Requires(obj != null, "TaskScheduler.LongRunningThreadWork: obj is null");
Task t = obj as Task;
Contract.Assert(t != null, "TaskScheduler.LongRunningThreadWork: t is null");
t.ExecuteEntry(false);
}
protected internal override void QueueTask(Task task)
{
if ((task.Options & TaskCreationOptions.LongRunning) != 0)
{
// Run LongRunning tasks on their own dedicated thread.
Thread thread = new Thread(s_longRunningThreadWork);
thread.IsBackground = true; // Keep this thread from blocking process shutdown
thread.Start(task);
}
else
{
// Normal handling for non-LongRunning tasks.
bool forceToGlobalQueue = ((task.Options & TaskCreationOptions.PreferFairness) != 0);
ThreadPool.UnsafeQueueCustomWorkItem(task, forceToGlobalQueue);
}
}
}


如果Task是TaskCreationOptions.LongRunning,那么我们新建一个逻辑线程来运行当前的Task,否者放到线程池里面运行。

单独的逻辑线程调用s_longRunningThreadWork回调方法【该方法调用task的ExecuteEntry】;如果是线程池那么我们调用Task的ExecuteWorkItem方法【其实还是调用Task的ExecuteEntry】,Task.cs

public class Task : IThreadPoolWorkItem, IAsyncResult, IDisposable
{
[ThreadStatic]
internal static Task t_currentTask;
internal static Task InternalCurrent
{
get { return t_currentTask; }
}
void IThreadPoolWorkItem.ExecuteWorkItem()
{
ExecuteEntry(false);
}
internal bool ExecuteEntry(bool bPreventDoubleExecution)
{
if (bPreventDoubleExecution || ((Options & (TaskCreationOptions)InternalTaskOptions.SelfReplicating) != 0))
{
int previousState = 0;

// Do atomic state transition from queued to invoked. If we observe a task that's already invoked,
// we will return false so that TaskScheduler.ExecuteTask can throw an exception back to the custom scheduler.
// However we don't want this exception to be throw if the task was already canceled, because it's a
// legitimate scenario for custom schedulers to dequeue a task and mark it as canceled (example: throttling scheduler)
if (!AtomicStateUpdate(TASK_STATE_DELEGATE_INVOKED,
TASK_STATE_DELEGATE_INVOKED | TASK_STATE_COMPLETED_MASK,
ref previousState) && (previousState & TASK_STATE_CANCELED) == 0)
{
// This task has already been invoked.  Don't invoke it again.
return false;
}
}
else
{
// Remember that we started running the task delegate.
m_stateFlags |= TASK_STATE_DELEGATE_INVOKED;
}

if (!IsCancellationRequested && !IsCanceled)
{
ExecuteWithThreadLocal(ref t_currentTask);
}
else if (!IsCanceled)
{
int prevState = Interlocked.Exchange(ref m_stateFlags, m_stateFlags | TASK_STATE_CANCELED);
if ((prevState & TASK_STATE_CANCELED) == 0)
{
CancellationCleanupLogic();
}
}

return true;
}
private void ExecuteWithThreadLocal(ref Task currentTaskSlot)
{
// Remember the current task so we can restore it after running, and then
Task previousTask = currentTaskSlot;

// ETW event for Task Started
var etwLog = TplEtwProvider.Log;
Guid savedActivityID = new Guid();
bool etwIsEnabled = etwLog.IsEnabled();
if (etwIsEnabled)
{
if (etwLog.TasksSetActivityIds)
EventSource.SetCurrentThreadActivityId(TplEtwProvider.CreateGuidForTaskID(this.Id), out savedActivityID);
// previousTask holds the actual "current task" we want to report in the event
if (previousTask != null)
etwLog.TaskStarted(previousTask.m_taskScheduler.Id, previousTask.Id, this.Id);
else
etwLog.TaskStarted(TaskScheduler.Current.Id, 0, this.Id);
}

if (AsyncCausalityTracer.LoggingOn)
AsyncCausalityTracer.TraceSynchronousWorkStart(CausalityTraceLevel.Required, this.Id, CausalitySynchronousWork.Execution);

try
{
// place the current task into TLS.
currentTaskSlot = this;

ExecutionContext ec = CapturedContext;
if (ec == null)
{
// No context, just run the task directly.
Execute();
}
else
{
if (IsSelfReplicatingRoot || IsChildReplica)
{
CapturedContext = CopyExecutionContext(ec);
}

// Run the task.  We need a simple shim that converts the
// object back into a Task object, so that we can Execute it.

// Lazily initialize the callback delegate; benign ----
var callback = s_ecCallback;
if (callback == null) s_ecCallback = callback = new ContextCallback(ExecutionContextCallback);
#if PFX_LEGACY_3_5
ExecutionContext.Run(ec, callback, this);
#else
ExecutionContext.Run(ec, callback, this, true);
#endif
}

if (AsyncCausalityTracer.LoggingOn)
AsyncCausalityTracer.TraceSynchronousWorkCompletion(CausalityTraceLevel.Required, CausalitySynchronousWork.Execution);

Finish(true);
}
finally
{
currentTaskSlot = previousTask;

// ETW event for Task Completed
if (etwIsEnabled)
{
// previousTask holds the actual "current task" we want to report in the event
if (previousTask != null)
etwLog.TaskCompleted(previousTask.m_taskScheduler.Id, previousTask.Id, this.Id, IsFaulted);
else
etwLog.TaskCompleted(TaskScheduler.Current.Id, 0, this.Id, IsFaulted);

if (etwLog.TasksSetActivityIds)
EventSource.SetCurrentThreadActivityId(savedActivityID);
}
}
}
private static void ExecutionContextCallback(object obj)
{
Task task = obj as Task;
Contract.Assert(task != null, "expected a task object");
task.Execute();
}
private void Execute()
{
if (IsSelfReplicatingRoot)
{
ExecuteSelfReplicating(this);
}
else
{
try
{
InnerInvoke();
}
catch (ThreadAbortException tae)
{
// Don't record the TAE or call FinishThreadAbortedTask for a child replica task --
// it's already been done downstream.
if (!IsChildReplica)
{
// Record this exception in the task's exception list
HandleException(tae);

// This is a ThreadAbortException and it will be rethrown from this catch clause, causing us to
// skip the regular Finish codepath. In order not to leave the task unfinished, we now call
// FinishThreadAbortedTask here.
FinishThreadAbortedTask(true, true);
}
}
catch (Exception exn)
{
// Record this exception in the task's exception list
HandleException(exn);
}
}
}
}


到这里Task是什么 就明白了,继续往下看,Task的ExecuteEntry方法主要是调用ExecutionContext.Run(ec, callback, this, true)方法,注意这里的callback是一个ContextCallback实例,里面主要是调用Task的Execute方法,Task的Execute主要是代用InnerInvoke方法,该方法在子类Task<TResult>被重写,InnerInvoke方法里面有2局比较重要【m_result = func() 或者m_result = funcWithState(m_stateObject) 这里才是正真调用我们自己的方法,给返回值赋值】,到这里返回值也就明白了。那么在我们方法结束后需要调用Finish方法来标记方法结束

/// <summary>
/// Signals completion of this particular task.
/// The bUserDelegateExecuted parameter indicates whether this Finish() call comes following the
/// full execution of the user delegate.
/// If bUserDelegateExecuted is false, it mean user delegate wasn't invoked at all (either due to
/// a cancellation request, or because this task is a promise style Task). In this case, the steps
/// involving child tasks (i.e. WaitForChildren) will be skipped.
///
/// </summary>
internal void Finish(bool bUserDelegateExecuted)
{
if (!bUserDelegateExecuted)
{
// delegate didn't execute => no children. We can safely call the remaining finish stages
FinishStageTwo();
}
else
{
var props = m_contingentProperties;

if (props == null || // no contingent properties means no children, so it's safe to complete ourselves
(props.m_completionCountdown == 1 && !IsSelfReplicatingRoot) ||
// Count of 1 => either all children finished, or there were none. Safe to complete ourselves
// without paying the price of an Interlocked.Decrement.
// However we need to exclude self replicating root tasks from this optimization, because
// they can have children joining in, or finishing even after the root task delegate is done.
Interlocked.Decrement(ref props.m_completionCountdown) == 0) // Reaching this sub clause means there may be remaining active children,
// and we could be racing with one of them to call FinishStageTwo().
// So whoever does the final Interlocked.Dec is responsible to finish.
{
FinishStageTwo();
}
else
{
// Apparently some children still remain. It will be up to the last one to process the completion of this task on their own thread.
// We will now yield the thread back to ThreadPool. Mark our state appropriately before getting out.

// We have to use an atomic update for this and make sure not to overwrite a final state,
// because at this very moment the last child's thread may be concurrently completing us.
// Otherwise we risk overwriting the TASK_STATE_RAN_TO_COMPLETION, _CANCELED or _FAULTED bit which may have been set by that child task.
// Note that the concurrent update by the last child happening in FinishStageTwo could still wipe out the TASK_STATE_WAITING_ON_CHILDREN flag,
// but it is not critical to maintain, therefore we dont' need to intruduce a full atomic update into FinishStageTwo

AtomicStateUpdate(TASK_STATE_WAITING_ON_CHILDREN, TASK_STATE_FAULTED | TASK_STATE_CANCELED | TASK_STATE_RAN_TO_COMPLETION);
}

// Now is the time to prune exceptional children. We'll walk the list and removes the ones whose exceptions we might have observed after they threw.
// we use a local variable for exceptional children here because some other thread may be nulling out m_contingentProperties.m_exceptionalChildren
List<Task> exceptionalChildren = props != null ? props.m_exceptionalChildren : null;

if (exceptionalChildren != null)
{
lock (exceptionalChildren)
{
exceptionalChildren.RemoveAll(s_IsExceptionObservedByParentPredicate); // RemoveAll has better performance than doing it ourselves
}
}
}
}
internal void FinishStageTwo()
{
AddExceptionsFromChildren();
// At this point, the task is done executing and waiting for its children,
// we can transition our task to a completion state.
int completionState;
if (ExceptionRecorded)
{
completionState = TASK_STATE_FAULTED;
if (AsyncCausalityTracer.LoggingOn)
AsyncCausalityTracer.TraceOperationCompletion(CausalityTraceLevel.Required, this.Id, AsyncCausalityStatus.Error);

if (Task.s_asyncDebuggingEnabled)
{
RemoveFromActiveTasks(this.Id);
}
}
else if (IsCancellationRequested && IsCancellationAcknowledged)
{
// We transition into the TASK_STATE_CANCELED final state if the task's CT was signalled for cancellation,
// and the user delegate acknowledged the cancellation request by throwing an OCE,
// and the task hasn't otherwise transitioned into faulted state. (TASK_STATE_FAULTED trumps TASK_STATE_CANCELED)
//
// If the task threw an OCE without cancellation being requestsed (while the CT not being in signaled state),
// then we regard it as a regular exception

completionState = TASK_STATE_CANCELED;
if (AsyncCausalityTracer.LoggingOn)
AsyncCausalityTracer.TraceOperationCompletion(CausalityTraceLevel.Required, this.Id, AsyncCausalityStatus.Canceled);

if (Task.s_asyncDebuggingEnabled)
{
RemoveFromActiveTasks(this.Id);
}
}
else
{
completionState = TASK_STATE_RAN_TO_COMPLETION;
if (AsyncCausalityTracer.LoggingOn)
AsyncCausalityTracer.TraceOperationCompletion(CausalityTraceLevel.Required, this.Id, AsyncCausalityStatus.Completed);

if (Task.s_asyncDebuggingEnabled)
{
RemoveFromActiveTasks(this.Id);
}
}

// Use Interlocked.Exchange() to effect a memory fence, preventing
// any SetCompleted() (or later) instructions from sneak back before it.
Interlocked.Exchange(ref m_stateFlags, m_stateFlags | completionState);

// Set the completion event if it's been lazy allocated.
// And if we made a cancellation registration, it's now unnecessary.
var cp = m_contingentProperties;
if (cp != null)
{
cp.SetCompleted();
cp.DeregisterCancellationCallback();
}

// ready to run continuations and notify parent.
FinishStageThree();
}

internal bool AtomicStateUpdate(int newBits, int illegalBits)
{
// This could be implemented in terms of:
//     internal bool AtomicStateUpdate(int newBits, int illegalBits, ref int oldFlags);
// but for high-throughput perf, that delegation's cost is noticeable.
SpinWait sw = new SpinWait();
do
{
int oldFlags = m_stateFlags;
if ((oldFlags & illegalBits) != 0) return false;
if (Interlocked.CompareExchange(ref m_stateFlags, oldFlags | newBits, oldFlags) == oldFlags)
{
return true;
}
sw.SpinOnce();
} while (true);
}


在FinishStageTwo方法后面还会调用FinishStageThree方法,FinishStageThree方法调用FinishContinuations,在FinishContinuations方法里面会获取object continuationObject = Interlocked.Exchange(ref m_continuationObject, s_taskCompletionSentinel);然后再用对应的方法.

internal void FinishStageThree()
{
// Release the action so that holding this task object alive doesn't also
// hold alive the body of the task.  We do this before notifying a parent,
// so that if notifying the parent completes the parent and causes
// its synchronous continuations to run, the GC can collect the state
// in the interim.  And we do it before finishing continuations, because
// continuations hold onto the task, and therefore are keeping it alive.
m_action = null;

// Notify parent if this was an attached task
if (m_parent != null
&& ((m_parent.CreationOptions & TaskCreationOptions.DenyChildAttach) == 0)
&& (((TaskCreationOptions)(m_stateFlags & OptionsMask)) & TaskCreationOptions.AttachedToParent) != 0)
{
m_parent.ProcessChildCompletion(this);
}

// Activate continuations (if any).
FinishContinuations();
}
internal void FinishContinuations()
{
// Atomically store the fact that this task is completing.  From this point on, the adding of continuations will
// result in the continuations being run/launched directly rather than being added to the continuation list.
object continuationObject = Interlocked.Exchange(ref m_continuationObject, s_taskCompletionSentinel);
TplEtwProvider.Log.RunningContinuation(Id, continuationObject);

// If continuationObject == null, then we don't have any continuations to process
if (continuationObject != null)
{

if (AsyncCausalityTracer.LoggingOn)
AsyncCausalityTracer.TraceSynchronousWorkStart(CausalityTraceLevel.Required, this.Id, CausalitySynchronousWork.CompletionNotification);

// Skip synchronous execution of continuations if this task's thread was aborted
bool bCanInlineContinuations = !(((m_stateFlags & TASK_STATE_THREAD_WAS_ABORTED) != 0) ||
(Thread.CurrentThread.ThreadState == ThreadState.AbortRequested) ||
((m_stateFlags & (int)TaskCreationOptions.RunContinuationsAsynchronously) != 0));

// Handle the single-Action case
Action singleAction = continuationObject as Action;
if (singleAction != null)
{
AwaitTaskContinuation.RunOrScheduleAction(singleAction, bCanInlineContinuations, ref t_currentTask);
LogFinishCompletionNotification();
return;
}

// Handle the single-ITaskCompletionAction case
ITaskCompletionAction singleTaskCompletionAction = continuationObject as ITaskCompletionAction;
if (singleTaskCompletionAction != null)
{
if (bCanInlineContinuations)
{
singleTaskCompletionAction.Invoke(this);
}
else
{
ThreadPool.UnsafeQueueCustomWorkItem(new CompletionActionInvoker(singleTaskCompletionAction, this), forceGlobal: false);
}
LogFinishCompletionNotification();
return;
}

// Handle the single-TaskContinuation case
TaskContinuation singleTaskContinuation = continuationObject as TaskContinuation;
if (singleTaskContinuation != null)
{
singleTaskContinuation.Run(this, bCanInlineContinuations);
LogFinishCompletionNotification();
return;
}

// Not a single; attempt to cast as list
List<object> continuations = continuationObject as List<object>;

if (continuations == null)
{
LogFinishCompletionNotification();
return;  // Not a single or a list; just return
}

//
// Begin processing of continuation list
//

// Wait for any concurrent adds or removes to be retired
lock (continuations) { }
int continuationCount = continuations.Count;

// Fire the asynchronous continuations first ...
for (int i = 0; i < continuationCount; i++)
{
// Synchronous continuation tasks will have the ExecuteSynchronously option,
// and we're looking for asynchronous tasks...
var tc = continuations[i] as StandardTaskContinuation;
if (tc != null && (tc.m_options & TaskContinuationOptions.ExecuteSynchronously) == 0)
{
TplEtwProvider.Log.RunningContinuationList(Id, i, tc);
continuations[i] = null; // so that we can skip this later
tc.Run(this, bCanInlineContinuations);
}
}

// ... and then fire the synchronous continuations (if there are any).
// This includes ITaskCompletionAction, AwaitTaskContinuations, and
// Action delegates, which are all by default implicitly synchronous.
for (int i = 0; i < continuationCount; i++)
{
object currentContinuation = continuations[i];
if (currentContinuation == null) continue;
continuations[i] = null; // to enable free'ing up memory earlier
TplEtwProvider.Log.RunningContinuationList(Id, i, currentContinuation);

// If the continuation is an Action delegate, it came from an await continuation,
// and we should use AwaitTaskContinuation to run it.
Action ad = currentContinuation as Action;
if (ad != null)
{
AwaitTaskContinuation.RunOrScheduleAction(ad, bCanInlineContinuations, ref t_currentTask);
}
else
{
// If it's a TaskContinuation object of some kind, invoke it.
TaskContinuation tc = currentContinuation as TaskContinuation;
if (tc != null)
{
// We know that this is a synchronous continuation because the
// asynchronous ones have been weeded out
tc.Run(this, bCanInlineContinuations);
}
// Otherwise, it must be an ITaskCompletionAction, so invoke it.
else
{
Contract.Assert(currentContinuation is ITaskCompletionAction, "Expected continuation element to be Action, TaskContinuation, or ITaskContinuationAction");
var action = (ITaskCompletionAction)currentContinuation;

if (bCanInlineContinuations)
{
action.Invoke(this);
}
else
{
ThreadPool.UnsafeQueueCustomWorkItem(new CompletionActionInvoker(action, this), forceGlobal: false);
}
}
}
}

LogFinishCompletionNotification();
}
}


当我们访问TTask<TResult> 的Result的时候,如果Task还没有执行完毕,那么我们必须等待TASK,调用Task<TResult>的GetResultCore(waitCompletionNotification: true)方法,该方法最终调用Task的InternalWait方法。

/// The core wait function, which is only accesible internally. It's meant to be used in places in TPL code where
/// the current context is known or cached.
internal bool InternalWait(int millisecondsTimeout, CancellationToken cancellationToken)
{
// ETW event for Task Wait Begin
var etwLog = TplEtwProvider.Log;
bool etwIsEnabled = etwLog.IsEnabled();
if (etwIsEnabled)
{
Task currentTask = Task.InternalCurrent;
etwLog.TaskWaitBegin(
(currentTask != null ? currentTask.m_taskScheduler.Id : TaskScheduler.Default.Id), (currentTask != null ? currentTask.Id : 0),
this.Id, TplEtwProvider.TaskWaitBehavior.Synchronous, 0, System.Threading.Thread.GetDomainID());
}

bool returnValue = IsCompleted;

// If the event hasn't already been set, we will wait.
if (!returnValue)
{
// Alert a listening debugger that we can't make forward progress unless it slips threads.
// We call NOCTD for two reasons:
//    1. If the task runs on another thread, then we'll be blocked here indefinitely.
//    2. If the task runs inline but takes some time to complete, it will suffer ThreadAbort with possible state corruption,
//       and it is best to prevent this unless the user explicitly asks to view the value with thread-slipping enabled.
Debugger.NotifyOfCrossThreadDependency();

// We will attempt inline execution only if an infinite wait was requested
// Inline execution doesn't make sense for finite timeouts and if a cancellation token was specified
// because we don't know how long the task delegate will take.
if (millisecondsTimeout == Timeout.Infinite && !cancellationToken.CanBeCanceled &&
WrappedTryRunInline() && IsCompleted) // TryRunInline doesn't guarantee completion, as there may be unfinished children.
{
returnValue = true;
}
else
{
returnValue = SpinThenBlockingWait(millisecondsTimeout, cancellationToken);
}
}

Contract.Assert(IsCompleted || millisecondsTimeout != Timeout.Infinite);

// ETW event for Task Wait End
if (etwIsEnabled)
{
Task currentTask = Task.InternalCurrent;
if (currentTask != null)
{
etwLog.TaskWaitEnd(currentTask.m_taskScheduler.Id, currentTask.Id, this.Id);
}
else
{
etwLog.TaskWaitEnd(TaskScheduler.Default.Id, 0, this.Id);
}
// logically the continuation is empty so we immediately fire
etwLog.TaskWaitContinuationComplete(this.Id);
}

return returnValue;
}
/// Waits for the task to complete, for a timeout to occur, or for cancellation to be requested.
/// The method first spins and then falls back to blocking on a new event.
private bool SpinThenBlockingWait(int millisecondsTimeout, CancellationToken cancellationToken)
{
bool infiniteWait = millisecondsTimeout == Timeout.Infinite;
uint startTimeTicks = infiniteWait ? 0 : (uint)Environment.TickCount;
bool returnValue = SpinWait(millisecondsTimeout);
if (!returnValue)
{
var mres = new SetOnInvokeMres();
try
{
AddCompletionAction(mres, addBeforeOthers: true);
if (infiniteWait)
{
returnValue = mres.Wait(Timeout.Infinite, cancellationToken);
}
else
{
uint elapsedTimeTicks = ((uint)Environment.TickCount) - startTimeTicks;
if (elapsedTimeTicks < millisecondsTimeout)
{
returnValue = mres.Wait((int)(millisecondsTimeout - elapsedTimeTicks), cancellationToken);
}
}
}
finally
{
if (!IsCompleted) RemoveContinuation(mres);
// Don't Dispose of the MRES, because the continuation off of this task may
// still be running.  This is ok, however, as we never access the MRES' WaitHandle,
// and thus no finalizable resources are actually allocated.
}
}
return returnValue;
}
private void AddCompletionAction(ITaskCompletionAction action, bool addBeforeOthers)
{
if (!AddTaskContinuation(action, addBeforeOthers))
action.Invoke(this); // run the action directly if we failed to queue the continuation (i.e., the task completed)
}

// Record a continuation task or action.
// Return true if and only if we successfully queued a continuation.
private bool AddTaskContinuation(object tc, bool addBeforeOthers)
{
Contract.Requires(tc != null);

// Make sure that, if someone calls ContinueWith() right after waiting for the predecessor to complete,
// we don't queue up a continuation.
if (IsCompleted) return false;

// Try to just jam tc into m_continuationObject
if ((m_continuationObject != null) || (Interlocked.CompareExchange(ref m_continuationObject, tc, null) != null))
{
// If we get here, it means that we failed to CAS tc into m_continuationObject.
// Therefore, we must go the more complicated route.
return AddTaskContinuationComplex(tc, addBeforeOthers);
}
else return true;
}
private sealed class SetOnInvokeMres : ManualResetEventSlim, ITaskCompletionAction
{
internal SetOnInvokeMres() : base(false, 0) { }
public void Invoke(Task completingTask) { Set(); }
}


SpinThenBlockingWait方法先自旋检查是否完成,如果没有调用SetOnInvokeMres实例来完成Wait【SetOnInvokeMres是ManualResetEventSlim的子类,这里调用 mres.Wait(Timeout.Infinite, cancellationToken) 或者 mres.Wait((int)(millisecondsTimeout - elapsedTimeTicks), cancellationToken)来实现阻塞】,但是这里有一个AddCompletionAction方法,然后调用AddTaskContinuation方法,在AddTaskContinuation方法中有一句很重要【if ((m_continuationObject != null) || (Interlocked.CompareExchange(ref m_continuationObject, tc, null) != null))】说白了就是把SetOnInvokeMres实例赋给m_continuationObject对象。所以AddCompletionAction方法里面的action.Invoke(this);放法在这里没有调用。而是等待task执行完毕后FinishContinuations方法中的singleTaskCompletionAction.Invoke(this);设置信号量,如果m_continuationObject是真正的TaskContinuation实例,我们就调用【 singleTaskContinuation.Run(this, bCanInlineContinuations)】

我们简单总结一下一下调用次序:

TaskFactory.StartNew
->Task<TResult>.StartNew【该方法首先调用构造函数,构造函数里面调用PossiblyCaptureContext来捕获线程上下文,然后调用自己的ScheduleAndStart方法】
->TaskScheduler.InternalQueueTask
->ThreadPoolTaskScheduler.QueueTask【到这里Task就转换为线程了】
->Task.ExecuteEntry【如果是线程池是通过调用IThreadPoolWorkItem的ExecuteWorkItem方法进入的】
->Task.ExecuteWithThreadLocal【该方法首先拷贝捕获到的线程上下文,再调用ExecutionContext.Run,传入上传下文和回调方法】
->Task.ExecutionContextCallback
->Task.Execute
->Task<TResult>.InnerInvoke【该方法会真正调用我们自己的方法,并且给返回值赋值】
->Task.Finish 【标记该Task已经结束】
->Task.FinishStageTwo
->Task.FinishStageThree
->Task.FinishContinuations【会触发SET】
-------------------------
Task<TResult>的Result属性,就是Task的返回值,如果Task执行完毕直接返回,否者调用Task<TResult>.GetResultCore
->Task.InternalWait
->Task.SpinThenBlockingWait【调用SetOnInvokeMres的Wait方法】
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: