• C#异步编程是怎么回事(番外)


    在上一篇通信协议碰到了多线程,阻塞、非阻塞、锁、信号量...,会碰到很多问题。因此我感觉很有必要研究多线程异步编程

    首先以一个例子开始

    image

    我说明一下这个例子。
    这是一个演示异步编程的例子。

    • 输入job [name],在一个同步的Main方法中,以一发即忘的方式调用异步方法StartJob()
    • 输入time,调用同步方法PrintCurrentTime()输出时间。
    • 输出都带上线程ID,便于观察。
      可以看到,主线程不会阻塞。主线程在同步方法中使用一发即忘的方式调用异步方法时,在异步方法中碰到阻塞时,主线程返回同步方法中继续执行。而异步方法在另一个线程中继续执行。
      程序如下
    internal class Program
    {
        static void Main(string[] args)
        {
            while (true)
            {
                Console.WriteLine($"(Thread ID: {Thread.CurrentThread.ManagedThreadId}) Enter 'time' to get current time or 'job [name]' to start a job:");
                string input = Console.ReadLine();
    
                if (input.StartsWith("time"))
                {
                    // 输出当前时间
                    PrintCurrentTime();
                }
                else if (input.StartsWith("job"))
                {
                    // 启动一个异步任务,执行指定的工作
                    string[] parts = input.Split(new char[] { ' ' }, 2);
                    string jobName = parts.Length > 1 ? parts[1] : string.Empty;
                    StartJob(jobName);
                }
                else
                {
                    Console.WriteLine("Invalid input. Please try again.");
                }
            }
        }
    
        static void PrintCurrentTime()
        {
            Console.WriteLine($"(Thread ID: {Thread.CurrentThread.ManagedThreadId}) Current time: {DateTime.Now}");
        }
    
        static async void StartJob(string jobName)
        {
            // 获取主线程的线程 ID
            int mainThreadId = Thread.CurrentThread.ManagedThreadId;
    
            // 检查是否在主线程上
            bool onMainThread = Thread.CurrentThread.ManagedThreadId == mainThreadId;
    
            Console.WriteLine($"(Thread ID: {Thread.CurrentThread.ManagedThreadId}) Starting job '{jobName}'. This will take 10 seconds...");
    
            // 输出主线程上下文移动情况
            Console.WriteLine($"(Thread ID: {Thread.CurrentThread.ManagedThreadId}) Main thread context moved to new thread: {(!onMainThread)}");
    
            await Task.Delay(10000); // 模拟任务需要10秒钟完成
    
            // 输出任务完成信息及上下文移动情况
            Console.WriteLine($"(Thread ID: {Thread.CurrentThread.ManagedThreadId}) Job '{jobName}' completed. Main thread context moved to new thread: {(!onMainThread)}");
        }
    
    }
    

    上下文流转

    一个方法从一个线程代码栈被切换,或者说被剪切到另一个线程代码栈上去,可以称为上下文流转
    这对于理解异步编程是一个重要的点。
    但由于上面的程序缺少必要变量,我需要在不同位置加几个变量,来展示上下文确实被移动了。

    static async void StartJob(string jobName)
    {
    	int mainThreadId = Thread.CurrentThread.ManagedThreadId;
    	// 检查是否在主线程上
    	bool onMainThread = Thread.CurrentThread.ManagedThreadId == mainThreadId;
    	...
    }
    

    image
    可以看到onMainThread一直为False,这个变量从线程1移动到线程5
    而且bool是值类型,在栈上面,这说明StartJob这段代码确实移动到线程5的栈上面去了。(每个线程都有一个调用栈)

    使用VS调试窗口监视线程

    想要再进一步,更清晰的话说明上下文流转的话,那就得监视这两个线程栈的内容了。万幸的是 vs提供了这个功能,调试 > 窗口 > 并行堆栈

    • 命中断点时,StartJob方法在主线程24876上
      image

    • 10秒后再次命中,StartJob方法跑到了任务线程上。而主线程现在在Main函数的Console.ReadLine()那里阻塞
      image

    • 代码阻塞与线程阻塞
      在上面的例子中我们引出两种现象,代码阻塞线程阻塞
      代码阻塞时,线程不一定阻塞,原线程没有阻塞,去执行别的代码了,而由新线程接手当前上下文和调用栈阻塞在这里,比如这里的await Task.Delay(10000)
      代码阻塞时线程也可能阻塞,比如lock(lockObj)Console.ReadLine()
      为了方便,我们姑且这样命名吧

      • 代码阻塞时,线程不阻塞称之为等待await
      • 代码阻塞时,线程也阻塞称之为阻塞block
    • 为什么有两个箭头
      这里为什么有线程24666和27548两个NET TP Worker(.NET Thread Pool (TP) Worker)?据chatGPT解释,Delay语句在线程池中找了一个线程去执行,一旦延迟时间到达,StartJob会在其中一个线程池线程上恢复执行。计时是一个线程,恢复上下文是另一个线程。Delay就代表了我们的那个耗时线程(不是异步方法所在线程)。
      既然有两个线程的联动,其中就出现了一些熟悉的东西。信号量Semaphore,一次性信号量的消耗TrySetResult,但详细过程我还不清楚。
      MSDN上的例子也是这样
      image

    以同步的方式进行异步编程

    原来把异步方法的上下文移动到新线程N,保证主线程不阻塞(脱离主线程U)。然后N用第三个线程C执行耗时任务,最后把C结果给位于N中的上下文。
    站在代码编写者的角度,不特意去看线程的话,就不会注意到异步方法的上下文从一个线程跑到另一个线程上去了。这就是所谓的以同步的方式进行异步编程。
    那么线程N的执行就明晰了。先保存上下文,然后启用新线程C进行耗时任务,并阻塞。等C使用信号量或其他什么通知N时,N再根据C的结果继续执行。
    可以这样总结

    • asyncawait是一个语法糖。
    • 以同步的方式进行异步编程的方式是使用语法糖,以同步的方式书写代码,然后编译成适当的异步的实现。

    我列出几种可能的异步的实现

    1. 异步状态机

    • 异步状态机是C#编译async语法糖的实现方式
    • 异步方法StartJob将会被编译成一个同步方法StartJobAsync和一个状态机StartJobAsyncMachine
    • 状态机流转上下文的方式是将新线程用到的变量提升为字段,储存于可被线程共享的进程堆中
    • MoveNext方法可以被不同线程执行,这是关键
    点击查看代码
    internal class Program
    {
        ...
    
        internal static void StartJobAsync(string jobName)
        {
            StartJobAsyncMachine stateMachine = new StartJobAsyncMachine();
            stateMachine.builder = AsyncVoidMethodBuilder.Create();
            stateMachine.jobName = jobName;
            stateMachine.state = -1;
            stateMachine.builder.Start(ref stateMachine);
        }
    
        public sealed class StartJobAsyncMachine : IAsyncStateMachine
        {
            public int state;
    
            public AsyncVoidMethodBuilder builder;
    
            private TaskAwaiter taskAwaiter;
    
            //形参会编译成public字段
            public string jobName;
            //被新线程使用的局部变量会编译成private字段
            private bool onMainThread;
    
            private void MoveNext()
            {
                int num = state;
                try
                {
                    TaskAwaiter awaiter;
                    if (num != 0)
                    {
                        // 获取主线程的线程 ID
                        int mainThreadId = Thread.CurrentThread.ManagedThreadId;
    
                        // 检查是否在主线程上
                        onMainThread = Thread.CurrentThread.ManagedThreadId == mainThreadId;
    
                        Console.WriteLine($"(Thread ID: {Thread.CurrentThread.ManagedThreadId}) Starting job '{jobName}'. This will take 10 seconds...");
    
                        // 输出主线程上下文移动情况
                        Console.WriteLine($"(Thread ID: {Thread.CurrentThread.ManagedThreadId}) Main thread context moved to new thread: {(!onMainThread)}");
                        awaiter = Task.Delay(10000).GetAwaiter();
    
                        if (!awaiter.IsCompleted)
                        {
                            num = (state = 0);
                            taskAwaiter = awaiter;
                            StartJobAsyncMachine stateMachine = this;
                            builder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine);
                            return;
                        }
                    }
                    else
                    {
                        awaiter = taskAwaiter;
                        taskAwaiter = default(TaskAwaiter);
                        num = (state = -1);
                    }
                    awaiter.GetResult();
                    // 输出任务完成信息及上下文移动情况
                    Console.WriteLine($"(Thread ID: {Thread.CurrentThread.ManagedThreadId}) Job '{jobName}' completed. Main thread context moved to new thread: {(!onMainThread)}");
                }
                catch (Exception exception)
                {
                    state = -2;
                    builder.SetException(exception);
                    return;
                }
                state = -2;
                builder.SetResult();
            }
    
            void IAsyncStateMachine.MoveNext()
            {
                this.MoveNext();
            }
    
            private void SetStateMachine(IAsyncStateMachine stateMachine)
            {
            }
    
            void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine stateMachine)
            {
                this.SetStateMachine(stateMachine);
            }
    
        }
    }
    

    StartJobAsync的调用和原方法等效。我在Main中在加一种指令jobMachine调用StartJobAsync。原来的改为job空格

    else if (input.StartsWith("jobMachine "))
    {
        // 启动一个异步任务,执行指定的工作
        string[] parts = input.Split(new char[] { ' ' }, 2);
        string jobName = parts.Length > 1 ? parts[1] : string.Empty;
        StartJobAsync(jobName);
    }
    

    image

    2. 协程

    这种方法到底叫协程还是异步迭代器,我不太分得清,但目的是能够达到的,我暂且就叫做协程好了。
    虽然这种做法就像脱裤子放屁,因为协程最后也会编译成状态机。这个例子主要是为了演示直观。
    理论上,C#中的异步/等待(async/await)语法并不是直接编译成协程的,而是由编译器生成状态机(state machine)来管理异步操作。但是,我们可以通过理解协程的工作原理以及C#异步/等待模型的特性,来描绘一种可能的编译结果。
    这里我写了一个基于协程的异步的实现。效果和原来的等同。

    • 原理
      和状态机实现基本一样。对于每个async方法生成一个协程。而且在异步方法嵌套时,那么async方法内部的async方法在编译时就不需要开一个新线程了。要不然得多少线程。
    internal class Program
    {
        static void Main(string[] args)
        {
            while (true)
            {
                ...
                else if (input.StartsWith("jobCorotine "))
                {
                    // 启动一个异步任务,执行指定的工作
                    string[] parts = input.Split(new char[] { ' ' }, 2);
                    string jobName = parts.Length > 1 ? parts[1] : string.Empty;
                    StartJobAsync_2(jobName);
                }
                ...
            }
        }
    
        #region 异步协程
        static void StartJobAsync_2(string jobName)
        {
            StartJobAsyncCorotine startJobCorotine = new StartJobAsyncCorotine();
            startJobCorotine.jobName = jobName;
            var enumerator = startJobCorotine.DelayedOperations();
            var iterator = enumerator.GetEnumerator();
            bool next = false;
            while (true)
            {
                next = iterator.MoveNext();
                if (!iterator.Current.IsCompleted)
                {
                    //异步方法中存在耗时任务,切换到新线程
                    break;
                }
                next = false;
            }
            if (next == false)
            {
                return;
            }
            //异步方法存在耗时任务,切换上下文到新线程
            Task.Run(() =>
            {
                do
                {
                    if (!iterator.Current.IsCompleted)
                    {
                        //创建耗时任务线程进行耗时任务
                        Task.Run(() =>
                        {
                            iterator.Current.GetResult();
                        }).Wait();
                    }
                }
                while (iterator.MoveNext());
            });
        }
    
        public sealed class StartJobAsyncCorotine
        {
            //形参因为需要运行时赋值,只能写成字段的形式
            public string jobName;
    
            public int Count = 1;
    
            public IEnumerable DelayedOperations()
            {
                TaskAwaiter awaiter1;
    
                // 获取主线程的线程 ID
                int mainThreadId = Thread.CurrentThread.ManagedThreadId;
    
                // 检查是否在主线程上
                bool onMainThread = Thread.CurrentThread.ManagedThreadId == mainThreadId;
    
                Console.WriteLine($"(Thread ID: {Thread.CurrentThread.ManagedThreadId}) Starting job '{jobName}'. This will take 10 seconds...");
    
                // 输出主线程上下文移动情况
                Console.WriteLine($"(Thread ID: {Thread.CurrentThread.ManagedThreadId}) Main thread context moved to new thread: {(!onMainThread)}");
    
                awaiter1 = Task.Delay(10000).GetAwaiter(); // 模拟任务需要10秒钟完成
                //出去判断这是否是耗时任务以切换线程
                yield return awaiter1;
    
                // 输出任务完成信息及上下文移动情况
                Console.WriteLine($"(Thread ID: {Thread.CurrentThread.ManagedThreadId}) Job '{jobName}' completed. Main thread context moved to new thread: {(!onMainThread)}");
            }
        }
        #endregion
    }
    
    • 效果确实和原来一样

    image

    3. 闭包

    这真不需要多说,通过闭包进行捕获上下文真的是太常见了,Ajax中用到吐🤮

    带返回值的上下文流转

    StartJob是没有返回值的,假如我们需要一个返回值呢,比如一个bool,用于判断接下来的执行流程。
    调用异步方法StartJob的同步方法Main之间存在着绝对的分界线——两个线程。同步方法不会被交给异步方法中的那个新线程,没法在同步方法中以同步的方式进行异步编程
    唯一的一点看头是,至少Task还给我们留下了一个回调ContinueWith可用。但条件允许的话,何不把回调的内容写在异步方法内部呢?

  • 相关阅读:
    vb.net加本地SQL server数据库圣经
    分布式事务中的那些事——微服务总结(二)
    【YOLOv8分割】在 预测阶段 如何预测整个目录并输出二值mask图像
    华纳云:如何使用Docker进行有效的日志管理?
    [glacierctf 2022] 只会3个
    20道常被问到的JavaScript题目
    LeetCode知识点总结 - 89
    安全狗受邀亮相第二届工控中国大会
    340. 至多包含 K 个不同字符的最长子串
    鸿蒙HarmonyOS实战-Stage模型(信息传递载体Want)
  • 原文地址:https://www.cnblogs.com/ggtc/p/18229724