• NETCore中实现一个轻量无负担的极简任务调度ScheduleTask


    至于任务调度这个基础功能,重要性不言而喻,大多数业务系统都会用到,世面上有很多成熟的三方库比如Quartz,Hangfire,Coravel
    这里我们不讨论三方的库如何使用 而是从0开始自己制作一个简易的任务调度,如果只是到分钟级别的粒度基本够用

    技术栈用到了:BackgroundServiceNCrontab

    第一步我们定义一个简单的任务约定,不干别的就是一个执行方法:

        public interface IScheduleTask
        {
            Task ExecuteAsync();
        }
        public abstract class ScheduleTask : IScheduleTask
        {
            public virtual Task ExecuteAsync()
            {
                return Task.CompletedTask;
            }
        }
    

    第二步定义特性标注任务执行周期等信的metadata

        [AttributeUsage(AttributeTargets.Class, AllowMultiple = true, Inherited = false)]
        public class ScheduleTaskAttribute(string cron) : Attribute
        {
            /// 
            /// 支持的cron表达式格式 * * * * *:https://en.wikipedia.org/wiki/Cron
            /// 最小单位为分钟
            /// 
            public string Cron { get; set; } = cron;
            public string? Description { get; set; }
            /// 
            /// 是否异步执行.默认false会阻塞接下来的同类任务
            /// 
            public bool IsAsync { get; set; } = false;
            /// 
            /// 是否初始化即启动,默认false
            /// 
            public bool IsStartOnInit { get; set; } = false;
        }
    

    第三步我们定义一个调度器约定,不干别的就是判断当前的任务是否可以执行:

        public interface IScheduler
        {
            /// 
            /// 判断当前的任务是否可以执行
            /// 
            bool CanRun(ScheduleTaskAttribute scheduleMetadata, DateTime referenceTime);
        }
    

    好了,基础步骤就完成了,如果我们需要实现配置级别的任务调度或者动态的任务调度 那我们再抽象一个Store:

        public class ScheduleTaskMetadata(Type scheduleTaskType, string cron)
        {
            public Type ScheduleTaskType { get; set; } = scheduleTaskType;
            public string Cron { get; set; } = cron;
            public string? Description { get; set; }
            public bool IsAsync { get; set; } = false;
            public bool IsStartOnInit { get; set; } = false;
        }
        public interface IScheduleMetadataStore
        {
            /// 
            /// 获取所有ScheduleTaskMetadata
            /// 
            Task> GetAllAsync();
        }
    

    实现一个Configuration级别的Store

        internal class ConfigurationScheduleMetadataStore(IConfiguration configuration) : IScheduleMetadataStore
        {
            const string Key = "BiwenQuickApi:Schedules";
    
            public Task> GetAllAsync()
            {
                var options = configuration.GetSection(Key).GetChildren();
    
                if (options?.Any() is true)
                {
                    var metadatas = options.Select(x =>
                    {
                        var type = Type.GetType(x[nameof(ConfigurationScheduleOption.ScheduleType)]!);
                        if (type is null)
                            throw new ArgumentException($"Type {x[nameof(ConfigurationScheduleOption.ScheduleType)]} not found!");
    
                        return new ScheduleTaskMetadata(type, x[nameof(ConfigurationScheduleOption.Cron)]!)
                        {
                            Description = x[nameof(ConfigurationScheduleOption.Description)],
                            IsAsync = string.IsNullOrEmpty(x[nameof(ConfigurationScheduleOption.IsAsync)]) ? false : bool.Parse(x[nameof(ConfigurationScheduleOption.IsAsync)]!),
                            IsStartOnInit = string.IsNullOrEmpty(x[nameof(ConfigurationScheduleOption.IsStartOnInit)]) ? false : bool.Parse(x[nameof(ConfigurationScheduleOption.IsStartOnInit)]!),
                        };
                    });
                    return Task.FromResult(metadatas);
                }
                return Task.FromResult(Enumerable.Empty());
            }
        }
    

    然后呢,我们可能需要多任务调度的事件做一些操作或者日志存储.比如失败了该干嘛,完成了回调其他后续业务等.我们再来定义一下具体的事件IEvent,具体可以参考我上一篇文章:
    https://www.cnblogs.com/vipwan/p/18184088

        public abstract class ScheduleTaskEvent(IScheduleTask scheduleTask, DateTime eventTime) : IEvent
        {
            /// 
            /// 任务
            /// 
            public IScheduleTask ScheduleTask { get; set; } = scheduleTask;
            /// 
            /// 触发时间
            /// 
            public DateTime EventTime { get; set; } = eventTime;
        }
        /// 
        /// 执行完成
        /// 
        public sealed class TaskSuccessedEvent(IScheduleTask scheduleTask, DateTime eventTime, DateTime endTime) : ScheduleTaskEvent(scheduleTask, eventTime)
        {
            /// 
            /// 执行结束的时间
            /// 
            public DateTime EndTime { get; set; } = endTime;
        }
        /// 
        /// 执行开始
        /// 
        public sealed class TaskStartedEvent(IScheduleTask scheduleTask, DateTime eventTime) : ScheduleTaskEvent(scheduleTask, eventTime);
        /// 
        /// 执行失败
        /// 
        public sealed class TaskFailedEvent(IScheduleTask scheduleTask, DateTime eventTime, Exception exception) : ScheduleTaskEvent(scheduleTask, eventTime)
        {
            /// 
            /// 异常信息
            /// 
            public Exception Exception { get; private set; } = exception;
        }
    

    接下来我们再实现基于NCrontab的简易调度器,这个调度器主要是解析Cron表达式判断传入时间是否可以执行ScheduleTask,具体的代码:

        internal class SampleNCrontabScheduler : IScheduler
        {
            /// 
            /// 暂存上次执行时间
            /// 
            private static ConcurrentDictionary LastRunTimes = new();
    
            public bool CanRun(ScheduleTaskAttribute scheduleMetadata, DateTime referenceTime)
            {
                var now = DateTime.Now;
                var haveExcuteTime = LastRunTimes.TryGetValue(scheduleMetadata, out var time);
                if (!haveExcuteTime)
                {
                    var nextStartTime = CrontabSchedule.Parse(scheduleMetadata.Cron).GetNextOccurrence(referenceTime);
                    LastRunTimes.TryAdd(scheduleMetadata, nextStartTime);
    
                    //如果不是初始化启动,则不执行
                    if (!scheduleMetadata.IsStartOnInit)
                        return false;
                }
                if (now >= time)
                {
                    var nextStartTime = CrontabSchedule.Parse(scheduleMetadata.Cron).GetNextOccurrence(referenceTime);
                    //更新下次执行时间
                    LastRunTimes.TryUpdate(scheduleMetadata, nextStartTime, time);
                    return true;
                }
                return false;
            }
        }
    

    然后就是核心的BackgroundService了,这里我用的IdleTime心跳来实现,粒度分钟,当然内部也可以封装Timer等实现更复杂精度更高的调度,这里就不展开讲了,代码如下:

    
        internal class ScheduleBackgroundService : BackgroundService
        {
            private static readonly TimeSpan _pollingTime
    #if DEBUG
              //轮询20s 测试环境下,方便测试。
              = TimeSpan.FromSeconds(20);
    #endif
    #if !DEBUG
             //轮询60s 正式环境下,考虑性能轮询时间延长到60s
             = TimeSpan.FromSeconds(60);
    #endif
            //心跳10s.
            private static readonly TimeSpan _minIdleTime = TimeSpan.FromSeconds(10);
            private readonly ILogger _logger;
            private readonly IServiceProvider _serviceProvider;
            public ScheduleBackgroundService(ILogger logger, IServiceProvider serviceProvider)
            {
                _logger = logger;
                _serviceProvider = serviceProvider;
            }
            protected override async Task ExecuteAsync(CancellationToken stoppingToken)
            {
                while (!stoppingToken.IsCancellationRequested)
                {
                    var pollingDelay = Task.Delay(_pollingTime, stoppingToken);
                    try
                    {
                        await RunAsync(stoppingToken);
                    }
                    catch (Exception ex)
                    {
                        //todo:
                        _logger.LogError(ex.Message);
                    }
                    await WaitAsync(pollingDelay, stoppingToken);
                }
            }
            private async Task RunAsync(CancellationToken stoppingToken)
            {
                using var scope = _serviceProvider.CreateScope();
                var tasks = scope.ServiceProvider.GetServices();
                if (tasks is null || !tasks.Any())
                {
                    return;
                }
                //调度器
                var scheduler = scope.ServiceProvider.GetRequiredService();
                async Task DoTaskAsync(IScheduleTask task, ScheduleTaskAttribute metadata)
                {
                    if (scheduler.CanRun(metadata, DateTime.Now))
                    {
                        var eventTime = DateTime.Now;
                        //通知启动
                        _ = new TaskStartedEvent(task, eventTime).PublishAsync(default);
                        try
                        {
                            if (metadata.IsAsync)
                            {
                                //异步执行
                                _ = task.ExecuteAsync();
                            }
                            else
                            {
                                //同步执行
                                await task.ExecuteAsync();
                            }
                            //执行完成
                            _ = new TaskSuccessedEvent(task, eventTime, DateTime.Now).PublishAsync(default);
                        }
                        catch (Exception ex)
                        {
                            _ = new TaskFailedEvent(task, DateTime.Now, ex).PublishAsync(default);
                        }
                    }
                };
                //注解中的task
                foreach (var task in tasks)
                {
                    if (stoppingToken.IsCancellationRequested)
                    {
                        break;
                    }
                    //标注的metadatas
                    var metadatas = task.GetType().GetCustomAttributes();
    
                    if (!metadatas.Any())
                    {
                        continue;
                    }
                    foreach (var metadata in metadatas)
                    {
                        await DoTaskAsync(task, metadata);
                    }
                }
                //store中的scheduler
                var stores = _serviceProvider.GetServices().ToArray();
    
                //并行执行,提高性能
                Parallel.ForEach(stores, async store =>
                {
                    if (stoppingToken.IsCancellationRequested)
                    {
                        return;
                    }
                    var metadatas = await store.GetAllAsync();
                    if (metadatas is null || !metadatas.Any())
                    {
                        return;
                    }
                    foreach (var metadata in metadatas)
                    {
                        var attr = new ScheduleTaskAttribute(metadata.Cron)
                        {
                            Description = metadata.Description,
                            IsAsync = metadata.IsAsync,
                            IsStartOnInit = metadata.IsStartOnInit,
                        };
    
                        var task = scope.ServiceProvider.GetRequiredService(metadata.ScheduleTaskType) as IScheduleTask;
                        if (task is null)
                        {
                            return;
                        }
                        await DoTaskAsync(task, attr);
                    }
                });
            }
    
            private static async Task WaitAsync(Task pollingDelay, CancellationToken stoppingToken)
            {
                try
                {
                    await Task.Delay(_minIdleTime, stoppingToken);
                    await pollingDelay;
                }
                catch (OperationCanceledException)
                {
                }
            }
        }
    

    最后收尾阶段我们老规矩扩展一下IServiceCollection:

            internal static IServiceCollection AddScheduleTask(this IServiceCollection services)
            {
                foreach (var task in ScheduleTasks)
                {
                    services.AddTransient(task);
                    services.AddTransient(typeof(IScheduleTask), task);
                }
                //调度器
                services.AddScheduler();
                //配置文件Store:
    	services.AddScheduleMetadataStore();
                //BackgroundService
               services.AddHostedService();
                return services;
            }
            /// 
            /// 注册调度器AddScheduler
            /// 
            public static IServiceCollection AddScheduler<T>(this IServiceCollection services) where T : class, IScheduler
            {
                services.AddSingleton();
                return services;
            }
    
            /// 
            /// 注册ScheduleMetadataStore
            /// 
            public static IServiceCollection AddScheduleMetadataStore<T>(this IServiceCollection services) where T : class, IScheduleMetadataStore
            {
                services.AddSingleton();
                return services;
            }
    

    老规矩我们来测试一下:

        //通过特性标注的方式执行:
        [ScheduleTask(Constants.CronEveryMinute)] //每分钟一次
        [ScheduleTask("0/3 * * * *")]//每3分钟执行一次
        public class KeepAlive(ILogger logger) : IScheduleTask
        {
            public async Task ExecuteAsync()
            {
                //执行5s
                await Task.Delay(TimeSpan.FromSeconds(5));
                logger.LogInformation("keep alive!");
            }
        }
    	public class DemoConfigTask(ILogger logger) : IScheduleTask
        {
            public Task ExecuteAsync()
            {
                logger.LogInformation("Demo Config Schedule Done!");
                return Task.CompletedTask;
            }
        }
    

    通过配置文件的方式配置Store:

    {
      "BiwenQuickApi": {
        "Schedules": [
          {
            "ScheduleType": "Biwen.QuickApi.DemoWeb.Schedules.DemoConfigTask,Biwen.QuickApi.DemoWeb",
            "Cron": "0/5 * * * *",
            "Description": "Every 5 mins",
            "IsAsync": true,
            "IsStartOnInit": false
          },
          {
            "ScheduleType": "Biwen.QuickApi.DemoWeb.Schedules.DemoConfigTask,Biwen.QuickApi.DemoWeb",
            "Cron": "0/10 * * * *",
            "Description": "Every 10 mins",
            "IsAsync": false,
            "IsStartOnInit": true
          }
        ]
      }
    }
    

    我们还可以实现自己的Store,这里以放到内存为例,如果有兴趣 你可以可以自行开发一个面板管理:

        public class DemoStore : IScheduleMetadataStore
        {
            public Task> GetAllAsync()
            {
                //模拟从数据库或配置文件中获取ScheduleTaskMetadata
                IEnumerable metadatas =
                    [
                        new ScheduleTaskMetadata(typeof(DemoTask),Constants.CronEveryNMinutes(2))
                        {
                            Description="测试的Schedule"
                        },
                    ];
                return Task.FromResult(metadatas);
            }
        }
    	//然后注册这个Store:
    	builder.Services.AddScheduleMetadataStore();
    

    所有的一切都大功告成,最后我们来跑一下Demo,成功了:
    image

    当然这里是自己的固定思维设计的一个简约版,还存在一些不足,欢迎板砖轻拍指正!

    2024/05/16更新:
    提供同一时间单一运行中的任务实现

    /// 
    /// 模拟一个只能同时存在一个的任务.一分钟执行一次,但是耗时两分钟.
    /// 
    /// 
    [ScheduleTask(Constants.CronEveryMinute, IsStartOnInit = true)]
        public class OnlyOneTask(ILogger logger) : OnlyOneRunningScheduleTask
        {
            public override Task OnAbort()
            {
                logger.LogWarning($"[{DateTime.Now}]任务被打断.因为有一个相同的任务正在执行!");
                return Task.CompletedTask;
            }
    
            public override async Task ExecuteAsync()
            {
                var now = DateTime.Now;
                //模拟一个耗时2分钟的任务
                await Task.Delay(TimeSpan.FromMinutes(2));
                logger.LogInformation($"[{now}] ~ {DateTime.Now} 执行一个耗时两分钟的任务!");
            }
        }
    

    源代码我发布到了GitHub,欢迎star! https://github.com/vipwan/Biwen.QuickApi
    https://github.com/vipwan/Biwen.QuickApi/tree/master/Biwen.QuickApi/Scheduling

  • 相关阅读:
    9、正则表达式入门
    【leetcode】 剑指 Offer学习计划(java版本含注释)(下)
    .NET餐厅管理系统菜品添加页面前端
    janus-gateway安装(docker方式)(centos7)
    ZZNUOJ_Java软件的下载安装和写代码
    cfssl使用方法重新整理说明
    vue项目实战-完成路由组件的搭建
    关于外网java后端服务访问内网minio中间件,因连接minio超时,启动失败问题
    《嵌入式 - 深入剖析STM32》STM32 启动流程详解(GCC)
    自动化脚本一键安装 jdk,hadoop,hive
  • 原文地址:https://www.cnblogs.com/vipwan/p/18194062/biwen-quickapi-scheduletask