面试时经常会问到网络库,好久没看过这块知识了,实现一下,用到了一下一些知识点
下面是服务器代码:epollserver.cpp
- #include
- #include
- #include
- #include
- #include
- #include
- #include
- #include
- #include
- #include
- #include
- #include
-
- using namespace std;
- typedef struct sockaddr_in sockaddr_in;
-
- sem_t sem; // 信号
- pthread_cond_t cond; // 条件变量
- pthread_mutex_t mutex; // 互斥锁
-
- int pipeHandle[2]; // 管道 0读端, 1写端
-
- void set_non_blocking(int sock)
- {
- int flag;
- flag = fcntl(sock, F_GETFL);
- if (flag < 0) {
- cout << "error fcntl(sock, GETFL)! " << endl;
- return;
- }
-
- flag |= O_NONBLOCK;
- if (fcntl(sock, F_SETFL, flag) < 0) {
- cout << "error fcntl(sock, F_SETFL, opts)! " << endl;
- return;
- }
- }
-
- void* voteAction(void* data) {
- while (1)
- {
- pthread_cond_wait(&cond, &mutex); // 无条件等待,与互斥锁配合防止多个线程同时调用
- pthread_t senderId;
- read(pipeHandle[0], &senderId, sizeof(senderId));
- cout << "vote action: people " << senderId << " vote action happen, please call 110" << endl;
- write(pipeHandle[1], &senderId, sizeof(senderId));
- sem_post(&sem);
- }
- return NULL;
- }
-
- void* policeCenter(void* data)
- {
- while (1) {
- sem_wait(&sem);
- pthread_t senderId;
- read(pipeHandle[0], &senderId, sizeof(senderId));
- cout << "110 center recevice people" << senderId << " notify vote event happened" << endl;
- }
- return NULL;
- }
-
-
- struct my_params {
- int main_listenfd;
- int con_fd;
- int sock_event;
- int epoll_fd;
- };
- void* recvfromclient(void* args)
- {
- // cout << "thread pod = " << gettid() << endl;
- struct my_params* params;
- params = (struct my_params*)args;
- int listenfd = (*params).main_listenfd;
- int co_fd = (*params).con_fd;
- int epfd = (*params).epoll_fd;
- int events = (*params).sock_event;
- struct epoll_event ev;
- int sockfd;
- int socklen = sizeof(struct sockaddr_in);
- struct sockaddr_in client_addr;
-
- char buffer[1024];
- if (co_fd == listenfd)
- {
- cout << "accept connection, fd is " << listenfd << endl;
- int connfd = accept(listenfd, (struct sockaddr*)&client_addr, (socklen_t*)&socklen);
- if (connfd < 0)
- {
- cout << "connect fd < 0" << endl;
- pthread_exit(NULL);
- return NULL;
- }
- set_non_blocking(connfd);
- char* str = inet_ntoa(client_addr.sin_addr);
- cout << "connect from " << str << endl;
- ev.data.fd = connfd;
- ev.events = EPOLLIN | EPOLLET;
- epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);
- }
- else if (events & EPOLLIN)
- {
- sockfd = co_fd;
- if (sockfd < 0)
- {
- cout << "epoll in sockfd < 0" << endl;
- pthread_exit(NULL);
- return NULL;
- }
- memset(buffer, 0, sizeof(buffer));
- int ret = recv(sockfd, buffer, sizeof(buffer), 0);
- if (ret < 0)
- {
- cout << "recv error" << endl;
- }
- else if (ret == 0) // 对端主动关闭连接是可读事件,需要处理发送改过来的FIN包,对应的是read返回0
- {
- close(sockfd);
- sockfd = -1;
- cout << inet_ntoa(client_addr.sin_addr) << "closed" << endl;
- return NULL;
- }
-
- if (string(buffer) == "exit")
- {
- cout << inet_ntoa(client_addr.sin_addr) << " closed connect" << endl;
- close(sockfd);
- sockfd = -1;
- return NULL;
- }
- if (string(buffer) == "vote") // 报警
- {
- pthread_t pid = gettid();
- write(pipeHandle[1], &pid, sizeof(pid));
- pthread_cond_signal(&cond); // 唤醒条件变量
- }
- cout << "receive :" << buffer << endl;
- ev.data.fd = sockfd;
- ev.events = EPOLLOUT | EPOLLET;
- epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev);
- }
- else if (events & EPOLLOUT)
- {
- sockfd = co_fd;
- strcpy(buffer, "ok");
- int ret = send(sockfd, buffer, strlen(buffer), 0);
- if (ret <= 0)
- {
- cout << "send error" << endl;
- pthread_exit(NULL);
- return NULL;
- }
- cout << "send: " << buffer << endl;
- ev.data.fd = sockfd;
- ev.events = EPOLLIN | EPOLLET;
- epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev);
- }
-
- pthread_exit(NULL);
- return NULL;
- }
-
-
- int main()
- {
- int listenfd = socket(AF_INET, SOCK_STREAM, 0); // AF_INET 协议族,ipv4协议 SOCK_STREAM tcp链接,提供序列化的,可靠的,双向连接的字节流, 第三个参数是指定协议,0自动选择type类型对应的默认协议
- if (listenfd == -1)
- {
- cout << "socket create fail" << endl;
- return -1;
- }
-
- set_non_blocking(listenfd);
- struct epoll_event ev, events[20]; // ev用于注册事件,数组用于回传要处理的事件
- int epfd = epoll_create(256);
- ev.data.fd = listenfd;
- ev.events = EPOLLIN | EPOLLET;
- epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);
-
- struct sockaddr_in serveraddr; //sockaddr_in 分别将端口和地址存储在两个结构体中
- memset(&serveraddr, 0, sizeof(serveraddr));
- serveraddr.sin_family = AF_INET;
- serveraddr.sin_port = htons(8000);
- serveraddr.sin_addr.s_addr = INADDR_ANY;
-
- if (bind(listenfd, (struct sockaddr*)&serveraddr, sizeof(serveraddr)) != 0)
- {
- cout << "bind error" << endl;
- return -1;
- }
-
- if (listen(listenfd, 5) != 0)
- {
- cout << "Listen error" << endl;
- close(listenfd);
- return -1;
- }
-
- pthread_t voteThread;
- int res = pthread_create(&voteThread, NULL, voteAction, NULL); // 创建一个报警线程
- if (res < 0)
- {
- cout << "crete vote thread fail" << endl;
- close(listenfd);
- return -1;
- }
- pthread_cond_init(&cond, NULL); // 动态创建条件变量
- pthread_mutex_init(&mutex, NULL); // 动态创建互斥锁
-
- pthread_t policeThread;
- res = pthread_create(&policeThread, NULL, policeCenter, NULL);
- if (res < 0)
- {
- cout << "crete police center thread fail" << endl;
- close(listenfd);
- return -1;
- }
-
- sem_init(&sem, 1, 0); // 信号量
- res = pipe(pipeHandle); // 管道
- if (res < 0)
- {
- cout << "create pipe fail" << endl;
- close(listenfd);
- return -1;
- }
-
- cout << "*******************************welcome connect to server******************************" << endl;
-
- while (1)
- {
- int nfds = epoll_wait(epfd, events, 20, 1000);
- if (nfds > 0)
- {
- for (int i = 0; i < nfds; i++)
- {
- struct my_params param;
- param.main_listenfd = listenfd;
- param.con_fd = events[i].data.fd;
- param.sock_event = events[i].events;
- param.epoll_fd = epfd;
- pthread_t thread;
- res = pthread_create(&thread, NULL, recvfromclient, (void*)¶m);
- if (res < 0)
- {
- cout << "thread create fail" << endl;
- continue;
- }
- res = pthread_detach(thread); // 线程分离状态,该线程结束后,其退出状态不由其他线程获取,而字何解自己自动释放清理pcb的残留资源(进程没有该机制)
- if (res < 0)
- {
- cout << "thread deatch fail" << endl;
- }
- }
-
- }
- }
- pthread_join(voteThread, NULL);
- pthread_join(policeThread, NULL);
- close(listenfd);
- return 0;
- }
以下是服务器的编译文件:CMakeLists.txt
- cmake_minimum_required(VERSION 3.16.3)
- project(epollserver LANGUAGES CXX)
-
- link_libraries(pthread)
- add_executable(server epollserver.cpp)
以上文件都是服务器端的代码,需要放在同一个目录下
以下是客户端代码:epollclient.cpp
- #include
- #include
- #include
- #include
- #include
- #include
- #include
-
- using namespace std;
-
- int main()
- {
- int sockfd = socket(AF_INET, SOCK_STREAM, 0);
- if (sockfd <= 0)
- {
- cout << "socket error";
- return -1;
- }
-
- struct sockaddr_in remote_addr;
- memset(&remote_addr, 0, sizeof(remote_addr));
- remote_addr.sin_family = AF_INET;
- remote_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
- remote_addr.sin_port = htons(8000);
-
- if (connect(sockfd, (struct sockaddr*)&remote_addr, sizeof(struct sockaddr)) < 0)
- {
- cout << "connect eror";
- return -1;
- }
- cout << "connected to server" << endl;
-
- char buffer[1024];
-
- while (1)
- {
- memset(buffer, 0, sizeof(buffer));
- cout << "please enter message:";
- cin >> buffer;
-
- int len = send(sockfd, buffer, strlen(buffer), 0);
- if (len <= 0)
- {
- cout << "send error" << endl;
- break;
- }
-
- if (string(buffer) == "exit")
- {
- cout << "good bye" << endl;
- break;
- }
- memset(buffer, 0, sizeof(buffer));
- len = recv(sockfd, buffer, 256, 0);
- if (len > 0)
- {
- buffer[len] = '\0';
- cout << "Received:" << buffer << endl;
- }
- }
- close(sockfd);
- return 0;
- }
以下是客户端的编译文件:CMakeLists.txt
- cmake_minimum_required(VERSION 3.16.3)
- project(epollclient)
-
- add_executable(client epollclient.cpp)
如果不会使用Cmake 可以直接使用命令
客户端命令编译:g++ epollclient.cpp -o client -lpthread
服务器命令编译:g++ epollserver.cpp -o server -lpthread