• C++实现集群聊天服务器


    C++实现集群聊天服务器

    JSON

    Json是一种轻量级的数据交换模式(也叫做数据序列化方式)。Json采用完全独立于编程语言的文本格式来存储和表示数据。见解和清晰的层次结构使得Json称为理想的数据交换语言。易于阅读和编写。同时也易于支持机器解析和生成,有效地提升网络传出效率。

    这里讲的网络传输,就涉及到序列化和反序列化。以客户端和服务器端通信为例,一般情况下,客户端给服务器端发送信息,发送的信息可能是字符串、整型等信息,需要先转化为字节流数据,这就是序列化;同样,服务器接收到客户端发来的字节流信息,需要转化成原始的数据格式,这就是反序列化。

    JSON for Modern C++是一个C++下的JSON库,具有以下特点:直观的语法、仅需使用头文件json.hpp依赖、C++11标准编写、类似于STL使用json、STL和json可以互相转化、严谨的测试(所有类都经过严格的单元测试)。

    数据序列化

    在网络中,常见的数据传输序列化格式有XML、Json、ProtoBuf,其中ProtoBuf最为常用,其数据压缩编码传输占用带宽小,同样的数据信息,是Json的1/10,XML的1/20,但是使用起来稍微比Json复杂。

    Json使用

    头文件引入和重命名#include"json.hpp" using json = nlohmann::json;。然后就可以是使用json类似于对象的使用方法使用json了。

    #include"json.hpp"
    using json = nlohmann::json;
    #include 
    #include 
    using namespace std;
    
    void func1()
    {
        json js;
        js["from"] = "zhangsan";
        js["message_type"] = 2;
        js["to"] = "lisi";
        js["message"] = "Hi, what are you doing?";
        cout << js << endl;
    	string str=js.dump();//序列化,转化成字符串格式
    	cout<<str.c_str()<<endl;
        // 模拟从网络接收到json字符串,通过json::parse函数把json字符串字节流转化为json对象
        json js2 = json::parse(temp);
        cout << js2 << endl;
    }
    void func2()
    {
        json js;
        js["id"] = {1, 2, 3, 4, 5};
        js["name"] = "zhang san";
        js["msg"]["zhang san"] = "hello world";
        js["msg"]["liu shuo"] = "hello china";
        js["msg"] = {{"zhang san", "hello world"}, {"liu shuo", "hello china"}};
        cout << js << endl;
    }
    int main()
    {
        func1();
        func2();
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    key采用哈希表,是无顺序的结构。

    json的序列化——json_obj.dump()。json反序列化——json::parse(json_str)

    cmake常规使用

    首先给出一个代码样例,通过代码样例基本上可以看懂一些常用的cmake命令:

    cmake_minimum_required(VERSION 3.0) #CMake最小版本
    
    project(main)#定义当前工程的名字
    
    # set表示创建一个变量,并初始化对应的值
    set(CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS} -g) #配置编译选项
    
    # include_directories()#头文件搜索目录
    
    # link_directories() #库文件搜索目录
    
    # 设置需要编译的源文件列表,其实也就是定义一个SRC_LIST变量名
    set(SRC_LIST ./muduo_server.cpp)
    
    # 设置可执行文件最终存储的目录
    set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/bin)
    
    
    # 把指定目录下的所有源文件名字放入变量名SRC_LIST里面
    # aux_source_directory(./ SRC_LIST)
    
    # 表示生成可执行文件server,由SRC_LIST变量所定义的源文件编译而来
    add_executable(server ${SRC_LIST})
    
    # 表示这个server目标程序,需要连接 muduo_net muduo_base pthread 等库文件
    target_link_libraries(server muduo_net muduo_base pthread)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    一般C++开源项目标准目录结构如下图所示:

    一般在build目录下进行cmake ..进行编译,然后会在build目录下生成编译过程中的中间文件,其中会存在一个Makefile文件,在执行make命令来生成最终的可执行文件。

    PROJECT_NAME : 通过 project() 指定项目名称
    PROJECT_SOURCE_DIR : 工程的根目录
    PROJECT_BINARY_DIR : 执行 cmake 命令的目录
    CMAKE_CURRENT_SOURCE_DIR : 当前 CMakeList.txt 文件所在的目录
    CMAKE_CURRENT_BINARY_DIR : 编译目录,可使用 add subdirectory 来修改
    EXECUTABLE_OUTPUT_PATH : 二进制可执行文件输出位置
    LIBRARY_OUTPUT_PATH : 库文件输出位置
    BUILD_SHARED_LIBS : 默认的库编译方式 ( shared 或 static ) ,默认为 static
    CMAKE_C_FLAGS : 设置 C 编译选项
    CMAKE_CXX_FLAGS : 设置 C++ 编译选项
    CMAKE_CXX_FLAGS_DEBUG : 设置编译类型 Debug 时的编译选项
    CMAKE_CXX_FLAGS_RELEASE : 设置编译类型 Release 时的编译选项
    CMAKE_GENERATOR : 编译器名称
    CMAKE_COMMAND : CMake 可执行文件本身的全路径
    CMAKE_BUILD_TYPE : 工程编译生成的版本, Debug / Release

    Muduo

    muduo网络库给用户提供了两个主要的类:

    1. TcpServer:用于编写服务器程序的
    2. TcpClient:用于编写客户端程序的

    epoll+线程池:
    优点:能够把网络I/O的代码和业务代码区分开、用户的断开和连接,用户可读写事件

    Muduo服务端

    下面提供了muduo进行服务器I/O和worker线程分离的代码示例:

    #include 
    #include 
    #include 
    #include 
    #include
    using namespace std;
    using namespace muduo;
    using namespace muduo::net;
    
    // 通用模板
    class ChatServer
    {
    public:
        ChatServer(EventLoop *loop,
                   const InetAddress &listenAddr,
                   const string &nameArg) : _tcpserver(loop, listenAddr, nameArg), _loop(loop)
        {
            // 给服务器注册用户连接的创建和断开的回调
            _tcpserver.setConnectionCallback(bind(&ChatServer::onConnection, this, placeholders::_1));
    
            // 给服务器注册用户读写事件回调
            _tcpserver.setMessageCallback(bind(&ChatServer::onMessage, this, placeholders::_1, placeholders::_2, placeholders::_3));
    
            // 设置服务器线程数量,1个I/O线程,3个worker线程
            _tcpserver.setThreadNum(4);
        }
        //开启事件循环
        void start()
        {
            _tcpserver.start();
        }
    
    private:
        // 专门处理用户的连接创建和断开  epoll、listenfd、accept
        void onConnection(const TcpConnectionPtr &conn)
        {
            if(conn->connected())
            {
                cout << conn->peerAddress().toIpPort() << " -> " << conn->localAddress().toIpPort() << " state online;"<<endl;
            }
            else
            {
                cout << conn->peerAddress().toIpPort() << " -> " << conn->localAddress().toIpPort() << " state offline;" << endl;
                conn->shutdown();//关闭连接
            }
        }
        // 专门处理用户的读写事件
        void onMessage(const TcpConnectionPtr &conn, // 连接
                       Buffer *buffer,                 // 缓冲区
                       Timestamp time)               // 接收到信息的时间信息
        {
            string buf = buffer->retrieveAllAsString();
            cout << "recv data: " << buf << " time: " << time.toString() << endl;
            conn->send(buf);
        }
        TcpServer _tcpserver;
        EventLoop *_loop;
    };
    int main()
    {
        EventLoop loop;
        InetAddress addr("127.0.0.1", 6000);//本机地址
        ChatServer server(&loop,addr,"ChatServer");
        //将listenfd通过epoll_ctl ->(传递给) epoll
        server.start();
        //按照epoll_wait以阻塞方式等待新用户连接,已连接用户的读写事件等
        loop.loop();
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69

    编译命令:

    server:muduo_server.cpp
    	g++ $+ -o $@ -lmuduo_net -lmuduo_base -lpthread -std=c++11
    
    • 1
    • 2

    $+ :表示依赖项,这里表示muduo_server.cpp 。$@:表示目标项,这里表示server。

    基于muduo网络库开发服务器程序,大概步骤如下:
    1、组合TcpServer对象
    2、创建EventLoop事件循环对象的指针
    3、明确TcpServer构造函数需要什么参数(TcpServer无默认构造函数),输出ChatServer构造函数
    4、在当前服务器的构造函数中,注册处理连接的回调函数和处理事件的连接函数
    5、设置合适的服务器端线程数量,muduo会自己分配I/O线程和worker线程(一般设置1个I/O,n-1个worker线程)

    负载均衡器

    一台服务器在32位Linx系统环境下,大致的并发量sockfd大约是1024个,大约支持20000个人进行同时聊天。使用ulimit -n命令查看系统允许当前用户进程打开的文件数限制,一般情况下每个进程最多允许打开1024个文件,还需要去除给当前用户进程必然打开的标准输入、标准输出、标准错误、服务器监听、进程通信等文件,剩下可以给客户端socket连接的文件数大概只有1014个左右,也就是说,基于Linux的通信程序最多运行同时1024个TCP并发连接。Linux下高并发socket最大连接数所受的各种限制点击查看更多。

    在实际环境中可能会存在多个服务器同时在后台运行,当开始通信时,需要选定聊天的服务器。


    LVS:负载均衡器常使用的设备。

    nginx负载均衡器:相当于把服务器串联起来,在用户连接服务器后,ngnix负载均衡器将对client分配服务器,如果一台服务器时支持2W用户的连接,那么三台服务器就支持6W用户的连接。

    聊天服务器属于长连接的业务。

    redis是基于发布-订阅模式,类似于设计模式的观察者模式

    nginx安装

    nginx在1.9版本之前只支持HTTP协议的web服务器的负载均衡,之后的版本开始支持TCP长连接的负载均衡。但是,nginx默认情况下没有编译TCP负载均衡模块,需要使用--with-stream进行激活。

    进入nginx官网下载对应的nginx的压缩包。

    我们使用的ubuntu系统,所以下载第二列的nginx-1.25.2版本,下载后得到安装包。使用tar -zxvf nginx-1.25.2.tar.gz进行解压。解压后目录里面存在auto CHANGES CHANGES.ru conf configure contrib html LICENSE man README src等文件夹和文件。在执行./configure --with-stream开启基于TCP的负载均衡。

    安装过程中可能存在库文件的丢失,这里我遇到了zlib,PCRE等缺失。可以按照下方命令进行安装:

    sudo apt install zlib1g
    sudo apt install zlib1g-dev
    sudo apt-get install libpcre3 libpcre3-dev
    
    • 1
    • 2
    • 3

    然后再执行命令(可能需要管理员权限):

    ./configure --with-stream
    make
    make install
    
    • 1
    • 2
    • 3
    nginx -s reload #重新加载配置文件,例如添加服务器配置
    nginx -s stop #停止nginx服务
    
    • 1
    • 2

    需要在nginx的配置文件加入以下内容(nginx配置文件在/usr/local/nginx/conf/nginx.conf,可执行文件在/usr/local/nginx/sbin/nginx

    stream {
        upstream MyServer {
            server 127.0.0.1:6000 weight=1 max_fails=3 fail_timeout=30s;
            server 127.0.0.1:6002 weight=1 max_fails=3 fail_timeout=30s;
        }
        server {
            proxy_connect_timeout 1s;
            listen 8000;
            proxy_pass MyServer;
            tcp_nodelay on;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    上图显示了一个客户端连接服务器(连接的是nginx提供的ip和port),nginx将会给每个服务器按照配置进行分发客户端相应,间接等于客户端直连服务器(还是需要通过负载均衡器nginx),不影响用户之间的通信(非跨服务器通信)。

    配置之后需要进行重新加载配置nginx -s reload

    redis安装

    ubuntu安装redis非常简单:

    sudo apt-get install redis-server
    
    • 1

    查看redis的运行

    ps -ef | grep redis
    netstat -tanp
    
    • 1
    • 2

    默认运行在6379端口tcp 0 0 127.0.0.1:6379 0.0.0.0:* LISTEN 144028/redis-server

    redis是一个强大的缓存服务器,支持多种数据结构,如字符串、list列表、set集合、map映射表等结构,支持数据的持久化存储(存储在硬盘中),经常被用于高并发的服务器环境设计中。

    redis其实类似于mysql,是client/server设计的。redis本身支持事务处理,多线程对key自增自减是线程安全的。

    redis-cli #启动redis客户端
    
    • 1

    key-value

    root@xiehou--ubuntu:~# redis-cli
    127.0.0.1:6379> get "abc"
    (nil)
    127.0.0.1:6379> set "abc" 122
    OK
    127.0.0.1:6379> get "abc"
    "122"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7


    redis的发布-订阅机制:发布-订阅模式包含了两种角色,分别是消息的发布者和消息的订阅者。订阅者可以定义一个或者多个频道channel,发布者可以指向向某个频道channel发送消息,所有订阅此频道的订阅者都会收到此消息。

    订阅的命令是subscribe。进入订阅模式后,处于此状态的客户端不能使用除subscribe、unsubscribe、psubscribe和punsubscribe这四个属于发布订阅的命令之外,否则就会报错。

    进入订阅状态后客户端可能收到3种类型的回复。每种类型的回复都包含3个值,第一个值是消息的
    类型,根据消类型的不同,第二个和第三个参数的含义可能不同。消息类型的取值可能是以下3个:

    1. subscribe:表示订阅成功的反馈信息。第二个值是订阅成功的频道名称,第三个是当前客户端订阅的频道数量。
    2. message:表示接收到的消息,第二个值表示产生消息的频道名称,第三个值是消息的内容。
    3. unsubscribe:表示成功取消订阅某个频道。第二个值是对应的频道名称,第三个值是当前客户端订阅的频道数量,当此值为0时客户端会退出订阅状态,之后就可以执行其他非"发布/订阅"模式的命令了。


    带输入参数的调试gdb --args ./chatserver 127.0.0.1 8000。或者先运行gdb ./chatserver,然后在run 127.0.0.1 8000。在某个cpp文件中打断点break chatservice.cpp:23

    项目地址:https://gitee.com/xiehou-design/ChatServer

  • 相关阅读:
    大数据集群修改服务器ip
    【深度学习】yolov5 tag7.0 实例分割 从0到1的体会
    记录使用IDEA时出现的小问题
    Linux命令入门教程(四):文本编辑篇
    Qt QImage和QPixmap区别
    交换--STP协议
    java 使用多线程模拟 大气测试数据传感器 和 计算机显示数据
    设计模式:策略模式、工厂模式、模板模式混合使用
    Nginx源码分析 --时间 日期 日志
    petite-vue源码剖析-v-if和v-for的工作原理
  • 原文地址:https://blog.csdn.net/qq_45041871/article/details/132521310