从字面意思理解就是任务调度器,即将任务排队到线程中执行的管理器。并且能控制最大的并行任务数量。
其本身是一个抽象类,但是提供了一些静态公共属性与方法,先理解下这些。
即获取当前的任务调度器
以默认的任务调度器实例化一个TaskScheduler
获取Id标识
获取当前的同步上下文SynchronizationContext
事件:未能被正常捕获的异步中的异常触发事件,即一个Task执行后没有被合适的管理起来,就像是导弹发射出去后就不在管理了的时候。说的简单点,就是当异步Task没有被await、Wait()、Result处理时将不会触发;
并且这种异常事件不会立刻触发,需要强制GC回收才能触发。
其次,这个事件仅在release模式下有作用,debug会在throw异常的时候直接抛出异常结束任务。
GC.Collect();
GC.WaitForPendingFinalizers();
使用示例如下:
static void ExceptionTest()
{
TaskScheduler.UnobservedTaskException += TaskScheduler_UnobservedTaskException;
Task.Factory.StartNew(() =>
{
throw new Exception("一个错误");
});
Thread.Sleep(3000);//确保task执行完了,但是不使用await的处理
GC.Collect();
GC.WaitForPendingFinalizers();
Console.WriteLine("end");
Console.ReadKey();
}
private static void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e)
{
Console.WriteLine("UnobservedTaskException:" + e.Exception.InnerException.Message);
}
运行输出如下:
UnobservedTaskException:一个错误
end
如果在release模式下,不使用该事件,程序将正常运行知道结束。
当触发异常时如果需要中断程序,需在程序配置中设置
<configuration>
<runtime>
<ThrowUnobservedTaskExceptions enabled="true"/>
runtime>
configuration>
public virtual int MaximumConcurrencyLevel { get; }
对于实例化的调度器对象,可以通过MaximumConcurrencyLevel获取最大并发数量。
也可在继承类中重载该属性,设置自定义的最大并发数量。
获取已排队的所有任务,为抽象方法,继承类需要实现该方法。需要注意的是它仅对于调试器(Debug)支持。
返回值IEnumerable是一个允许调试器遍历当前排队到此计划程序中的任务的枚举。
请务必注意,调用此方法时,进程中的所有其他线程都将冻结。 因此,请务必避免与其他可能导致阻塞的线程同步。 如果需要同步,并且无法获取此方法中的锁,则应引发异常,以便调试器不会阻止。
protected override IEnumerable<Task> GetScheduledTasks()
{
bool lockTaken = false;
try
{
Monitor.TryEnter(_syncObj, ref lockTaken);
if (lockTaken)
{
return _tasks.ToArray();
}
else throw new NotSupportedException();
}
finally
{
if (lockTaken) Monitor.Exit(_syncObj);
}
}
将Task安排到线程中执行,这里以官方文档示例简单说明:
https://learn.microsoft.com/zh-cn/dotnet/api/system.threading.tasks.taskscheduler?view=net-6.0
protected sealed override void QueueTask(Task task)
{
lock (_tasks)
{
_tasks.AddLast(task);
if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
{
++_delegatesQueuedOrRunning;
NotifyThreadPoolOfPendingWork();
}
}
}
private void NotifyThreadPoolOfPendingWork()
{
ThreadPool.UnsafeQueueUserWorkItem(_ =>
{
_currentThreadIsProcessingItems = true;
try
{
while (true)
{
Task item;
lock (_tasks)
{
if (_tasks.Count == 0)
{
--_delegatesQueuedOrRunning;
break;
}
item = _tasks.First.Value;
_tasks.RemoveFirst();
}
base.TryExecuteTask(item);
}
}
finally { _currentThreadIsProcessingItems = false; }
}, null);
}
_tasks是类内任务集合
_delegatesQueuedOrRunning和_maxDegreeOfParallelism为内部成员,分别是正在运行的任务数量和最大并发数量。
QueueTask实际上是并行运行的,外部一次StartNew()就会执行一行,没执行一次_tasks就添加当次的Task对象。保证在这个调度器下创建的所有task均在内部成员_tasks中。
当没添加一个后_delegatesQueuedOrRunning自增,通过ThreadPool开辟线程,并从_tasks中取出第一个执行。
TryExecuteTask()是同步方法,即任务执行完毕后才会执行之后代码,所以while循环中的内容就是从_tasks中取出然后执行。
示例中的QueueTask实现,实际上_delegatesQueuedOrRunning不小于_maxDegreeOfParallelism时后续的task只做add处理了,所有实际上由_maxDegreeOfParallelism个NotifyThreadPoolOfPendingWork的实例(或副本)或者说内部的while循环在运行,当有每一个while循环,或者说每一个线程无法再从_tasks中取出task时光_delegatesQueuedOrRunning就自减,并结束循环。
最后,官方文档中有提到ConcurrentQueue,即前面提到的_tasks如果使用ConcurrentQueue类型,将更加方便高效。
从 .NET Framework 4 开始,此队列使用类似于类的无锁算法ConcurrentQueue。 使用此无锁实现,线程池在排队和取消队列工作项时花费的时间更少。 此性能优势可用于使用线程池的所有程序。
尝试以内连的方式运行task,该如何理解这个内敛呢?
官方对这个方法的说明是这样:
派生自 TaskScheduler 实现此函数的类,以支持在启动该任务对象等待的线程上内联执行任务。 内联执行是可选的,可以通过返回 false 拒绝请求。 但是,可以内联的任务越多,计划程序就越能缩放。 事实上,内联太少的计划程序可能容易出现死锁。 适当的实现应确保按计划程序保证的策略执行的请求可以成功内联执行。 例如,如果计划程序使用专用线程执行任务,则来自该线程的任何内联请求都应成功。
如果计划程序决定执行内联执行,则应通过调用具有提供的任务对象的基 TaskScheduler TryExecuteTask 方法来执行此操作,从而传播返回值。 如果计划程序决定遵守内联请求,则它也可能适合从其内部数据结构中删除内联任务。 但是,请注意,在某些情况下,可能会要求计划程序内联以前未使用 QueueTask 该方法提供给它的任务。
通过查看源码,这个方法的调用主要是在Task.Wait()、WaitAll()的时候。个人理解的意思是这样,这些方法是将异步转同步的,即异步方法不结束,主线程就阻塞的,即我让小明搬100块砖,Wait小明的时候我就是闲着的,尝试内联就是自己试试能不也参与搬砖,如果我不忙,那么我也参与搬砖,就等于两个人同时搬砖了。try的意思就是当前Wait()的线程不一定允许参与执行task,如果不可以那么还是由指定的线程执行,这样在一定程度上能够多一个并发数。
参考微软官方的例子,自定义一个任务调度器,然后执行如下任务:
static void Main()
{
List<Task> ts = new List<Task>();
MyTaskScheduler scheduler = new MyTaskScheduler(3);
TaskFactory taskFactory = new TaskFactory(scheduler);
for (int i = 0; i < 10; i++)
{
Task t = taskFactory.StartNew((n) =>
{
int s = 2000;
Thread.Sleep(s);
Console.WriteLine(">task threadid = {0},taskid = {1} index = {2}", Thread.CurrentThread.ManagedThreadId, Task.CurrentId,n);
}, i);
ts.Add(t);
}
Console.WriteLine("wait -> threadid={0}", Thread.CurrentThread.ManagedThreadId);
Task.WaitAll(ts.ToArray());
Console.WriteLine("end");
}
TryExecuteTaskInline方法内直接return false时,输出如下:
wait -> threadid=1
>task threadid = 5,taskid = 2 index = 1
>task threadid = 3,taskid = 1 index = 0
>task threadid = 7,taskid = 3 index = 2
>task threadid = 3,taskid = 5 index = 4
>task threadid = 5,taskid = 4 index = 3
>task threadid = 7,taskid = 6 index = 5
>task threadid = 5,taskid = 8 index = 7
>task threadid = 3,taskid = 7 index = 6
>task threadid = 7,taskid = 9 index = 8
>task threadid = 5,taskid = 10 index = 9
end
任务是三个三个执行的,主线程id为1,而所有任务在3、5、7线程上执行。
把TryExecuteTaskInline方法换成如下:
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
//如果当前还有线程在运行,就尝试内联执行,否则就返回
if (!_currentThreadIsProcessingItems) return false;
//输出task信息
Console.WriteLine("tryInLine TaskId={0},,PreviouslyQueued={1}", task.Id, taskWasPreviouslyQueued.ToString());
if (taskWasPreviouslyQueued)//如果被排队了
if (TryDequeue(task))//从队列中移除(带lock校验的,可能不成功)
return base.TryExecuteTask(task);//如果移除了就在当前线程执行
else
return false;
else
return base.TryExecuteTask(task);
}
运行结果如下:
wait -> threadid=1
tryInLine TaskId=10,,PreviouslyQueued=True
>task threadid = 1,taskid = 10 index = 9
>task threadid = 7,taskid = 3 index = 2
tryInLine TaskId=9,,PreviouslyQueued=True
>task threadid = 3,taskid = 1 index = 0
>task threadid = 4,taskid = 2 index = 1
>task threadid = 1,taskid = 9 index = 8
>task threadid = 4,taskid = 6 index = 5
>task threadid = 3,taskid = 5 index = 4
tryInLine TaskId=8,,PreviouslyQueued=True
>task threadid = 7,taskid = 4 index = 3
>task threadid = 4,taskid = 7 index = 6
>task threadid = 3,taskid = 8 index = 7
end
任务是四个四个执行,部分任务的执行线程与主线程一致,并且内连执行是从任务列表的后向前执行的,这样就很好理解了,我找了三个人搬砖,Wait的时候我没事干,我也帮忙搬砖,但总不能影响搬砖工的工作,只能从砖摞后面开始,这样汇聚的时候就两边人就都知道结束了,也不会和别人有干扰。
当然,当甩手掌柜,直接return false也是可以的。
TaskScheduler本身是一个抽象类,除了前面说的一些静态方法外,再来看下内部实现。
源码中只有三处实现:
ThreadPoolTaskScheduler、SynchronizationContextTaskScheduler、ConcurrentExclusiveTaskScheduler
源码位置:https://referencesource.microsoft.com/#mscorlib/system/threading/Tasks/ThreadPoolTaskScheduler.cs,b76a4a6f77962f28,references
namespace System.Threading.Tasks
{
///
/// An implementation of TaskScheduler that uses the ThreadPool scheduler
///
internal sealed class ThreadPoolTaskScheduler: TaskScheduler
{
///
/// Constructs a new ThreadPool task scheduler object
///
internal ThreadPoolTaskScheduler()
{
int id = base.Id; // force ID creation of the default scheduler
}
// static delegate for threads allocated to handle LongRunning tasks.
private static readonly ParameterizedThreadStart s_longRunningThreadWork = new ParameterizedThreadStart(LongRunningThreadWork);
private static void LongRunningThreadWork(object obj)
{
Contract.Requires(obj != null, "TaskScheduler.LongRunningThreadWork: obj is null");
Task t = obj as Task;
Contract.Assert(t != null, "TaskScheduler.LongRunningThreadWork: t is null");
t.ExecuteEntry(false);
}
///
/// Schedules a task to the ThreadPool.
///
/// The task to schedule.
[SecurityCritical]
protected internal override void QueueTask(Task task)
{
if ((task.Options & TaskCreationOptions.LongRunning) != 0)
{
// Run LongRunning tasks on their own dedicated thread.
Thread thread = new Thread(s_longRunningThreadWork);
thread.IsBackground = true; // Keep this thread from blocking process shutdown
thread.Start(task);
}
else
{
// Normal handling for non-LongRunning tasks.
bool forceToGlobalQueue = ((task.Options & TaskCreationOptions.PreferFairness) != 0);
ThreadPool.UnsafeQueueCustomWorkItem(task, forceToGlobalQueue);
}
}
///
/// This internal function will do this:
/// (1) If the task had previously been queued, attempt to pop it and return false if that fails.
/// (2) Propagate the return value from Task.ExecuteEntry() back to the caller.
///
/// IMPORTANT NOTE: TryExecuteTaskInline will NOT throw task exceptions itself. Any wait code path using this function needs
/// to account for exceptions that need to be propagated, and throw themselves accordingly.
///
[SecurityCritical]
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
// If the task was previously scheduled, and we can't pop it, then return false.
if (taskWasPreviouslyQueued && !ThreadPool.TryPopCustomWorkItem(task))
return false;
// Propagate the return value of Task.ExecuteEntry()
bool rval = false;
try
{
rval = task.ExecuteEntry(false); // handles switching Task.Current etc.
}
finally
{
// Only call NWIP() if task was previously queued
if(taskWasPreviouslyQueued) NotifyWorkItemProgress();
}
return rval;
}
[SecurityCritical]
protected internal override bool TryDequeue(Task task)
{
// just delegate to TP
return ThreadPool.TryPopCustomWorkItem(task);
}
[SecurityCritical]
protected override IEnumerable<Task> GetScheduledTasks()
{
return FilterTasksFromWorkItems(ThreadPool.GetQueuedWorkItems());
}
private IEnumerable<Task> FilterTasksFromWorkItems(IEnumerable<IThreadPoolWorkItem> tpwItems)
{
foreach (IThreadPoolWorkItem tpwi in tpwItems)
{
if (tpwi is Task)
{
yield return (Task)tpwi;
}
}
}
///
/// Notifies the scheduler that work is progressing (no-op).
///
internal override void NotifyWorkItemProgress()
{
ThreadPool.NotifyWorkItemProgress();
}
///
/// This is the only scheduler that returns false for this property, indicating that the task entry codepath is unsafe (CAS free)
/// since we know that the underlying scheduler already takes care of atomic transitions from queued to non-queued.
///
internal override bool RequiresAtomicStartTransition
{
get { return false; }
}
}
}
从源码来看,这是internal sealed的类,通过查找引用,其只有一处使用,即在TaskScheduler中。
默认的Task多线程均是使用该类来管理的,他通过线程池进行调度,将长任务(TaskCreationOptions.LongRunning)开辟新的线程执行,其他的使用ThreadPool.UnsafeQueueCustomWorkItem执行。
internal sealed class SynchronizationContextTaskScheduler : TaskScheduler
{
private SynchronizationContext m_synchronizationContext;
///
/// Constructs a SynchronizationContextTaskScheduler associated with
///
/// This constructor expects to be set.
internal SynchronizationContextTaskScheduler()
{
SynchronizationContext synContext = SynchronizationContext.Current;
// make sure we have a synccontext to work with
if (synContext == null)
{
throw new InvalidOperationException(Environment.GetResourceString("TaskScheduler_FromCurrentSynchronizationContext_NoCurrent"));
}
m_synchronizationContext = synContext;
}
///
/// Implemetation of for this scheduler class.
///
/// Simply posts the tasks to be executed on the associated .
///
///
[SecurityCritical]
protected internal override void QueueTask(Task task)
{
m_synchronizationContext.Post(s_postCallback, (object)task);
}
///
/// Implementation of for this scheduler class.
///
/// The task will be executed inline only if the call happens within
/// the associated .
///
///
///
[SecurityCritical]
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
if (SynchronizationContext.Current == m_synchronizationContext)
{
return TryExecuteTask(task);
}
else
return false;
}
// not implemented
[SecurityCritical]
protected override IEnumerable<Task> GetScheduledTasks()
{
return null;
}
///
/// Implementes the property for
/// this scheduler class.
///
/// By default it returns 1, because a based
/// scheduler only supports execution on a single thread.
///
public override Int32 MaximumConcurrencyLevel
{
get
{
return 1;
}
}
// preallocated SendOrPostCallback delegate
private static SendOrPostCallback s_postCallback = new SendOrPostCallback(PostCallback);
// this is where the actual task invocation occures
private static void PostCallback(object obj)
{
Task task = (Task) obj;
// calling ExecuteEntry with double execute check enabled because a user implemented SynchronizationContext could be buggy
task.ExecuteEntry(true);
}
}
从构造函数可以看出必须在当前同步上下文不为空时使用。
因为是internal sealed类,无法直接使用,唯一的使用方法是通过TaskScheduler.FromCurrentSynchronizationContext()实例化。
内部的Task的执行方式实际上是通过同步上下文的异步post执行的:SynchronizationContext.Post();
Concurrent是“并发”的意思。Exclusive是“独占”的意思。ConcurrentExclusiveTaskScheduler是内置的并发独占任务调度器,实例化时通过ConcurrentExclusiveSchedulerPair的构造函数实现,其所有的构造函数走的都是同一个构造函数。
public ConcurrentExclusiveSchedulerPair(TaskScheduler taskScheduler, int maxConcurrencyLevel, int maxItemsPerTask)
{
// Validate arguments
if (taskScheduler == null) throw new ArgumentNullException("taskScheduler");
if (maxConcurrencyLevel == 0 || maxConcurrencyLevel < -1) throw new ArgumentOutOfRangeException("maxConcurrencyLevel");
if (maxItemsPerTask == 0 || maxItemsPerTask < -1) throw new ArgumentOutOfRangeException("maxItemsPerTask");
Contract.EndContractBlock();
// Store configuration
m_underlyingTaskScheduler = taskScheduler;
m_maxConcurrencyLevel = maxConcurrencyLevel;
m_maxItemsPerTask = maxItemsPerTask;
// Downgrade to the underlying scheduler's max degree of parallelism if it's lower than the user-supplied level
int mcl = taskScheduler.MaximumConcurrencyLevel;
if (mcl > 0 && mcl < m_maxConcurrencyLevel) m_maxConcurrencyLevel = mcl;
// Treat UNLIMITED_PROCESSING/-1 for both MCL and MIPT as the biggest possible value so that we don't
// have to special case UNLIMITED_PROCESSING later on in processing.
if (m_maxConcurrencyLevel == UNLIMITED_PROCESSING) m_maxConcurrencyLevel = Int32.MaxValue;
if (m_maxItemsPerTask == UNLIMITED_PROCESSING) m_maxItemsPerTask = Int32.MaxValue;
// Create the concurrent/exclusive schedulers for this pair
m_exclusiveTaskScheduler = new ConcurrentExclusiveTaskScheduler(this, 1, ProcessingMode.ProcessingExclusiveTask);
m_concurrentTaskScheduler = new ConcurrentExclusiveTaskScheduler(this, m_maxConcurrencyLevel, ProcessingMode.ProcessingConcurrentTasks);
}
它在内部会创建两个ConcurrentExclusiveTaskScheduler调度器(所以类型是**Pair),独占的任务调度器m_exclusiveTaskScheduler(最大并发等级1)和并发任务调度器m_concurrentTaskScheduler。
TaskScheduler是一个抽象类,需要自定义新的调度器重载它。
public class MyTaskScheduler : TaskScheduler
{
protected override IEnumerable<Task> GetScheduledTasks()
{
throw new NotImplementedException();
}
protected override void QueueTask(Task task)
{
throw new NotImplementedException();
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
throw new NotImplementedException();
}
}
默认必须重载的方法有上面三个。
通过前文对这几个方法的说明,以实际示例展示如何自定义一个可设定最大并发数量的多线程调度器。
(以下实例仅供参考)
public class LimitedConcurrencyTaskScheduler : TaskScheduler
{
//任务队列
private LinkedList<Task> _tasks = new LinkedList<Task>();
//最大并发数量
private readonly int _maxThreadCount;
//当前运行的线程数
private int _aliveThreadCount = 0;
//有线程在忙
private bool _anyThreadWorking;
///
/// 是否正在运行
///
public bool IsBusy => _aliveThreadCount > 0;
///
/// 最大并发数量
///
public override int MaximumConcurrencyLevel => _maxThreadCount;
public LimitedConcurrencyTaskScheduler()
{
int num = Environment.ProcessorCount / 2;
_maxThreadCount = ((num < 1) ? 1 : num);
}
public LimitedConcurrencyTaskScheduler(int maxThreadCount)
{
_maxThreadCount = ((maxThreadCount < 1) ? 1 : maxThreadCount);
}
protected override void QueueTask(Task task)
{
lock (_tasks)
{
//加入队列
_tasks.AddLast(task);
if (_aliveThreadCount < _maxThreadCount)
{
//激活线程数小于最大并发限制时开新线程
Thread thread = new Thread(ThreadWorkItem)
{
IsBackground = true,
Priority = ThreadPriority.Normal
};
//激活线程数自增并启动线程
Interlocked.Increment(ref _aliveThreadCount);
if (!thread.IsAlive)
{
thread.Start();
}
}
}
}
///
/// 每个线程的处理内容
///
private void ThreadWorkItem()
{
try
{//不断从队列中取task并执行
_anyThreadWorking = true;
while (true)
{
Task task;
lock (_tasks)
{
if (_tasks.Count == 0)
{
break;
}
task = _tasks.First.Value;
_tasks.RemoveFirst();
}
base.TryExecuteTask(task);
}
}
finally
{
//队列清空后结束当前线程的工作
_anyThreadWorking = false;
Interlocked.Decrement(ref _aliveThreadCount);
}
}
///
/// debug调试时获取
///
///
protected override IEnumerable<Task> GetScheduledTasks()
{
return _tasks;//只能返回目前还未执行的
}
///
/// 尝试移除
///
///
///
protected sealed override bool TryDequeue(Task task)
{
lock (_tasks) return _tasks.Remove(task);
}
///
/// 尝试以内联的方式运行
///
///
///
///
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
//如果任务正在执行,直接返回
if (!_anyThreadWorking) return false;
//输出task信息
Console.WriteLine("tryInLine TaskId={0},,PreviouslyQueued={1}", task.Id, taskWasPreviouslyQueued.ToString());
bool dequeuedAndNoExcuted = false;
bool inLine = false;
if (taskWasPreviouslyQueued)//如果被排队了
if (TryDequeue(task))//从队列中移除(带lock校验的,可能不成功)
{
inLine = base.TryExecuteTask(task);//如果移除了就在当前线程执行
dequeuedAndNoExcuted = !inLine;
}
else
inLine = false;
else
inLine = base.TryExecuteTask(task);
if (dequeuedAndNoExcuted)
{
lock (_tasks)
_tasks.AddLast(task);
}
return inLine;
}
}