redis需要>5.0
Stream 是 Redis 5.0 引入的一种专门为消息队列设计的数据类型,Stream 是一个包含 0 个或者多个元素的有序队列,这些元素根据 ID 的大小进行有序排列。
它实现了大部分消息队列的功能:
本次主要实现基本的消息发送接受确认,消费组有需要的可以看参考的文章
info
XADD streamName id field value [field value ...]
# 消息队列名称后面的 「*」 ,表示让 Redis 为插入的消息自动生成唯一 ID,当然也可以自己定义。
# 消息 ID 由两部分组成:当前毫秒内的时间戳; 顺序编号。从 0 为起始值,用于区分同一时间内产生的多个命令
XADD queue01 * name wjl age 25 gender male
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
XREAD COUNT 1 BLOCK 0 STREAMS queue01 0-0
# 指定消费组在 Stream 中的起始 ID,它决定了消费者组从哪个 ID 之后开始读取消息,0-0 从第一条开始读取, $ 表示从最后一条向后开始读取,只接收新消息。
# 如果想使用 XREAD 进行顺序消费,每次读取后要记住返回的消息 ID,下次调用 XREAD 就将上一次返回的消息 ID 作为参数传递到下一次调用就可以继续消费后续的消息了。
这里只是开胃菜,通过 XREAD 读取的数据其实并没有被删除,当重新执行 XREAD COUNT 1 BLOCK 0 STREAMS queue01 0-0
指令的时候又会重新读取到。
# Stream 通过 XGROUP CREATE 指令创建消费组 (Consumer Group),需要传递起始消息 ID 参数用来初始化 last_delivered_id 变量。
# 随便再插入一些数据
XADD queue01 * name zhangsan age 52 gender male
XADD queue01 * name lisi age 34 gender male
XADD queue01 * name xiaomei age 24 gender famale
# 创建消费组的指令
# 格式
XGROUP CREATE stream group start_id
# stream:指定队列的名字;
# group:指定消费组名字;
# start_id:指定消费组在 Stream 中的起始 ID,它决定了消费者组从哪个 ID 之后开始读取消息,0-0 从第一条开始读取, $ 表示从最后一条向后开始读取,只接收新消息。
# MKSTREAM:默认情况下,XGROUP CREATE命令在目标流不存在时返回错误。可以使用可选MKSTREAM子命令作为 之后的最后一个参数来自动创建流。
# 新建group01消费组
XGROUP CREATE queue01 group01 0-0 MKSTREAM
XREADGROUP GROUP groupName consumerName [COUNT n] [BLOCK ms] STREAMS streamName [stream ...] id [id ...]
XREADGROUP GROUP group01 consumer01 COUNT 1 BLOCK 0 STREAMS queue01 >
# >:命令的最后参数 >,表示从尚未被消费的消息开始读取;
# BLOCK 0:表示阻塞读取,要是大于0就是等待多少毫秒
如果消息队列中的消息被消费组的一个消费者消费了,这条消息就不会再被这个消费组的其他消费者读取到。
XREADGROUP GROUP groupName consumerName
XPENDING queue01 group01
1 # 未读消息条数
1696822787364-0 #所有消费者读取的消息最小和最大 ID;这是最小
1696822787364-0 #所有消费者读取的消息最小和最大 ID;这是最大
consumer01
1
查看消费者读取了哪些数据
XPENDING queue01 group01 - + 10 consumer01
XACK key group-key ID [ID ...]
XACK queue01 group01 1696822787364-0
再次查询未读消息
XPENDING queue01 group01
XREADGROUP GROUP group01 consumer01 COUNT 1 BLOCK 0 STREAMS queue01 >
使用FreeRedis类库,熟悉了上面的流程,直接上代码
using FreeRedis;
namespace RedisMQStu01
{
internal class Program
{
async static Task Main(string[] args)
{
var cli = new RedisClient("127.0.0.1:6379,password=,defaultDatabase=99");
var queueName = "queue01";//队列的名字
var groupName = "group01";//读取队列的群组的名字
var consumerName = "consumer01";//消费者的名字
//添加数据
await cli.XAddAsync(queueName, "name", "wjl", "age", 25, "gender", "male");
await cli.XAddAsync(queueName, "name", "zhangsan", "age", 52, "gender", "male");
await cli.XAddAsync(queueName, "name", "lisi", "age", 34, "gender", "male");
await cli.XAddAsync(queueName, "name", "xiaomei", "age", 24, "gender", "famale");
//创建群组,如果数据存在则不需要执行了,第一次需要执行
await cli.XGroupCreateAsync(queueName, groupName, id: "0-0", MkStream: true);
//读取群组消息
var ids = new Dictionary<string, string>();
ids.Add("queue01", ">");
var result = await cli.XReadGroupAsync(groupName, consumerName,
1, 0, noack: false, ids);
//查看已读未确认的消息
var unReadResults = await cli.XPendingAsync(queueName, groupName);
await Console.Out.WriteLineAsync($"未读消息条数为:{unReadResults.count}");
foreach (var item in result)
{
await Console.Out.WriteLineAsync(item.key);//群组名字
foreach (var entry in item.entries)
{
await Console.Out.WriteLineAsync($"\t{entry.id}");//消息队列id
await Console.Out.WriteAsync($"\t");
foreach (var field in entry.fieldValues)
{
await Console.Out.WriteAsync($"\t{field.ToString()}");
}
await Console.Out.WriteLineAsync();
//确认消息
await cli.XAckAsync(queueName,groupName, entry.id);
}
}
await Console.Out.WriteLineAsync("完成");
}
}
}
上面的代码是生产者和消费者在一块,不满足生产环境要求,因为生产环境大多需要分开,生产者只负责生产,消费者只负责消费
using FreeRedis;
namespace RedisMQProductor01
{
internal class Program
{
///
/// redis消息队列的生产者
///
///
///
async static Task Main(string[] args)
{
var cli = new RedisClient("127.0.0.1:6379,password=,defaultDatabase=99");
var queueName = "queue01";//队列的名字
//添加数据
await cli.XAddAsync(queueName, "name", "wjl", "age", 25, "gender", "male");
await cli.XAddAsync(queueName, "name", "zhangsan", "age", 52, "gender", "male");
await cli.XAddAsync(queueName, "name", "lisi", "age", 34, "gender", "male");
await cli.XAddAsync(queueName, "name", "xiaomei", "age", 24, "gender", "famale");
await Console.Out.WriteLineAsync("生产者添加数据完成");
}
}
}
using FreeRedis;
namespace RedisMQConsumer01
{
///
/// redis消息队列的消费者
///
internal class Program
{
async static Task Main(string[] args)
{
var cli = new RedisClient("127.0.0.1:6379,password=,defaultDatabase=99");
var queueName = "queue01";//队列的名字
var groupName = "group01";//读取队列的群组的名字
var consumerName = "consumer01";//消费者的名字
//如果数据存在则不需要执行了,第一次需要执行
var info = await cli.XInfoGroupsAsync(queueName);
if (info == null || info.Length < 1)
{
//创建群组
await cli.XGroupCreateAsync(queueName, groupName, id: "0-0", MkStream: true);
}
//读取群组消息
var ids = new Dictionary<string, string>();
ids.Add("queue01", ">");
//block的值是0表示无限等待
var result = await cli.XReadGroupAsync(groupName, consumerName, 1, 0, noack: false, ids);
while (true)
{
if (result != null && result.Length > 0)
{
foreach (var item in result)
{
await Console.Out.WriteLineAsync(item.key);//群组名字
foreach (var entry in item.entries)
{
await Console.Out.WriteLineAsync($"\t{entry.id}");//消息队列id
await Console.Out.WriteAsync($"\t");
foreach (var field in entry.fieldValues)
{
await Console.Out.WriteAsync($"\t{field.ToString()}");
}
await Console.Out.WriteLineAsync();
//确认消息
await cli.XAckAsync(queueName, groupName, entry.id);
}
}
await Console.Out.WriteLineAsync("===============本次处理完毕===============");
}
//继续等待
result = await cli.XReadGroupAsync(groupName, consumerName, 1, 0, noack: false, ids);
}
}
}
}
先启动生产者在启动消费者查看效果
改善之后可以先启动消费者然后等待生产者投递数据即可
using FreeRedis;
using Newtonsoft.Json;
using SqlSugar;
namespace CelueStu02
{
///
/// 备份策略消费者
///
internal class Program
{
async static Task Main(string[] args)
{
var cli = new RedisClient("127.0.0.1:6379,password=,defaultDatabase=99");
var queueName = "queue01";//队列的名字
var groupName = "group01";//读取队列的群组的名字
var consumerName = "consumer01";//消费者的名字
try
{
var streamInfo = cli.XInfoStream(queueName);
}
catch
{
await cli.XAddAsync(queueName, "student", "");
}
//如果数据存在则不需要执行了,第一次需要执行
var info = await cli.XInfoGroupsAsync(queueName);
if (info == null || info.Length < 1)
{
//创建群组
await cli.XGroupCreateAsync(queueName, groupName, id: "0-0", MkStream: true);
}
//读取群组消息
var ids = new Dictionary<string, string>();
ids.Add("queue01", ">");
//block的值是0表示无限等待
var result = await cli.XReadGroupAsync(groupName, consumerName, 1, 0, noack: false, ids);
ConnectionConfig connectionConfig = new ConnectionConfig()
{
ConnectionString = "",//自己写数据库链接字符串
IsAutoCloseConnection = true,
DbType = DbType.SqlServer
};
using SqlSugarClient db = new SqlSugarClient(connectionConfig);
//初始化表格
db.CodeFirst.InitTables(typeof(Student));
while (true)
{
if (result != null && result.Length > 0)
{
foreach (var item in result)
{
await Console.Out.WriteLineAsync(item.key);//群组名字
foreach (var entry in item.entries)
{
await Console.Out.WriteLineAsync($"\t{entry.id}");//消息队列id
for (int i = 0; i < entry.fieldValues.Length; i++)
{
var field = entry.fieldValues[i];
if (field.ToString() == "student")
{
var studentListJson = entry.fieldValues[i + 1]?.ToString() ?? "";
if (string.IsNullOrWhiteSpace(studentListJson))
{
continue;
}
var students = JsonConvert.DeserializeObject<List<Student>>(studentListJson);
await db.Storageable(students).ExecuteCommandAsync();
}
}
//确认消息
await cli.XAckAsync(queueName, groupName, entry.id);
}
}
await Console.Out.WriteLineAsync("===============本次处理完毕===============");
}
//继续等待
result = await cli.XReadGroupAsync(groupName, consumerName, 1, 0, noack: false, ids);
}
}
}
}
using FreeRedis;
using Newtonsoft.Json;
using SqlSugar;
namespace CelueStu01
{
///
/// 备份策略生产者
///
internal class Program
{
async static Task Main(string[] args)
{
var cli = new RedisClient("127.0.0.1:6379,password=,defaultDatabase=99");
var queueName = "queue01";//队列的名字
var perProcessNumber = 1000;//每次处理的数据条数
int totalPage = 0;//总页码数
ConnectionConfig connectionConfig = new ConnectionConfig()
{
ConnectionString = "",
IsAutoCloseConnection = true,
DbType = DbType.SqlServer
};
using (SqlSugarClient db = new SqlSugarClient(connectionConfig))
{
//初始化表格
db.CodeFirst.InitTables(typeof(Student));
do
{
int count = await db.Queryable<Student>().CountAsync();
totalPage = count % perProcessNumber == 0 ? count / perProcessNumber : (count / perProcessNumber) + 1;
var students = await db.Queryable<Student>().ToPageListAsync(totalPage, perProcessNumber);
//批量发送,redis频繁写入会报rdb错误,限制一下写入频率
await cli.XAddAsync(queueName, "student", JsonConvert.SerializeObject(students));
List<int> deleteStudents = students.Select(p => p.Id).ToList();
if (deleteStudents.Any())
{
//批量删除
await db.Deleteable<Student>().Where(p => deleteStudents.Contains(p.Id)).ExecuteCommandAsync();
}
totalPage -= 1;
//Thread.Sleep(2000);
} while (totalPage > 0);
}
await Console.Out.WriteLineAsync("生产者添加数据完成");
}
}
}