• .NET Core多线程 (2) 异步 - 上


    合集:.NET Core多线程温故知新

     

    去年换工作时系统复习了一下.NET Core多线程相关专题,学习了一线码农老哥的《.NET 5多线程编程实战》课程,我将复习的知识进行了总结形成本专题。

    本篇,我们来复习一下异步的相关知识点,预计阅读时间10分钟。

    理解异步的本质

    (1)异步是什么?

    举个例子,在高峰期去餐厅吃饭,会先排队拿个小票,然后去逛一下玩玩,等到排到时会被通知就餐,这时再回到餐厅就可以点餐了。

    同步示意图:

    异步示意图:

    (2)同步有什么弊端

    时间片切换成本高!

    • CPU密集型操作:编码解码、图形计算、正则表达式等
    • IO密集型操作:和硬件打交道,和DB打交道等
    • 线程太多的烦恼/代价:
    • 新开Thread是有开销的(时间、空间)
    • GC回收会冻结所有线程寻找引用根(gcroot)

    程序有可能会卡死!

    • Thread会和网络驱动程序打交道(外网络地址)
    • ThreadPool中的WorkQueue任务(4000+)得不到处理
    • 异步:async/await

    (3)C#如何使用异步?

    ThreadPool线程池分类:

      • workThread:

        • 适用于CPU密集型,在WinDbg中标签为 ThreadPool Worker

      • IOThread:

        • 适用于IO密集型,在WinDbg中标签为 ThreadPool Completion Port

    HttpClient案例演示:在下面的代码中GetContentLengthAsync异步方法中的线程就用的IOThread,可以通过WinDbg验证。

    复制代码
    namespace ConsoleApp3
    {
        class Program
        {
            static void Main(string[] args)
            {
                GetContentLengthAsync("http://cnblogs.com");
    
                Console.WriteLine($"主线程:{Environment.CurrentManagedThreadId}, 准备退出!");
                Console.ReadLine();
            }
    
            static async Task<int> GetContentLengthAsync(string url)
            {
                using (HttpClient client = new HttpClient())
                {
                    var content = await client.GetStringAsync(url);
    
                    Console.WriteLine($"当前线程:{Environment.CurrentManagedThreadId}, content={content.Length}");
    
                    return content.Length;
                }
            }
        }
    }
    复制代码

    异步的底层:IO完成端口

    (1)理解IO完成端口

    异步的核心:callback机制

    IO完成端口:这是一个Windows内核对象,我们常称之为IOCP。IOCP是一个异步I/O的Windows API,它可以高效地将I/O事件通知给应用程序,类似于Linux中的Epoll。因此,.NET Framework是基于IOCP来实现的异步,而.NET Core则增加了基于epoll来实现异步,因为它要支持跨平台而不只是Windows。SafeHandle:文件句柄、网络句柄...

    核心步骤:

    • 初始化时将SafeHandle、ThreadPool与IO完成端口进行绑定(比如:FileStream在Init时)

    • (主线程)创建IO完成端口:CreateIoCompletionPort(IntPtr FileHandle, IntPtr ExistingCompletionPort, IntPtr CompletionKey, uint NumberOfConcurrentThreads)

    • (主线程)将消息塞到IO完成端口的Queue队列:PostQueuedCompletionStatus

    • (子线程)从IO完成端口的Queue队列中获取消息:GetQueuedCompletionStatu

    (2)实现一个简单的IO完成端口

    自定义一个IOCP类,代码如下:

    复制代码
    public class IOCP
    {
        [DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)]
        public static extern SafeFileHandle CreateIoCompletionPort(IntPtr FileHandle, IntPtr ExistingCompletionPort, IntPtr CompletionKey, uint NumberOfConcurrentThreads);
    
        [DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)]
        public static extern bool GetQueuedCompletionStatus(SafeFileHandle CompletionPort,
            out uint lpNumberOfBytesTransferred, out IntPtr lpCompletionKey,
            out IntPtr lpOverlapped, uint dwMilliseconds);
    
        [DllImport("Kernel32", CharSet = CharSet.Auto)]
        public static extern bool PostQueuedCompletionStatus(SafeFileHandle CompletionPort, uint dwNumberOfBytesTransferred, IntPtr dwCompletionKey, IntPtr lpOverlapped);
    }
    复制代码

    调用端代码如下:

    复制代码
    // 1. 创建IO完成端口
    var safehandle = IOCP.CreateIoCompletionPort(new IntPtr(-1), IntPtr.Zero, IntPtr.Zero, 1);
    
    var thread = new Thread(() =>
    {
        Console.WriteLine($"工作线程: {Environment.CurrentManagedThreadId} 开始获取数据...");
        while (true)
        {
            // 3. get数据
            IOCP.GetQueuedCompletionStatus(safehandle, out var ipn, out var ipc, out var lop, int.MaxValue);
    
            var receiveData = Convert.ToString(GCHandle.FromIntPtr(lop).Target);
    
            Console.WriteLine($"工作线程: {Environment.CurrentManagedThreadId} 获取数据成功!{receiveData}");
    
            Thread.Sleep(1000);
        }
    });
    
    thread.Start();
    
    // 2. post 数据
    var data = (IntPtr)GCHandle.Alloc("hello world");
    
    IOCP.PostQueuedCompletionStatus(safehandle, 4096, IntPtr.Zero, data);
    
    Console.WriteLine($"主线程: {Environment.CurrentManagedThreadId} 塞入数据成功!");
    
    Console.ReadLine();
    复制代码

    加深对异步的理解

    我们都知道 ContinueWith 主要起 延续任务的作用,写起来十分繁琐!.NET 4.5推出了语法糖async/await大大简化了异步编程的工作量

    下面展示使用ContinueWith 和 async/await 的两种方式的代码量:

    复制代码
    /// 
    /// continutewith 的版本
    /// 
    /// 
    static Taskstring>> GetContentListContinute()
    {
        var list = new List<string>();
    
        SqlConnection connection = new SqlConnection("Server=LocalHost; Persist Security Info=False;Integrated Security=SSPI;Database= PostDB;");
    
        var task = connection.OpenAsync().ContinueWith(t =>
            {
                SqlCommand command = new SqlCommand("select PostContent from Post", connection);
    
                return command.ExecuteReaderAsync().ContinueWith(t2 =>
                {
                    var reader = t2.Result;
    
                    return GetContent(reader, list).ContinueWith(t3 =>
                    {
                        return list;
                    });
                }).Unwrap();
    
            }).Unwrap();
    
        return task;
    }
    
    static Task<bool> GetContent(SqlDataReader reader, List<string> list)
    {
        return reader.ReadAsync().ContinueWith(t =>
        {
            var hasRow = t.Result;
    
            if (hasRow)
            {
                list.Add(reader.GetString(0));  //读取reader的值
                GetContent(reader, list);
            }
    
            return false;
        });
    }
    
    /// 
    /// await+async 的异步写法
    /// 
    /// 
    static async Taskstring>> GetContentListAsync()
    {
        List<string> list = new List<string>();
    
        SqlConnection connection = new SqlConnection("Server=LocalHost; Persist Security Info=False;Integrated Security=SSPI;Database= PostDB;");
    
        await connection.OpenAsync();
    
        SqlCommand command = new SqlCommand("select PostContent from Post", connection);
    
        var reader = command.ExecuteReader();
    
        while (await reader.ReadAsync())
        {
            list.Add(reader.GetString(0));
        }
    
        return list;
    }
    复制代码

    async/await语法糖的底层原理

    从编译后的IL代码来看,async/await只是编译器提供的语法糖,它并不是一种新的异步模型,而只是一种简化异步代码编写的方式。

    从反编译后的代码来看,对于async/await的方法编译器会新生成一个实现了IAsyncStateMachine接口的状态机类。

    (1)IAsyncStateMachine接口定义:

    复制代码
    public interface IAsyncStateMachine
    {
        void MoveNext();
        void SetStateMachine(IAsyncStateMachine stateMachine);
    }
    复制代码

    (2)IAsyncStateMachine实现类的基本执行步骤

    • step1.初始化一个异步状态机machine

    • step2.初始化一个AsyncTaskMethodBuilder的实例,赋予machine.builder

    • step3.设置异步状态机的状态为-1,将类传入到状态机内部

    • step4.调用machine.builder的start方法

    • step5.返回machine.builder.Task

    (3).NET提供异步方式的总结:

    • .NET 4.5开始提供的async/await,本质是.NET 4.0的Task + 状态机

    • .NET 4.0开始提供的Task,本质是.NET 3.5提供的Thread+ThreadPool+等待/取消等API操作

    小结

    本篇,我们复习了异步相关的基础知识,但由于内容太多,因此将其拆分为了两篇推文。下一篇,我们继续异步相关知识。

    参考资料

    一线码农,腾讯课堂《.NET 5多线程编程实战

    不明作者,《Task调度与await》

     

  • 相关阅读:
    高级安装程序将应用程序转换为MSIX
    PMP的最新发展趋势?你可知?
    朔雪流量复制器的前端
    在创建的关于tensorflow的envs环境中无法import出pip中显示的包
    centos常见的命令
    Linux常用命令
    XCTF1-web unseping
    Docker部署GItLab
    Python Web开发(十一):ORM 对关联表的操作
    MATLAB矩阵
  • 原文地址:https://www.cnblogs.com/edisonchou/p/dotnet_multithread_learning_notes_chap2.html