• Net6Configuration & Options 源码分析 Part3 IOptionsMonitor 是如何接收到配置文件变更并同步数据源的


    配置源的同步 IOptionsMonitor 使用

    //以下demo演示使用IOptionsMonitor重新加载配置并当重新加载配置是执行回调函数

    var configuration = new ConfigurationBuilder().AddJsonFile(path: "profile.json",
                                                               optional: false,
                                                               reloadOnChange: true).Build();
    new ServiceCollection().AddOptions().Configure<Profile>(configuration).BuildServiceProvider().GetRequiredService<IOptionsMonitor<Profile>>().OnChange(profile => Console.WriteLine($"data reload: {profile.Age}"));
    Console.Read();
    
    public class Profile
    {
        public int Age { get; set; }
    }
    

    配置源的同步 IOptionsMonitor 源码分析

    当文件变更时如何向外发送通知的以及 Reload data。

    以JsonConfiguration为例:
    FileConfigurationProvider通过FileProvider.Watch当文件发生改变的时候会调用Load,load方法做了两件事情,1.调用子类同名虚方完成具体数据的reload data(由具体实现类:JsonConfigurationProvider)2。提供调用OnReload(由父类ConfigurationProvider实现)。完成对外发送data change的通知。OnReload内调用了_reloadToken.OnReload发送回调通知并产生一个新的ConfigurationReloadToken重新赋值给_reloadToken,通知注册到FileConfigurationProvider._reloadToken的回调,那么想接收到文件改变的消息只需要通过GetReloadToken()得到_reloadToken属性并将回调函数注册进去即可。
    如下是此三个类的继承关系JsonConfiguration->FileConfigurationProvider->ConfigurationProvider
    知道了这些在看下ConfigurationRoot。

    public abstract class FileConfigurationProvider : ConfigurationProvider, IDisposable
    {
        public FileConfigurationProvider(FileConfigurationSource source!!)
        {
            Source = source;
            if (Source.ReloadOnChange && Source.FileProvider != null)
            {
                _changeTokenRegistration = ChangeToken.OnChange(
                    () => Source.FileProvider.Watch(Source.Path!),
                    () =>
                    {
                        // 重新从JsonFile Load 数据并
                        Load(reload: true);
                    });
            }
        }
    
        private void Load(bool reload)
        {
            IFileInfo? file = Source.FileProvider?.GetFileInfo(Source.Path ?? string.Empty);
            using Stream stream = OpenRead(file);
            try
            {
                // 此处调用具体实现类的Load 方法例如JsonConfigurationProvider
                Load(stream);
            }
           
            // 发送OnReload 并重新生成ConfigurationReloadToken共下次使用。
            OnReload();
        }
    }
    
    public class JsonConfigurationProvider : FileConfigurationProvider
    {
        public JsonConfigurationProvider(JsonConfigurationSource source) : base(source) { }
    
        public override void Load(Stream stream)
        {
            Data = JsonConfigurationFileParser.Parse(stream);
        }
    }
    
    public abstract class ConfigurationProvider : IConfigurationProvider
    {
        protected void OnReload()
        {
            ConfigurationReloadToken previousToken = Interlocked.Exchange(ref _reloadToken, new ConfigurationReloadToken());
            previousToken.OnReload();
        }
    
        public IChangeToken GetReloadToken()
        {
            return _reloadToken;
        }
    }
    

    ConfigurationRoot会循环调用把所有的providers 并通过IConfigurationProvider.GetReloadToken()得到FileConfigurationProvider._reloadToken,然后注册上RaiseChanged作为回调函数。以文件系统为例,当文件发生改动时会调用此回调函数,此回调函数又会调用ConfigurationRoot的_changeToken.OnReload()向外发送通知。
    ConfigurationChangeTokenSource:注册的时机为ConfigurationChangeTokenSource.Configure.

    我们作为使用者注册的回调事件就是注册在OptionsMonitor._onChange中。当用户使用OptionsMonitor时,其在构造方法通过DI拿到使用ConfigurationChangeTokenSource作为包装类,其包装的是ConfigurationRoot._changeToken,并把自身的事件OptionsMonitor._onChange作为回调函数注册在包装类ConfigurationChangeTokenSource.包装的ConfigurationRoot._changeToken中。自此完成了整个回调链条。

    // ConfigurationRoot向IConfigurationProvider注册回调函数拼接回调链条。
    public class ConfigurationRoot : IConfigurationRoot, IDisposable
    {
        _providers = providers;
        _changeTokenRegistrations = new List<IDisposable>(providers.Count);
        foreach (IConfigurationProvider p in providers)
        {
            p.Load();
            // 回调链条拼接
            _changeTokenRegistrations.Add(ChangeToken.OnChange(() => p.GetReloadToken(), () => RaiseChanged()));
        }
    
        private void RaiseChanged()
        {
            ConfigurationReloadToken previousToken = Interlocked.Exchange(ref _changeToken, new ConfigurationReloadToken());
            previousToken.OnReload();
        }
    }
    
    // ConfigurationChangeTokenSource 包装类与注册 OptionsConfigurationServiceCollectionExtensions
    public class ConfigurationChangeTokenSource<TOptions> : IOptionsChangeTokenSource<TOptions>
    {
        private IConfiguration _config;
    
        public ConfigurationChangeTokenSource(IConfiguration config) : this(Options.DefaultName, config){}
    
        public IChangeToken GetChangeToken()
        {
            return _config.GetReloadToken();
        }
    }
    
    public static class OptionsConfigurationServiceCollectionExtensions
    {
         public static IServiceCollection Configure<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)] TOptions>(this IServiceCollection services!!, string? name, IConfiguration config!!, Action<BinderOptions>? configureBinder) where TOptions : class
        {
            services.AddOptions();
            services.AddSingleton<IOptionsChangeTokenSource<TOptions>>(new ConfigurationChangeTokenSource<TOptions>(name, config));
            return services.AddSingleton<IConfigureOptions<TOptions>>(new NamedConfigureFromConfigurationOptions<TOptions>(name, config, configureBinder));
        }
    }
    
    public class OptionsMonitor<[DynamicallyAccessedMembers(Options.DynamicallyAccessedMembers)] TOptions> : IOptionsMonitor<TOptions>, IDisposable
        where TOptions : class
    {
        internal event Action<TOptions, string>? _onChange;
        public OptionsMonitor(IOptionsFactory<TOptions> factory, IEnumerable<IOptionsChangeTokenSource<TOptions>> sources, IOptionsMonitorCache<TOptions> cache)
        {
            ChangeToken.OnChange(
                              () => source.GetChangeToken(),
                              (name) => InvokeChanged(name),
                              source.Name);
            private void InvokeChanged(string? name)
            {
                name = name ?? Options.DefaultName;
                _cache.TryRemove(name);
                TOptions options = Get(name);
                if (_onChange != null)
                {
                    _onChange.Invoke(options, name);
                }
            }
     
        }
         public IDisposable OnChange(Action<TOptions, string> listener)
        {
            var disposable = new ChangeTrackerDisposable(this, listener);
            _onChange += disposable.OnChange;
            return disposable;
        }
    }
    
    
    

    总结

    整个过程回调使用了两个ConfigurationReloadToken分别是。1. FileConfigurationProvider提供了一个ConfigurationReloadToken 2.提供了一个ConfigurationRoot._changeToken 。回调链条的拼接是。1.FileConfigurationProvider构造函数中文件的Watch与FileConfigurationProvider._reloadToken同时在这里也完成了数据的reload data 2 ConfigurationRoot的构造函数中与IConfigurationProvider._reloadToken进行的回调链条拼接 。第三次拼接是把用户注册的回调函注册在OptionsMonito的event上,OptionsMonito在构造函数中通过DI容器获取到ConfigurationRoot._changeToken中包装类。并把event作为回调函数进行注册.

    通过以上代码分析,当我们向创建一个具有相同通知机制的回调链条并且有多次通知 需要利用CancellationToken与 ChangeToken.OnChange 进行链接,同时要注意每次链接后向下发送消息时,要重新生成changeToken,因为changeToken的特性是只能发送一次消息。向多次必须重新生成ChangeToken例如

    ConfigurationReloadToken previousToken = Interlocked.Exchange(ref _changeToken, new ConfigurationReloadToken());
    previousToken.OnReload();
    
  • 相关阅读:
    GitLab CI/CD使用经验,来自于莫纳什大学的考试任务解析
    文件系统理论详解,Linux操作系统原理与应用
    多御安全浏览器宝藏功能全新升级,建议低调使用
    2023-9-11 台阶-Nim游戏
    续航605km,价格 11.77 万起带激光雷达,你卷我也卷
    大数据技术从业者注意了!使用代理IP时避开这些误区
    HBase 常见问题总结(一)
    线程和进程、浏览器中的JavaScript线程、JavaScript的宏任务和微任务
    做软件测试是在浪费时间吗,有未来吗?
    Python条件语句的用法
  • 原文地址:https://www.cnblogs.com/hts92/p/16012929.html