我们之前的ChatServer
是维护了一个连接的用户表,每次向别的用户发消息都会从用户表中查看对端用户是否在线。然后再判断是直接发送,还是转为离线消息。
但是现在我们是集群服务器,有多个服务器维护用户。我们的ChatServerA
要聊天的对象在ChatServerB
,ChatServerA
在自己服务器的用户表中找不到。那么可能对端用户在线,它却给对端用户发送了离线消息。因此,我们需要保证跨服务器间的通信!
那我们如何实现,非常直观的想法,我们可以让后端的服务器之间互相连接。
上面的设计,让各个ChatServer服务器互相之间直接建立TCP连接进行通信,相当于在服务器网络之间进行广播。这样的设计使得各个服务器之间耦合度太高,不利于系统扩展,并且会占用系统大量的socket资源,各服务器之间的带宽压力很大,不能够节省资源给更多的客户端提供服务,因此绝对不是一个好的设计。
集群部署的服务器之间进行通信,最好的方式就是引入中间件消息队列,解耦各个服务器,使整个系统松耦合,提高服务器的响应能力,节省服务器的带宽资源,如下图所示:
在集群分布式环境中,经常使用的中间件消息队列有ActiveMQ、RabbitMQ、Kafka等,都是应用场景广泛并且性能很好的消息队列,供集群服务器之间,分布式服务之间进行消息通信。限于我们的项目业务类型并不是非常复杂,对并发请求量也没有太高的要求,因此我们的中间件消息队列选型的是基于发布-订阅模式的redis。
参考博客
Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息。
Redis 客户端可以订阅任意数量的频道。
下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:
当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端
class Redis
{
public:
Redis();
~Redis();
//连接Redis服务器
bool connect();
//向Redis指定的通道channel发布消息
bool publish(int channel, string message);
//向Redis指定的通道subscribe订阅消息
bool subscribe(int channel);
//取消订阅
bool unsubscribe(int channel);
//独立线程中接收订阅通道的消息
void observer_channel_message();
//初始化业务层上报通道消息的回调对象
void init_notify_handler(redis_handler handler);
private:
//hiredis同步上下文对象,负责publish消息
redisContext *publish_context_;
//负责subscribe消息
redisContext *subcribe_context_;
//回调操作,收到消息给service上报
redis_handler notify_message_handler_;
};
#include "redis.hpp"
#include
Redis::Redis() : publish_context_(nullptr), subcribe_context_(nullptr)
{
}
Redis::~Redis()
{
if (publish_context_ != nullptr)
{
redisFree(publish_context_);
}
if (subcribe_context_ != nullptr)
{
redisFree(subcribe_context_);
}
}
//连接Redis服务器
bool Redis::connect()
{
publish_context_ = redisConnect("127.0.0.1", 6379);
if (publish_context_ == nullptr)
{
cerr << "connect redis failed!" << endl;
return false;
}
subcribe_context_ = redisConnect("127.0.0.1", 6379);
if (subcribe_context_ == nullptr)
{
cerr << "connect redis failed!" << endl;
return false;
}
// 独立线程中接收订阅通道的消息
thread t([&]() {
observer_channel_message();
});
t.detach();
cout << "connect redis-server success!" << endl;
return true;
}
//向Redis指定的通道channel发布消息
bool Redis::publish(int channel, string message)
{
// 相当于publish 键 值
// redis 127.0.0.1:6379> PUBLISH runoobChat "Redis PUBLISH test"
redisReply *reply = (redisReply *)redisCommand(publish_context_, "PUBLISH %d %s", channel, message.c_str());
if (reply == nullptr)
{
cerr << "publish command failed!" << endl;
return false;
}
// 释放资源
freeReplyObject(reply);
return true;
}
// 向Redis指定的通道subscribe订阅消息
bool Redis::subscribe(int channel)
{
// redisCommand 会先把命令缓存到context中,然后调用RedisAppendCommand发送给redis
// redis执行subscribe是阻塞,不会响应,不会给我们一个reply
// redis 127.0.0.1:6379> SUBSCRIBE runoobChat
if (REDIS_ERR == redisAppendCommand(subcribe_context_, "SUBSCRIBE %d", channel))
{
cerr << "subscibe command failed" << endl;
return false;
}
int done = 0;
while (!done)
{
if (REDIS_ERR == redisBufferWrite(subcribe_context_, &done))
{
cerr << "subscribe command failed" << endl;
return false;
}
}
return true;
}
//取消订阅
bool Redis::unsubscribe(int channel)
{
//redisCommand 会先把命令缓存到context中,然后调用RedisAppendCommand发送给redis
//redis执行subscribe是阻塞,不会响应,不会给我们一个reply
if (REDIS_ERR == redisAppendCommand(subcribe_context_, "UNSUBSCRIBE %d", channel))
{
cerr << "subscibe command failed" << endl;
return false;
}
int done = 0;
while (!done)
{
if (REDIS_ERR == redisBufferWrite(subcribe_context_, &done))
{
cerr << "subscribe command failed" << endl;
return false;
}
}
return true;
}
//独立线程中接收订阅通道的消息
void Redis::observer_channel_message()
{
redisReply *reply = nullptr;
while (REDIS_OK == redisGetReply(subcribe_context_, (void **)&reply))
{
//reply里面是返回的数据有三个,0. message , 1.通道号,2.消息
if (reply != nullptr && reply->element[2] != nullptr && reply->element[2]->str != nullptr)
{
//给业务层上报消息
notify_message_handler_(atoi(reply->element[1]->str), reply->element[2]->str);
}
freeReplyObject(reply);
}
cerr << "----------------------- oberver_channel_message quit--------------------------" << endl;
}
//初始化业务层上报通道消息的回调对象
void Redis::init_notify_handler(redis_handler handler)
{
notify_message_handler_ = handler;
}
先运行两个服务端,分别监听6000
和6002
端口。然后开启两个客户端,它们向8000
端口发起请求,这是Nginx
负责负载均衡模块的关注端口。可以看到发起的两个客户端连接分别被分派到了两个服务端,现在还需要测试跨服务端的通信是否成功。
跨服务端通信测试:
发现对端没用收到消息,后来发现终端提示redis
发布命令出错了。后来发现就没有启动redis进程!
# 启动redis服务
redis-server redis.conf
可以看到,可以单一聊天和群组聊天。
Redis进阶——发布订阅详解_·梅花十三的博客-CSDN博客_redis发布订阅模式
Redis 发布订阅 | 菜鸟教程