• muduo源码剖析之TcpServer服务端


    简介

    TcpServer拥有Acceptor类,新连接到达时new TcpConnection后续客户端和TcpConnection类交互。TcpServer管理连接和启动线程池,用Acceptor接受连接。

    服务端封装 - muduo的server端维护了多个tcpconnection
    注意TcpServer本身不带Channel,而是使用Acceptor的Channel

    成员及属性解析

    主要接口

    回调setters

    这些回调函数会在新连接建立时传递给TcpConnction对象

    start

    启动threadPool_线程池
    在runInLoop中执行acceptor的listen
    这里专门设置了一个started_标记,防止多次运行start

    核心实现:newConnection

    从accept回调中获得的fd动态创建新的TcpConnection对象
    为连接对象注册各类回调函数
    将连接对象存入connectionMap_映射表里

    主要成员

    loop

    这个loop也是acceptor的loop

    acceptor
    threadPool

    一个EventLoopThreadPool,用来存放io线程的EventLoopThread

    socket

    服务端用来监听的socket

    ConnectionMap

    一个连接名和实例(TcpConnectionPtr)的映射容器

    TcpServer中回调的传递示意简图:

    da77bb9302ff4da9896f0154839d5b82

    源码剖析

    源码已经编写注释

    TcpServer.h

    // Copyright 2010, Shuo Chen.  All rights reserved.
    // http://code.google.com/p/muduo/
    //
    // Use of this source code is governed by a BSD-style license
    // that can be found in the License file.
    
    // Author: Shuo Chen (chenshuo at chenshuo dot com)
    //
    // This is a public header file, it must only include public header files.
    
    #ifndef MUDUO_NET_TCPSERVER_H
    #define MUDUO_NET_TCPSERVER_H
    
    #include "muduo/base/Atomic.h"
    #include "muduo/base/Types.h"
    #include "muduo/net/TcpConnection.h"
    
    #include 
    
    namespace muduo
    {
    namespace net
    {
    
    class Acceptor;
    class EventLoop;
    class EventLoopThreadPool;
    
    ///
    /// TCP server, supports single-threaded and thread-pool models.
    ///
    /// This is an interface class, so don't expose too much details.
    class TcpServer : noncopyable
    {
     public:
      typedef std::function<void(EventLoop*)> ThreadInitCallback;
      enum Option
      {
        kNoReusePort,//不设置端口复用
        kReusePort,//设置端口复用
      };
    
      //TcpServer(EventLoop* loop, const InetAddress& listenAddr);
      TcpServer(EventLoop* loop,//mainloop
                const InetAddress& listenAddr,
                const string& nameArg,
                Option option = kNoReusePort);
      ~TcpServer();  // force out-line dtor, for std::unique_ptr members.
    
      const string& ipPort() const { return ipPort_; }
      const string& name() const { return name_; }
      EventLoop* getLoop() const { return loop_; }
    
      /// Set the number of threads for handling input.
      ///
      /// Always accepts new connection in loop's thread.
      /// Must be called before @c start
      /// @param numThreads
      /// - 0 means all I/O in loop's thread, no thread will created.
      ///   this is the default value.
      /// - 1 means all I/O in another thread.
      /// - N means a thread pool with N threads, new connections
      ///   are assigned on a round-robin basis.
      void setThreadNum(int numThreads);//设置subloop数量
      void setThreadInitCallback(const ThreadInitCallback& cb)//设置subloop线程初始化时调用的函数
      { threadInitCallback_ = cb; }
      /// valid after calling start()
      std::shared_ptr<EventLoopThreadPool> threadPool()//返回EventLoopThread线程池
      { return threadPool_; }
    
      /// Starts the server if it's not listening.
      ///
      /// It's harmless to call it multiple times.
      /// Thread safe.
      void start();
    
      /// Set connection callback.
      /// Not thread safe.
      void setConnectionCallback(const ConnectionCallback& cb)
      { connectionCallback_ = cb; }
    
      /// Set message callback.
      /// Not thread safe.
      void setMessageCallback(const MessageCallback& cb)
      { messageCallback_ = cb; }
    
    
      
    
      /// Set write complete callback.
      /// Not thread safe.
      void setWriteCompleteCallback(const WriteCompleteCallback& cb)
      { writeCompleteCallback_ = cb; }
    
     private:
      /// Not thread safe, but in loop
      void newConnection(int sockfd, const InetAddress& peerAddr);
      /// Thread safe.
      void removeConnection(const TcpConnectionPtr& conn);
      /// Not thread safe, but in loop
      void removeConnectionInLoop(const TcpConnectionPtr& conn);
    
      typedef std::map<string, TcpConnectionPtr> ConnectionMap;
    
      EventLoop* loop_;  // the acceptor loop
      const string ipPort_;
      const string name_;
      std::unique_ptr<Acceptor> acceptor_; // avoid revealing Acceptor
      std::shared_ptr<EventLoopThreadPool> threadPool_;
      ConnectionCallback connectionCallback_;
      MessageCallback messageCallback_;
      WriteCompleteCallback writeCompleteCallback_;
      ThreadInitCallback threadInitCallback_;
      AtomicInt32 started_;
      // always in loop thread
      int nextConnId_;
      ConnectionMap connections_;
    };
    
    }  // namespace net
    }  // namespace muduo
    
    #endif  // MUDUO_NET_TCPSERVER_H
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124

    TcpServer.cc

    // Copyright 2010, Shuo Chen.  All rights reserved.
    // http://code.google.com/p/muduo/
    //
    // Use of this source code is governed by a BSD-style license
    // that can be found in the License file.
     
    // Author: Shuo Chen (chenshuo at chenshuo dot com)
     
    #include "muduo/net/TcpServer.h"
     
    #include "muduo/base/Logging.h"
    #include "muduo/net/Acceptor.h"
    #include "muduo/net/EventLoop.h"
    #include "muduo/net/EventLoopThreadPool.h"
    #include "muduo/net/SocketsOps.h"
     
    #include   // snprintf
     
    using namespace muduo;
    using namespace muduo::net;
     
    TcpServer::TcpServer(EventLoop* loop,
                         const InetAddress& listenAddr,
                         const string& nameArg,
                         Option option)
      : loop_(CHECK_NOTNULL(loop)), //TcpServer所在的主线程下运行的事件驱动循环
        ipPort_(listenAddr.toIpPort()),/* 服务器负责监听的本地ip和端口 */
        name_(nameArg),/* 服务器名字,创建时传入 */
        acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),/* Acceptor对象,负责监听客户端连接请求,运行在主线程的EventLoop中 */
        threadPool_(new EventLoopThreadPool(loop, name_)),/* 事件驱动线程池,池中每个线程运行一个EventLoop */
        connectionCallback_(defaultConnectionCallback),/* 用户传入,有tcp连接到达或tcp连接关闭时调用,传给TcpConnection */
        messageCallback_(defaultMessageCallback),/* 用户传入,对端发来消息时调用,传给TcpConnection */
        nextConnId_(1) /* TcpConnection特有id,每增加一个TcpConnection,nextConnId_加一 */
    { 
      /* 
       * 设置回调函数,当有客户端请求时,Acceptor接收客户端请求,然后调用这里设置的回调函数
       * 回调函数用于创建TcpConnection连接
       */
      acceptor_->setNewConnectionCallback(
          std::bind(&TcpServer::newConnection, this, _1, _2));
    }
     
    TcpServer::~TcpServer()
    {
      loop_->assertInLoopThread();
      LOG_TRACE << "TcpServer::~TcpServer [" << name_ << "] destructing";
     
      for (auto& item : connections_)
      {
        TcpConnectionPtr conn(item.second);
        item.second.reset();
        conn->getLoop()->runInLoop(
          std::bind(&TcpConnection::connectDestroyed, conn));
      }
    }
     
    void TcpServer::setThreadNum(int numThreads)
    {
      assert(0 <= numThreads);
      threadPool_->setThreadNum(numThreads);
    }
     
    void TcpServer::start()
    {
      if (started_.getAndSet(1) == 0)
      {
        threadPool_->start(threadInitCallback_);//启动线程池,threadInitCallback_创建好所有线程后调用的回调函数
     
        assert(!acceptor_->listenning());
        loop_->runInLoop(       //直接调用linsten函数
            std::bind(&Acceptor::listen, get_pointer(acceptor_)));
      }
    }
     
    /* 
     * Acceptor接收客户端请求后调用的回调函数
     * @param sockfd: 已经接收完成(三次握手完成)后的客户端套接字
     * @param peerAddr: 客户端地址
     * 
     * Acceptor只负责接收客户端请求
     * TcpServer需要生成一个TcpConnection用于管理tcp连接
     * 
     * 1.TcpServer内有一个EventLoopThreadPool,即事件循环线程池,池子中每个线程都是一个EventLoop
     * 2.每个EventLoop包含一个Poller用于监听注册到这个EventLoop上的所有Channel
     * 3.当建立起一个新的TcpConnection时,这个连接会放到线程池中的某个EventLoop中
     * 4.TcpServer中的baseLoop只用来检测客户端的连接
     * 
     * 从libevent的角度看就是
     * 1.EventLoopThreadPool是一个struct event_base的池子,池子中全是struct event_base
     * 2.TcpServer独占一个event_base,这个event_base不在池子中
     * 3.TcpConnection会扔到这个池子中的某个event_base中
     */
    void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
    {
      loop_->assertInLoopThread();
      EventLoop* ioLoop = threadPool_->getNextLoop();//从事件驱动线程池中取出一个线程给TcpConnection 
       /* 为TcpConnection生成独一无二的名字 */
      char buf[64];
      snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
      ++nextConnId_;
      string connName = name_ + buf;
     
      LOG_INFO << "TcpServer::newConnection [" << name_
               << "] - new connection [" << connName
               << "] from " << peerAddr.toIpPort();
      /* 
       * 根据sockfd获取tcp连接在本地的<地址,端口>
       * getsockname(int fd, struct sockaddr*, int *size);
       */
      InetAddress localAddr(sockets::getLocalAddr(sockfd));
      // FIXME poll with zero timeout to double confirm the new connection
      // FIXME use make_shared if necessary
      /* 创建一个新的TcpConnection代表一个Tcp连接 */
      TcpConnectionPtr conn(new TcpConnection(ioLoop,
                                              connName,
                                              sockfd,
                                              localAddr,
                                              peerAddr));
       /* 添加到所有tcp 连接的map中,键是tcp连接独特的名字(服务器名+客户端<地址,端口>) */
      connections_[connName] = conn;
       /* 为tcp连接设置回调函数(由用户提供) */
      conn->setConnectionCallback(connectionCallback_);
      conn->setMessageCallback(messageCallback_);
      conn->setWriteCompleteCallback(writeCompleteCallback_);
      /* 
       * 关闭回调函数,由TcpServer设置,作用是将这个关闭的TcpConnection从map中删除
       * 当poll返回后,发现被激活的原因是EPOLLHUP,此时需要关闭tcp连接
       * 调用Channel的CloseCallback,进而调用TcpConnection的handleClose,进而调用removeConnection
       */
      conn->setCloseCallback(
          std::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe
     
      /* 
       * 连接建立后,调用TcpConnection连接建立成功的函数
       * 1.新建的TcpConnection所在事件循环是在事件循环线程池中的某个线程
       * 2.所以TcpConnection也就属于它所在的事件驱动循环所在的那个线程
       * 3.调用TcpConnection的函数时也就应该在自己所在线程调用
       * 4.所以需要调用runInLoop在自己的那个事件驱动循环所在线程调用这个函数
       * 5.当前线程是TcpServer的主线程,不是TcpConnection的线程,如果在这个线程直接调用会阻塞监听客户端请求
       * 6.其实这里不是因为线程不安全,即使在这个线程调用也不会出现线程不安全,因为TcpConnection本就是由这个线程创建的
       */
      ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
    }
     
    void TcpServer::removeConnection(const TcpConnectionPtr& conn)
    {
      // FIXME: unsafe
      loop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
    }
     
    void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
    {
      //关闭连接,把fd从epoll中del掉,要释放connector(包括描述符)和channel
      loop_->assertInLoopThread();
      LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_
               << "] - connection " << conn->name();
      size_t n = connections_.erase(conn->name());
      (void)n;
      assert(n == 1);
      EventLoop* ioLoop = conn->getLoop();
      ioLoop->queueInLoop(
          std::bind(&TcpConnection::connectDestroyed, conn));
    }
     
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
  • 相关阅读:
    Android 得到安装包的信息
    基于单片机微波炉加热箱系统设计
    API实战教程:使用身份证OCR识别API构建一个应用
    称重驱动二次开发教程
    C/C++ 每日一练:计算斐波那契数列的第 n 项(递归、记忆化、迭代)
    Linux中scp命令复制文件
    RDP协议流程详解(二)Basic Settings Exchange 阶段
    【图像配准】Canny边缘检测+模板配准红外可见光双路数据
    螺旋矩阵||真的很有趣!(蓝桥杯宝贝们看过来)
    【JavaWeb - 网页编程】一 HTML
  • 原文地址:https://blog.csdn.net/weixin_50448879/article/details/134454632