• 流媒体分析之srt 之libsrt 分析:


    1.在上一个章节我们分析ffmpeg 调用libsrt 库实现对srt 协议实现。

    下面是libsrt  对外接口:

    1. // Binding and connection management
    2. int srt_bind(SRTSOCKET u, const struct sockaddr * name, int namelen) { return CUDT::bind(u, name, namelen); }
    3. int srt_bind_acquire(SRTSOCKET u, UDPSOCKET udpsock) { return CUDT::bind(u, udpsock); }
    4. int srt_listen(SRTSOCKET u, int backlog) { return CUDT::listen(u, backlog); }
    5. SRTSOCKET srt_accept(SRTSOCKET u, struct sockaddr * addr, int * addrlen) { return CUDT::accept(u, addr, addrlen); }
    6. SRTSOCKET srt_accept_bond(const SRTSOCKET lsns[], int lsize, int64_t msTimeOut) { return CUDT::accept_bond(lsns, lsize, msTimeOut); }
    7. int srt_connect(SRTSOCKET u, const struct sockaddr * name, int namelen) { return CUDT::connect(u, name, namelen, SRT_SEQNO_NONE); }
    8. int srt_connect_debug(SRTSOCKET u, const struct sockaddr * name, int namelen, int forced_isn) { return CUDT::connect(u, name, namelen, forced_isn); }
    9. int srt_getpeername(SRTSOCKET u, struct sockaddr * name, int * namelen) { return CUDT::getpeername(u, name, namelen); }
    10. int srt_getsockname(SRTSOCKET u, struct sockaddr * name, int * namelen) { return CUDT::getsockname(u, name, namelen); }
    11. int srt_getsockopt(SRTSOCKET u, int level, SRT_SOCKOPT optname, void * optval, int * optlen)
    12. { return CUDT::getsockopt(u, level, optname, optval, optlen); }
    13. int srt_setsockopt(SRTSOCKET u, int level, SRT_SOCKOPT optname, const void * optval, int optlen)
    14. { return CUDT::setsockopt(u, level, optname, optval, optlen); }
    15. int srt_getsockflag(SRTSOCKET u, SRT_SOCKOPT opt, void* optval, int* optlen)
    16. { return CUDT::getsockopt(u, 0, opt, optval, optlen); }
    17. int srt_setsockflag(SRTSOCKET u, SRT_SOCKOPT opt, const void* optval, int optlen)
    18. { return CUDT::setsockopt(u, 0, opt, optval, optlen); }
    19. int srt_send(SRTSOCKET u, const char * buf, int len) { return CUDT::send(u, buf, len, 0); }
    20. int srt_recv(SRTSOCKET u, char * buf, int len) { return CUDT::recv(u, buf, len, 0); }
    21. int srt_sendmsg(SRTSOCKET u, const char * buf, int len, int ttl, int inorder) { return CUDT::sendmsg(u, buf, len, ttl, 0!= inorder); }
    22. int srt_recvmsg(SRTSOCKET u, char * buf, int len) { int64_t ign_srctime; return CUDT::recvmsg(u, buf, len, ign_srctime); }

     2 . libsrt  库数据流程整体库如下:

     2.1 srt_bind 分析

       根据上图会调用 updateMux 创建CMultiplexer 类对象,该对象如下

        CSndQueue*    m_pSndQueue; // The sending queue

        CRcvQueue*    m_pRcvQueue; // The receiving queue

        CChannel*     m_pChannel;  // The UDP channel for sending and receiving

    1. void srt::CUDTUnited::updateMux(CUDTSocket* s, const sockaddr_any& addr, const UDPSOCKET* udpsock /*[[nullable]]*/)
    2. {
    3. ScopedLock cg(m_GlobControlLock);
    4. // If udpsock is provided, then this socket will be simply
    5. // taken for binding as a good deal. It would be nice to make
    6. // a sanity check to see if this UDP socket isn't already installed
    7. // in some multiplexer, but we state this UDP socket isn't accessible
    8. // anyway so this wouldn't be possible.
    9. if (!udpsock)
    10. {
    11. // If not, we need to see if there exist already a multiplexer bound
    12. // to the same endpoint.
    13. const int port = addr.hport();
    14. const CSrtConfig& cfgSocket = s->core().m_config;
    15. bool reuse_attempt = false;
    16. for (map::iterator i = m_mMultiplexer.begin(); i != m_mMultiplexer.end(); ++i)
    17. {
    18. CMultiplexer& m = i->second;
    19. // First, we need to find a multiplexer with the same port.
    20. if (m.m_iPort != port)
    21. {
    22. HLOGC(smlog.Debug,
    23. log << "bind: muxer @" << m.m_iID << " found, but for port " << m.m_iPort
    24. << " (requested port: " << port << ")");
    25. continue;
    26. }
    27. // If this is bound to the wildcard address, it can be reused if:
    28. // - addr is also a wildcard
    29. // - channel settings match
    30. // Otherwise it's a conflict.
    31. sockaddr_any sa;
    32. m.m_pChannel->getSockAddr((sa));
    33. HLOGC(smlog.Debug,
    34. log << "bind: Found existing muxer @" << m.m_iID << " : " << sa.str() << " - check against "
    35. << addr.str());
    36. if (sa.isany())
    37. {
    38. if (!addr.isany())
    39. {
    40. LOGC(smlog.Error,
    41. log << "bind: Address: " << addr.str()
    42. << " conflicts with existing wildcard binding: " << sa.str());
    43. throw CUDTException(MJ_NOTSUP, MN_BUSYPORT, 0);
    44. }
    45. // Still, for ANY you need either the same family, or open
    46. // for families.
    47. if (m.m_mcfg.iIpV6Only != -1 && m.m_mcfg.iIpV6Only != cfgSocket.iIpV6Only)
    48. {
    49. LOGC(smlog.Error,
    50. log << "bind: Address: " << addr.str()
    51. << " conflicts with existing IPv6 wildcard binding: " << sa.str());
    52. throw CUDTException(MJ_NOTSUP, MN_BUSYPORT, 0);
    53. }
    54. if ((m.m_mcfg.iIpV6Only == 0 || cfgSocket.iIpV6Only == 0) && m.m_iIPversion != addr.family())
    55. {
    56. LOGC(smlog.Error,
    57. log << "bind: Address: " << addr.str() << " conflicts with IPv6 wildcard binding: " << sa.str()
    58. << " : family " << (m.m_iIPversion == AF_INET ? "IPv4" : "IPv6") << " vs. "
    59. << (addr.family() == AF_INET ? "IPv4" : "IPv6"));
    60. throw CUDTException(MJ_NOTSUP, MN_BUSYPORT, 0);
    61. }
    62. reuse_attempt = true;
    63. HLOGC(smlog.Debug, log << "bind: wildcard address - multiplexer reusable");
    64. }
    65. else if (addr.isany() && addr.family() == sa.family())
    66. {
    67. LOGC(smlog.Error,
    68. log << "bind: Wildcard address: " << addr.str()
    69. << " conflicts with existting IP binding: " << sa.str());
    70. throw CUDTException(MJ_NOTSUP, MN_BUSYPORT, 0);
    71. }
    72. // If this is bound to a certain address, AND:
    73. else if (sa.equal_address(addr))
    74. {
    75. // - the address is the same as addr
    76. reuse_attempt = true;
    77. HLOGC(smlog.Debug, log << "bind: same IP address - multiplexer reusable");
    78. }
    79. else
    80. {
    81. HLOGC(smlog.Debug, log << "bind: IP addresses differ - ALLOWED to create a new multiplexer");
    82. }
    83. // Otherwise:
    84. // - the address is different than addr
    85. // - the address can't be reused, but this can go on with new one.
    86. // If this is a reusage attempt:
    87. if (reuse_attempt)
    88. {
    89. // - if the channel settings match, it can be reused
    90. if (channelSettingsMatch(m.m_mcfg, cfgSocket))
    91. {
    92. HLOGC(smlog.Debug, log << "bind: reusing multiplexer for port " << port);
    93. // reuse the existing multiplexer
    94. ++i->second.m_iRefCount;
    95. installMuxer((s), (i->second));
    96. return;
    97. }
    98. else
    99. {
    100. // - if not, it's a conflict
    101. LOGC(smlog.Error,
    102. log << "bind: Address: " << addr.str() << " conflicts with binding: " << sa.str()
    103. << " due to channel settings");
    104. throw CUDTException(MJ_NOTSUP, MN_BUSYPORT, 0);
    105. }
    106. }
    107. // If not, proceed to the next one, and when there are no reusage
    108. // candidates, proceed with creating a new multiplexer.
    109. // Note that a binding to a different IP address is not treated
    110. // as a candidate for either reuseage or conflict.
    111. }
    112. }
    113. // a new multiplexer is needed
    114. CMultiplexer m;
    115. configureMuxer((m), s, addr.family());
    116. try
    117. {
    118. m.m_pChannel = new CChannel();
    119. m.m_pChannel->setConfig(m.m_mcfg);
    120. if (udpsock)
    121. {
    122. // In this case, addr contains the address
    123. // that has been extracted already from the
    124. // given socket
    125. m.m_pChannel->attach(*udpsock, addr);
    126. }
    127. else if (addr.empty())
    128. {
    129. // The case of previously used case of a NULL address.
    130. // This here is used to pass family only, in this case
    131. // just automatically bind to the "0" address to autoselect
    132. // everything.
    133. m.m_pChannel->open(addr.family());
    134. }
    135. else
    136. {
    137. // If at least the IP address is specified, then bind to that
    138. // address, but still possibly autoselect the outgoing port, if the
    139. // port was specified as 0.
    140. m.m_pChannel->open(addr);
    141. }
    142. m.m_pTimer = new CTimer;
    143. m.m_pSndQueue = new CSndQueue;
    144. m.m_pSndQueue->init(m.m_pChannel, m.m_pTimer);
    145. m.m_pRcvQueue = new CRcvQueue;
    146. m.m_pRcvQueue->init(128, s->core().maxPayloadSize(), m.m_iIPversion, 1024, m.m_pChannel, m.m_pTimer);
    147. // Rewrite the port here, as it might be only known upon return
    148. // from CChannel::open.
    149. m.m_iPort = installMuxer((s), m);
    150. m_mMultiplexer[m.m_iID] = m;
    151. }
    152. catch (const CUDTException&)
    153. {
    154. m.destroy();
    155. throw;
    156. }
    157. catch (...)
    158. {
    159. m.destroy();
    160. throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
    161. }
    162. HLOGC(smlog.Debug, log << "bind: creating new multiplexer for port " << m.m_iPort);
    163. }

    2.2  发送队列CSndQueue  worker 函数分析。  

    CSndQueue  对象初始化时候会创建线程:执行worker  函数

      woker 函数:循环从sendbuf 缓存队列中取数据,并按srt 打包数据发送

    1. void* srt::CSndQueue::worker(void* param)
    2. {
    3. CSndQueue* self = (CSndQueue*)param;
    4. #if ENABLE_LOGGING
    5. THREAD_STATE_INIT(("SRT:SndQ:w" + Sprint(m_counter)).c_str());
    6. #else
    7. THREAD_STATE_INIT("SRT:SndQ:worker");
    8. #endif
    9. #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
    10. CTimer::rdtsc(self->m_ullDbgTime);
    11. self->m_ullDbgPeriod = uint64_t(5000000) * CTimer::getCPUFrequency();
    12. self->m_ullDbgTime += self->m_ullDbgPeriod;
    13. #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
    14. while (!self->m_bClosing)
    15. {
    16. const steady_clock::time_point next_time = self->m_pSndUList->getNextProcTime();
    17. #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
    18. self->m_WorkerStats.lIteration++;
    19. #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
    20. if (is_zero(next_time))
    21. {
    22. #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
    23. self->m_WorkerStats.lNotReadyTs++;
    24. #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
    25. // wait here if there is no sockets with data to be sent
    26. THREAD_PAUSED();
    27. if (!self->m_bClosing)
    28. {
    29. self->m_pSndUList->waitNonEmpty();
    30. #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
    31. self->m_WorkerStats.lCondWait++;
    32. #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
    33. }
    34. THREAD_RESUMED();
    35. continue;
    36. }
    37. // wait until next processing time of the first socket on the list
    38. const steady_clock::time_point currtime = steady_clock::now();
    39. #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
    40. if (self->m_ullDbgTime <= currtime)
    41. {
    42. fprintf(stdout,
    43. "SndQueue %lu slt:%lu nrp:%lu snt:%lu nrt:%lu ctw:%lu\n",
    44. self->m_WorkerStats.lIteration,
    45. self->m_WorkerStats.lSleepTo,
    46. self->m_WorkerStats.lNotReadyPop,
    47. self->m_WorkerStats.lSendTo,
    48. self->m_WorkerStats.lNotReadyTs,
    49. self->m_WorkerStats.lCondWait);
    50. memset(&self->m_WorkerStats, 0, sizeof(self->m_WorkerStats));
    51. self->m_ullDbgTime = currtime + self->m_ullDbgPeriod;
    52. }
    53. #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
    54. THREAD_PAUSED();
    55. if (currtime < next_time)
    56. {
    57. self->m_pTimer->sleep_until(next_time);
    58. #if defined(HAI_DEBUG_SNDQ_HIGHRATE)
    59. self->m_WorkerStats.lSleepTo++;
    60. #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
    61. }
    62. THREAD_RESUMED();
    63. // Get a socket with a send request if any.
    64. CUDT* u = self->m_pSndUList->pop();
    65. if (u == NULL)
    66. {
    67. #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
    68. self->m_WorkerStats.lNotReadyPop++;
    69. #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
    70. continue;
    71. }
    72. #define UST(field) ((u->m_b##field) ? "+" : "-") << #field << " "
    73. HLOGC(qslog.Debug,
    74. log << "CSndQueue: requesting packet from @" << u->socketID() << " STATUS: " << UST(Listening)
    75. << UST(Connecting) << UST(Connected) << UST(Closing) << UST(Shutdown) << UST(Broken) << UST(PeerHealth)
    76. << UST(Opened));
    77. #undef UST
    78. if (!u->m_bConnected || u->m_bBroken)
    79. {
    80. #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
    81. self->m_WorkerStats.lNotReadyPop++;
    82. #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
    83. continue;
    84. }
    85. // pack a packet from the socket
    86. CPacket pkt;
    87. // 取数据srt 打包
    88. const std::pair<bool, steady_clock::time_point> res_time = u->packData((pkt));
    89. // Check if payload size is invalid.
    90. if (res_time.first == false)
    91. {
    92. #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
    93. self->m_WorkerStats.lNotReadyPop++;
    94. #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
    95. continue;
    96. }
    97. const sockaddr_any addr = u->m_PeerAddr;
    98. const steady_clock::time_point next_send_time = res_time.second;
    99. if (!is_zero(next_send_time))
    100. self->m_pSndUList->update(u, CSndUList::DO_RESCHEDULE, next_send_time);
    101. HLOGC(qslog.Debug, log << self->CONID() << "chn:SENDING: " << pkt.Info());
    102. // 发送srt 数据
    103. self->m_pChannel->sendto(addr, pkt);
    104. #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
    105. self->m_WorkerStats.lSendTo++;
    106. #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
    107. }
    108. THREAD_EXIT();
    109. return NULL;
    110. }

    2.3  接收队列CRcvQueue  worker 函数分析。

             循环从数据通道读数据 ,会判断数据包类型,如果是控制数据包调用processCtrl 函数处理;

    如果是数据包,调用processData 函数将数据放到CRcvBuffer 缓存中

    1. void* srt::CRcvQueue::worker(void* param)
    2. {
    3. CRcvQueue* self = (CRcvQueue*)param;
    4. sockaddr_any sa(self->getIPversion());
    5. int32_t id = 0;
    6. #if ENABLE_LOGGING
    7. THREAD_STATE_INIT(("SRT:RcvQ:w" + Sprint(m_counter)).c_str());
    8. #else
    9. THREAD_STATE_INIT("SRT:RcvQ:worker");
    10. #endif
    11. CUnit* unit = 0;
    12. EConnectStatus cst = CONN_AGAIN;
    13. while (!self->m_bClosing)
    14. {
    15. bool have_received = false;
    16. EReadStatus rst = self->worker_RetrieveUnit((id), (unit), (sa));
    17. if (rst == RST_OK)
    18. {
    19. if (id < 0)
    20. {
    21. // User error on peer. May log something, but generally can only ignore it.
    22. // XXX Think maybe about sending some "connection rejection response".
    23. HLOGC(qrlog.Debug,
    24. log << self->CONID() << "RECEIVED negative socket id '" << id
    25. << "', rejecting (POSSIBLE ATTACK)");
    26. continue;
    27. }
    28. // NOTE: cst state is being changed here.
    29. // This state should be maintained through any next failed calls to worker_RetrieveUnit.
    30. // Any error switches this to rejection, just for a case.
    31. // Note to rendezvous connection. This can accept:
    32. // - ID == 0 - take the first waiting rendezvous socket
    33. // - ID > 0 - find the rendezvous socket that has this ID.
    34. if (id == 0)
    35. {
    36. // ID 0 is for connection request, which should be passed to the listening socket or rendezvous sockets
    37. cst = self->worker_ProcessConnectionRequest(unit, sa);
    38. }
    39. else
    40. {
    41. // Otherwise ID is expected to be associated with:
    42. // - an enqueued rendezvous socket
    43. // - a socket connected to a peer
    44. cst = self->worker_ProcessAddressedPacket(id, unit, sa);
    45. // CAN RETURN CONN_REJECT, but m_RejectReason is already set
    46. }
    47. HLOGC(qrlog.Debug, log << self->CONID() << "worker: result for the unit: " << ConnectStatusStr(cst));
    48. if (cst == CONN_AGAIN)
    49. {
    50. HLOGC(qrlog.Debug, log << self->CONID() << "worker: packet not dispatched, continuing reading.");
    51. continue;
    52. }
    53. have_received = true;
    54. }
    55. else if (rst == RST_ERROR)
    56. {
    57. // According to the description by CChannel::recvfrom, this can be either of:
    58. // - IPE: all errors except EBADF
    59. // - socket was closed in the meantime by another thread: EBADF
    60. // If EBADF, then it's expected that the "closing" state is also set.
    61. // Check that just to report possible errors, but interrupt the loop anyway.
    62. if (self->m_bClosing)
    63. {
    64. HLOGC(qrlog.Debug,
    65. log << self->CONID() << "CChannel reported error, but Queue is closing - INTERRUPTING worker.");
    66. }
    67. else
    68. {
    69. LOGC(qrlog.Fatal,
    70. log << self->CONID()
    71. << "CChannel reported ERROR DURING TRANSMISSION - IPE. INTERRUPTING worker anyway.");
    72. }
    73. cst = CONN_REJECT;
    74. break;
    75. }
    76. // OTHERWISE: this is an "AGAIN" situation. No data was read, but the process should continue.
    77. // take care of the timing event for all UDT sockets
    78. const steady_clock::time_point curtime_minus_syn =
    79. steady_clock::now() - microseconds_from(CUDT::COMM_SYN_INTERVAL_US);
    80. CRNode* ul = self->m_pRcvUList->m_pUList;
    81. while ((NULL != ul) && (ul->m_tsTimeStamp < curtime_minus_syn))
    82. {
    83. CUDT* u = ul->m_pUDT;
    84. if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing)
    85. {
    86. u->checkTimers();
    87. self->m_pRcvUList->update(u);
    88. }
    89. else
    90. {
    91. HLOGC(qrlog.Debug,
    92. log << CUDTUnited::CONID(u->m_SocketID) << " SOCKET broken, REMOVING FROM RCV QUEUE/MAP.");
    93. // the socket must be removed from Hash table first, then RcvUList
    94. self->m_pHash->remove(u->m_SocketID);
    95. self->m_pRcvUList->remove(u);
    96. u->m_pRNode->m_bOnList = false;
    97. }
    98. ul = self->m_pRcvUList->m_pUList;
    99. }
    100. if (have_received)
    101. {
    102. HLOGC(qrlog.Debug,
    103. log << "worker: RECEIVED PACKET --> updateConnStatus. cst=" << ConnectStatusStr(cst) << " id=" << id
    104. << " pkt-payload-size=" << unit->m_Packet.getLength());
    105. }
    106. // Check connection requests status for all sockets in the RendezvousQueue.
    107. // Pass the connection status from the last call of:
    108. // worker_ProcessAddressedPacket --->
    109. // worker_TryAsyncRend_OrStore --->
    110. // CUDT::processAsyncConnectResponse --->
    111. // CUDT::processConnectResponse
    112. self->m_pRendezvousQueue->updateConnStatus(rst, cst, unit);
    113. // XXX updateConnStatus may have removed the connector from the list,
    114. // however there's still m_mBuffer in CRcvQueue for that socket to care about.
    115. }
    116. HLOGC(qrlog.Debug, log << "worker: EXIT");
    117. THREAD_EXIT();
    118. return NULL;
    119. }

  • 相关阅读:
    江同志是怎样逆划水的?
    Playcanvas后处理-辉光bloom
    RPA在票据处理中的应用
    深度学习面试题目01
    推荐一款好用的代码可视化工具
    【力扣算法简单五十题】21.验证回文串
    c++处理tcp粘包问题以及substr方法
    PageRank算法
    8月26日计算机视觉理论学习笔记——医疗影像分割
    Ubuntu Seata开机自启动服务
  • 原文地址:https://blog.csdn.net/u012794472/article/details/126783288