• 尝试Redis发布-订阅模型


    场景

    我有程序,功能大概类似于一个程序进行生产数据,一个程序进行消费,起初我考虑到了各种MQ去解决这件事情,我们现有资源有Redis,引入MQ可能会导致资源,系统复杂性,实时性的一个问题,所以依然考虑使用Redis的发布-订阅模型来解决这问题。

    发布-订阅模型

    1. 订阅通道

    订阅者使用SUBSCRIBE命令来订阅一个或多个通道。例如:

    SUBSCRIBE channel1 channel2
    
    • 1

    2. 取消订阅通道

    订阅者可以使用UNSUBSCRIBE命令来取消订阅一个或多个通道。例如:

    UNSUBSCRIBE channel1 channel2
    
    • 1

    3. 接收消息

    一旦订阅了通道,订阅者将开始接收发布者发送到这些通道的消息。消息将以异步方式传递给订阅者。

    4.发布消息

    发布消费到通道,发布到了之后订阅者就可用监听到这个消息了。

    PUBLISH my_channel "Hello, subscribers!"
    
    • 1

    5.通配符的使用

    Redis还支持通配符订阅,让订阅者可以使用通配符来匹配多个通道。通配符有两种:

    :匹配一个通道名,例如 SUBSCRIBE news. 将订阅所有以 “news.” 开头的通道。

    ?:匹配一个通道名中的一个字符,例如 SUBSCRIBE news.?? 将订阅以 “news.” 开头且后面有两个字符的通道。

    存在的问题

    消息持久化和顺序问题

    如果Redis挂壁了,那么消息也会丢失的,这个其实可用采用Redis Stream来解决这个问题。

    发布者

    我们采用c#语言来简单写一个demo

    using StackExchange.Redis;
    using System;
    
    class Publisher
    {
        static void Main()
        {
            // 建立到Redis服务器的连接
            ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("your_redis_connection_string");
            IDatabase db = redis.GetDatabase();
    
            string streamName = "mystream";
    
            for (int i = 1; i <= 10; i++)
            {
                string messageId = db.StreamAdd(streamName, new[]
                {
                    new NameValueEntry("message", $"Message {i}")
                });
    
                Console.WriteLine($"Published: {messageId}");
            }
    
            // 关闭连接
            redis.Close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    在上述示例中,我们使用Redis的StreamAdd方法将消息发布到名为 “mystream” 的流中。每条消息都有一个唯一的消息ID,消息将按照它们添加到流的顺序进行排序。

    订阅者
    using StackExchange.Redis;
    using System;
    
    class Subscriber
    {
        static void Main()
        {
            // 建立到Redis服务器的连接
            ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("your_redis_connection_string");
            IDatabase db = redis.GetDatabase();
    
            string streamName = "mystream";
            string consumerGroup = "mygroup";
            string consumerName = "mysubscriber";
    
            // 创建消费者组
            db.StreamCreateConsumerGroup(streamName, consumerGroup, "0");
    
            while (true)
            {
                var messages = db.StreamReadGroup(streamName, consumerGroup, consumerName, "0", 10);
    
                foreach (var message in messages)
                {
                    Console.WriteLine($"Received: {message.Values[1]}");
                    // 在这里处理消息
                    // 可以实现消息确认等逻辑
                }
            }
    
            // 关闭连接
            redis.Close();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    在上述示例中,我们首先创建了一个消费者组,然后循环读取来自流 “mystream” 的消息。通过使用消费者组,我们可以确保每条消息只会被一个订阅者处理,并且即使订阅者离线一段时间,它也可以获取未处理的消息。

    使用Redis Streams,消息将持久化存储在Redis中,即使Redis服务器重启,消息也不会丢失。这使得Redis Streams成为处理消息的可靠工具,适用于日志记录、事件处理和消息队列等应用。

    安全性问题

    Redis的消息其实是裸奔的。解决这个问题的核心在于可以在Redis上设置访问控制,只允许授权的发布者和订阅者连接到Redis。此外,可以考虑使用TLS/SSL来加密连接。

    启用密码验证

    首先,可以配置Redis以限制访问只允许授权的客户端连接。在Redis的配置文件中(通常是redis.conf),可以使用以下配置项来启用访问控制:

    # 启用密码认证
    requirepass your_password
    
    • 1
    • 2

    也可以使用SSL加密协议来限制,这个要取得加密证书。

    无法保证消息可靠性

    这个问题事实上是无解的,接受不了就不要用,有人可能说发布消息之后,订阅者回电机制,这个事实上是伪的不能再伪的逻辑,我都收不到了,我甜蜜的怎么回电?还有人说存数据库,我就笑笑。

    结束

    我没有转语言,只是掌握的不只是一门语言,一个nice的IT工作者应该是不被语言限制的!

  • 相关阅读:
    20. 数据库操作
    【SpringCloud-学习笔记】DockerCompose
    Webpack5 搭建Vue项目(进阶版)
    Oracle通过DBLINK访问达梦数据库
    PACP学习笔记一:使用 PCAP 编程
    MySql第三篇---索引的创建与设计原则
    JavaEE:File类查询一个文件的路径(举例+源码 )
    网络基本概念
    python---文件的操作、异常处理、模块、包
    MVC三层架构
  • 原文地址:https://blog.csdn.net/weixin_45487988/article/details/132874475