• (十五)使用Redis实现发布订阅功能


    如何保证支持跨服务器通信

    我们之前的ChatServer是维护了一个连接的用户表,每次向别的用户发消息都会从用户表中查看对端用户是否在线。然后再判断是直接发送,还是转为离线消息。
    但是现在我们是集群服务器,有多个服务器维护用户。我们的ChatServerA要聊天的对象在ChatServerBChatServerA在自己服务器的用户表中找不到。那么可能对端用户在线,它却给对端用户发送了离线消息。因此,我们需要保证跨服务器间的通信!
    那我们如何实现,非常直观的想法,我们可以让后端的服务器之间互相连接。

    在这里插入图片描述

    上面的设计,让各个ChatServer服务器互相之间直接建立TCP连接进行通信,相当于在服务器网络之间进行广播。这样的设计使得各个服务器之间耦合度太高,不利于系统扩展,并且会占用系统大量的socket资源,各服务器之间的带宽压力很大,不能够节省资源给更多的客户端提供服务,因此绝对不是一个好的设计。
    集群部署的服务器之间进行通信,最好的方式就是引入中间件消息队列,解耦各个服务器,使整个系统松耦合,提高服务器的响应能力,节省服务器的带宽资源,如下图所示:

    在这里插入图片描述

    在集群分布式环境中,经常使用的中间件消息队列有ActiveMQ、RabbitMQ、Kafka等,都是应用场景广泛并且性能很好的消息队列,供集群服务器之间,分布式服务之间进行消息通信。限于我们的项目业务类型并不是非常复杂,对并发请求量也没有太高的要求,因此我们的中间件消息队列选型的是基于发布-订阅模式的redis。

    Redis的安装

    参考博客

    Redis发布-订阅

    Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息。
    Redis 客户端可以订阅任意数量的频道。
    下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:
    在这里插入图片描述

    当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端
    在这里插入图片描述

    Redis发布-订阅代码实现

    class Redis
    {
    public:
        Redis();
        ~Redis();
    
        //连接Redis服务器
        bool connect();
    
        //向Redis指定的通道channel发布消息
        bool publish(int channel, string message);
    
        //向Redis指定的通道subscribe订阅消息
        bool subscribe(int channel);
    
        //取消订阅
        bool unsubscribe(int channel);
    
        //独立线程中接收订阅通道的消息
        void observer_channel_message();
    
        //初始化业务层上报通道消息的回调对象
        void init_notify_handler(redis_handler handler);
    
    private:
        //hiredis同步上下文对象,负责publish消息
        redisContext *publish_context_;
    
        //负责subscribe消息
        redisContext *subcribe_context_;
    
        //回调操作,收到消息给service上报
        redis_handler notify_message_handler_;
    };
    
    
    #include "redis.hpp"
    #include 
    
    Redis::Redis() : publish_context_(nullptr), subcribe_context_(nullptr)
    {
    }
    
    Redis::~Redis()
    {
        if (publish_context_ != nullptr)
        {
            redisFree(publish_context_);
        }
    
        if (subcribe_context_ != nullptr)
        {
            redisFree(subcribe_context_);
        }
    }
    
    //连接Redis服务器
    bool Redis::connect()
    {
        publish_context_ = redisConnect("127.0.0.1", 6379);
        if (publish_context_ == nullptr)
        {
            cerr << "connect redis failed!" << endl;
            return false;
        }
    
        subcribe_context_ = redisConnect("127.0.0.1", 6379);
        if (subcribe_context_ == nullptr)
        {
            cerr << "connect redis failed!" << endl;
            return false;
        }
    
        // 独立线程中接收订阅通道的消息
        thread t([&]() {
            observer_channel_message();
        });
        t.detach();
    
        cout << "connect redis-server success!" << endl;
        return true;
    }
    
    //向Redis指定的通道channel发布消息
    bool Redis::publish(int channel, string message)
    {
        // 相当于publish 键 值
        // redis 127.0.0.1:6379> PUBLISH runoobChat "Redis PUBLISH test"
        redisReply *reply = (redisReply *)redisCommand(publish_context_, "PUBLISH %d %s", channel, message.c_str());
        if (reply == nullptr)
        {
            cerr << "publish command failed!" << endl;
            return false;
        }
    
        // 释放资源
        freeReplyObject(reply);
        return true;
    }
    
    // 向Redis指定的通道subscribe订阅消息
    bool Redis::subscribe(int channel)
    {
        // redisCommand 会先把命令缓存到context中,然后调用RedisAppendCommand发送给redis
        // redis执行subscribe是阻塞,不会响应,不会给我们一个reply
        // redis 127.0.0.1:6379> SUBSCRIBE runoobChat
        if (REDIS_ERR == redisAppendCommand(subcribe_context_, "SUBSCRIBE %d", channel))
        {
            cerr << "subscibe command failed" << endl;
            return false;
        }
    
        int done = 0;
        while (!done)
        {
            if (REDIS_ERR == redisBufferWrite(subcribe_context_, &done))
            {
                cerr << "subscribe command failed" << endl;
                return false;
            }
        }
    
        return true;
    }
    
    //取消订阅
    bool Redis::unsubscribe(int channel)
    {
        //redisCommand 会先把命令缓存到context中,然后调用RedisAppendCommand发送给redis
        //redis执行subscribe是阻塞,不会响应,不会给我们一个reply
        if (REDIS_ERR == redisAppendCommand(subcribe_context_, "UNSUBSCRIBE %d", channel))
        {
            cerr << "subscibe command failed" << endl;
            return false;
        }
    
        int done = 0;
        while (!done)
        {
            if (REDIS_ERR == redisBufferWrite(subcribe_context_, &done))
            {
                cerr << "subscribe command failed" << endl;
                return false;
            }
        }
    
        return true;
    }
    
    //独立线程中接收订阅通道的消息
    void Redis::observer_channel_message()
    {
        redisReply *reply = nullptr;
        while (REDIS_OK == redisGetReply(subcribe_context_, (void **)&reply))
        {
            //reply里面是返回的数据有三个,0. message , 1.通道号,2.消息
            if (reply != nullptr && reply->element[2] != nullptr && reply->element[2]->str != nullptr)
            {
                //给业务层上报消息
                notify_message_handler_(atoi(reply->element[1]->str), reply->element[2]->str);
            }
    
            freeReplyObject(reply);
        }
    
        cerr << "----------------------- oberver_channel_message quit--------------------------" << endl;
    }
    
    //初始化业务层上报通道消息的回调对象
    void Redis::init_notify_handler(redis_handler handler)
    {
        notify_message_handler_ = handler;
    }
    
    

    跨服务器通信的测试

    先运行两个服务端,分别监听60006002端口。然后开启两个客户端,它们向8000端口发起请求,这是Nginx负责负载均衡模块的关注端口。可以看到发起的两个客户端连接分别被分派到了两个服务端,现在还需要测试跨服务端的通信是否成功。

    在这里插入图片描述

    在这里插入图片描述

    跨服务端通信测试:

    在这里插入图片描述

    在这里插入图片描述

    发现对端没用收到消息,后来发现终端提示redis发布命令出错了。后来发现就没有启动redis进程!

    # 启动redis服务
    redis-server redis.conf
    

    在这里插入图片描述

    可以看到,可以单一聊天和群组聊天。

    参考

    Redis进阶——发布订阅详解_·梅花十三的博客-CSDN博客_redis发布订阅模式
    Redis 发布订阅 | 菜鸟教程

  • 相关阅读:
    基于STC15系列库操作LED灯
    shell实战案例:系统性能监控脚本
    大规模语言模型--训练成本
    mysql面试题33:Blob和text有什么区别
    快速解决mfc140u.dll丢失问题,找不到mfc140u.dll修复方法分享
    GO: 快速升级Go版本
    数据建模中利用3σ剔除异常值进行数据清洗
    Mybatis主键自动生成
    Linux gcc和make学习
    解说天下之操作系统
  • 原文地址:https://blog.csdn.net/weixin_46272577/article/details/127093852