• C# redis通过stream实现消息队列以及ack机制


    redis实现

    查看redis版本

    redis需要>5.0
    Stream 是 Redis 5.0 引入的一种专门为消息队列设计的数据类型,Stream 是一个包含 0 个或者多个元素的有序队列,这些元素根据 ID 的大小进行有序排列。

    它实现了大部分消息队列的功能:

    • 消息 ID 系列化生成;
    • 消息遍历;
    • 消息的阻塞和非阻塞读;
    • Consumer Groups 消费组;
    • ACK 确认机制。
    • 支持多播。

    本次主要实现基本的消息发送接受确认,消费组有需要的可以看参考的文章

    info
    
    • 1

    在这里插入图片描述

    插入消息

    XADD streamName id field value [field value ...]
    # 消息队列名称后面的 「*」 ,表示让 Redis 为插入的消息自动生成唯一 ID,当然也可以自己定义。
    # 消息 ID 由两部分组成:当前毫秒内的时间戳; 顺序编号。从 0 为起始值,用于区分同一时间内产生的多个命令
    XADD queue01 * name wjl age 25 gender male
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    读取消息

    XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
    XREAD COUNT 1 BLOCK 0 STREAMS queue01  0-0
    # 指定消费组在 Stream 中的起始 ID,它决定了消费者组从哪个 ID 之后开始读取消息,0-0 从第一条开始读取, $ 表示从最后一条向后开始读取,只接收新消息。
    # 如果想使用 XREAD 进行顺序消费,每次读取后要记住返回的消息 ID,下次调用 XREAD 就将上一次返回的消息 ID 作为参数传递到下一次调用就可以继续消费后续的消息了。
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述
    这里只是开胃菜,通过 XREAD 读取的数据其实并没有被删除,当重新执行 XREAD COUNT 1 BLOCK 0 STREAMS queue01 0-0指令的时候又会重新读取到。

    创建消费组

    # Stream 通过 XGROUP CREATE 指令创建消费组 (Consumer Group),需要传递起始消息 ID 参数用来初始化 last_delivered_id 变量。
    # 随便再插入一些数据
    XADD queue01 * name zhangsan age 52 gender male
    XADD queue01 * name lisi age 34 gender male
    XADD queue01 * name xiaomei age 24 gender famale
    # 创建消费组的指令
    # 格式
    XGROUP CREATE stream group start_id
    # stream:指定队列的名字;
    # group:指定消费组名字;
    # start_id:指定消费组在 Stream 中的起始 ID,它决定了消费者组从哪个 ID 之后开始读取消息,0-0 从第一条开始读取, $ 表示从最后一条向后开始读取,只接收新消息。
    # MKSTREAM:默认情况下,XGROUP CREATE命令在目标流不存在时返回错误。可以使用可选MKSTREAM子命令作为 之后的最后一个参数来自动创建流。
    
    # 新建group01消费组
    XGROUP CREATE queue01 group01 0-0 MKSTREAM
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    读取群组消息

    XREADGROUP GROUP groupName consumerName [COUNT n] [BLOCK ms] STREAMS streamName [stream ...] id [id ...]
    
    XREADGROUP GROUP group01 consumer01 COUNT 1 BLOCK 0 STREAMS queue01 >
    # >:命令的最后参数 >,表示从尚未被消费的消息开始读取;
    # BLOCK 0:表示阻塞读取,要是大于0就是等待多少毫秒
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在这里插入图片描述

    如果消息队列中的消息被消费组的一个消费者消费了,这条消息就不会再被这个消费组的其他消费者读取到。

    在这里插入图片描述

    查看已读未确认消息

    XREADGROUP GROUP groupName consumerName
    XPENDING queue01 group01 
    
    • 1
    • 2

    在这里插入图片描述

    1 # 未读消息条数
    1696822787364-0 #所有消费者读取的消息最小和最大 ID;这是最小
    1696822787364-0 #所有消费者读取的消息最小和最大 ID;这是最大
    consumer01
    1
    
    • 1
    • 2
    • 3
    • 4
    • 5

    查看消费者读取了哪些数据

    XPENDING queue01 group01 - + 10 consumer01
    
    • 1

    在这里插入图片描述

    确认消息

    XACK key group-key ID [ID ...]
    
    XACK queue01 group01 1696822787364-0
    
    • 1
    • 2
    • 3

    在这里插入图片描述
    再次查询未读消息

    XPENDING queue01 group01 
    XREADGROUP GROUP group01 consumer01 COUNT 1 BLOCK 0 STREAMS queue01 >
    
    • 1
    • 2

    在这里插入图片描述
    在这里插入图片描述

    C#操作redis实现

    使用FreeRedis类库,熟悉了上面的流程,直接上代码

    using FreeRedis;
    
    namespace RedisMQStu01
    {
        internal class Program
        {
            async static Task Main(string[] args)
            {
                var cli = new RedisClient("127.0.0.1:6379,password=,defaultDatabase=99");
                var queueName = "queue01";//队列的名字
                var groupName = "group01";//读取队列的群组的名字
                var consumerName = "consumer01";//消费者的名字
                //添加数据
                await cli.XAddAsync(queueName, "name", "wjl", "age", 25, "gender", "male");
                await cli.XAddAsync(queueName, "name", "zhangsan", "age", 52, "gender", "male");
                await cli.XAddAsync(queueName, "name", "lisi", "age", 34, "gender", "male");
                await cli.XAddAsync(queueName, "name", "xiaomei", "age", 24, "gender", "famale");
                //创建群组,如果数据存在则不需要执行了,第一次需要执行
                await cli.XGroupCreateAsync(queueName, groupName, id: "0-0", MkStream: true);
                //读取群组消息
                var ids = new Dictionary<string, string>();
                ids.Add("queue01", ">");
                var result = await cli.XReadGroupAsync(groupName, consumerName,
                        1, 0, noack: false, ids);
                //查看已读未确认的消息
                var unReadResults = await cli.XPendingAsync(queueName, groupName);
                await Console.Out.WriteLineAsync($"未读消息条数为:{unReadResults.count}");
                foreach (var item in result)
                {
                    await Console.Out.WriteLineAsync(item.key);//群组名字
                    foreach (var entry in item.entries)
                    {
                        await Console.Out.WriteLineAsync($"\t{entry.id}");//消息队列id
                        await Console.Out.WriteAsync($"\t");
                        foreach (var field in entry.fieldValues)
                        {
                            await Console.Out.WriteAsync($"\t{field.ToString()}");
                        }
                        await Console.Out.WriteLineAsync();
                        //确认消息
                        await cli.XAckAsync(queueName,groupName, entry.id);
                    }
                }
                await Console.Out.WriteLineAsync("完成");
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    上面的代码是生产者和消费者在一块,不满足生产环境要求,因为生产环境大多需要分开,生产者只负责生产,消费者只负责消费

    生产者

    using FreeRedis;
    
    namespace RedisMQProductor01
    {
        internal class Program
        {
            /// 
            /// redis消息队列的生产者
            /// 
            /// 
            /// 
            async static Task Main(string[] args)
            {
                var cli = new RedisClient("127.0.0.1:6379,password=,defaultDatabase=99");
                var queueName = "queue01";//队列的名字
                //添加数据
                await cli.XAddAsync(queueName, "name", "wjl", "age", 25, "gender", "male");
                await cli.XAddAsync(queueName, "name", "zhangsan", "age", 52, "gender", "male");
                await cli.XAddAsync(queueName, "name", "lisi", "age", 34, "gender", "male");
                await cli.XAddAsync(queueName, "name", "xiaomei", "age", 24, "gender", "famale");
                await Console.Out.WriteLineAsync("生产者添加数据完成");
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    消费者

    using FreeRedis;
    
    namespace RedisMQConsumer01
    {
        /// 
        /// redis消息队列的消费者
        /// 
        internal class Program
        {
            async static Task Main(string[] args)
            {
                var cli = new RedisClient("127.0.0.1:6379,password=,defaultDatabase=99");
                var queueName = "queue01";//队列的名字
                var groupName = "group01";//读取队列的群组的名字
                var consumerName = "consumer01";//消费者的名字
                //如果数据存在则不需要执行了,第一次需要执行
                var info = await cli.XInfoGroupsAsync(queueName);
                if (info == null || info.Length < 1)
                {
                    //创建群组
                    await cli.XGroupCreateAsync(queueName, groupName, id: "0-0", MkStream: true);
                }
                //读取群组消息
                var ids = new Dictionary<string, string>();
                ids.Add("queue01", ">");
                //block的值是0表示无限等待
                var result = await cli.XReadGroupAsync(groupName, consumerName, 1, 0, noack: false, ids);
                while (true)
                {
                    if (result != null && result.Length > 0)
                    {
                        foreach (var item in result)
                        {
                            await Console.Out.WriteLineAsync(item.key);//群组名字
                            foreach (var entry in item.entries)
                            {
                                await Console.Out.WriteLineAsync($"\t{entry.id}");//消息队列id
                                await Console.Out.WriteAsync($"\t");
                                foreach (var field in entry.fieldValues)
                                {
                                    await Console.Out.WriteAsync($"\t{field.ToString()}");
                                }
                                await Console.Out.WriteLineAsync();
                                //确认消息
                                await cli.XAckAsync(queueName, groupName, entry.id);
                            }
                        }
                        await Console.Out.WriteLineAsync("===============本次处理完毕===============");
                    }
                    //继续等待
                    result = await cli.XReadGroupAsync(groupName, consumerName, 1, 0, noack: false, ids);
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    先启动生产者在启动消费者查看效果
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    方法改善

    改善之后可以先启动消费者然后等待生产者投递数据即可

    消费者

    using FreeRedis;
    using Newtonsoft.Json;
    using SqlSugar;
    
    namespace CelueStu02
    {
        /// 
        /// 备份策略消费者
        /// 
        internal class Program
        {
            async static Task Main(string[] args)
            {
                var cli = new RedisClient("127.0.0.1:6379,password=,defaultDatabase=99");
                var queueName = "queue01";//队列的名字
                var groupName = "group01";//读取队列的群组的名字
                var consumerName = "consumer01";//消费者的名字
                try
                {
                    var streamInfo = cli.XInfoStream(queueName);
                }
                catch
                {
                    await cli.XAddAsync(queueName, "student", "");
                }
    
                //如果数据存在则不需要执行了,第一次需要执行
                var info = await cli.XInfoGroupsAsync(queueName);
                if (info == null || info.Length < 1)
                {
                    //创建群组
                    await cli.XGroupCreateAsync(queueName, groupName, id: "0-0", MkStream: true);
                }
                //读取群组消息
                var ids = new Dictionary<string, string>();
                ids.Add("queue01", ">");
                //block的值是0表示无限等待
                var result = await cli.XReadGroupAsync(groupName, consumerName, 1, 0, noack: false, ids);
                ConnectionConfig connectionConfig = new ConnectionConfig()
                {
                    ConnectionString = "",//自己写数据库链接字符串
                    IsAutoCloseConnection = true,
                    DbType = DbType.SqlServer
                };
                using SqlSugarClient db = new SqlSugarClient(connectionConfig);
                //初始化表格
                db.CodeFirst.InitTables(typeof(Student));
    
                while (true)
                {
                    if (result != null && result.Length > 0)
                    {
                        foreach (var item in result)
                        {
                            await Console.Out.WriteLineAsync(item.key);//群组名字
                            foreach (var entry in item.entries)
                            {
                                await Console.Out.WriteLineAsync($"\t{entry.id}");//消息队列id
                                for (int i = 0; i < entry.fieldValues.Length; i++)
                                {
                                    var field = entry.fieldValues[i];
                                    if (field.ToString() == "student")
                                    {
                                        var studentListJson = entry.fieldValues[i + 1]?.ToString() ?? "";
                                        if (string.IsNullOrWhiteSpace(studentListJson))
                                        {
                                            continue;
                                        }
                                        var students = JsonConvert.DeserializeObject<List<Student>>(studentListJson);
                                        await db.Storageable(students).ExecuteCommandAsync();
                                    }
                                }
                                //确认消息
                                await cli.XAckAsync(queueName, groupName, entry.id);
                            }
                        }
                        await Console.Out.WriteLineAsync("===============本次处理完毕===============");
                    }
                    //继续等待
                    result = await cli.XReadGroupAsync(groupName, consumerName, 1, 0, noack: false, ids);
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84

    生产者

    using FreeRedis;
    using Newtonsoft.Json;
    using SqlSugar;
    
    namespace CelueStu01
    {
        /// 
        /// 备份策略生产者
        /// 
        internal class Program
        {
            async static Task Main(string[] args)
            {
                var cli = new RedisClient("127.0.0.1:6379,password=,defaultDatabase=99");
                var queueName = "queue01";//队列的名字
                var perProcessNumber = 1000;//每次处理的数据条数
                int totalPage = 0;//总页码数
                ConnectionConfig connectionConfig = new ConnectionConfig()
                {
                    ConnectionString = "",
                    IsAutoCloseConnection = true,
                    DbType = DbType.SqlServer
                };
                using (SqlSugarClient db = new SqlSugarClient(connectionConfig))
                {
                    //初始化表格
                    db.CodeFirst.InitTables(typeof(Student));
                    do
                    {
                        int count = await db.Queryable<Student>().CountAsync();
                        totalPage = count % perProcessNumber == 0 ? count / perProcessNumber : (count / perProcessNumber) + 1;
                        var students = await db.Queryable<Student>().ToPageListAsync(totalPage, perProcessNumber);
                        //批量发送,redis频繁写入会报rdb错误,限制一下写入频率
                        await cli.XAddAsync(queueName, "student", JsonConvert.SerializeObject(students));
                        List<int> deleteStudents = students.Select(p => p.Id).ToList();
                        if (deleteStudents.Any())
                        {
                            //批量删除
                            await db.Deleteable<Student>().Where(p => deleteStudents.Contains(p.Id)).ExecuteCommandAsync();
                        }
                        totalPage -= 1;
                        //Thread.Sleep(2000);
                    } while (totalPage > 0);
                }
                await Console.Out.WriteLineAsync("生产者添加数据完成");
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    参考

  • 相关阅读:
    Super Marker插件——标记资源,提高效率
    canvas入门教学(1)
    json提取-响应报文中是json数组
    时间复杂度
    linux文件IO
    mpu6050姿态解算与卡尔曼滤波(5)可应用于51单片机的卡尔曼滤波器
    JDBC介绍
    Oracle 逻辑备份(数据迁移)
    多维度深入剖析QLC SSD硬件延迟的来源
    Ubuntu2004字体不清晰,排查流程
  • 原文地址:https://blog.csdn.net/qq_36437991/article/details/133697946