• 模拟实现.net中的Task机制:探索异步编程的奥秘


    .net中使用Task可以方便地编写异步程序,为了更好地理解Task及其调度机制,接下来模拟Task的实现,目的是搞清楚:

    1. Task是什么
    2. Task是如何被调度的

    基本的Task模拟实现

    从最基本的Task用法开始

    Task.Run(Action action)
    
    • 1

    这个命令的作用是将action作为一项任务提交给调度器,调度器会安排空闲线程来处理。
    我们使用Job来模拟Task

    public class Job
    {
        private readonly Action _work;
    
        public Job(Action work) => _work = work;
        public JobStatus Status { get; internal set; }
    
        internal protected virtual void Invoke()
        {
            Status = JobStatus.Running;
            _work();
            Status = JobStatus.Completed;
        }
    
        public void Start(JobScheduler? scheduler = null)
            => (scheduler ?? JobScheduler.Current).QueueJob(this);
    
        public static Job Run(Action work)
        {
            var job = new Job(work);
            job.Start();
            return job;
        }
    }
    
    public enum JobStatus
    {
        Created,
        Scheduled,
        Running,
        Completed
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    这里也定义了同Task一样的静态Run方法,使用方式也与Task类似

    Job.Run(() => Console.WriteLine($"Job1, thread:{Thread.CurrentThread.ManagedThreadId}"));
    
    • 1

    作为对比,使用Task时的写法如下,多了await关键字,后文会讨论。

    await Task.Run(()=>() => Console.WriteLine($"Task1, thread:{Thread.CurrentThread.ManagedThreadId}"));
    
    • 1

    调用Job.Run方法时,会基于给定的Action创建一个Job,然后执行job.Start(), 但Job没有立即开始执行,而是通过QueueJob方法提交给了调度器,由调度器来决定Job何时执行,在Job真正被执行时会调用其Invoke方法,此时给定的Action就会被执行了,同时会对应修改Job的状态,从Running到Completed。简单来说,.net的Task的基本工作过程与这个粗糙的Job一样,由此可见,Task/Job代表一项具有某种状态的操作

    基于线程池的调度

    但Task/Job的执行依赖与调度器,这里用JobScheduler来模拟,.net默认使用基于线程池的调度策略,我们也模拟实现一个ThreadPoolJobScheduler
    首先看下JobScheduler,作为抽象基类,其QueueJob方法将有具体的某个调度器(ThreadPoolJobScheduler)来实现:

    public abstract class JobScheduler
    {
        public abstract void QueueJob(Job job);
        public static JobScheduler Current { get; set; } = new ThreadPoolJobScheduler();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    ThreadPoolJobScheduler实现的QueueJob如下:

    public class ThreadPoolJobScheduler : JobScheduler
    {
        public override void QueueJob(Job job)
        {
            job.Status = JobStatus.Scheduled;
            var executionContext = ExecutionContext.Capture();
            ThreadPool.QueueUserWorkItem(_ => ExecutionContext.Run(executionContext!,
                _ => job.Invoke(), null));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    ThreadPoolJobScheduler会将Job提交给线程池,并将Job状态设置为Scheduled。

    使用指定线程进行调度

    JobScheduler的Current属性默认设置为基于线程的调度,如果有其它调度器也可以更换,但为什么要更换呢?这要从基于线程的调度的局限说起,对于一些具有较高优先级的任务,采用这个策略可能会无法满足需求,比如当线程都忙的时候,新的任务可能迟迟无法被执行。对于这种情况,.net可以通过设置TaskCreationOptions.LongRunning来解决,解析来先用自定义的调度器来解决这个问题:

    public class DedicatedThreadJobScheduler : JobScheduler
    {
        private readonly BlockingCollection _queues=new();
        private readonly Thread[] _threads;
    
        public DedicatedThreadJobScheduler(int threadCount)
        {
            _threads=new Thread[threadCount];
            for(int index=0; index< threadCount; index++)
            {
                _threads[index] =new Thread(Invoke);
            }
            Array.ForEach(_threads, thread=>thread.Start());
    
            void Invoke(object? state){
                while(true){
                    _queues.Take().Invoke();
                }
            }
        }
    
        public override void QueueJob(Job job)
        {
            _queues.Add(job);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    在启动DedicatedThreadJobScheduler时,会启动指定数量的线程,这些线程会不停地从队列中取出任务并执行。
    接下来看看.net的TaskCreationOptions.LongRunning怎么用:

    await Task.Factory.StartNew(LongRunningMethod, TaskCreationOptions.LongRunning);
    
    static void LongRunningMethod()
    {
        // Simulate a long-running operation
        Console.WriteLine("Long-running task started on thread {0}.", Thread.CurrentThread.ManagedThreadId);
        Thread.Sleep(10000);
        Console.WriteLine("Long-running task finished on thread {0}.", Thread.CurrentThread.ManagedThreadId);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    任务顺序的编排

    在使用Task时,经常会使用await关键字,来控制多个异步任务之间的顺序,await实际上是语法糖,在了解await之前,先来看看最基本的ContinueWith方法。

    var taskA = Task.Run(() => DateTime.Now);
    var taskB = taskA.ContinueWith(time => Console.WriteLine(time.Result));
    await taskB;
    
    • 1
    • 2
    • 3

    模仿Task,我们给Job也添加ContinueWith方法。

    public class Job
    {
        private readonly Action _work;
        private Job? _continue;
    
        public Job(Action work) => _work = work;
        public JobStatus Status { get; internal set; }
    
        internal protected virtual void Invoke()
        {
            Status = JobStatus.Running;
            _work();
            Status = JobStatus.Completed;
            _continue?.Start();
        }
    
        public void Start(JobScheduler? scheduler = null)
            => (scheduler ?? JobScheduler.Current).QueueJob(this);
    
        public static Job Run(Action work)
        {
            var job = new Job(work);
            job.Start();
            return job;
        }
    
        public Job ContinueWith(Action tobeContinued)
        {
            if (_continue == null)
            {
                var job = new Job(() => tobeContinued(this));
                _continue = job;
            }
            else
            {
                _continue.ContinueWith(tobeContinued);
            }
            return this;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    这个ContinueWith方法会将下一个待执行的Job放在_continue,这样多个顺序执行的Job就会构成一个链表。
    在当前Job的Invoke方法执行结束时,会触发下一个Job被调度。
    使用示例:

    Job.Run(() =>
    {
        Thread.Sleep(1000);
        Console.WriteLine("11");
    }).ContinueWith(_ =>
    {
        Thread.Sleep(1000);
        Console.WriteLine("12");
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    进一步使用await关键字来控制

    要像Task一样使用await,需要Job支持有GetAwaiter方法。任何一个类型,只要有了这个GetAwaiter方法,就可以对其使用await关键字了。
    c#的Task类中可以找到GetAwaiter

    public TaskAwaiter GetAwaiter();
    
    • 1

    然后TaskAwaiter继承了ICriticalNotifyCompletion接口

    public readonly struct TaskAwaiter : System.Runtime.CompilerServices.ICriticalNotifyCompletion
    
    • 1

    照猫画虎,也为Job添加一个最简单的JobAwaiter

    public class Job
    {
        ...
    
        public JobAwaiter GetAwaiter() => new(this);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    JobAwaiter的定义如下:

    public struct JobAwaiter : ICriticalNotifyCompletion
    {
        private readonly Job _job;
        public readonly bool IsCompleted => _job.Status == JobStatus.Completed;
    
        public JobAwaiter(Job job)
        {
            _job = job;
            if (job.Status == JobStatus.Created)
            {
                job.Start();
            }
        }
        
        public void GetResult() { }
    
        public void OnCompleted(Action continuation)
        {
            _job.ContinueWith(_ => continuation());
        }
    
        public void UnsafeOnCompleted(Action continuation)
        => OnCompleted(continuation);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    添加了await后,前面的代码也可以这样写:

    await F1();
    await F2();
    
    static Job F1() => new Job(() =>
    {
            Thread.Sleep(1000);
            Console.WriteLine("11");
    });
    
    static Job F2() => new Job(() =>
    {
            Thread.Sleep(1000);
            Console.WriteLine("12");
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    总结

    回顾开头的两个问题,现在可以尝试给出答案了。

    1. Task是什么,Task是一种有状态的操作(Created,Scheduled,Running,Completed),是对耗时操作的抽象,就像现实中的一项任务一样,它的执行需要相对较长的时间,它也有创建(Created),安排(Scheduled),执行(Running),完成(Completed)的基本过程。任务完成当然需要拿到结果的,这里的Job比较简单,没有模拟具体的结果;
    2. Task是如何被调度的,默认采用基于线程池的调度,即创建好Task后,由线程池中的空闲线程执行,具体什么时候执行、由哪个线程执行,开发者是不用关心的,在具体执行过程中,
      但由于.net全局线程池的局限,对于一些特殊场景无法满足时(比如需要立即执行Task),此时可以通过TaskCreationOptions更改调度行为;

    另外,await是语法糖,它背后的实现是基于GetAwaiter,由其返回ICriticalNotifyCompletion接口的实现,并对ContinueWith做了封装。

  • 相关阅读:
    ABAP中FIELD-SYMBOLS的详细用法
    用Markdown Nice写作
    基于用户行为的交易反欺诈探索
    Vue.js入门教程(五)
    InetAddress.getByName背后发生了什么
    vue3 element-plus 组件table表格 勾选框回显(初始化默认回显)完整静态代码
    如何使用 pyqt 实现 Groove 音乐播放器
    力扣(LeetCode)1620. 网络信号最好的坐标(C++)
    Win10配置IIS与 C#/.net项目的发布与IIS部署
    最新ChatGPT网站源码+支持GPT4.0+支持Midjourney绘画+支持国内全AI模型
  • 原文地址:https://blog.csdn.net/zhixin9001/article/details/134172184