• aspnetcore使用websocket实时更新商品信息


    先演示一下效果,再展示代码逻辑。

    中间几次调用过程省略。。。

    暂时只用到了下面四个项目

    1.产品展示页面中第一次通过接口去获取数据库的列表数据

    复制代码
    /// 
    /// 获取指定的商品目录
    /// 
    /// 
    /// 
    /// 
    /// 
    [HttpGet]
    [Route("items")]
    [ProducesResponseType(typeof(PaginatedViewModel), StatusCodes.Status200OK)]
    [ProducesResponseType(typeof(IEnumerable), StatusCodes.Status200OK)]
    [ProducesResponseType(StatusCodes.Status400BadRequest)]
    public async Task Catalogs([FromQuery] int pageSize = 10, [FromQuery] int pageIndex = 0, string ids = null)
    {
        if (!string.IsNullOrEmpty(ids))
        {
            var items = await GetItemByIds(ids);
            if (!items.Any())
            {
                return BadRequest("ids value invalid. Must be comma-separated list of numbers");
            }
    
            return Ok(items);
        }
    
        var totalItems = await _catalogContext.Catalogs
            .LongCountAsync();
    
        var itemsOnPage = await _catalogContext.Catalogs
            .OrderBy(c => c.Name)
            .Skip(pageSize * pageIndex)
            .Take(pageSize)
            .ToListAsync();
        var result = itemsOnPage.Select(x => new ProductDto(x.Id.ToString(), x.Name, x.Price.ToString(), x.Stock.ToString(), x.ImgPath));
        var model = new PaginatedViewModel(pageIndex, pageSize, totalItems, result);
        return Ok(model);
    
    }
    复制代码

    2.在前端页面会把当前页面的产品列表id都发送到websocket中去

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    function updateAndSendProductIds(ids) {
        productIds = ids;
     
        // Check if the WebSocket is open
        if (socket.readyState === WebSocket.OPEN) {
            // Send the list of product IDs through the WebSocket connection
            socket.send(JSON.stringify(productIds));
        }
    }
     
    function fetchData() {
        
        const apiUrl = baseUrl + `/Catalog/items?pageSize=${pageSize}&pageIndex=${currentPage}`;
     
        axios.get(apiUrl)
            .then(response => {
                const data = response.data.data;
                displayProducts(baseUrl, data);
     
                const newProductIds = data.map(product => product.Id);
                // Check if the WebSocket is open
                updateAndSendProductIds(newProductIds);
                // 从响应中获取总页数
                const totalPages = Math.ceil(response.data.count / pageSize);
                displayPagination(totalPages);
     
                // 更新当前页数的显示
                const currentPageElement = document.getElementById('currentPage');
                currentPageElement.textContent = `当前页数: ${currentPage + 1} / 总页数: ${totalPages}`;
            })
            .catch(error => {
                console.error('获取数据失败:', error);
            });
    }

    3.websocket拿到了id数据可以精确的把当前页面的产品都查出来再推送给product.html页面,通过下面的ReceiveAsync方法获取html发送的数据,再通过timer定时器每秒钟Send方法实时的往页面发送获取到的数据,当然这个是不断的去从redis中去查的。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    using System.Net.WebSockets;
    using System.Threading.Tasks;
    using System;
    using WsServer.Handler;
    using WsServer.Manager;
    using StackExchange.Redis;
    using Microsoft.Extensions.Configuration;
    using System.Collections.Generic;
    using Catalogs.Domain.Catalogs;
    using Catalogs.Domain.Dtos;
    using System.Net.Sockets;
     
    namespace WebScoket.Server.Services
    {
        ///
        /// 实时推送产品主要是最新的库存,其他信息也会更新
        ///
        public class ProductListHandler : WebSocketHandler
        {
            private System.Threading.Timer _timer;
            private readonly IDatabase _redisDb;
            //展示列表推送
            private string productIdsStr;
            public ProductListHandler(WebSocketConnectionManager webSocketConnectionManager,IConfiguration configuration) : base(webSocketConnectionManager)
            {
                ConnectionMultiplexer redis = ConnectionMultiplexer.Connect(configuration["DistributedRedis:ConnectionString"] ?? throw new Exception("$未能获取distributedredis连接字符串"));
                _redisDb = redis.GetDatabase();
                _timer = new System.Threading.Timer(Send, null, TimeSpan.Zero, TimeSpan.FromSeconds(1));
            }
            private void Send(object state)
            {
                // 获取当前时间并发送给所有连接的客户端
                if (productIdsStr != null)
                {
                    string[] productIds = System.Text.Json.JsonSerializer.Deserialize<string[]>(productIdsStr);
                    string hashKeyToRetrieve = "products";
                    List products = new List();
     
                    foreach (var productId in productIds)
                    {
                        if(productId == "null") {
                            continue;
                        }
                        string retrievedProductValue = _redisDb.HashGet(hashKeyToRetrieve, productId);
                        if (!string.IsNullOrEmpty(retrievedProductValue))
                        {
                            //反序列化和构造函数冲突,改造了一下Catalog
                            Catalog catalog = System.Text.Json.JsonSerializer.Deserialize(retrievedProductValue);
                            products.Add(new ProductDto(catalog.Id.ToString(), catalog.Name, catalog.Price.ToString(), catalog.Stock.ToString(), catalog.ImgPath));
                        }
                    }
                    if (products.Count > 0)
                    {
                         SendMessageToAllAsync(System.Text.Json.JsonSerializer.Serialize(products)).Wait();
                    }
                    else
                    {
                        SendMessageToAllAsync("NoProduct").Wait();
                    }
                }
            }
            public override async Task ReceiveAsync(WebSocket socket, WebSocketReceiveResult result, byte[] buffer)
            {
                //每次页面有刷新就会拿到展示的id列表
                productIdsStr = System.Text.Encoding.UTF8.GetString(buffer, 0, result.Count);
            }
        }
    }

    4.html页面就可以拿到最新数据再去绑定到页面

    1
    2
    3
    4
    5
    6
    7
    8
    9
    socket.addEventListener('message', (event) => {
        if (event.data == "NoProduct") {
            clearProductList();
        }
        // Handle the received product data and update the product list
        const productData = JSON.parse(event.data);
        // Update the product list with the received data (call your displayProducts function)
        displayProducts(baseUrl, productData);
    });

     

    整个流程就这么简单,但是这里需要保持数据库和redis的数据实时同步,否则页面展示的就不是最新的数据就没意义了。

    再回到Catalog.Service服务中。

     private async Task DeleteCache()
     {
         //await _redisDb.HashDeleteAsync("products",id); //没必要了
         await _channel.Writer.WriteAsync("delete_catalog_fromredis");
     }

    再做更新、新增、删除等动作的时候就调用一下DeleteCache方法,往后台服务发送一个channel,当后台收到后就做redis删除并且从初始化sqlserver到redis列表同步的操作

    复制代码
    using System.Reflection;
    using System.Threading.Channels;
    using Catalogs.Infrastructure.Database;
    using Microsoft.EntityFrameworkCore;
    using Microsoft.Extensions.Hosting;
    using Microsoft.Extensions.Logging;
    using StackExchange.Redis;
    
    namespace Catalogs.WebApi.BackgroudServices
    {
        /// <summary>
        /// 记得任何删除了或者购买了产品后需要删除改产品的键
        /// summary>
        public class InitProductListToRedisService : BackgroundService
        {
            private readonly IServiceScopeFactory _serviceScopeFactory;
            private readonly IDatabase _redisDb;
            private readonly Channel<string> _channel;
            private readonly ILogger _logger;
            public InitProductListToRedisService(IServiceScopeFactory serviceScopeFactory, IConfiguration configuration, Channel<string> channel, ILogger<InitProductListToRedisService> logger)
            {
                _serviceScopeFactory = serviceScopeFactory;
                ConnectionMultiplexer redis = ConnectionMultiplexer.Connect(configuration["DistributedRedis:ConnectionString"] ?? throw new Exception("$未能获取distributedredis连接字符串"));
                _redisDb = redis.GetDatabase();
                _channel = channel;
                _logger = logger;
            }
            protected override async Task ExecuteAsync(CancellationToken stoppingToken)
            {
                await Init();
    
                while (!_channel.Reader.Completion.IsCompleted)
                {
                    var msg = await _channel.Reader.ReadAsync();
                    if(msg == "delete_catalog_fromredis")
                    {
                        await Init();
                    }
                }
            }
    
            private async Task Init()
            {
                using var scope = _serviceScopeFactory.CreateScope();
                try
                {
                    CatalogContext _context = scope.ServiceProvider.GetRequiredService<CatalogContext>();
                    string hashKey = "products";
                    var products = await _context.Catalogs.ToListAsync();
                   
                       await _redisDb.KeyDeleteAsync(hashKey);
                    
                        foreach (var product in products)
                        {
                            
                            string productField = product.Id.ToString();
                            string productValue = System.Text.Json.JsonSerializer.Serialize(product);
    
                            _redisDb.HashSet(hashKey, new HashEntry[] { new HashEntry(productField, productValue) });
                        }
    
                        _logger.LogInformation($"ProductList is over stored in Redis Hash.");           
                }
                catch(Exception ex)
                {
                    _logger.LogError($"ProductLis stored in Redis Hash error.");
                }
            }
        }
    }
    复制代码

    这里还有优化的空间可以只针对怕products的hashset的某个id去更新、删除、新增一条数据。

    示例代码:

    liuzhixin405/efcore-template (github.com)

     

  • 相关阅读:
    关于python的内存管理机制和python变量占用内存及时释放的问题
    java毕业设计《数据结构与算法》网上教学系统mybatis+源码+调试部署+系统+数据库+lw
    nginx实现路由重定向功能 避免服务器出现 404 Not Found
    spring actuator未授权之heapdump自动化利用【JDumpSpider】
    CSS - 深入理解选择器的使用方式
    React在实际开发中Variables与Prop的实战运用
    Canal-监听数据库表的变化
    【小程序】首屏渲染优化
    分布式锁的实现
    python访问多个文件夹并清除其中不符合要求的文件
  • 原文地址:https://www.cnblogs.com/morec/p/17947739