这是基于 UDP 协议实现的一个网络程序,主要功能是 构建一个多人聊天室,当某个用户发送消息时,其他用户可以立即收到,形成一个群聊。
这个程序由一台服务器和n个客户端组成,服务器扮演了一个接受信息和分发信息的角色,将信息发送给所有已知的用户主机。
将服务器接收消息看作生产商品、分发消息看作消费商品,这不就是一个生动形象的 「生产者消费者模型」 吗?
「生产者消费者模型」 必备 321
3:三组关系2:两个角色1:一个交易场所
其中两个角色可以分别创建两个线程,一个负责接收消息,放入 「生产者消费者模型」,另一个则是负责从 「生产者消费者模型」 中拿去消息,分发给用户主机。
这对我们客户端也有相似的地方,但是与服务器不同,我们每个 客户端都认为自己只需要与服务器 1对1 连接就可以了,因此我们 每个客户端都只需要即使接收和发送 资源就可以了,只需要创建两个线程即可。
这里的交易场所可以选则 阻塞队列,也可以选择 环形队列。

在引入 「生产者消费者模型」 后,服务器头文件结构将会变成下面这个样子
- 启动服务器,原初始化服务器、启动线程
- 接收消息,将收到的消息存入环形队列
- 发送消息,从环形队列中获取消息,并派发给线程
这里我们直接使用一个vector数组模拟实现环形队列,同时借用信号量实现生产者消费者模型。
RingQueue.hpp 头文件
- #pragma once
-
- #include
- #include
-
- namespace My_RingQueue
- {
- const int DEF_CAP=10;
-
- template<class T>
- class RingQueue
- {
- public:
- RingQueue(size_t cap = DEF_CAP)
- :_cap(cap)
- ,_pro_step(0)
- ,_con_step(0)
- {
- _queue.resize(_cap);
- // 初始化信号量
- sem_init(&_pro_sem, 0, _cap);
- sem_init(&_con_sem, 0, 0);
- }
-
- ~RingQueue(){
- // 销毁信号量
- sem_destroy(&_pro_sem);
- sem_destroy(&_con_sem);
- }
-
- // 生产商品
- void Push(const T &inData){
- // 申请信号量
- P(&_pro_sem);
- // 生产
- _queue[_pro_step++] = inData;
- _pro_step %= _cap;
- // 释放信号量
- V(&_con_sem);
- }
-
- // 消费商品
- void Pop(T *outData){
- // 申请信号量
- P(&_con_sem);
- // 消费
- *outData = _queue[_con_step++];
- _con_step %= _cap;
- // 释放信号量
- V(&_pro_sem);
- }
-
- private:
- void P(sem_t *sem){
- sem_wait(sem);
- }
-
- void V(sem_t *sem){
- sem_post(sem);
- }
-
- private:
- std::vector
_queue; //这个环形队列我们直接使用数组实现 - size_t _cap;
- sem_t _pro_sem; //生产者信号量
- sem_t _con_sem; //消费者信号量
- size_t _pro_step; // 生产者下标
- size_t _con_step; // 消费者下标
- };
- }
在首次接收到某个用户的信息时,需要将其进行标识,以便后续在进行消息广播时分发给他
有点类似于用户首次发送消息,就被拉入了 “群聊”。
目前可以使用 IP + Port 的方式标识用户,确保用户的唯一性,这里选取 unordered_map 这种哈希表结构,方便快速判断用户是否已存在
key:用户标识符value:用户客户端的 sockaddr_in 结构体注意: 这里的哈希表后面会涉及多线程的访问,需要加锁保护。
利用RAII思想实现锁的自动化
- #pragma once
-
- #include
-
- class LockGuard{
- public:
-
- LockGuard(pthread_mutex_t *pmtx)
- :_mtx(pmtx)
- {
- pthread_mutex_lock(_mtx);
- }
-
- ~LockGuard(){
- pthread_mutex_unlock(_mtx);
- }
- private:
- pthread_mutex_t *_mtx;
- };
用自己的线程库
- #pragma once
-
- #include
- #include
- #include
- #include
-
- enum class Status{
- NEW=0,//代表新建线程
- RUNNING,//代表运行
- EXIT //已退出线程
- };
- // 参数、返回值为 void 的函数类型
- //typedef void (*func_t)(void*);
- using func_t = std::function<void(void*)>; // 使用包装器设定函数类型
-
- class Thread{
- public:
- Thread(int num=0,func_t func=nullptr,void *args=nullptr)
- :_tid(0)
- ,_status(Status::NEW)
- ,_func(func)
- ,_args(args)
- {
- //写入线程名字
- char name[128];
- snprintf(name,sizeof name,"thraed-%d",num);
- _name=name;
- }
-
- ~Thread(){}
-
- //获取线程id
- pthread_t getTID() const{
- return _tid;
- }
-
- //获取线程名字
- std::string getName() const{
- return _name;
- }
-
-
- //获取线程状态
- Status getStatus() const{
- return _status;
- }
-
- // 回调方法
- static void* runHelper(void* args){
- Thread* myThis = static_cast
(args); - // 很简单,回调用户传进来的 func 函数即可
- myThis->_func(myThis->_args);
-
- return nullptr;
- }
-
- // 启动线程
- void run(){
- int ret = pthread_create(&_tid, nullptr, runHelper, this);
- if(ret != 0){
- std::cerr << "create thread fail!" << std::endl;
- exit(1); // 创建线程失败,直接退出
- }
- _status = Status::RUNNING; // 更改状态为 运行中
- }
-
- // 线程等待
- void join(){
- int ret = pthread_join(_tid, nullptr);
- if(ret != 0){
- std::cerr << "thread join fail!" << std::endl;
- exit(1); // 等待失败,直接退出
- }
- _status = Status::EXIT; // 更改状态为 退出
- }
- private:
- pthread_t _tid; // 线程 ID
- std::string _name; // 线程名
- Status _status; // 线程状态
- func_t _func; // 线程回调函数
- void* _args; // 传递给回调函数的参数
- };
- #include
- #include
- #include
- #include
- #include
- #include
- #include
- #include
- #include
- #include
- #include"err.hpp"
- #include"RingQueue.hpp"
- #include
- #include"Thread.hpp"
- #include"LockGuard.hpp"
- #include
-
-
- namespace My_server{
-
- //端口号默认值
- const uint16_t default_port=8888;
-
- class server
- {
- private:
- /* data */
- int _sock;// 服务端套接字
- uint16_t _port;//端口号
- My_RingQueue::RingQueue
_rq; //阻塞队列 - std::unordered_map
struct sockaddr_in> _userTable; // <用户标识符, sockaddr_in 结构体> -
- pthread_mutex_t _mtx; // 互斥锁,保护哈希表
-
- Thread* _producer;//生产者线程
- Thread* _consumer;//消费者线程
-
- public:
- server(uint16_t port=default_port)
- :_port(port)
- {
- pthread_mutex_init(&_mtx,nullptr);
-
- //创建线程,因为类内成员有隐含的this指针,需要bind固定该参数
- _producer = new Thread(1,std::bind(&server::RecvMessage,this));
- _consumer = new Thread(2,std::bind(&server::BroadcastMessage,this));
- }
- ~server(){
-
- //等待线程结束
- _producer->join();
- _consumer->join();
-
- //销毁互斥锁
- pthread_mutex_destroy(&_mtx);
-
- //释放对象
- delete _producer;
- delete _consumer;
- }
-
- //初始化服务器
- void StartServer(){
-
- //1 创建套接字
- _sock = socket(AF_INET,SOCK_DGRAM,0);
- if(_sock==-1){
- std::cout<<"Create Socket Fail:: "<<strerror(errno)<
- exit(SOCKET_ERR);
- }
-
- //创建成功
- std::cout<<"Create Success Socket: "<<_sock<
-
- //2. 绑定IP地址和端口号
- struct sockaddr_in local;
- bzero(&local,sizeof(local));// 将结构体内容置0
-
- //填充字段
- local.sin_family= AF_INET; //设置为网络通信
- local.sin_port=htons(_port);//主机序列转换为网络序列
- local.sin_addr.s_addr=INADDR_ANY; //服务器端要绑定任何可用IP
-
- //绑定 IP 地址和端口号
- if(bind(_sock,(const sockaddr*)&local,sizeof(local))){
- std::cout<<"Bind IP&&Port Fail: "<<strerror(errno)<
- exit(BIND_ERR);
- }
-
- //绑定成功
- std::cout<<" Bind IP&&Port Success"<< std::endl;
-
- _producer->run();
- _consumer->run();
- }
-
- //接收信息
- void RecvMessage(){
-
- //服务器不断运行,使用需要使用 一个whilc(true) 死循环
- char buff[1024];
- while(true){
- //1 作为客户端 要接收信息
- struct sockaddr_in peer;// 客户端结构体
- socklen_t len = sizeof(peer); //客户端结构体大小
-
- ssize_t n=recvfrom(_sock,buff,sizeof(buff)-1,0,(struct sockaddr*)&peer,&len);
-
- if(n>0){
- buff[n]='\0';
- }
- else{
- continue;
- }
-
- //2. 处理数据
- std::string clientIp=inet_ntoa(peer.sin_addr);// 获取服务端IP地址
- uint16_t clientPort = ntohs(peer.sin_port);// 获取端口号
- printf("Server get message from [%s:%d]$ %s\n",clientIp.c_str(),clientPort,buff);
-
- //3 判断是否在聊天室加入该用户
- std::string user = clientIp + "-" + std::to_string(clientPort);
-
- //花括号作用域内使用锁 限定RAII锁的作用域
- {
- LockGuard lockguard(&_mtx);
- if(_userTable.count(user)==0){ //首次出现,加入用户表
- _userTable[user]=peer;
- }
- }
-
- //4 将信息添加至环形队列
- std::string msg="["+ clientIp +":"+std::to_string(clientPort)+"] say#" + buff;
- _rq.Push(msg);
- }
- }
- // 广播消息
- void BroadcastMessage(){
-
- while(true) {
- // 1.从环形队列中获取消息
- std::string msg;
- _rq.Pop(&msg);
-
- // 2.将消息发给用户
- // TODO
- std::vector
arr; -
- {
- LockGuard lockguard(&_mtx);
- for(auto &user:_userTable){
- arr.push_back(user.second);
- }
- }
-
- for(auto &add:arr){
- //向客户端发送信息
- sendto(_sock,msg.c_str(),msg.size(),0,(const sockaddr*)&add,sizeof(add));
- }
- }
- }
- };
-
- }
3.2.5 server.cc源文件
几乎不需要更改
- #include
- #include"server.hpp"
-
- using namespace My_server;
-
- int main()
- {
- std::unique_ptr
msvr(new server()) ; -
- //初始化服务器
- msvr->StartServer();
-
-
- return 0;
- }
四 客户端
有了之前 server.hpp 服务器头文件多线程化的经验后,改造 client.hpp 客户端头文件就很简单了,同样是创建两个线程,一个负责发送消息,一个负责接收消息
4.1 client.hpp头文件
- #pragma once
-
- #include
- #include
- #include "err.hpp"
- #include
- #include
- #include
- #include
- #include
- #include
-
- namespace My_client{
-
- class client{
- private:
- /* data */
- std::string server_ip;//服务端 IP 地址
- uint16_t server_port;//服务器端口号
- int _sock;
- struct sockaddr_in _svr;
- public:
- //构造函数
- client(const std::string& ip,uint16_t port)
- :server_ip(ip)
- ,server_port(port)
- {}
- //析构函数
- ~client(){
- }
- // 初始化客户端
- void InitClient() {
-
- //1. 创建套接字
- _sock=socket(AF_INET,SOCK_DGRAM,0);
- if(_sock==-1){
- std::cout << "Create Socket Fail: " << strerror(errno) << std::endl;
- exit(SOCKET_ERR);
- }
-
- std::cout<<"Create Success Socket:"<<_sock<
-
- //2. 构建服务器的sockaddr_in 结构体信息
- bzero(&_svr,sizeof(_svr));
- _svr.sin_family=AF_INET;
- // 绑定服务器IP地址
- _svr.sin_addr.s_addr=inet_addr(server_ip.c_str());
- //绑定服务器端口号
- _svr.sin_port=htons(server_port);
- }
- // 启动客户端
- void StartClient() {
-
- char buff[1024];
- // 1. 启动客户端
- while(true){
- std::string msg;
- std::cout<<"Input Message# ";
- std::getline(std::cin,msg);
-
- ssize_t n=sendto(_sock,msg.c_str(),msg.size(),0,(const struct sockaddr*)&_svr, sizeof(_svr));
-
- if(n==-1){
- std::cout<<"Send Message Fail: "<<strerror(errno)<
- continue;
- }
-
- //2 因为是回响 使用也要接收信息
- socklen_t len = sizeof(_svr);
- n = recvfrom(_sock,buff,sizeof(buff)-1,0,(struct sockaddr *)&_svr,&len);
-
- if(n>0){
- buff[n]='\0';
- }
- else{
- continue;
- }
-
- //可以再次获取 IP地址和 端口号
- std::string ip=inet_ntoa(_svr.sin_addr);
- uint16_t port=ntohs(_svr.sin_port);
-
- printf("Client get message from [%s:%d]# %s\n",ip.c_str(), port, buff);
- }
- }
- };
- }
4.2 client.cc 客户端源文件
- #include
- #include"client.hpp"
- #include"err.hpp"
-
-
- using namespace My_client;
-
- void Usage(const char* program){
- std::cout<<"Usage:"<
- std::cout<<"\t"<
"ServerIP ServerPort" << std::endl; - }
-
- int main(int argc,char *argv[]){
-
- if(argc!=3){
- //启动方式是错误的,提升错误信息
- Usage(argv[0]);
- return USAGE_ERR;
- }
-
- std::string ip = argv[1];
- uint16_t port = std::stoi(argv[2]);
-
- std::unique_ptr
mcit(new client(ip,port)) ; -
- //启动客户端
- mcit->StartClient();
-
- return 0;
- }
示例:

-
相关阅读:
docsify项目部署(华为云+宝塔+centos+docker+nginx)踩坑指南
电子商务、搜索引擎
Dell清除BIOS密码及硬盘锁
JavaScript对象
Go语学习笔记 - gorm使用 - 表增删改查 Web框架Gin(八)
CUDA和cuDNN安装配置
dubbo(四)异常处理
JavaScript基础教程笔记(一)
二十二、商城 - 商品录入-FastDFS(10)
结冰过程渲染-Ovito实现
-
原文地址:https://blog.csdn.net/sushhsishdgsusk/article/details/137990775