• 计算机网络 -- 多人聊天室


    一 程序介绍和核心功能

      这是基于 UDP 协议实现的一个网络程序,主要功能是 构建一个多人聊天室,当某个用户发送消息时,其他用户可以立即收到,形成一个群聊。

       这个程序由一台服务器和n个客户端组成,服务器扮演了一个接受信息和分发信息的角色,将信息发送给所有已知的用户主机。

      

    二 程序结构 

      将服务器接收消息看作生产商品、分发消息看作消费商品,这不就是一个生动形象的 「生产者消费者模型」 吗?

    「生产者消费者模型」 必备 321

    • 3三组关系
    • 2两个角色
    • 1一个交易场所

      其中两个角色可以分别创建两个线程,一个负责接收消息,放入 「生产者消费者模型」,另一个则是负责从 「生产者消费者模型」 中拿去消息,分发给用户主机。

      这对我们客户端也有相似的地方,但是与服务器不同,我们每个 客户端都认为自己只需要与服务器 1对1 连接就可以了,因此我们 每个客户端都只需要即使接收和发送 资源就可以了,只需要创建两个线程即可。

    这里的交易场所可以选则 阻塞队列,也可以选择 环形队列。

     三 服务器

    在引入 「生产者消费者模型」 后,服务器头文件结构将会变成下面这个样子

    • 启动服务器,原初始化服务器、启动线程
    • 接收消息,将收到的消息存入环形队列
    • 发送消息,从环形队列中获取消息,并派发给线程

    3.1 引入生产者消费者模型 

    这里我们直接使用一个vector数组模拟实现环形队列,同时借用信号量实现生产者消费者模型。

    RingQueue.hpp 头文件

    1. #pragma once
    2. #include
    3. #include
    4. namespace My_RingQueue
    5. {
    6. const int DEF_CAP=10;
    7. template<class T>
    8. class RingQueue
    9. {
    10. public:
    11. RingQueue(size_t cap = DEF_CAP)
    12. :_cap(cap)
    13. ,_pro_step(0)
    14. ,_con_step(0)
    15. {
    16. _queue.resize(_cap);
    17. // 初始化信号量
    18. sem_init(&_pro_sem, 0, _cap);
    19. sem_init(&_con_sem, 0, 0);
    20. }
    21. ~RingQueue(){
    22. // 销毁信号量
    23. sem_destroy(&_pro_sem);
    24. sem_destroy(&_con_sem);
    25. }
    26. // 生产商品
    27. void Push(const T &inData){
    28. // 申请信号量
    29. P(&_pro_sem);
    30. // 生产
    31. _queue[_pro_step++] = inData;
    32. _pro_step %= _cap;
    33. // 释放信号量
    34. V(&_con_sem);
    35. }
    36. // 消费商品
    37. void Pop(T *outData){
    38. // 申请信号量
    39. P(&_con_sem);
    40. // 消费
    41. *outData = _queue[_con_step++];
    42. _con_step %= _cap;
    43. // 释放信号量
    44. V(&_pro_sem);
    45. }
    46. private:
    47. void P(sem_t *sem){
    48. sem_wait(sem);
    49. }
    50. void V(sem_t *sem){
    51. sem_post(sem);
    52. }
    53. private:
    54. std::vector _queue; //这个环形队列我们直接使用数组实现
    55. size_t _cap;
    56. sem_t _pro_sem; //生产者信号量
    57. sem_t _con_sem; //消费者信号量
    58. size_t _pro_step; // 生产者下标
    59. size_t _con_step; // 消费者下标
    60. };
    61. }

    3.2 客户端代码

    3.2.1 引入用户信息

    在首次接收到某个用户的信息时,需要将其进行标识,以便后续在进行消息广播时分发给他

    有点类似于用户首次发送消息,就被拉入了 “群聊”。

    目前可以使用 IP + Port 的方式标识用户,确保用户的唯一性,这里选取 unordered_map 这种哈希表结构,方便快速判断用户是否已存在

    • key用户标识符
    • value用户客户端的 sockaddr_in 结构体

    注意: 这里的哈希表后面会涉及多线程的访问,需要加锁保护。

    3.2.2 LockGuard小组件

    利用RAII思想实现锁的自动化

    1. #pragma once
    2. #include
    3. class LockGuard{
    4. public:
    5. LockGuard(pthread_mutex_t *pmtx)
    6. :_mtx(pmtx)
    7. {
    8. pthread_mutex_lock(_mtx);
    9. }
    10. ~LockGuard(){
    11. pthread_mutex_unlock(_mtx);
    12. }
    13. private:
    14. pthread_mutex_t *_mtx;
    15. };

    3.2.3 Thread.hpp头文件

    用自己的线程库

    1. #pragma once
    2. #include
    3. #include
    4. #include
    5. #include
    6. enum class Status{
    7. NEW=0,//代表新建线程
    8. RUNNING,//代表运行
    9. EXIT //已退出线程
    10. };
    11. // 参数、返回值为 void 的函数类型
    12. //typedef void (*func_t)(void*);
    13. using func_t = std::function<void(void*)>; // 使用包装器设定函数类型
    14. class Thread{
    15. public:
    16. Thread(int num=0,func_t func=nullptr,void *args=nullptr)
    17. :_tid(0)
    18. ,_status(Status::NEW)
    19. ,_func(func)
    20. ,_args(args)
    21. {
    22. //写入线程名字
    23. char name[128];
    24. snprintf(name,sizeof name,"thraed-%d",num);
    25. _name=name;
    26. }
    27. ~Thread(){}
    28. //获取线程id
    29. pthread_t getTID() const{
    30. return _tid;
    31. }
    32. //获取线程名字
    33. std::string getName() const{
    34. return _name;
    35. }
    36. //获取线程状态
    37. Status getStatus() const{
    38. return _status;
    39. }
    40. // 回调方法
    41. static void* runHelper(void* args){
    42. Thread* myThis = static_cast(args);
    43. // 很简单,回调用户传进来的 func 函数即可
    44. myThis->_func(myThis->_args);
    45. return nullptr;
    46. }
    47. // 启动线程
    48. void run(){
    49. int ret = pthread_create(&_tid, nullptr, runHelper, this);
    50. if(ret != 0){
    51. std::cerr << "create thread fail!" << std::endl;
    52. exit(1); // 创建线程失败,直接退出
    53. }
    54. _status = Status::RUNNING; // 更改状态为 运行中
    55. }
    56. // 线程等待
    57. void join(){
    58. int ret = pthread_join(_tid, nullptr);
    59. if(ret != 0){
    60. std::cerr << "thread join fail!" << std::endl;
    61. exit(1); // 等待失败,直接退出
    62. }
    63. _status = Status::EXIT; // 更改状态为 退出
    64. }
    65. private:
    66. pthread_t _tid; // 线程 ID
    67. std::string _name; // 线程名
    68. Status _status; // 线程状态
    69. func_t _func; // 线程回调函数
    70. void* _args; // 传递给回调函数的参数
    71. };

    3.2.4 server.hpp 代码

    1. #include
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. #include
    8. #include
    9. #include
    10. #include
    11. #include"err.hpp"
    12. #include"RingQueue.hpp"
    13. #include
    14. #include"Thread.hpp"
    15. #include"LockGuard.hpp"
    16. #include
    17. namespace My_server{
    18. //端口号默认值
    19. const uint16_t default_port=8888;
    20. class server
    21. {
    22. private:
    23. /* data */
    24. int _sock;// 服务端套接字
    25. uint16_t _port;//端口号
    26. My_RingQueue::RingQueue _rq; //阻塞队列
    27. std::unordered_mapstruct sockaddr_in> _userTable; // <用户标识符, sockaddr_in 结构体>
    28. pthread_mutex_t _mtx; // 互斥锁,保护哈希表
    29. Thread* _producer;//生产者线程
    30. Thread* _consumer;//消费者线程
    31. public:
    32. server(uint16_t port=default_port)
    33. :_port(port)
    34. {
    35. pthread_mutex_init(&_mtx,nullptr);
    36. //创建线程,因为类内成员有隐含的this指针,需要bind固定该参数
    37. _producer = new Thread(1,std::bind(&server::RecvMessage,this));
    38. _consumer = new Thread(2,std::bind(&server::BroadcastMessage,this));
    39. }
    40. ~server(){
    41. //等待线程结束
    42. _producer->join();
    43. _consumer->join();
    44. //销毁互斥锁
    45. pthread_mutex_destroy(&_mtx);
    46. //释放对象
    47. delete _producer;
    48. delete _consumer;
    49. }
    50. //初始化服务器
    51. void StartServer(){
    52. //1 创建套接字
    53. _sock = socket(AF_INET,SOCK_DGRAM,0);
    54. if(_sock==-1){
    55. std::cout<<"Create Socket Fail:: "<<strerror(errno)<
    56. exit(SOCKET_ERR);
    57. }
    58. //创建成功
    59. std::cout<<"Create Success Socket: "<<_sock<
    60. //2. 绑定IP地址和端口号
    61. struct sockaddr_in local;
    62. bzero(&local,sizeof(local));// 将结构体内容置0
    63. //填充字段
    64. local.sin_family= AF_INET; //设置为网络通信
    65. local.sin_port=htons(_port);//主机序列转换为网络序列
    66. local.sin_addr.s_addr=INADDR_ANY; //服务器端要绑定任何可用IP
    67. //绑定 IP 地址和端口号
    68. if(bind(_sock,(const sockaddr*)&local,sizeof(local))){
    69. std::cout<<"Bind IP&&Port Fail: "<<strerror(errno)<
    70. exit(BIND_ERR);
    71. }
    72. //绑定成功
    73. std::cout<<" Bind IP&&Port Success"<< std::endl;
    74. _producer->run();
    75. _consumer->run();
    76. }
    77. //接收信息
    78. void RecvMessage(){
    79. //服务器不断运行,使用需要使用 一个whilc(true) 死循环
    80. char buff[1024];
    81. while(true){
    82. //1 作为客户端 要接收信息
    83. struct sockaddr_in peer;// 客户端结构体
    84. socklen_t len = sizeof(peer); //客户端结构体大小
    85. ssize_t n=recvfrom(_sock,buff,sizeof(buff)-1,0,(struct sockaddr*)&peer,&len);
    86. if(n>0){
    87. buff[n]='\0';
    88. }
    89. else{
    90. continue;
    91. }
    92. //2. 处理数据
    93. std::string clientIp=inet_ntoa(peer.sin_addr);// 获取服务端IP地址
    94. uint16_t clientPort = ntohs(peer.sin_port);// 获取端口号
    95. printf("Server get message from [%s:%d]$ %s\n",clientIp.c_str(),clientPort,buff);
    96. //3 判断是否在聊天室加入该用户
    97. std::string user = clientIp + "-" + std::to_string(clientPort);
    98. //花括号作用域内使用锁 限定RAII锁的作用域
    99. {
    100. LockGuard lockguard(&_mtx);
    101. if(_userTable.count(user)==0){ //首次出现,加入用户表
    102. _userTable[user]=peer;
    103. }
    104. }
    105. //4 将信息添加至环形队列
    106. std::string msg="["+ clientIp +":"+std::to_string(clientPort)+"] say#" + buff;
    107. _rq.Push(msg);
    108. }
    109. }
    110. // 广播消息
    111. void BroadcastMessage(){
    112. while(true) {
    113. // 1.从环形队列中获取消息
    114. std::string msg;
    115. _rq.Pop(&msg);
    116. // 2.将消息发给用户
    117. // TODO
    118. std::vector arr;
    119. {
    120. LockGuard lockguard(&_mtx);
    121. for(auto &user:_userTable){
    122. arr.push_back(user.second);
    123. }
    124. }
    125. for(auto &add:arr){
    126. //向客户端发送信息
    127. sendto(_sock,msg.c_str(),msg.size(),0,(const sockaddr*)&add,sizeof(add));
    128. }
    129. }
    130. }
    131. };
    132. }

    3.2.5 server.cc源文件

    几乎不需要更改

    1. #include
    2. #include"server.hpp"
    3. using namespace My_server;
    4. int main()
    5. {
    6. std::unique_ptr msvr(new server());
    7. //初始化服务器
    8. msvr->StartServer();
    9. return 0;
    10. }

    四 客户端

      有了之前 server.hpp 服务器头文件多线程化的经验后,改造 client.hpp 客户端头文件就很简单了,同样是创建两个线程,一个负责发送消息,一个负责接收消息

    4.1 client.hpp头文件

    1. #pragma once
    2. #include
    3. #include
    4. #include "err.hpp"
    5. #include
    6. #include
    7. #include
    8. #include
    9. #include
    10. #include
    11. namespace My_client{
    12. class client{
    13. private:
    14. /* data */
    15. std::string server_ip;//服务端 IP 地址
    16. uint16_t server_port;//服务器端口号
    17. int _sock;
    18. struct sockaddr_in _svr;
    19. public:
    20. //构造函数
    21. client(const std::string& ip,uint16_t port)
    22. :server_ip(ip)
    23. ,server_port(port)
    24. {}
    25. //析构函数
    26. ~client(){
    27. }
    28. // 初始化客户端
    29. void InitClient() {
    30. //1. 创建套接字
    31. _sock=socket(AF_INET,SOCK_DGRAM,0);
    32. if(_sock==-1){
    33. std::cout << "Create Socket Fail: " << strerror(errno) << std::endl;
    34. exit(SOCKET_ERR);
    35. }
    36. std::cout<<"Create Success Socket:"<<_sock<
    37. //2. 构建服务器的sockaddr_in 结构体信息
    38. bzero(&_svr,sizeof(_svr));
    39. _svr.sin_family=AF_INET;
    40. // 绑定服务器IP地址
    41. _svr.sin_addr.s_addr=inet_addr(server_ip.c_str());
    42. //绑定服务器端口号
    43. _svr.sin_port=htons(server_port);
    44. }
    45. // 启动客户端
    46. void StartClient() {
    47. char buff[1024];
    48. // 1. 启动客户端
    49. while(true){
    50. std::string msg;
    51. std::cout<<"Input Message# ";
    52. std::getline(std::cin,msg);
    53. ssize_t n=sendto(_sock,msg.c_str(),msg.size(),0,(const struct sockaddr*)&_svr, sizeof(_svr));
    54. if(n==-1){
    55. std::cout<<"Send Message Fail: "<<strerror(errno)<
    56. continue;
    57. }
    58. //2 因为是回响 使用也要接收信息
    59. socklen_t len = sizeof(_svr);
    60. n = recvfrom(_sock,buff,sizeof(buff)-1,0,(struct sockaddr *)&_svr,&len);
    61. if(n>0){
    62. buff[n]='\0';
    63. }
    64. else{
    65. continue;
    66. }
    67. //可以再次获取 IP地址和 端口号
    68. std::string ip=inet_ntoa(_svr.sin_addr);
    69. uint16_t port=ntohs(_svr.sin_port);
    70. printf("Client get message from [%s:%d]# %s\n",ip.c_str(), port, buff);
    71. }
    72. }
    73. };
    74. }

    4.2 client.cc 客户端源文件

    1. #include
    2. #include"client.hpp"
    3. #include"err.hpp"
    4. using namespace My_client;
    5. void Usage(const char* program){
    6. std::cout<<"Usage:"<
    7. std::cout<<"\t"<"ServerIP ServerPort" << std::endl;
    8. }
    9. int main(int argc,char *argv[]){
    10. if(argc!=3){
    11. //启动方式是错误的,提升错误信息
    12. Usage(argv[0]);
    13. return USAGE_ERR;
    14. }
    15. std::string ip = argv[1];
    16. uint16_t port = std::stoi(argv[2]);
    17. std::unique_ptr mcit(new client(ip,port));
    18. //启动客户端
    19. mcit->StartClient();
    20. return 0;
    21. }

    示例:

  • 相关阅读:
    docsify项目部署(华为云+宝塔+centos+docker+nginx)踩坑指南
    电子商务、搜索引擎
    Dell清除BIOS密码及硬盘锁
    JavaScript对象
    Go语学习笔记 - gorm使用 - 表增删改查 Web框架Gin(八)
    CUDA和cuDNN安装配置
    dubbo(四)异常处理
    JavaScript基础教程笔记(一)
    二十二、商城 - 商品录入-FastDFS(10)
    结冰过程渲染-Ovito实现
  • 原文地址:https://blog.csdn.net/sushhsishdgsusk/article/details/137990775