• 使用ABP SignalR重构消息服务(二)


    使用ABP SignalR重构消息服务(二)

    上篇使用ABP SignalR重构消息服务(一)主要讲的是SignalR的基础知识和前端如何使用SignalR,这段时间也是落实方案设计。这篇我主要讲解SignalR源码(最近我手头工作比较忙@蟹老板)。

    SignalR源码分析(原地址,原地址已经停止维护了合并到了AspNetCore#

    使用SignalR我们主要是添加services.AddSignalR();,添加ChatHub类继承我们的Hub ,然后管道注入endpoints.MapHub<ChatHub>("/ChatHub");
    通过services.AddSignalR()可以看到使用的类是SignalRDependencyInjectionExtensions
    通过Hub类可以看到程序集是Microsoft.AspNetCore.SignalR.Core
    通过MapHub<ChatHub>可以看到使用的类是HubEndpointRouteBuilderExtensions

    SignalR服务注册#

    我们先分析services.AddSignalR()注入做了什么准备

    这里我们要讲一个东西Microsoft.AspNetCore.SignalR.Core类库有一个SignalRDependencyInjectionExtensions
    Microsoft.AspNetCore.SignalR类库也存在一个SignalRDependencyInjectionExtensions

    Microsoft.AspNetCore.SignalR类库中的SignalRDependencyInjectionExtensions解读

    public static class SignalRDependencyInjectionExtensions
    {
        // 单独注入SignalR配置
        public static ISignalRServerBuilder AddHubOptions<THub>(this ISignalRServerBuilder signalrBuilder, Action<HubOptions<THub>> configure) where THub : Hub
        {
            if (signalrBuilder == null)
            {
                throw new ArgumentNullException(nameof(signalrBuilder));
            }
    
            signalrBuilder.Services.AddSingleton<IConfigureOptions<HubOptions<THub>>, HubOptionsSetup<THub>>();
            signalrBuilder.Services.Configure(configure);
            return signalrBuilder;
        }
    
        //  添加SignalR服务
        public static ISignalRServerBuilder AddSignalR(this IServiceCollection services)
        {
            if (services == null)
            {
                throw new ArgumentNullException(nameof(services));
            }
            // ConnectionsDependencyInjectionExtensions拓展类 添加请求路由、添加身份验证、添加Http连接调度程序、添加Http连接管理器
            services.AddConnections();
            // 禁用WebSocket保持活动,因为SignalR有它自己的
            services.Configure<WebSocketOptions>(o => o.KeepAliveInterval = TimeSpan.Zero);
            services.TryAddSingleton<SignalRMarkerService>();
            services.TryAddEnumerable(ServiceDescriptor.Singleton<IConfigureOptions<HubOptions>, HubOptionsSetup>());
            //调用 Microsoft.AspNetCore.SignalR.Core 类库中的 SignalRDependencyInjectionExtensions
            return services.AddSignalRCore();
        }
    
        // 添加SignalR服务。注入SignalR配置信息
        public static ISignalRServerBuilder AddSignalR(this IServiceCollection services, Action<HubOptions> configure)
        {
            if (services == null)
            {
                throw new ArgumentNullException(nameof(services));
            }
    
            var signalrBuilder = services.AddSignalR();
            services.Configure(configure);
            return signalrBuilder;
        }
    }
    

    Microsoft.AspNetCore.SignalR.Core类库中的SignalRDependencyInjectionExtensions解读
    这里面注入了SignalR中核心类,所以下面的代码我们一定要仔细研读了。

    public static class SignalRDependencyInjectionExtensions
    {    
        // 将最小的基本SignalR服务添加IServiceCollection 中
        public static ISignalRServerBuilder AddSignalRCore(this IServiceCollection services)
        {
            // 用于标记SignalR是否注入
            services.TryAddSingleton<SignalRCoreMarkerService>();
            // 注入默认集线器生命周期管理器
            services.TryAddSingleton(typeof(HubLifetimeManager<>), typeof(DefaultHubLifetimeManager<>));
            // 注入默认集线器协议解析器
            services.TryAddSingleton(typeof(IHubProtocolResolver), typeof(DefaultHubProtocolResolver));
            // 注入集线器上下文
            services.TryAddSingleton(typeof(IHubContext<>), typeof(HubContext<>));
            services.TryAddSingleton(typeof(IHubContext<,>), typeof(HubContext<,>));
            // 注入集线器中心连接处理程序
            services.TryAddSingleton(typeof(HubConnectionHandler<>), typeof(HubConnectionHandler<>));
            // 注入获取用户唯一标识方法
            services.TryAddSingleton(typeof(IUserIdProvider), typeof(DefaultUserIdProvider));
            // 注入默认中心调度员
            services.TryAddSingleton(typeof(HubDispatcher<>), typeof(DefaultHubDispatcher<>));
            // 注入默认激活中心
            services.TryAddScoped(typeof(IHubActivator<>), typeof(DefaultHubActivator<>));
            // 添加授权
            services.AddAuthorization();
    
            var builder = new SignalRServerBuilder(services);
            // 添加Protocol转json
            builder.AddJsonProtocol();
            return builder;
        }
    }
    

    SignalR集线器设计#

    通过Hub类可以看到程序集是Microsoft.AspNetCore.SignalR.Core

    // Hub 是一个抽象类
    public abstract class Hub : IDisposable
    {
        private bool _disposed;
        // 客户端链接
        private IHubCallerClients _clients = default!;
        // 集线器呼叫中心上下文
        private HubCallerContext _context = default!;
        // 集线器组管理
        private IGroupManager _groups = default!;
        // 客户端链接(管理所有用户链接)
        public IHubCallerClients Clients
        {
            get
            {
                CheckDisposed();
                return _clients;
            }
            set
            {
                CheckDisposed();
                _clients = value;
            }
        }
        // 集线器上下文(保存当前用户链接信息)
        public HubCallerContext Context
        {
            get
            {
                CheckDisposed();
                return _context;
            }
            set
            {
                CheckDisposed();
                _context = value;
            }
        }
        // 组管理(对于组进行添加或者删除)
        public IGroupManager Groups
        {
            get
            {
                CheckDisposed();
                return _groups;
            }
            set
            {
                CheckDisposed();
                _groups = value;
            }
        }
    
        // 连接方法(用于兼容用户连接操作)
        public virtual Task OnConnectedAsync()
        {
            return Task.CompletedTask;
        }
    
        // 链接释放方法(用于监控用户下线操作)
        public virtual Task OnDisconnectedAsync(Exception? exception)
        {
            return Task.CompletedTask;
        }
    
        protected virtual void Dispose(bool disposing)
        {
        }
    
        public void Dispose()
        {
            if (_disposed)
            {
                return;
            }
    
            Dispose(true);
    
            _disposed = true;
        }
    
        private void CheckDisposed()
        {
            if (_disposed)
            {
                throw new ObjectDisposedException(GetType().Name);
            }
        }
    }
    

    SignalR中间件#

    通过MapHub可以看到使用的类是HubEndpointRouteBuilderExtensions

    app.UseEndpoints(endpoints =>
    {
      endpoints.MapHub<ChatHub>("/ChatHub");
    });
    

    HubEndpointRouteBuilderExtensions源代码

    public static class HubEndpointRouteBuilderExtensions
    {
        ................................
    
        // 注册集线器
        public static HubEndpointConventionBuilder MapHub<[DynamicallyAccessedMembers(HubAccessibility)] THub>(this IEndpointRouteBuilder endpoints, string pattern, Action<HttpConnectionDispatcherOptions>? configureOptions) where THub : Hub
        {
            // 这个就是我们上面注册SignalR保留来判断是否注入
            var marker = endpoints.ServiceProvider.GetService<SignalRMarkerService>();
    
            if (marker == null)
            {
                throw new InvalidOperationException("Unable to find the required services. Please add all the required services by calling " +
                                                    "'IServiceCollection.AddSignalR' inside the call to 'ConfigureServices(...)' in the application startup code.");
            }
            // SignalR配置信息
            var options = new HttpConnectionDispatcherOptions();
            configureOptions?.Invoke(options);
    
            // endpoints.MapConnections用来接收第一次连接请求,然后开启对于协议连接
            var conventionBuilder = endpoints.MapConnections(pattern, options, b =>
            {
                // SignalRConnectionBuilderExtensions拓展类(这里是一个重点,将我们的泛型集线器连接进行注入,就可以开始它的工作了)
                b.UseHub<THub>();
            });
    
            ....................................
            return new HubEndpointConventionBuilder(conventionBuilder);
        }
    }
    

    SignalRConnectionBuilderExtensions源代码

    public static class SignalRConnectionBuilderExtensions
    {
        public static IConnectionBuilder UseHub<[DynamicallyAccessedMembers(HubAccessibility)] THub>(this IConnectionBuilder connectionBuilder) where THub : Hub
        {
            var marker = connectionBuilder.ApplicationServices.GetService(typeof(SignalRCoreMarkerService));
            if (marker == null)
            {
                throw new InvalidOperationException("Unable to find the required services. Please add all the required services by calling " +
                    "'IServiceCollection.AddSignalR' inside the call to 'ConfigureServices(...)' in the application startup code.");
            }
            // 1.connectionBuilder.UseConnectionHandler拓展方法在 ConnectionBuilderExtensions中
            // 2.HubConnectionHandler这个不就是我们注入服务的集线器中心连接处理程序吗?
            return connectionBuilder.UseConnectionHandler<HubConnectionHandler<THub>>();
        }
    }
    

    ConnectionBuilderExtensions源代码

    public static class ConnectionBuilderExtensions
    {
        // 执行集线器的连接方法,到了这里就代表本次连接成功了
        public static IConnectionBuilder UseConnectionHandler<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicConstructors)] TConnectionHandler>(this IConnectionBuilder connectionBuilder) where TConnectionHandler : ConnectionHandler
        {
            var handler = ActivatorUtilities.GetServiceOrCreateInstance<TConnectionHandler>(connectionBuilder.ApplicationServices);
    
            // 这是一个终端中间件,所以没有必要使用'next'参数 
            return connectionBuilder.Run(connection => handler.OnConnectedAsync(connection));
        }
    }
    

    小结#

    通过services.AddSignalR()进行SignalR基础服务进行注册。
    通过Hub抽象工程,由不同的集线器继承,定义同一的连接、断开方法、客户端连接管理、群组管理、当前上下文信息。
    通过MapHub<ChatHub>通过中间件路由规则进行流量划分。
    当我们看完上面调用链路,脑中是不是已经有了一个清晰的方向了,它是怎么与前端进行连接的,并且对于注入的服务有一定的了解。

    HubConnectionHandler连接处理#

    我们已经知道进入中间件之后就会进入HubConnectionHandler.OnConnectedAsync()方法

        public override async Task OnConnectedAsync(ConnectionContext connection)
        {
            // 我们检查是否设置了HubOptions<THub>,因为它们优先于全局hub选项。  
            // 然后将keepAlive和handshakeTimeout值设置为HubOptionsSetup中的默认值,当它们显式地设置为null时。  
    
            var supportedProtocols = _hubOptions.SupportedProtocols ?? _globalHubOptions.SupportedProtocols;
            if (supportedProtocols == null || supportedProtocols.Count == 0)
            {
                throw new InvalidOperationException("There are no supported protocols");
            }
            // 默认握手超时15分钟
            var handshakeTimeout = _hubOptions.HandshakeTimeout ?? _globalHubOptions.HandshakeTimeout ?? HubOptionsSetup.DefaultHandshakeTimeout;
            // 集线器连接配置
            var contextOptions = new HubConnectionContextOptions()
            {
                KeepAliveInterval = _hubOptions.KeepAliveInterval ?? _globalHubOptions.KeepAliveInterval ?? HubOptionsSetup.DefaultKeepAliveInterval,
                ClientTimeoutInterval = _hubOptions.ClientTimeoutInterval ?? _globalHubOptions.ClientTimeoutInterval ?? HubOptionsSetup.DefaultClientTimeoutInterval,
                StreamBufferCapacity = _hubOptions.StreamBufferCapacity ?? _globalHubOptions.StreamBufferCapacity ?? HubOptionsSetup.DefaultStreamBufferCapacity,
                MaximumReceiveMessageSize = _maximumMessageSize,
                SystemClock = SystemClock,
                MaximumParallelInvocations = _maxParallelInvokes,
            };
    
            Log.ConnectedStarting(_logger);
            // 创建连接上下文,将用户信息添加到上下文中
            var connectionContext = new HubConnectionContext(connection, contextOptions, _loggerFactory);
    
            var resolvedSupportedProtocols = (supportedProtocols as IReadOnlyList<string>) ?? supportedProtocols.ToList();
            if (!await connectionContext.HandshakeAsync(handshakeTimeout, resolvedSupportedProtocols, _protocolResolver, _userIdProvider, _enableDetailedErrors))
            {
                return;
            }
    
            // 已建立connectionContext
    
            try
            {
                // 默认集线器生命周期管理器(DefaultHubLifetimeManager)将当前用户添加到连接池中
                await _lifetimeManager.OnConnectedAsync(connectionContext);
                // 获取我们对应的集线器,执行OnConnectedAsync()方法,这个时候就真正的开始执行我们写的代码了。
                // 里面有一个消息分配方法DispatchMessagesAsync(),获取我们交互的信息进行处理
                await RunHubAsync(connectionContext);
            }
            finally
            {
                connectionContext.Cleanup();
    
                Log.ConnectedEnding(_logger);
                // 当处理消息方法跳出,之后代表当前用户已经断开连接了,所以我们需要触发断线方法
                await _lifetimeManager.OnDisconnectedAsync(connectionContext);
            }
        }
    

    SignalR异步分派消息#

    //  异步分派消息
    private async Task DispatchMessagesAsync(HubConnectionContext connection)
        {
            var input = connection.Input;
            var protocol = connection.Protocol;
            connection.BeginClientTimeout();
    
            var binder = new HubConnectionBinder<THub>(_dispatcher, connection);
    
            while (true)
            {
                var result = await input.ReadAsync();
                var buffer = result.Buffer;
    
                try
                {
                    if (result.IsCanceled)
                    {
                        break;
                    }
                    // 存在消息
                    if (!buffer.IsEmpty)
                    {
                        bool messageReceived = false;
                        // 没有消息限制,只是解析和分派
                        if (_maximumMessageSize == null)
                        {
                            while (protocol.TryParseMessage(ref buffer, binder, out var message))
                            {
                                connection.StopClientTimeout();
                                // 我们接收到了消息,停止超时检查
                                messageReceived = true;
                                // 将接收的消息,根据不同的类型进行分发处理
                                await _dispatcher.DispatchMessageAsync(connection, message);
                            }
    
                            if (messageReceived)
                            {
                                // 处理完接收消息之后,开启超时检查
                                connection.BeginClientTimeout();
                            }
                        }
                        else
                        {
                            // 我们给解析器一个默认消息大小的滑动窗口  
                            var maxMessageSize = _maximumMessageSize.Value;
    
                            while (!buffer.IsEmpty)
                            {
                                var segment = buffer;
                                var overLength = false;
                                // 切分消息,慢慢进行处理
                                if (segment.Length > maxMessageSize)
                                {
                                    segment = segment.Slice(segment.Start, maxMessageSize);
                                    overLength = true;
                                }
    
                                if (protocol.TryParseMessage(ref segment, binder, out var message))
                                {
                                    connection.StopClientTimeout();
                                    // 我们接收到了消息,停止超时检查
                                    messageReceived = true;
                                    // 将接收的消息,根据不同的类型进行分发处理
                                    await _dispatcher.DispatchMessageAsync(connection, message);
                                }
                                else if (overLength)
                                {
                                    throw new InvalidDataException($"The maximum message size of {maxMessageSize}B was exceeded. The message size can be configured in AddHubOptions.");
                                }
                                else
                                {
                                    // No need to update the buffer since we didn't parse anything
                                    break;
                                }
    
                                // Update the buffer to the remaining segment
                                buffer = buffer.Slice(segment.Start);
                            }
    
                            if (messageReceived)
                            {
                                connection.BeginClientTimeout();
                            }
                        }
                    }
    
                    if (result.IsCompleted)
                    {
                        if (!buffer.IsEmpty)
                        {
                            throw new InvalidDataException("Connection terminated while reading a message.");
                        }
                        break;
                    }
                }
                finally
                {
                    // 缓冲区被分割到它被消耗的地方,所以我们可以直接开始。  我们把检查标记为缓冲。 结束,如果我们没有收到完整的帧,我们将等待更多的数据  再读一遍之前。
                    input.AdvanceTo(buffer.Start, buffer.End);
                }
            }
    

    SignalR针对用户发送消息#

    针对于群发消息,我们知道有一个组的容器,我们只要将大家添加到一个组中就可以了,那么我们想根据用户发送消息1:1的模式,SignalR源码中是怎么处理的呢?

    在注册SignalR服务中我们可以看到这个services.TryAddSingleton(typeof(IUserIdProvider), typeof(DefaultUserIdProvider));

    public class DefaultUserIdProvider : IUserIdProvider
    {
        // 获取当前用户标识
        public virtual string? GetUserId(HubConnectionContext connection)
        {
            // 这个也就是为什么我们在不做任何处理之下想使用SignalR用户模式,需要在Jwt中添加一个ClaimTypes.NameIdentifier了
            return connection.User.FindFirst(ClaimTypes.NameIdentifier)?.Value;
        }
    }
    

    我们只需要自己定义一个实现类,将默认实现替换掉就可以了。

        // 用户模式发送源码
        public override Task SendUserAsync(string userId, string methodName, object?[] args, CancellationToken cancellationToken = default)
        {
            //  connection.UserIdentifier就是执行了GetUserId()方法获取的用户标识
            return SendToAllConnections(methodName, args, (connection, state) => string.Equals(connection.UserIdentifier, (string)state!, StringComparison.Ordinal), userId, cancellationToken);
        }
    

    SignalR项目使用设计#

    上面我们已经讲完,SignalR从连接==>处理消息以及用户模式的源码设计,相信小伙伴已经脑海中已经有点东西了,那么就开始项目中实践方式

    我主要负责提供基础SignalR库,给到不同的部门进行使用,所以我首先需要考虑到一个高内聚,低耦合的设计,这里我首先不能掺杂业务逻辑,但是又需要所有业务聚合到我这边,然后通过不同的业务进行不同的处理。
    设计思路:

    • 定义两个接口IReceiveMessageISendMessage,接口中分别有MessageType属性,HandlerAsync(input)方法
    • 定义一个公用的集线器注入IEnumerable<IReceiveMessage>IEnumerable<ISendMessage>添加Receive(input)Send(input)方法通过不同的入参中的MessageType属性,从注入集合中获取对应的消息实现进行处理

    集线器伪代码

        public class SignalRHub : Hub
        {
            private readonly IEnumerable<IReceiveMessage> _receiveMessages;
            private readonly IEnumerable<ISendMessage> _sendMessages;
    
            public SignalRHub(IEnumerable<IReceiveMessage> receiveMessages,
                IEnumerable<ISendMessage> sendMessages)
            {
                _receiveMessages = receiveMessages;
                _sendMessages = sendMessages;
            }
    
            public async Task Receive(SignalRReceiveMessage input)
            {
                await _receiveMessages.FirstOrDefault(x => string.Compare(x.MessageType, input.MessageType, true) == 0).HandlerAsync(input);
            }
    
            public async Task Send(SignalRSendMessage outInput) 
            {
                await _sendMessages.FirstOrDefault(x => string.Compare(x.MessageType, outInput.MessageType, true) == 0).HandlerAsync(outInput);
            }
        }
    

    业务实现示例

        public class NotificationSendMessage : ISendMessage, ISingletonDependency
        {
            public string MessageType
            {
                get => SignalRSendMessageEnum.Notification.ToString();
            }
    
            public Task HandlerAsync(SignalRSendMessage message)
            {
                //.......业务逻辑......
            }
        }
    

    这样我就只需要接收消息,进行转发给对应实现就可以了,我给同事提供了SignalR服务,又不干涉他们的业务。

  • 相关阅读:
    基于模板匹配的图像拼接技术研究-含Matlab代码
    【系统架构设计】计算机公共基础知识: 3 计算机网络
    如何做好测试?(十)回归测试 (Regression Testing, RT)
    KafkaConsumer 消费逻辑
    什么是分布式锁?几种分布式锁分别是怎么实现的?
    华为OD机考:单词接龙C++实现
    洛谷 P1909 [NOIP2016 普及组] 买铅笔
    shiro-第一篇-基本介绍及使用
    uni-app 之 下拉刷新,上拉加载,获取网络列表数据
    求,帮看看第四题1111111111凑字数
  • 原文地址:https://www.cnblogs.com/chenxi001/p/16104617.html