• 如何在long-running task中调用async方法


    什么是 long-running thread

    long-running task 是指那些长时间运行的任务,比如在一个 while True 中执行耗时较长的同步处理。

    下面的例子中,我们不断从队列中尝试取出数据,并对这些数据进行处理,这样的任务就适合交给一个 long-running task 来处理。

    var queue = new BlockingCollection<string>();
    
    Task.Factory.StartNew(() =>
    {
        while (true)
        {
            // BlockingCollection.Take() 方法会阻塞当前线程,直到队列中有数据可以取出。
            var input = queue.Take();
            Console.WriteLine($"You entered: {input}");
        }
    }, TaskCreationOptions.LongRunning);
    
    
    while (true)
    {
        var input = Console.ReadLine();
        queue.Add(input);
    }
    

    在 .NET 中,我们可以使用 Task.Factory.StartNew 方法并传入 TaskCreationOptions.LongRunning 来创建一个 long-running task。

    虽然这种方式创建的 long-running task 和默认创建的 task 一样,都是分配给 ThreadPoolTaskScheduler 来调度的, 但 long-running task 会被分配到一个新的 Background 线程上执行,而不是交给 ThreadPool 中的线程来执行。

    class ThreadPoolTaskScheduler : TaskScheduler
    {
        // ...
        protected internal override void QueueTask(Task task)
        {
            TaskCreationOptions options = task.Options;
            if (Thread.IsThreadStartSupported && (options & TaskCreationOptions.LongRunning) != 0)
            {
                // 在一个新的 Background 线程上执行 long-running task。
                new Thread(s_longRunningThreadWork)
                {
                    IsBackground = true,
                    Name = ".NET Long Running Task"
                }.UnsafeStart(task);
            }
            else
            {
                // 非 long-running task 交给 ThreadPool 中的线程来执行。
                ThreadPool.UnsafeQueueUserWorkItemInternal(task, (options & TaskCreationOptions.PreferFairness) == 0);
            }
        }
        // ...
    }
    

    为什么long-running task要和普通的task分开调度

    如果一个task持续占用一个线程,那么这个线程就不能被其他的task使用,这和 ThreadPool 的设计初衷是相违背的。

    如果在 ThreadPool 中创建了大量的 long-running task,那么就会导致
    ThreadPool 中的线程不够用,从而影响到其他的 task 的执行。

    在 long-running task await 一个 async 方法后会发生什么

    有时候,我们需要在 long-running task 中调用一个 async 方法。比如下面的例子中,我们需要在 long-running task 中调用一个 async
    的方法来处理数据。

    var queue = new BlockingCollection<string>();
    
    Task.Factory.StartNew(async () =>
    {
        while (true)
        {
            var input = queue.Take();
            Console.WriteLine($"Before process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
            await ProcessAsync(input);
            Console.WriteLine($"After process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
        }
    }, TaskCreationOptions.LongRunning);
    
    async Task ProcessAsync(string input)
    {
        // 模拟一个异步操作。
        await Task.Delay(100);
        Console.WriteLine($"You entered: {input}, thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
    }
    
    while (true)
    {
        var input = Console.ReadLine();
    
        queue.Add(input);
    }
    
    TaskScheduler InternalCurrentTaskScheduler()
    {
        var propertyInfo = typeof(TaskScheduler).GetProperty("InternalCurrent", BindingFlags.Static | BindingFlags.NonPublic);
        return (TaskScheduler)propertyInfo.GetValue(null);
    }
    

    连续输入 1、2、3、4,输出如下:

    1
    Before process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
    You entered: 1, thread id: 4, task scheduler: , thread pool: True
    After process: thread id: 4, task scheduler: , thread pool: True
    2
    Before process: thread id: 4, task scheduler: , thread pool: True
    You entered: 2, thread id: 4, task scheduler: , thread pool: True
    After process: thread id: 4, task scheduler: , thread pool: True
    3
    Before process: thread id: 4, task scheduler: , thread pool: True
    You entered: 3, thread id: 4, task scheduler: , thread pool: True
    After process: thread id: 4, task scheduler: , thread pool: True
    4
    Before process: thread id: 4, task scheduler: , thread pool: True
    You entered: 4, thread id: 4, task scheduler: , thread pool: True
    After process: thread id: 4, task scheduler: , thread pool: True
    

    从执行结果中可以看出,第一次 await 之前,当前线程是 long-running task 所在的线程(thread id: 9),此后就变成了 ThreadPool
    中的线程(thread id: 4)。

    至于为什么之后一直是 ThreadPool 中的线程(thread id: 4),这边做一下简单的解释。在我以前一篇介绍 await 的文章中介绍了 await 的执行过程,以及 await 之后的代码会在哪个线程上执行。

    https://www.cnblogs.com/eventhorizon/p/15912383.html

    1. 第一次 await 前,当前线程是 long-running task 所在的线程(thread id: 9),绑定了 TaskScheduler(ThreadPoolTaskScheduler),也就是说 await 之后的代码会被调度到 ThreadPool 中执行。
    2. 第一次 await 之后的代码被调度到 ThreadPool 中的线程(thread id: 4)上执行。
    3. ThreadPool 中的线程不会绑定 TaskScheduler,也就意味着之后的代码还是会在 ThreadPool 中的线程上执行,并且是本地队列优先,所以一直是 thread id: 4 这个线程在从本地队列中取出任务在执行。

    线程池的介绍请参考我另一篇博客
    https://www.cnblogs.com/eventhorizon/p/15316955.html

    回到本文的主题,如果在 long-running task 使用了 await 调用一个 async 方法,就会导致为 long-running task 分配的独立线程提前退出,和我们的预期不符。

    long-running task 中 调用 一个 async 方法的可能姿势

    使用 Task.Wait#

    在 long-running task 中调用一个 async 方法,可以使用 Task.Wait 来阻塞当前线程,直到 async 方法执行完毕。
    对于 Task.Factory.StartNew 创建出来的 long-running task 来说,因为其绑定了 ThreadPoolTaskScheduler,就算是使用 Task.Wait
    阻塞了当前线程,也不会导致死锁。
    并且 Task.Wait 会把异常抛出来,所以我们可以在 catch 中处理异常。

    // ...
    Task.Factory.StartNew( () =>
    {
        while (true)
        {
            var input = queue.Take();
            Console.WriteLine($"Before process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
            ProcessAsync(input).Wait();
            Console.WriteLine($"After process: thread id: {Thread.CurrentThread.ManagedThreadId}");
        }
    }, TaskCreationOptions.LongRunning);
    // ...
    

    输出如下:

    1
    Before process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
    You entered: 1, thread id: 5, task scheduler: , thread pool: True
    After process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
    2
    Before process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
    You entered: 2, thread id: 5, task scheduler: , thread pool: True
    After process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
    3
    Before process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
    You entered: 3, thread id: 5, task scheduler: , thread pool: True
    After process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
    4
    Before process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
    You entered: 4, thread id: 5, task scheduler: , thread pool: True
    After process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
    

    Task.Wait 并不会对 async 方法内部产生影响,所以 async 方法内部的代码还是按照正常的逻辑执行。这边 ProcessAsync 方法内部打印的
    thread id 没变纯粹是因为 ThreadPool 目前就只创建了一个线程,你可以疯狂输入看看结果。

    关于 Task.Wait 的使用,可以参考我另一篇博客
    https://www.cnblogs.com/eventhorizon/p/17481757.html

    使用自定义的 TaskScheduler 来创建 long-running task#

    Task.Factory.StartNew(async () =>
    {
        while (true)
        {
            var input = queue.Take();
            Console.WriteLine(
                $"Before process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
            await ProcessAsync(input);
            Console.WriteLine(
                $"After process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
        }
    }, CancellationToken.None, TaskCreationOptions.None, new CustomerTaskScheduler());
    
    class CustomerTaskScheduler : TaskScheduler
    {
        // 这边的 BlockingCollection 只是举个例子,如果是普通的队列,配合锁也是可以的。
        private readonly BlockingCollection _tasks = new BlockingCollection();
    
        public CustomerTaskScheduler()
        {
            var thread = new Thread(() =>
            {
                foreach (var task in _tasks.GetConsumingEnumerable())
                {
                    TryExecuteTask(task);
                }
            })
            {
                IsBackground = true
            };
            thread.Start();
        }
    
        protected override IEnumerable GetScheduledTasks()
        {
            return _tasks;
        }
    
        protected override void QueueTask(Task task)
        {
            _tasks.Add(task);
        }
    
        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {
            return false;
        }
    }
    

    输出如下:

    1
    Before process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
    You entered: 1, thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
    After process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
    2
    Before process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
    You entered: 2, thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
    After process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
    3
    Before process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
    You entered: 3, thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
    After process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
    4
    Before process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
    You entered: 4, thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
    After process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
    

    因为修改了上下文绑定的 TaskScheduler,会影响到 async 方法内部 await 回调的执行。

    这种做法不推荐使用,因为可能会导致死锁。

    如果我将 await 改成 Task.Wait,就会导致死锁。

    Task.Factory.StartNew(() =>
    {
        while (true)
        {
            var input = queue.Take();
            Console.WriteLine(
                $"Before process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
            ProcessAsync(input).Wait();
            Console.WriteLine(
                $"After process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
        }
    }, CancellationToken.None, TaskCreationOptions.None, new CustomerTaskScheduler());
    

    输出如下:

    1
    Before process: thread id: 7, task scheduler: CustomerTaskScheduler, thread pool: False
    

    后面就没有输出了,因为死锁了,除非我们在 ProcessAsync 方法内部每个 await 的 Task 后加上ConfigureAwait(false)。

    同理,同学们也可以尝试用 SynchronizationContext 来实现类似的效果,同样有死锁的风险。

    总结

    如果你想要在一个 long-running task 中执行 async 方法,使用 await 关键字会导致 long-running task 的独立线程提前退出。

    比较推荐的做法是使用 Task.Wait。如果连续执行多个 async 方法,建议将这些 async 方法封装成一个新方法,然后只 Wait 这个新方法的 Task。

  • 相关阅读:
    Java之多线程(6个demo)
    深度解读李彦宏的“不要卷模型,要卷应用”
    作为外贸业务员,为什么我经常随机轻松 就“捡“到精准潜在客户
    elasticdump官方教程
    Docker入门
    动态规划:总结
    Hadoop(二): YARN(资源管理器 RM)、HBase高可用集群搭建
    弘辽科技:淘宝怎么才能获得更多流量?要做点什么?
    LVS-DR模式 +keepalived
    【Pytorch】torch.nn.LeakyReLU()
  • 原文地址:https://www.cnblogs.com/eventhorizon/p/17497359.html