• 分享自己平时使用的socket多客户端通信的代码技术点和软件使用


    前言

    说到linux下多进程通信,有好几种,之前也在喵哥的公众号回复过,这里再拿出来,重新写一遍:多进程通信有管道,而管道分为匿名和命名管道 ,后者比前者优势在于可以进行无亲缘进程通信;此外信号也是进程通信的一种,比如我们最常用的就是设置ctrl+c的kill信号发送给进程;其次信号量一般来说是一种同步机制但是也可以认为是通信,需要注意的是信号量、共享内存、消息队列在使用时候也有posix和system v的区别;还有我们今天的主角套接字( socket ) :套接字也是一种进程间通信机制。

    线程间的通信的话,共享变量,此外在unpipc书描述的话,同步也属于通讯机制,那么就要补充上线程中我们最多用的互斥量、条件变量、读写锁、记录锁和线程中的信号量使用。

    今天想分享一些socket编程的例子,socket嵌入式。linux开发很常用,用于进程间通信很方便,也有很多介绍,今天我也也来做自己的介绍分享。和别人不一样的地方,我主要想分享socket 服务端在linux写的代码,使用vscode调试执行,并且同时分享自己使用tcp监控软件去判断socket通信正确性。

    作者:良知犹存

    转载授权以及围观:欢迎关注微信公众号:羽林君

    或者添加作者个人微信:become_me


    socket通信基本函数介绍

    在这里有一个简单demo演示以及函数的介绍,大家打开这个链接就可以看到哈:

    socket重要函数

    socket通信有些固定的函数,这里先给大家做简单的分享:

    • int socket(int domain, int type, int protocol);

    该函数用于创建一个socket描述符,它唯一标识一个socket,这个socket描述字跟文件描述字一样,后续的操作都有用到它,把它作为参数,通过它来进行一些读写操作。创建socket的时候,也可以指定不同的参数创建不同的socket描述符,socket函数的三个参数分别为:

    1.domain:参数domain表示该套接字使用的协议族,在Linux系统中支持多种协议族,对于TCP/IP协议来说,选择AF_INET就足以,当然如果你的IP协议的版本支持IPv6,那么可以选择AF_INET6,可选的协议族具体见:

    - AF_UNIX, AF_LOCAL: 本地通信-AF_INET : IPv4
    - AF_INET6 : IPv6
    - AF_IPX : IPX - Novell 协议
    - AF_NETLINK : 内核用户界面设备
    - AF_X25 : ITU-T X.25 / ISO-8208 协议
    - AF_AX25 : 业余无线电 AX.25 协议
    - AF_ATMPVC : 访问原始ATM PVC
    - AF_APPLETALK : AppleTalk
    - AF_PACKET : 底层数据包接口
    - AF_ALG : 内核加密API的AF_ALG接口
    

    2.type:参数type指定了套接字使用的服务类型,可能的类型有以下几种:

    - SOCK_STREAM:提供可靠的(即能保证数据正确传送到对方)面向连接的Socket服务,多用于资料(如文件)传输,如TCP协议。
    - SOCK_DGRAM:是提供无保障的面向消息的Socket 服务,主要用于在网络上发广播信息,如UDP协议,提供无连接不可靠的数据报交付服务。
    - SOCK_SEQPACKET:为固定最大长度的数据报提供有序的,可靠的,基于双向连接的数据传输路径。
    - SOCK_RAW:表示原始套接字,它允许应用程序访问网络层的原始数据包,这个套接字用得比较少,暂时不用理会它。
    - SOCK_RDM:提供不保证排序的可靠数据报层。
    

    3.protocol:参数protocol指定了套接字使用的协议,在IPv4中,只有TCP协议提供SOCK_STREAM这种可靠的服务,只有UDP协议提供SOCK_DGRAM服务,对于这两种协议,protocol的值均为0,因为当protocol为0时,会自动选择type类型对应的默认协议。

    • int bind(int sockfd, struct sockaddr *my_addr, socklen_t addrlen);

    在进行网络通信的时候,必须把一个套接字与一个IP地址或端口号相关联,这个bind就是绑定的过程。

    • int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);

    这个connect()函数用于客户端中,将sockfd与远端IP地址、端口号进行绑定,在TCP客户端中调用这个函数将发生握手过程(会发送一个TCP连接请求),并最终建立一个TCP连接,而对于UDP协议来说,调用这个函数只是在sockfd中记录远端IP地址与端口号,而不发送任何数据,参数信息与bind()函数是一样的。

    • int listen(int s, int backlog);

    listen()函数只能在TCP服务器进程中使用,让服务器进程进入监听状态,等待客户端的连接请求,listen()函数在一般在bind()函数之后调用,在accept()函数之前调用,它的函数原型是:

    • int accept(int s, struct sockaddr *addr, socklen_t *addrlen);

    accept()函数就是用于处理连接请求的,accept()函数用于TCP服务器中,等待着远端主机的连接请求,并且建立一个新的TCP连接,在调用这个函数之前需要通过调用listen()函数让服务器进入监听状态,如果队列中没有未完成连接套接字,并且套接字没有标记为非阻塞模式,accept()函数的调用会阻塞应用程序直至与远程主机建立TCP连接;如果一个套接字被标记为非阻塞式而队列中没有未完成连接套接字, 调用accept()函数将立即返回EAGAIN。

    • ssize_t read(int fd, void *buf, size_t count);

    read 从描述符 fd 中读取 count 字节的数据并放入从 buf 开始的缓冲区中.

    • ssize_t recv(int sockfd, void *buf, size_t len, int flags);

    不论是客户还是服务器应用程序都可以用recv()函数从TCP连接的另一端接收数据,它与read()函数的功能是差不多的。

    • ssize_t write(int fd, const void *buf, size_t count);

    write()函数一般用于处于稳定的TCP连接中传输数据,当然也能用于UDP协议中,它向套接字描述符 fd 中写入 count 字节的数据,数据起始地址由 buf 指定,函数调用成功返回写的字节数,失败返回-1,并设置errno变量。

    • int send(int s, const void *msg, size_t len, int flags);

    无论是客户端还是服务器应用程序都可以用send()函数来向TCP连接的另一端发送数据。

    • int sendto(int s, const void *msg, size_t len, int flags, const struct sockaddr *to, socklen_t tolen);

    sendto()函数与send函数非常像,但是它会通过 struct sockaddr 指向的 to 结构体指定要发送给哪个远端主机,在to参数中需要指定远端主机的IP地址、端口号等,而tolen参数则是指定to 结构体的字节长度。

    • int close(int fd);

    close()函数是用于关闭一个指定的套接字,在关闭套接字后,将无法使用对应的套接字描述符

    TCP客户端一般流程

    • 调用socket()函数创建一个套接字描述符。
    • 调用connect()函数连接到指定服务器中,端口号为服务器监听的端口号。
    • 调用write()函数发送数据。
    • 调用close()函数终止连接。
     // 创建套接字描述符
    ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1// 建立TCP连接
    (connect(sockfd, (struct sockaddr *)&server, sizeof(struct sockaddr))
    write(sockfd, buffer, sizeof(buffer))
    close(sockfd);
    

    TCP服务器一般流程

    • 服务器的代码流程如下:
    • 调用socket()函数创建一个套接字描述符。
    • 调用bind()函数绑定监听的端口号。
    • 调用listen()函数让服务器进入监听状态。
    • 调用accept()函数处理来自客户端的连接请求。
    • 调用read()函数接收客户端发送的数据。
    • 调用close()函数终止连接。
    // socket create and verification
    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    // binding newly created socket to given IP and verification   
    if ((bind(sockfd, (struct sockaddr*)&server, sizeof(server))) != 0) 
    // now server is ready to listen and verification
    if ((listen(sockfd, 5)) != 0) {
    // accept the data packet from client and verification
    connfd = accept(sockfd, (struct sockaddr*)&client, &len);
    if (read(connfd, buff, sizeof(buff)) <= 0) {
    close(connfd);
    close(sockfd);
    

    这里也顺带分享一个socket 阻塞和非阻塞的机制 前面提到accept函数中,描述套接字没有标记为非阻塞模式,accept()函数的调用会阻塞应用程序直至与远程主机建立TCP连接;如果一个套接字被标记为非阻塞式而队列中没有未完成连接套接字, 调用accept()函数将立即返回EAGAIN。但是socket默认初始化是阻塞的,正常初始化后accept没有收到客户端的链接请求的话,就会一直的等待阻塞当前线程,直到有客户端进行链接请求。

    那么如何才能把socket设置为非阻塞呢?用ioctl(sockfd, FIONBIO, &mode);

    //-------------------------
    
    // Set the socket I/O mode: In this case FIONBIO
    
    // enables or disables the blocking mode for the
    
    // socket based on the numerical value of iMode.
    
    // If iMode = 0, blocking is enabled;
    
    // If iMode != 0, non-blocking mode is enabled.
    
    u_long iMode = 1;  //non-blocking mode is enabled.
    
    ioctlsocket(m_socket, FIONBIO, &iMode); //设置为非阻塞模式
    

    一般大家介绍会说使用ioctlsocket,但是有些系统使用会报错。如下:

    ioctlsocket会报错,所以使用 ioctl就好了,操作都是一样的。

     #include <sys/ioctl.h>
    ioctl(sockfd, FIONBIO, &mode);
    

    这是一个简单的图表分析,来自下面文章链接,大家有兴趣也可以自行查看。

    阻塞非阻塞的介绍 链接

    代码实例

    代码有test_socket_client.cpptest_socket_server.htest_socket_server.cpp 三个文件,交互机制以及实现功能如下:

    首先test_socket_client.cpp 是客户端代码,用来测试链接服务器端交互,用select进行接收数据,并监听执行终端是否有输入信息,输入信息立刻发送。

    test_socket_server.h是test_socket_server.cpp使用定义的类和api的头文件,而在test_socket_server.cpp实现了定义了一个支持多客户端连接的通信接口,同时也时刻检测执行终端输入信息,并广播到全部链接的客户端;而客户端发过来的信息,服务端针对的点对点收发,即接收到特定客户端的信息只发送到该客户端。其中使用了std::future + std::async实现了通信的异步操作,并使用 impl模式包裹了socket接口。在监听执行终端信息时候分别使用了std::conditionstd::async实现,大家可以通过宏开关自行选择测试。

    还有些其他的技术使用,多线程的调度以及流的输出,忽略SIGPIPE信号用来控制客户端链接断开之后代码正常运行等,再后面我一一给大家分析介绍。

    test_socket_client.cpp 这个文件就是随便找了一个socket客户端代码,这个test_socket_client代码来源是网络,大家也可以自己去写或者网上自己找相关的用例,因为本次的重要部分是服务端server代码,所以这块就贴一下代码。

    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <errno.h>
    #include <sys/socket.h>
    #include <arpa/inet.h>
    #include <netinet/in.h>
    #include <sys/types.h>
    #include <unistd.h>
    #include <sys/time.h>
    
    //g++ test_socket_client.cpp -o  test_socket_client
    
    #define BUFLEN 1024
    #define PORT 8555
    
    int main(int argc, char **argv)
    {
        int sockfd;
        struct sockaddr_in s_addr;
        socklen_t len;
        unsigned int port;
        char buf[BUFLEN];
        fd_set rfds;
        struct timeval tv;
        int retval, maxfd; 
        
        /*建立socket*/
        if((sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1){
            perror("socket");
            exit(errno);
        }else
            printf("socket create success!\n");
    
        /*设置服务器ip*/
        memset(&s_addr,0,sizeof(s_addr));
        s_addr.sin_family = AF_INET;
        s_addr.sin_port = htons(PORT);
        if (inet_aton("127.0.0.1", (struct in_addr *)&s_addr.sin_addr.s_addr) == 0) {
            perror("127.0.0.1");
            exit(errno);
        }
      
        /*开始连接服务器*/ 
        while(connect(sockfd,(struct sockaddr*)&s_addr,sizeof(struct sockaddr)) == -1){
            perror("connect");
            sleep(1);
            exit(errno);
        }
    
        while(1){
            FD_ZERO(&rfds);
            FD_SET(0, &rfds);
            maxfd = 0;
            FD_SET(sockfd, &rfds);
            if(maxfd < sockfd)
                maxfd = sockfd;
            tv.tv_sec = 6;
            tv.tv_usec = 0;
            retval = select(maxfd+1, &rfds, NULLNULL, &tv);
            if(retval == -1){
                printf("select出错,客户端程序退出\n");
                break;
            }else if(retval == 0){
                printf("waiting...\n");
                continue;
            }else{
                /*服务器发来了消息*/
                if(FD_ISSET(sockfd,&rfds)){
                    /******接收消息*******/
                    bzero(buf,BUFLEN);
                    len = recv(sockfd,buf,BUFLEN,0);
                    if(len > 0)
                        printf("服务器发来的消息是:%s\n",buf);
                    else{
                        if(len < 0 )
                            printf("接受消息失败!\n");
                        else
                            printf("服务器退出了,聊天终止!\n");
                    break; 
                    }
                }
                /*用户输入信息了,开始处理信息并发送*/
                if(FD_ISSET(0, &rfds)){ 
                    /******发送消息*******/ 
                    bzero(buf,BUFLEN);
                    fgets(buf,BUFLEN,stdin);
                   
                    if(!strncasecmp(buf,"quit",4)){
                        printf("client 请求终止聊天!\n");
                        break;
                    }
                        len = send(sockfd,buf,strlen(buf),0);
                    if(len > 0)
                        printf("\t消息发送成功:%s\n",buf); 
                    else{
                        printf("消息发送失败!\n");
                        break; 
                    } 
                }
            }
        
        }
        /*关闭连接*/
        close(sockfd);
        return 0;
    }
    

    test_socket_server.h 使用的头文件,定义一些外部api

    #ifndef _TEST_SOCKET_H
    #define _TEST_SOCKET_H
    
    #include <functional>
    #include <memory>
    #include <thread>
    #include <vector>
    
    namespace linx_socket {
    
    int Writen(int fd, const void *vptr, int n);
    int Readn(int fd, void *vptr, int maxlen);
    int CreatSocket(const char *ip, int port);
    int StartLisen(int fd);
    bool Close(int fd);
    
    }  // namespace linx_socket
    
    class DevSocket  {
     public:
      using CallBack  = std::function<void(int ,std::vector<uint8_t>&&)>;
      DevSocket();
      DevSocket(const CallBack& callback);
      bool Send(int fd,const std::vector<uint8_t> &data) const ;
      // std::vector<uint8_t> Recive() const ; //当建立连接后 就会在线程里面循环读取客户端发来的信息, 所以不需要专门写rx函数
      ~DevSocket(){}
    
      private:
      class Socket;
      std::unique_ptr<Socket> SocketImpl;
    
    };
    
    #endif
    

    test_socket_server.cpp

    里面包含的#include "log.h"这个文件是我自己写的log输出文件,打印时间和颜色等,看着比较方便,大家使用代码时候自行替换成自己需要printf或者std::cout或者自己的打印文件

    #include <stdio.h>
    #include <algorithm>
    #include <array>
    #include <chrono>
    #include <boost/thread/mutex.hpp>
    #include <mutex>
    #include <condition_variable>
    #include <iostream>
    #include <iterator>
    #include <string>
    #include <thread>
    #include <vector>
    #include <arpa/inet.h>
    #include <errno.h>
    #include <net/if.h>
    #include <netinet/in.h>
    #include <netinet/tcp.h>
    #include <sys/socket.h>
    #include <unistd.h>
    #include <future>
    
    #include "test_socket_server.h"
    #include "log.h"
    // g++ test_socket_server_optimiza_2.cpp -o  test_socket_server_optimiza -lboost_thread -lpthread
    namespace linx_socket
    {
    
        constexpr int socket_que_size = 3;
    
        //使用select进行写入
        int Writen(int fd, const void *vptr, int n)
        {
            ssize_t nleft = n;
            const char *ptr = (const char *)vptr;
            fd_set write_fd_set;
            struct timeval timeout;
            while (nleft > 0)
            {
                ssize_t nwriten = 0;
    
                timeout.tv_sec = 1;
                timeout.tv_usec = 0;
                FD_ZERO(&write_fd_set);
                FD_SET(fd, &write_fd_set);
                int s_ret = select(FD_SETSIZE, NULL, &write_fd_set, NULL, &timeout);
    
                if (s_ret < 0)
                {
                    EXC_ERROR("-------write_fd_set error------------");
                    return -1;
                }
                else if (s_ret == 0)
                {
                    usleep(100 * 1000);
                    EXC_ERROR("-------write_fd_set timeout ------------");
                    continue;
                }
    
                if ((nwriten = write(fd, ptr, nleft)) < 0)
                {
                    if (nwriten < 0 && errno == EINTR)
                    {
                        nwriten = 0;
                    }
                    else
                    {
                        EXC_ERROR("-------nwriten error = %d ------------", nwriten);
                        return -1;
                    }
                }
                nleft -= nwriten;
                ptr += nwriten;
            }
            return n;
        }
    
        //使用select进行读取
        int Readn(int fd, void *vptr, int maxlen)
        {
            bool ret = false;
            ssize_t nread = 0;
            fd_set read_fd_set;
            struct timeval timeout;
            while (!ret)
            {
                // EXC_INFO("Readn begine.");
                timeout.tv_sec = 1;
                timeout.tv_usec = 0;
                FD_ZERO(&read_fd_set);
                FD_SET(fd, &read_fd_set);
                int s_ret = select(FD_SETSIZE, &read_fd_set, NULLNULL, &timeout);
    
                if (s_ret < 0)
                {
                    EXC_ERROR("-------select error------------");
                    return -1;
                }
                else if (s_ret == 0)
                {
                    usleep(100 * 1000);
                    // EXC_ERROR("-------select timeout ------------");
                    continue;
                }
    
                if ((nread = read(fd, vptr, maxlen)) < 0)
                {
                    if (errno == EINTR)
                    {
                        EXC_ERROR("buff = %d, fd=%d, errno=%d.", vptr, fd, errno);
                        nread = 0;
                    }
                    else
                    {
                        EXC_ERROR("buff = %d, fd=%d, errno=%d.", vptr, fd, errno);
                        return -1;
                    }
                }
                else
                {
                    if (nread == 0)
                    {
                        EXC_ERROR("buff = %d, fd=%d, nread=%d. data:%s", vptr, fd, nread, vptr);
                    }
                    // else
                    // {
                    //     EXC_INFO("buff = %d, fd=%d, nread=%d. data:%s", vptr, fd, nread, vptr);
                    // }
                    ret = 1;
                }
            }
            return nread;
        }
        //进行处理来自客户端的连接请求
        int IsListened(int fd)
        {
            struct sockaddr_in c_addr;
            socklen_t c_lent = sizeof(c_addr);
            int fd_c = accept(fd, (struct sockaddr *)&c_addr, &c_lent);
    
            if (fd_c < 0)
            {
                if (errno == EPROTO || errno == ECONNABORTED)
                {
                    return -1;
                }
            }
            EXC_INFO("accept %s: %d sucess"inet_ntoa(c_addr.sin_addr), ntohs(c_addr.sin_port));
            return fd_c;
        }
    
        //创建一个套接字描述符
        int CreatSocket(const char *ip, int port)
        {
            int ret = -1;
            // EXC_INFO("CreatSocket");
            int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
            if (fd < 0)
            {
                return -1;
            }
            int reuse = 1;
            //设置套接字的一些选项 SOL_SOCKET:表示在Socket层 SO_REUSEADDR(允许重用本地地址和端口)
            if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) < 0)
            {
                return -1;
            }
    
            struct sockaddr_in s_addr;
            memset(&s_addr, 0sizeof(s_addr));
            s_addr.sin_addr.s_addr = htonl(INADDR_ANY);
            s_addr.sin_port = htons(port);
            s_addr.sin_family = AF_INET;
            if (bind(fd, (struct sockaddr *)&s_addr, sizeof(s_addr)) < 0)
            {
                EXC_ERROR("bind %s: %d error"inet_ntoa(s_addr.sin_addr), ntohs(s_addr.sin_port));
                close(fd);
                return -2;
            }
            if (listen(fd, socket_que_size) < 0)
            {
                close(fd);
                return -3;
            }
            return fd;
        }
    
        int CreatSocket(const char *ip, int port, int socket_que_size)
        {
            int ret = -1;
            EXC_INFO("");
            int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
            if (fd < 0)
            {
                return -1;
            }
            struct sockaddr_in s_addr;
            memset(&s_addr, 0sizeof(s_addr));
            s_addr.sin_addr.s_addr = htonl(INADDR_ANY);
            s_addr.sin_port = htons(port);
            s_addr.sin_family = AF_INET;
            if (bind(fd, (struct sockaddr *)&s_addr, sizeof(s_addr)) < 0)
            {
                close(fd);
                return -2;
            }
            if (listen(fd, socket_que_size) < 0)
            {
                close(fd);
                return -3;
            }
            return fd;
        }
    
        bool Close(int fd)
        {
            close(fd);
            return true;
        }
    } // namespace linx_socket
    
    class Connection
    {
    public:
        Connection(int fd, DevSocket::CallBack c) : call_back_f_(c), fd_(fd)
        {
            read_sta = std::async(std::launch::async, [this]()
                                  { Read(); }); //循环读取socket连接的数据
    
    
        };
    
        void Read()
        {
            while (!kill_thread_)
            {
                if (fd_ < 0)
                    break;
                std::array<uint8_t, kBuffSize> buf;
                int len = linx_socket::Readn(fd_, buf.data(), kBuffSize);
                if (len > 0)
                {
                    if (call_back_f_)
                    {
                        data_parser_mutex_.lock();
                        call_back_f_(fd_, {buf.begin(), buf.begin() + len});
                        data_parser_mutex_.unlock();
                    }
                }
                else if (len < 0)
                {
                    kill_thread_ = true;
                    EXC_ERROR("read error, fd= %d, rev len= %d.", fd_, len);
                    break;
                }
                else if (len == 0)
                {
                    std::this_thread::sleep_for(std::chrono::seconds(1));
                    EXC_ERROR("call_back_f_ = %d, fd=%d, rev len=%d.", call_back_f_, fd_, len);
                }
            }
        }
    
        bool Write(std::vector<uint8_t> data)
        {
            if (linx_socket::Writen(fd_, data.data(), data.size()) < 0)
            {
                kill_thread_ = true;
                EXC_ERROR("Writen error.");
                return false;
            }
            return true;
        }
        bool GetIsKillThread() return kill_thread_; }
    
        ~Connection()
        {
            EXC_INFO("kill_thread_ is %d", kill_thread_);
    
            kill_thread_ = true;
            if (fd_ != -1)
            {
                linx_socket::Close(fd_);
                fd_ = -1;
            }
        }
        std::future<void> &GetReadSta() return read_sta; }
        int GetFd() return fd_; }
    
    private:
        int fd_ = -1;
        bool kill_thread_ = false;
        DevSocket::CallBack call_back_f_ = nullptr/**/
        boost::mutex data_parser_mutex_;
        std::future<void> read_sta;
        constexpr static int kBuffSize = 1024;
    };
    
    class DevSocket::Socket
    {
    public:
        Socket(){};
        Socket(std::pair<std::string, int> port, const CallBack &callback_)
            : call_back_f_(callback_)
        {
            EXC_WARN("Socket ");
            int n;
            if ((n = linx_socket::CreatSocket(port.first.c_str(), port.second)) < 0)
            {
                throw std::string("CreatSocket  error ") + std::to_string(n);
            }
            fd = n;
            auto threa_func = [this]()
            {
                while (!kill_thread_)
                {
                    //循环std::launch::async 传递的可调用对象异步执行 
                    std::future<int> listened_status = std::async(std::launch::async, [this]()
                                                                  {
                                                                      EXC_INFO("Listened .");
                                                                      return linx_socket::IsListened(fd);
                                                                  });
    
                    //lister 套接字有没有侦听到连接,任务没返回,没有侦听到连接套接字
                    while (listened_status.wait_for(std::chrono::seconds(0)) !=
                           std::future_status::ready)
                    {
                        if (kill_thread_)
                            return;
                        for (auto it = connections_.begin(); it != connections_.end();)
                        {
                            //任务返回了,说明该连接结束了
                            if ((*it)->GetReadSta().wait_for(std::chrono::seconds(0)) ==
                                std::future_status::ready)
                            {
                                if ((*it)->GetReadSta().valid())
                                {
                                    EXC_ERROR("connection_kill_thread is %d, socket_kill_thread_ is =%d", (*it)->GetIsKillThread(), kill_thread_);
                                    (*it)->GetReadSta().get();//主动退出
                                }
                                EXC_INFO("dis connection_ fd=%d.", (*it)->GetFd());
                                boost::mutex::scoped_lock lock(connection_mutex_);
                                it = connections_.erase(it);
                                if (connections_.size() <= 0)
                                {
                                    EXC_ERROR("all is dis connected");
                                }
                            }
                            if (it != connections_.end())
                            {
                                ++it;
                            }
                        }
    
                        std::this_thread::sleep_for(std::chrono::milliseconds(10));
                    }
             //    EXC_INFO( "==================== thread id: %d" ,std::this_thread::get_id());
    
                    //有新的连接
                    int clien_fd = listened_status.get();
                    if (clien_fd > 0)
                    {
                        boost::mutex::scoped_lock lock(connection_mutex_);
                        connections_.push_back(
                            std::make_shared<Connection>(clien_fd, call_back_f_));
                        EXC_INFO("connection_ fd=%d.", clien_fd);
                    }
                }
            };
    
            // EXC_INFO("before move threa_func=%d.", threa_func);
    
            thread_ = std::thread(std::move(threa_func)); //左值变右值传入 减少拷贝
    
            // EXC_INFO("after  move threa_func=%d.", threa_func);
        }
    
        bool SendData(const std::vector<uint8_t> &data, std::shared_ptr<Connection> connection)
        {
            boost::mutex::scoped_lock lock(connection_mutex_);
            return connection->Write(data);
        }
        std::vector<std::shared_ptr<Connection>> GetConnections()
        {
            return connections_;
        }
        ~Socket()
        {
            kill_thread_ = true;
            if (fd != -1)
            {
                linx_socket::Close(fd);
            }
            if (thread_.joinable())
            {
                thread_.join();
            }
        }
    
    private:
        int fd = -1;
        bool kill_thread_ = false;
        CallBack call_back_f_ = nullptr;
        std::thread thread_;
        std::vector<std::shared_ptr<Connection>> connections_;
        boost::mutex connection_mutex_;
    };
    
    #define HOST "127.0.0.1" // 根据你服务器的IP地址修改
    #define PORT 8555        // 根据你服务器进程绑定的端口号修改
    
    DevSocket::DevSocket()
    {
        EXC_WARN("new  DevSocket");
        std::pair<std::string, int> par{HOST, PORT};
        SocketImpl = std::unique_ptr<Socket>(new Socket(par, nullptr));
    }
    
    DevSocket::DevSocket(const CallBack &callback)
    {
        EXC_WARN("new  DevSocket");
        std::pair<std::string, int> par{HOST, PORT};
        SocketImpl = std::unique_ptr<Socket>(new Socket(par, callback));
    }
    
    bool DevSocket::Send(int fd, const std::vector<uint8_t> &data) const
    {
        for (auto connection : SocketImpl->GetConnections())
        {
            if (nullptr == connection)
                continue;
            if (fd == connection->GetFd() || fd == 0//fd ==0 全部发送
            {
                int ret = SocketImpl->SendData(data, connection);
                EXC_WARN("fd %d  send status :%d", connection->GetFd(), ret);
            }
        }
        return true;
    }
    
    std::ostream &operator<<(std::ostream &out, std::vector<uint8_t> &data)
    {
        EXC_WARN("operator 1<<<<<<<<<<<<<<");
    
        out << "hex ";
        out << std::hex;
        for (auto &d : data)
        {
            out << "0x" << std::hex << (int)d << " ";
        }
        out << std::endl;
        EXC_WARN("operator 2<<<<<<<<<<<<<");
        return out;
    }
    
    #include <signal.h>
    void pipesig_handler(int sig)
    {
        EXC_ERROR("receive signal %d", sig);
    }
    #if 1 //std::async 控制发送
    int main(int argc, char *argv[])
    {
        DevSocket *Device;
        // 为SIGPIPE添加信号处理函数,处理完程序继续执行 1
        signal(SIGPIPE, pipesig_handler);
        bool SendFlag = false;
        std::vector<uint8_t> send_data; 
        int read_fd{-1};
        try
        {
            EXC_INFO("device socket init");
            Device =
                new DevSocket([&](int fd, std::vector<uint8_t> &&d)
                              {
                                  EXC_INFO("recive call fd :%d", fd);
                                  send_data = d;
                                  SendFlag = true;
                                  std::ostringstream ss;
                                  ss << "recive data:[";
                                  std::for_each(send_data.begin(), send_data.end(),
                                                [&](uint8_t temp)
                                                { ss << " " << temp << ","; });
                                  EXC_WARN("%s]", ss.str().c_str());
                                //   std::cout << send_data; //使用operator<< 函数
                              });
        }
        catch (const std::string s)
        {
            EXC_INFO("Device.emplace_back:%s", s.c_str());
            return EXIT_FAILURE;
        }
        const int BUFLEN = 1024;
        char buf[BUFLEN];
        std::thread input_keyboard = std::thread([&]
                                                 {
                                                     while (true)
                                                     {
                                                         memset(buf, 0sizeof(buf));
                                                         /*fgets函数:从流中读取BUFLEN-1个字符*/
                                                         fgets(buf, BUFLEN, stdin);
                                                         EXC_INFO("from terminal:%s", buf);
    
                                                         if (!strncasecmp(buf, "quit"4))
                                                         {
                                                             EXC_INFO("server quit!");
                                                             exit(0);
                                                         }
                                                         std::vector<uint8_t> send_msg;
    
                                                         for (int i = 0; buf[i] != '\0'; i++)
                                                         {
                                                            //  EXC_INFO("data:index[%d] :%d", i, buf[i]);
                                                             send_msg.emplace_back(buf[i]);
                                                         }
                                                         Device->Send(0, send_msg); //代表全部链接的都发送 fd =0
                                                     }
                                                 });
        while (true)
        {
            // EXC_INFO(" ");
    
     std::future<bool> send_future = std::async(std::launch::async, [&]()
                                                                  {
                                                                        while(true)
                                                                        {
                                                                            if(SendFlag)
                                                                            {
                                                                                return true;
                                                                            }
                                                                            std::this_thread::sleep_for(std::chrono::milliseconds(20));
                                                                            return false;
                                                                        }
                                                                  });
            {
    
                if (send_future.wait_for(std::chrono::milliseconds(30)) == std::future_status::ready) //子线程已执行完
                {
                    // EXC_INFO( "ready...");
                    if(send_future.get())
                    {
                        SendFlag=false;
                        std::ostringstream ss;
                        ss.clear();
                        ss << "send date :[";
                        std::for_each(send_data.begin(), send_data.end(),
                                    [&](uint8_t &temp)
                                    { ss << " " << temp << ","; });
                        EXC_INFO("%s]", ss.str().c_str());
                        
                        if (!send_data.empty())
                        {
                            Device->Send(read_fd, send_data);
                            send_data.clear();
                        }
                    }
                }
            }
    
        }
        input_keyboard.join();
    }
    #elif 1 //std::condition_variable 选择发送
    int main(int argc, char *argv[])
    {
        std::mutex SendMutex;
        std::condition_variable SendCondition;
        bool SendFlag = false;
        DevSocket *Device;
        // 为SIGPIPE添加信号处理函数,处理完程序继续执行 1
        signal(SIGPIPE, pipesig_handler);
    
        std::vector<uint8_t> send_data; //(8, 1);
        int read_fd{-1};
        EXC_WARN("");
        {
            std::ostringstream ss;
            ss << "send_date 1:[";
            std::for_each(send_data.begin(), send_data.end(),
                          [&](uint8_t temp)
                          { ss << " " << temp << ","; });
            EXC_INFO("%s]", ss.str().c_str());
        }
        const int BUFLEN = 1024;
        char buf[BUFLEN];
    
        try
        {
            EXC_INFO("Device.emplace_back");
            Device =
                new DevSocket([&](int fd,std::vector<uint8_t> &&d)
                              {
                                  EXC_INFO("recive call fd :%d",fd);
                                  {
                                      std::lock_guard<std::mutex> m(SendMutex);
                                      send_data = d;
                                      SendFlag = true;
    
                                      std::ostringstream ss;
                                      ss.clear();
                                      ss << "recive 2:[";
                                      std::for_each(send_data.begin(), send_data.end(),
                                                    [&](uint8_t temp)
                                                    { ss << " " << temp << ","; });
                                      EXC_WARN("%s]", ss.str().c_str());
                                      std::cout << send_data;
                                  }
                                  SendCondition.notify_one();
                                  EXC_INFO("");
                              });
        }
        catch (const std::string s)
        {
            EXC_INFO("Device.emplace_back:%s", s.c_str());
            return EXIT_FAILURE;
        }
        std::thread input_keyboard = std::thread([&]
                                                 {
                                                     while (true)
                                                     {
                                                         memset(buf, 0sizeof(buf));
                                                         /*fgets函数:从流中读取BUFLEN-1个字符*/
                                                         fgets(buf, BUFLEN, stdin);
                                                         EXC_INFO("from terminal:%s", buf);
    
                                                         if (!strncasecmp(buf, "quit"4))
                                                         {
                                                             EXC_INFO("server quit!");
                                                             exit(0);
                                                         }
                                                         std::vector<uint8_t> send_msg;
    
                                                         for (int i = 0; buf[i] != '\0'; i++)
                                                         {
                                                             EXC_INFO("data:index[%d] :%d", i, buf[i]);
                                                             send_msg.emplace_back(buf[i]);
                                                         }
                                                         Device->Send(0,send_msg);//代表全部链接的都发送 fd =0
                                                     }
                                                 });
        while (true)
        {
            EXC_INFO(" ");
    
            {
                std::unique_lock<std::mutex> m(SendMutex);
                SendCondition.wait(m, [&]
                                   { return SendFlag; });
                SendFlag = false;
            }
    
            {
                std::ostringstream ss;
                ss.clear();
                ss << "send_date 3:[";
                std::for_each(send_data.begin(), send_data.end(),
                              [&](uint8_t &temp)
                              { ss << " " << temp << ","; });
                EXC_ERROR("%s]", ss.str().c_str());
            }
    
            EXC_INFO("");
            if (!send_data.empty())
            {
                Device->Send(read_fd,send_data);
                send_data.clear();
            }
        }
        input_keyboard.join();
    }
    #endif
    

    分析介绍服务端代码使用到的技术点

    代码展示完毕,接下来给大家一点点分析里面用的一些关键点:

    使用的技术点:

    std::future + std::async

    使用代码:std::future<void> read_sta = std::async(std::launch::async, [this]() { Read(); });

    异步调用往往不知道何时返回,但是如果异步调用的过程需要同步,或者说后一个异步调用需要使用前一个异步调用的结果。这个时候就要用到future。

    首先std::future是一个类模板,其对象存储将来的值。提供了一种访问该值的机制,即使用get()成员函数。但是,如果此时在get()函数可用之前访问它的未来关联值,则get()函数将阻塞当前线程,直到get()函数准备好它的数据。std::future期待一个返回,从一个异步调用的角度来说,future更像是执行函数的返回值,C++标准库使用std::future为一次性事件建模,如果一个事件需要等待特定的一次性事件,那么这线程可以获取一个future对象来代表这个事件。

    线程可以周期性的在这个future上等待一小段时间,检查future是否已经ready,如果没有,该线程可以先去做另一个任务,一旦future就绪,该future就无法复位(无法再次使用这个future等待这个事件),所以future代表的是一次性事件。

    std::future对象是std::async、std::promise、std::packaged_task的底层对象,用来传递其他线程中操作的数据结果。这就是我们会有std::future + std::asyncstd::future + std::promisestd::future + std::packaged_task的组合使用。几者使用的方法大同小异,std::async是函数,std::promise和std::packaged_task是类, 相信对比这篇文章之后大家会有更加详细的理解用法,这里我就不多做赘述了。

    本次使用了std::async函数,以及配合使用了wait_for()函数和get()函数,使用这两个部分原因是阻塞动作,因为std::async创建异步任务时候创建一个线程去执行任务,使用以上两个函数可以进行确认异步线程的状态,两者的区别是使用get函数时候,要是异步线程没有执行完成,当前线程会原地阻塞直接异步线程执行完成;而wait_for()调用也会在当前位置阻塞,但wait_for有阻塞时间的参数,如果参数为std::chrono::seconds(0),那么就不会阻塞当前线程。

    而在本次的代码里面,std::future本次请求返回是void,也就是无需要具体的返回,可以理解为线程结束的话,get()函数就可以准备好了的。

    题外话:在实际开发中,有时候某线程的值不止被一个线程所需要,而get()却只能只用一次,std::future自身问题,它只容许一个线程等待结果。若我们要让多个线程等待同一个目标事件,这时可以通过std::shared_future达到多次通过get()获取值的目的。

    注:get()函数只能使用一次,因为get()函数的设计是一个移动语义,相当于将future对象中的值转移到了get()调用者中,所以再次get()就会报出异常。

    std::condition_variable

    处理一次性事件,我们std::condition_variable可以用于异步事件的重复通知,condition_variable可以用于异步事件的重复通知是条件变量,和条件变量pthread_cond_t类似,而std::condition_variable在Linux 下也有使用 Pthread 库中的 pthread_cond_*() 函数提供了与条件变量相关的功能,所以两者使用方法都是类似的,效果也是一样的。

    std::condition_variable 对象通常使用 std::unique_lockstd::mutex 来等待,当 std::condition_variable 对象的某个 wait 函数被调用的时候,它使用 std::unique_lock(通过 std::mutex) 来锁住当前线程。当前线程会一直被阻塞,直到另外一个线程在相同的 std::condition_variable 对象上调用了 notification 函数来唤醒当前线程,使用 notify_all可以通知所有等待的线程,notify_one则只会唤醒一个线程。

    signal信号处理

    信号是进程通信一种手段,除了通信,很多代码跑飞的问题,都是内核通知信号到进程的,所以解决bug时候我们也会这里面为什么要忽略这个信号呢,后面gdb调试可以看到详细的信息,这里直接说原因,是因为本次代码是 一个socket服务端对应多个客户端,而中间交互过程中,会有一些客户端链接也有客户端断开,而当服务器完整close这个连接时,若客户端端接着发数据。根据TCP协议的规定,会收到一个RST响应,client再往这个服务器发送数据时,系统会发出一个SIGPIPE信号给进程,告诉进程这个连接已经断开了,不要再写了。这个时候进程会game over,所以为了避免进程退出, 可以捕获SIGPIPE信号, 或者忽略它, 给它设置SIG_IGN信号处理函数。

    直接把这个信号忽略掉

    signal(SIGPIPE, SIG_IGN);
    

    为SIGPIPE添加信号处理函数,处理完程序继续执行

    void pipesig_handler(int sig)
    {
      EXC_ERROR("receive signal %d", sig);
    }
      // 为SIGPIPE添加信号处理函数,处理完程序继续执行 1
      signal(SIGPIPE, pipesig_handler);  
    

    运行原理分析:

    • linx_socket命名空间写了socket通信基本一些接口,基于select的read write读写函数、socket创建函数、用来处理客户端链接的IsListened函数、close函数关闭socket,这部分代码用来做后面类成员函数的基本调用的”库“函数; Connection类实现了每一个客户端链接成功后,都会执行Read函数,Read是一个while循环,使用std::async启动之后,,循环退出的条件在析构函数置位;此外还有Write函数做外部接口。

    • DevSocket类是最终使用的socket通信的外部接口,其中使用impl模式包装外部接口,在DevSocket类里面定义一个Socket类,这个类最重要的就是Socket(std::pair<std::string, int> port, const CallBack &callback_) : call_back_f_(callback_)这个构造函数,除了使用CreatSocket函数初始化建立一个socket设备描述符,还启动一个lambda线程函数threa_func,该线程一直循环执行std::async去创建线程去调用linx_socket::IsListened(fd)处理随时来的客户端链接请求。

      A:做了这一步之后listened_status.wait_for开始无延时判断linx_socket::IsListened函数的执行状态,在没有客户端有链接请求的时候,listened_status.wait_for会返回std::future_status::timeout,然后循环判断所有connections_中的read_staRead函数是否已经执行完成,而Read函数只有读取失败这里才会退出循环,执行完函数,这时候其实对应这个链接结束。

      B:而当istened_status.wait_for返回std::future_status::ready之后 connections_.push_back(std::make_shared<Connection>(clien_fd, call_back_f_));这段代码把新的客户端链接添加到connections_中去,然后一直循环执行A B动作。

      除了Socket这个构造函数,还有SendData去调用我们上面提到Connection的Write函数,包装成新的外部接口。

    • &operator<<函数重载了 << 运算符,方便输出std::vector<uint8_t>类型数据,这个类型数据是std::vector<uint8_t> send_data; 大家可以打开我上面的注释代码测试验证。

    • main函数,这部分有两处,用#if #elif进行选择,分别使用了std::asyncstd::condition_variable实现收到的数据之后唤醒主线程,再令主线程把收到的数据转发到对应链接的客户端;其次还启动了input_keyboard这个线程,用来监控,终端界面输入的字符,转发到所有链接的客户端。

    调试方法

    这里我分享两种调试方法,gdbvscode
    首先代码gdb调试时候,编译记得加上-g

    g++ test_socket_server.cpp -o  test_socket_server -g -lboost_thread -lpthread
    

    直接gdb + 编译好的可执行文件

    gdb test_socket_server_optimiza 进入之后使用layout src再加 l命令查看源码调试,很方便。

    gdb调试时候遇到这个报错received signal SIGPIPE, Broken pipe. 需要忽略SIGPIPE信号

    vscode c++程序中添加外部动态链接库 帮助调试

    在每个vscode打开的工程目录下,都有.vscode目录,里面会有几个.json文件,打开打开tasks.json文件,在 "${fileDirname}/${fileBasenameNoExtension}",继续增加自己链接需要库 "-lboost_thread","-lpthread",如下所示:

    这样子就可以正常调试了

    通信过程分析

    下图是我执行代码的log日志输出效果,那么我们怎么查看底层的传输数据呢?

    我这里分享使用的两个软件可以互相配合使用:

    tcpdump 抓包分析

    tcpdump,就是:dump the traffic on a network,根据使用者的定义对网络上的数据包进行截获的包分析工具。 tcpdump可以将网络中传送的数据包的“头”完全截获下来提供分析。它支持针对网络层、协议、主机、网络或端口的过滤,并提供and、or、not等逻辑语句来帮助你去掉无用的信息。

    它有很多命令操作,链接

    我直接实时显示了数据 sudo tcpdump host 127.0.0.1 and port 8555 -i lo

    这里面详细信息分析,大家直接可以看这篇文章进行对比,这位仁兄写的很详细,链接

    但是数据有时候无法实时查看,这个时候把数据保存起来,然后用Wireshark进行分析 sudo tcpdump host 127.0.0.1 and port 8555 -i lo -w socket_test.pcap

    然后使用wireshark socket_test.pcap打开

    .pcap文件直接使用Wireshark打开就可以看到了,这里面的小demo应该可以帮到你。

    Wireshark抓包分析

    Wireshark 是一款自由开源的网络协议分析器,它在全球被广泛使用。通过使用 Wireshark,你可以实时捕获网络的传入和传出数据包,并将其用于网络故障排除、数据包分析、软件和通信协议开发等。 windows和Ubuntu都可以使用, 本次我使用场景是Ubuntu。

    Wireshark 可以在 Ubuntu 的 Universe 存储库中找到。你可以启用 universe 存储库,然后按如下方式安装:

    sudo add-apt-repository universe
    sudo apt install wireshark
    

    安装时候有wireshark-common设置,我选择了默认的否,里面提示也是建议禁用它。 后期大家自己想更改的话,也可以使用 sudo dpkg-reconfigure wireshark-common 命令重新修改。

    使用sudo wireshark打开软件

    打开保存好的.pcap wireshark socket_test.pcap

    实时监控sudo wireshark

    筛选栏进行设置port,我的端口是8555,所以如此设置tcp.port == 8555,就可以看到实时交互的底层数据了。

    这里只是配合自己的demo进行简单的软件简单使用分享,更为详细使用,大家可以网上自行搜索。

    结语

    这就是我自己的一些socket相关的代码和软件使用分享。如果大家有更好的想法和需求,也欢迎大家加我好友交流分享哈。


    作者:良知犹存,白天努力工作,晚上原创公号号主。公众号内容除了技术还有些人生感悟,一个认真输出内容的职场老司机,也是一个技术之外丰富生活的人,摄影、音乐 and 篮球。关注我,与我一起同行。

                                  ‧‧‧‧‧‧‧‧‧‧‧‧‧‧‧‧  END  ‧‧‧‧‧‧‧‧‧‧‧‧‧‧‧‧
    

    推荐阅读

    【1】jetson nano开发使用的基础详细分享

    【2】Linux开发coredump文件分析实战分享

    【3】CPU中的程序是怎么运行起来的 必读

    【4】cartographer环境建立以及建图测试

    【5】设计模式之简单工厂模式、工厂模式、抽象工厂模式的对比

    本公众号全部原创干货已整理成一个目录,回复[ 资源 ]即可获得。

  • 相关阅读:
    C++ STL详解
    【MySQL索引与优化篇】索引的数据结构
    QT开发之串口通信(四)
    Radash库使用说明——数组方法篇(全)
    极限多标签学习之SwiftXML
    Oracle/PLSQL: Remainder Function
    【多线程进阶】--- 常见锁策略,CAS,synchronized底层工作原理,JUC,线程安全的集合类,死锁
    2023年软考时间流程安排:
    C++ 类和对象篇(三) 空类和6个默认成员函数
    CSS常用的长度单位
  • 原文地址:https://www.cnblogs.com/conscience-remain/p/16411194.html