1.在上一个章节我们分析ffmpeg 调用libsrt 库实现对srt 协议实现。
下面是libsrt 对外接口:
- // Binding and connection management
- int srt_bind(SRTSOCKET u, const struct sockaddr * name, int namelen) { return CUDT::bind(u, name, namelen); }
- int srt_bind_acquire(SRTSOCKET u, UDPSOCKET udpsock) { return CUDT::bind(u, udpsock); }
- int srt_listen(SRTSOCKET u, int backlog) { return CUDT::listen(u, backlog); }
- SRTSOCKET srt_accept(SRTSOCKET u, struct sockaddr * addr, int * addrlen) { return CUDT::accept(u, addr, addrlen); }
- SRTSOCKET srt_accept_bond(const SRTSOCKET lsns[], int lsize, int64_t msTimeOut) { return CUDT::accept_bond(lsns, lsize, msTimeOut); }
- int srt_connect(SRTSOCKET u, const struct sockaddr * name, int namelen) { return CUDT::connect(u, name, namelen, SRT_SEQNO_NONE); }
- int srt_connect_debug(SRTSOCKET u, const struct sockaddr * name, int namelen, int forced_isn) { return CUDT::connect(u, name, namelen, forced_isn); }
- int srt_getpeername(SRTSOCKET u, struct sockaddr * name, int * namelen) { return CUDT::getpeername(u, name, namelen); }
- int srt_getsockname(SRTSOCKET u, struct sockaddr * name, int * namelen) { return CUDT::getsockname(u, name, namelen); }
- int srt_getsockopt(SRTSOCKET u, int level, SRT_SOCKOPT optname, void * optval, int * optlen)
- { return CUDT::getsockopt(u, level, optname, optval, optlen); }
- int srt_setsockopt(SRTSOCKET u, int level, SRT_SOCKOPT optname, const void * optval, int optlen)
- { return CUDT::setsockopt(u, level, optname, optval, optlen); }
- int srt_getsockflag(SRTSOCKET u, SRT_SOCKOPT opt, void* optval, int* optlen)
- { return CUDT::getsockopt(u, 0, opt, optval, optlen); }
- int srt_setsockflag(SRTSOCKET u, SRT_SOCKOPT opt, const void* optval, int optlen)
- { return CUDT::setsockopt(u, 0, opt, optval, optlen); }
- int srt_send(SRTSOCKET u, const char * buf, int len) { return CUDT::send(u, buf, len, 0); }
- int srt_recv(SRTSOCKET u, char * buf, int len) { return CUDT::recv(u, buf, len, 0); }
- int srt_sendmsg(SRTSOCKET u, const char * buf, int len, int ttl, int inorder) { return CUDT::sendmsg(u, buf, len, ttl, 0!= inorder); }
- int srt_recvmsg(SRTSOCKET u, char * buf, int len) { int64_t ign_srctime; return CUDT::recvmsg(u, buf, len, ign_srctime); }
2 . libsrt 库数据流程整体库如下:

2.1 srt_bind 分析
根据上图会调用 updateMux 创建CMultiplexer 类对象,该对象如下
CSndQueue* m_pSndQueue; // The sending queue
CRcvQueue* m_pRcvQueue; // The receiving queue
CChannel* m_pChannel; // The UDP channel for sending and receiving
- void srt::CUDTUnited::updateMux(CUDTSocket* s, const sockaddr_any& addr, const UDPSOCKET* udpsock /*[[nullable]]*/)
- {
- ScopedLock cg(m_GlobControlLock);
-
- // If udpsock is provided, then this socket will be simply
- // taken for binding as a good deal. It would be nice to make
- // a sanity check to see if this UDP socket isn't already installed
- // in some multiplexer, but we state this UDP socket isn't accessible
- // anyway so this wouldn't be possible.
- if (!udpsock)
- {
- // If not, we need to see if there exist already a multiplexer bound
- // to the same endpoint.
- const int port = addr.hport();
- const CSrtConfig& cfgSocket = s->core().m_config;
- bool reuse_attempt = false;
- for (map
::iterator i = m_mMultiplexer.begin(); i != m_mMultiplexer.end(); ++i) - {
- CMultiplexer& m = i->second;
- // First, we need to find a multiplexer with the same port.
- if (m.m_iPort != port)
- {
- HLOGC(smlog.Debug,
- log << "bind: muxer @" << m.m_iID << " found, but for port " << m.m_iPort
- << " (requested port: " << port << ")");
- continue;
- }
- // If this is bound to the wildcard address, it can be reused if:
- // - addr is also a wildcard
- // - channel settings match
- // Otherwise it's a conflict.
- sockaddr_any sa;
- m.m_pChannel->getSockAddr((sa));
-
- HLOGC(smlog.Debug,
- log << "bind: Found existing muxer @" << m.m_iID << " : " << sa.str() << " - check against "
- << addr.str());
-
- if (sa.isany())
- {
- if (!addr.isany())
- {
- LOGC(smlog.Error,
- log << "bind: Address: " << addr.str()
- << " conflicts with existing wildcard binding: " << sa.str());
- throw CUDTException(MJ_NOTSUP, MN_BUSYPORT, 0);
- }
-
- // Still, for ANY you need either the same family, or open
- // for families.
- if (m.m_mcfg.iIpV6Only != -1 && m.m_mcfg.iIpV6Only != cfgSocket.iIpV6Only)
- {
- LOGC(smlog.Error,
- log << "bind: Address: " << addr.str()
- << " conflicts with existing IPv6 wildcard binding: " << sa.str());
- throw CUDTException(MJ_NOTSUP, MN_BUSYPORT, 0);
- }
-
- if ((m.m_mcfg.iIpV6Only == 0 || cfgSocket.iIpV6Only == 0) && m.m_iIPversion != addr.family())
- {
- LOGC(smlog.Error,
- log << "bind: Address: " << addr.str() << " conflicts with IPv6 wildcard binding: " << sa.str()
- << " : family " << (m.m_iIPversion == AF_INET ? "IPv4" : "IPv6") << " vs. "
- << (addr.family() == AF_INET ? "IPv4" : "IPv6"));
- throw CUDTException(MJ_NOTSUP, MN_BUSYPORT, 0);
- }
- reuse_attempt = true;
- HLOGC(smlog.Debug, log << "bind: wildcard address - multiplexer reusable");
- }
- else if (addr.isany() && addr.family() == sa.family())
- {
- LOGC(smlog.Error,
- log << "bind: Wildcard address: " << addr.str()
- << " conflicts with existting IP binding: " << sa.str());
- throw CUDTException(MJ_NOTSUP, MN_BUSYPORT, 0);
- }
- // If this is bound to a certain address, AND:
- else if (sa.equal_address(addr))
- {
- // - the address is the same as addr
- reuse_attempt = true;
- HLOGC(smlog.Debug, log << "bind: same IP address - multiplexer reusable");
- }
- else
- {
- HLOGC(smlog.Debug, log << "bind: IP addresses differ - ALLOWED to create a new multiplexer");
- }
- // Otherwise:
- // - the address is different than addr
- // - the address can't be reused, but this can go on with new one.
- // If this is a reusage attempt:
- if (reuse_attempt)
- {
- // - if the channel settings match, it can be reused
- if (channelSettingsMatch(m.m_mcfg, cfgSocket))
- {
- HLOGC(smlog.Debug, log << "bind: reusing multiplexer for port " << port);
- // reuse the existing multiplexer
- ++i->second.m_iRefCount;
- installMuxer((s), (i->second));
- return;
- }
- else
- {
- // - if not, it's a conflict
- LOGC(smlog.Error,
- log << "bind: Address: " << addr.str() << " conflicts with binding: " << sa.str()
- << " due to channel settings");
- throw CUDTException(MJ_NOTSUP, MN_BUSYPORT, 0);
- }
- }
- // If not, proceed to the next one, and when there are no reusage
- // candidates, proceed with creating a new multiplexer.
-
- // Note that a binding to a different IP address is not treated
- // as a candidate for either reuseage or conflict.
- }
- }
-
- // a new multiplexer is needed
- CMultiplexer m;
- configureMuxer((m), s, addr.family());
-
- try
- {
- m.m_pChannel = new CChannel();
- m.m_pChannel->setConfig(m.m_mcfg);
-
- if (udpsock)
- {
- // In this case, addr contains the address
- // that has been extracted already from the
- // given socket
- m.m_pChannel->attach(*udpsock, addr);
- }
- else if (addr.empty())
- {
- // The case of previously used case of a NULL address.
- // This here is used to pass family only, in this case
- // just automatically bind to the "0" address to autoselect
- // everything.
- m.m_pChannel->open(addr.family());
- }
- else
- {
- // If at least the IP address is specified, then bind to that
- // address, but still possibly autoselect the outgoing port, if the
- // port was specified as 0.
- m.m_pChannel->open(addr);
- }
-
- m.m_pTimer = new CTimer;
- m.m_pSndQueue = new CSndQueue;
- m.m_pSndQueue->init(m.m_pChannel, m.m_pTimer);
- m.m_pRcvQueue = new CRcvQueue;
- m.m_pRcvQueue->init(128, s->core().maxPayloadSize(), m.m_iIPversion, 1024, m.m_pChannel, m.m_pTimer);
-
- // Rewrite the port here, as it might be only known upon return
- // from CChannel::open.
- m.m_iPort = installMuxer((s), m);
- m_mMultiplexer[m.m_iID] = m;
- }
- catch (const CUDTException&)
- {
- m.destroy();
- throw;
- }
- catch (...)
- {
- m.destroy();
- throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
- }
-
- HLOGC(smlog.Debug, log << "bind: creating new multiplexer for port " << m.m_iPort);
- }
2.2 发送队列CSndQueue worker 函数分析。
CSndQueue 对象初始化时候会创建线程:执行worker 函数
woker 函数:循环从sendbuf 缓存队列中取数据,并按srt 打包数据发送
- void* srt::CSndQueue::worker(void* param)
- {
- CSndQueue* self = (CSndQueue*)param;
-
- #if ENABLE_LOGGING
- THREAD_STATE_INIT(("SRT:SndQ:w" + Sprint(m_counter)).c_str());
- #else
- THREAD_STATE_INIT("SRT:SndQ:worker");
- #endif
-
- #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
- CTimer::rdtsc(self->m_ullDbgTime);
- self->m_ullDbgPeriod = uint64_t(5000000) * CTimer::getCPUFrequency();
- self->m_ullDbgTime += self->m_ullDbgPeriod;
- #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
-
- while (!self->m_bClosing)
- {
- const steady_clock::time_point next_time = self->m_pSndUList->getNextProcTime();
-
- #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
- self->m_WorkerStats.lIteration++;
- #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
-
- if (is_zero(next_time))
- {
- #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
- self->m_WorkerStats.lNotReadyTs++;
- #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
-
- // wait here if there is no sockets with data to be sent
- THREAD_PAUSED();
- if (!self->m_bClosing)
- {
- self->m_pSndUList->waitNonEmpty();
-
- #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
- self->m_WorkerStats.lCondWait++;
- #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
- }
- THREAD_RESUMED();
-
- continue;
- }
-
- // wait until next processing time of the first socket on the list
- const steady_clock::time_point currtime = steady_clock::now();
-
- #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
- if (self->m_ullDbgTime <= currtime)
- {
- fprintf(stdout,
- "SndQueue %lu slt:%lu nrp:%lu snt:%lu nrt:%lu ctw:%lu\n",
- self->m_WorkerStats.lIteration,
- self->m_WorkerStats.lSleepTo,
- self->m_WorkerStats.lNotReadyPop,
- self->m_WorkerStats.lSendTo,
- self->m_WorkerStats.lNotReadyTs,
- self->m_WorkerStats.lCondWait);
- memset(&self->m_WorkerStats, 0, sizeof(self->m_WorkerStats));
- self->m_ullDbgTime = currtime + self->m_ullDbgPeriod;
- }
- #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
-
- THREAD_PAUSED();
- if (currtime < next_time)
- {
- self->m_pTimer->sleep_until(next_time);
-
- #if defined(HAI_DEBUG_SNDQ_HIGHRATE)
- self->m_WorkerStats.lSleepTo++;
- #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
- }
- THREAD_RESUMED();
-
- // Get a socket with a send request if any.
- CUDT* u = self->m_pSndUList->pop();
- if (u == NULL)
- {
- #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
- self->m_WorkerStats.lNotReadyPop++;
- #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
- continue;
- }
-
- #define UST(field) ((u->m_b##field) ? "+" : "-") << #field << " "
- HLOGC(qslog.Debug,
- log << "CSndQueue: requesting packet from @" << u->socketID() << " STATUS: " << UST(Listening)
- << UST(Connecting) << UST(Connected) << UST(Closing) << UST(Shutdown) << UST(Broken) << UST(PeerHealth)
- << UST(Opened));
- #undef UST
-
- if (!u->m_bConnected || u->m_bBroken)
- {
- #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
- self->m_WorkerStats.lNotReadyPop++;
- #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
- continue;
- }
-
- // pack a packet from the socket
- CPacket pkt;
- // 取数据srt 打包
- const std::pair<bool, steady_clock::time_point> res_time = u->packData((pkt));
-
- // Check if payload size is invalid.
- if (res_time.first == false)
- {
- #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
- self->m_WorkerStats.lNotReadyPop++;
- #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
- continue;
- }
-
- const sockaddr_any addr = u->m_PeerAddr;
- const steady_clock::time_point next_send_time = res_time.second;
- if (!is_zero(next_send_time))
- self->m_pSndUList->update(u, CSndUList::DO_RESCHEDULE, next_send_time);
-
- HLOGC(qslog.Debug, log << self->CONID() << "chn:SENDING: " << pkt.Info());
- // 发送srt 数据
- self->m_pChannel->sendto(addr, pkt);
-
- #if defined(SRT_DEBUG_SNDQ_HIGHRATE)
- self->m_WorkerStats.lSendTo++;
- #endif /* SRT_DEBUG_SNDQ_HIGHRATE */
- }
-
- THREAD_EXIT();
- return NULL;
- }
2.3 接收队列CRcvQueue worker 函数分析。
循环从数据通道读数据 ,会判断数据包类型,如果是控制数据包调用processCtrl 函数处理;
如果是数据包,调用processData 函数将数据放到CRcvBuffer 缓存中
- void* srt::CRcvQueue::worker(void* param)
- {
- CRcvQueue* self = (CRcvQueue*)param;
- sockaddr_any sa(self->getIPversion());
- int32_t id = 0;
-
- #if ENABLE_LOGGING
- THREAD_STATE_INIT(("SRT:RcvQ:w" + Sprint(m_counter)).c_str());
- #else
- THREAD_STATE_INIT("SRT:RcvQ:worker");
- #endif
-
- CUnit* unit = 0;
- EConnectStatus cst = CONN_AGAIN;
- while (!self->m_bClosing)
- {
- bool have_received = false;
- EReadStatus rst = self->worker_RetrieveUnit((id), (unit), (sa));
- if (rst == RST_OK)
- {
- if (id < 0)
- {
- // User error on peer. May log something, but generally can only ignore it.
- // XXX Think maybe about sending some "connection rejection response".
- HLOGC(qrlog.Debug,
- log << self->CONID() << "RECEIVED negative socket id '" << id
- << "', rejecting (POSSIBLE ATTACK)");
- continue;
- }
-
- // NOTE: cst state is being changed here.
- // This state should be maintained through any next failed calls to worker_RetrieveUnit.
- // Any error switches this to rejection, just for a case.
-
- // Note to rendezvous connection. This can accept:
- // - ID == 0 - take the first waiting rendezvous socket
- // - ID > 0 - find the rendezvous socket that has this ID.
- if (id == 0)
- {
- // ID 0 is for connection request, which should be passed to the listening socket or rendezvous sockets
- cst = self->worker_ProcessConnectionRequest(unit, sa);
- }
- else
- {
- // Otherwise ID is expected to be associated with:
- // - an enqueued rendezvous socket
- // - a socket connected to a peer
- cst = self->worker_ProcessAddressedPacket(id, unit, sa);
- // CAN RETURN CONN_REJECT, but m_RejectReason is already set
- }
- HLOGC(qrlog.Debug, log << self->CONID() << "worker: result for the unit: " << ConnectStatusStr(cst));
- if (cst == CONN_AGAIN)
- {
- HLOGC(qrlog.Debug, log << self->CONID() << "worker: packet not dispatched, continuing reading.");
- continue;
- }
- have_received = true;
- }
- else if (rst == RST_ERROR)
- {
- // According to the description by CChannel::recvfrom, this can be either of:
- // - IPE: all errors except EBADF
- // - socket was closed in the meantime by another thread: EBADF
- // If EBADF, then it's expected that the "closing" state is also set.
- // Check that just to report possible errors, but interrupt the loop anyway.
- if (self->m_bClosing)
- {
- HLOGC(qrlog.Debug,
- log << self->CONID() << "CChannel reported error, but Queue is closing - INTERRUPTING worker.");
- }
- else
- {
- LOGC(qrlog.Fatal,
- log << self->CONID()
- << "CChannel reported ERROR DURING TRANSMISSION - IPE. INTERRUPTING worker anyway.");
- }
- cst = CONN_REJECT;
- break;
- }
- // OTHERWISE: this is an "AGAIN" situation. No data was read, but the process should continue.
- // take care of the timing event for all UDT sockets
- const steady_clock::time_point curtime_minus_syn =
- steady_clock::now() - microseconds_from(CUDT::COMM_SYN_INTERVAL_US);
- CRNode* ul = self->m_pRcvUList->m_pUList;
- while ((NULL != ul) && (ul->m_tsTimeStamp < curtime_minus_syn))
- {
- CUDT* u = ul->m_pUDT;
- if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing)
- {
- u->checkTimers();
- self->m_pRcvUList->update(u);
- }
- else
- {
- HLOGC(qrlog.Debug,
- log << CUDTUnited::CONID(u->m_SocketID) << " SOCKET broken, REMOVING FROM RCV QUEUE/MAP.");
- // the socket must be removed from Hash table first, then RcvUList
- self->m_pHash->remove(u->m_SocketID);
- self->m_pRcvUList->remove(u);
- u->m_pRNode->m_bOnList = false;
- }
- ul = self->m_pRcvUList->m_pUList;
- }
- if (have_received)
- {
- HLOGC(qrlog.Debug,
- log << "worker: RECEIVED PACKET --> updateConnStatus. cst=" << ConnectStatusStr(cst) << " id=" << id
- << " pkt-payload-size=" << unit->m_Packet.getLength());
- }
- // Check connection requests status for all sockets in the RendezvousQueue.
- // Pass the connection status from the last call of:
- // worker_ProcessAddressedPacket --->
- // worker_TryAsyncRend_OrStore --->
- // CUDT::processAsyncConnectResponse --->
- // CUDT::processConnectResponse
- self->m_pRendezvousQueue->updateConnStatus(rst, cst, unit);
- // XXX updateConnStatus may have removed the connector from the list,
- // however there's still m_mBuffer in CRcvQueue for that socket to care about.
- }
-
- HLOGC(qrlog.Debug, log << "worker: EXIT");
-
- THREAD_EXIT();
- return NULL;
- }