通过前面的文章我们可以了解到,当创建好Transport的时候,socket已经建立好。具备了相应的网络传输能力。我们来看一下socket接收到数据是如何处理的。
Socket接收数据
- inline void UdpSocketHandler::OnUvRecv(
- ssize_t nread, const uv_buf_t* buf, const struct sockaddr* addr, unsigned int flags)
- {
- MS_TRACE();
-
- // NOTE: Ignore if there is nothing to read or if it was an empty datagram.
- if (nread == 0)
- return;
-
- // Check flags.
- if ((flags & UV_UDP_PARTIAL) != 0u)
- {
- MS_ERROR("received datagram was truncated due to insufficient buffer, ignoring it");
-
- return;
- }
-
- // Data received.
- if (nread > 0)
- {
- // Update received bytes.更新接收字节。
- this->recvBytes += nread;
-
- // Notify the subclass.通知子类。UdpSocket 是其子类
- UserOnUdpDatagramReceived(reinterpret_cast<uint8_t*>(buf->base), nread, addr);
- }
- // Some error.
- else
- {
- MS_DEBUG_DEV("read error: %s", uv_strerror(nread));
- }
- }
具体由UdpSocket其子类实现,其中listener是在创建transport创建时的具体transport
- void UdpSocket::UserOnUdpDatagramReceived(const uint8_t* data, size_t len, const struct sockaddr* addr)
- {
- MS_TRACE();
-
- if (!this->listener)
- {
- MS_ERROR("no listener set");
-
- return;
- }
-
- // Notify the reader.通知读者。
- this->listener->OnUdpSocketPacketReceived(this, data, len, addr);
- }
以PlainTransport为例
- //从udpsocket获得了接收数据
- inline void PlainTransport::OnUdpSocketPacketReceived(
- RTC::UdpSocket* socket, const uint8_t* data, size_t len, const struct sockaddr* remoteAddr)
- {
- MS_TRACE();
- //形成元组,记录IP等内容
- RTC::TransportTuple tuple(socket, remoteAddr);
- //进入到当前transport处理
- OnPacketReceived(&tuple, data, len);
- }
- inline void PlainTransport::OnPacketReceived(RTC::TransportTuple* tuple, const uint8_t* data, size_t len)
- {
- MS_TRACE();
-
- // Increase receive transmission.增加接收传输。
- RTC::Transport::DataReceived(len);
-
- // Check if it's RTCP.检查它是否是RTCP。
- if (RTC::RTCP::Packet::IsRtcp(data, len))
- {
- OnRtcpDataReceived(tuple, data, len);
- }
- // Check if it's RTP.检查它是否是RTP。
- else if (RTC::RtpPacket::IsRtp(data, len))
- {
- OnRtpDataReceived(tuple, data, len);
- }
- // Check if it's SCTP.检查它是否是SCTP。
- else if (RTC::SctpAssociation::IsSctp(data, len))
- {
- OnSctpDataReceived(tuple, data, len);
- }
- else
- {
- MS_WARN_DEV("ignoring received packet of unknown type");
- }
- }
首先来处理是不是加密的RTP数据;然后根据既定格式重构RTP数据为Packet;最后透传整理好的packet到上层Transport
- inline void PlainTransport::OnRtpDataReceived(
- RTC::TransportTuple* tuple, const uint8_t* data, size_t len)
- {
- MS_TRACE();
-
- if (HasSrtp() && !IsSrtpReady())
- return;
-
- // Decrypt the SRTP packet.解密SRTP报文。
- auto intLen = static_cast<int>(len);
-
- if (HasSrtp() && !this->srtpRecvSession->DecryptSrtp(const_cast<uint8_t*>(data), &intLen))
- {
- RTC::RtpPacket* packet = RTC::RtpPacket::Parse(data, static_cast<size_t>(intLen));
-
- if (!packet)
- {
- MS_WARN_TAG(srtp, "DecryptSrtp() failed due to an invalid RTP packet");
- }
- else
- {
- MS_WARN_TAG(
- srtp,
- "DecryptSrtp() failed [ssrc:%" PRIu32 ", payloadType:%" PRIu8 ", seq:%" PRIu16 "]",
- packet->GetSsrc(),
- packet->GetPayloadType(),
- packet->GetSequenceNumber());
-
- delete packet;
- }
-
- return;
- }
- //解析socket数据,获取格式化后的RtpPacket
- RTC::RtpPacket* packet = RTC::RtpPacket::Parse(data, static_cast<size_t>(intLen));
-
- if (!packet)
- {
- MS_WARN_TAG(rtp, "received data is not a valid RTP packet");
-
- return;
- }
-
- // If we don't have a RTP tuple yet, check whether comedia mode is set.
- if (!this->tuple)
- {
- if (!this->comedia)
- {
- MS_DEBUG_TAG(rtp, "ignoring RTP packet while not connected");
-
- // Remove this SSRC.
- RecvStreamClosed(packet->GetSsrc());
-
- delete packet;
-
- return;
- }
-
- MS_DEBUG_TAG(rtp, "setting RTP tuple (comedia mode enabled)");
-
- auto wasConnected = IsConnected();
-
- this->tuple = new RTC::TransportTuple(tuple);
-
- if (!this->listenIp.announcedIp.empty())
- this->tuple->SetLocalAnnouncedIp(this->listenIp.announcedIp);
-
- // If not yet connected do it now.
- if (!wasConnected)
- {
- // Notify the Node PlainTransport.
- json data = json::object();
-
- this->tuple->FillJson(data["tuple"]);
-
- this->shared->channelNotifier->Emit(this->id, "tuple", data);
-
- RTC::Transport::Connected();
- }
- }
- // Otherwise, if RTP tuple is set, verify that it matches the origin
- // of the packet.
- else if (!this->tuple->Compare(tuple))
- {
- MS_DEBUG_TAG(rtp, "ignoring RTP packet from unknown IP:port");
-
- // Remove this SSRC.
- RecvStreamClosed(packet->GetSsrc());
-
- delete packet;
-
- return;
- }
-
- // Pass the packet to the parent transport.将数据包传递给父传输。
- RTC::Transport::ReceiveRtpPacket(packet);
- }
- //当前调用来源于子类的OnRtpDataReceived中触发了当前接口
- void Transport::ReceiveRtpPacket(RTC::RtpPacket* packet)
- {
- MS_TRACE();
-
- packet->logger.recvTransportId = this->id;
-
- // Apply the Transport RTP header extension ids so the RTP listener can use them.
- // 应用传输RTP报头扩展id,以便RTP侦听器可以使用它们。
- packet->SetMidExtensionId(this->recvRtpHeaderExtensionIds.mid);
- packet->SetRidExtensionId(this->recvRtpHeaderExtensionIds.rid);
- packet->SetRepairedRidExtensionId(this->recvRtpHeaderExtensionIds.rrid);
- packet->SetAbsSendTimeExtensionId(this->recvRtpHeaderExtensionIds.absSendTime);
- packet->SetTransportWideCc01ExtensionId(this->recvRtpHeaderExtensionIds.transportWideCc01);
-
- auto nowMs = DepLibUV::GetTimeMs();
-
- // Feed the TransportCongestionControlServer.
- if (this->tccServer)
- {
- this->tccServer->IncomingPacket(nowMs, packet);
- }
-
- // Get the associated Producer.
- /*根据收到的packet,查找关联的producer。*/
- RTC::Producer* producer = this->rtpListener.GetProducer(packet);
-
- if (!producer)
- {
- packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::PRODUCER_NOT_FOUND);
-
- MS_WARN_TAG(
- rtp,
- "no suitable Producer for received RTP packet [ssrc:%" PRIu32 ", payloadType:%" PRIu8 "]",
- packet->GetSsrc(),
- packet->GetPayloadType());
-
- // Tell the child class to remove this SSRC.告诉子类删除这个SSRC。
- RecvStreamClosed(packet->GetSsrc());
-
- delete packet;
-
- return;
- }
-
- // MS_DEBUG_DEV(
- // "RTP packet received [ssrc:%" PRIu32 ", payloadType:%" PRIu8 ", producerId:%s]",
- // packet->GetSsrc(),
- // packet->GetPayloadType(),
- // producer->id.c_str());
-
- // Pass the RTP packet to the corresponding Producer.
- /*将packet传给指定的producer,进行下一步处理。*/
- auto result = producer->ReceiveRtpPacket(packet);
-
- switch (result)/*根据packet包类型不同,进行不同通道的码率统计。*/
- {
- case RTC::Producer::ReceiveRtpPacketResult::MEDIA:
- this->recvRtpTransmission.Update(packet);/*媒体通道的码率统计*/
- break;
- case RTC::Producer::ReceiveRtpPacketResult::RETRANSMISSION:
- this->recvRtxTransmission.Update(packet); /*重传通道的码率统计*/
- break;
- case RTC::Producer::ReceiveRtpPacketResult::DISCARDED:
- // Tell the child class to remove this SSRC.
- RecvStreamClosed(packet->GetSsrc());
- break;
- default:;
- }
- /*释放rtp包*/
- delete packet;
- }
- /*接收到transport传入的packet,对packet进行指定的处理。*/
- Producer::ReceiveRtpPacketResult Producer::ReceiveRtpPacket(RTC::RtpPacket* packet)
- {
- MS_TRACE();
-
- packet->logger.producerId = this->id;
-
- // Reset current packet.
- /*重置当前正在处理的packet*/
- this->currentRtpPacket = nullptr;
-
- // Count number of RTP streams.统计当前接收流的数目
- auto numRtpStreamsBefore = this->mapSsrcRtpStream.size();
- /*通过packet,获取对应的接收流。*/
- auto* rtpStream = GetRtpStream(packet);
-
- if (!rtpStream)/*没有查找到对应的rtp接收流*/
- {
- MS_WARN_TAG(rtp, "no stream found for received packet [ssrc:%" PRIu32 "]", packet->GetSsrc());
-
- packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::RECV_RTP_STREAM_NOT_FOUND);
-
- return ReceiveRtpPacketResult::DISCARDED;/*将packet丢弃*/
- }
-
- // Pre-process the packet.
- /*对packet进行预处理:如果是视频,则添加头部扩展id。*/
- PreProcessRtpPacket(packet);
-
- ReceiveRtpPacketResult result;
- bool isRtx{ false };/*packet是否是rtx流中的packet*/
-
- // Media packet.
- /*是主流中的rtp包*/
- if (packet->GetSsrc() == rtpStream->GetSsrc())
- {
- /*设置返回结果,表示是媒体流,视频流或音频流。*/
- result = ReceiveRtpPacketResult::MEDIA;
-
- // Process the packet.
- /*rtp接收流处理接收到的packet*/
- if (!rtpStream->ReceivePacket(packet))
- {
- // May have to announce a new RTP stream to the listener.
- /*如果添加了新的rtp接收流,则通知其订阅者。*/
- if (this->mapSsrcRtpStream.size() > numRtpStreamsBefore)
- NotifyNewRtpStream(rtpStream); /*最终通知到的是与producer相关的consumer*/
-
- packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::RECV_RTP_STREAM_DISCARDED);
-
- return result;
- }
- }
- // RTX packet.
- /*重传流中的rtp包*/
- else if (packet->GetSsrc() == rtpStream->GetRtxSsrc())
- {
- result = ReceiveRtpPacketResult::RETRANSMISSION;
- isRtx = true;
-
- // Process the packet.
- /*rtp接收流处理重传流中的packet*/
- if (!rtpStream->ReceiveRtxPacket(packet))
- {
- packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::RECV_RTP_STREAM_NOT_FOUND);
-
- return result;
- }
- }
- // Should not happen.
- else
- {
- MS_ABORT("found stream does not match received packet");
- }
- /*判断packet是否是关键帧中的包*/
- if (packet->IsKeyFrame())
- {
- MS_DEBUG_TAG(
- rtp,
- "key frame received [ssrc:%" PRIu32 ", seq:%" PRIu16 "]",
- packet->GetSsrc(),
- packet->GetSequenceNumber());
-
- // Tell the keyFrameRequestManager.
- if (this->keyFrameRequestManager)
- this->keyFrameRequestManager->KeyFrameReceived(packet->GetSsrc()); /*更新关键帧*/
- }
-
- // May have to announce a new RTP stream to the listener.
- if (this->mapSsrcRtpStream.size() > numRtpStreamsBefore)
- {
- // Request a key frame for this stream since we may have lost the first packets
- // (do not do it if this is a key frame).
- if (this->keyFrameRequestManager && !this->paused && !packet->IsKeyFrame())
- this->keyFrameRequestManager->ForceKeyFrameNeeded(packet->GetSsrc());
-
- // Update current packet.
- this->currentRtpPacket = packet;
-
- NotifyNewRtpStream(rtpStream);
-
- // Reset current packet.
- this->currentRtpPacket = nullptr;
- }
-
- // If paused stop here.
- if (this->paused)
- return result;
-
- // May emit 'trace' event.
- EmitTraceEventRtpAndKeyFrameTypes(packet, isRtx);
-
- // Mangle the packet before providing the listener with it.
- /*在将packet发布至其订阅者之前,对其进行倾轧。
- 主要进行payload type,ssrc,header extension的处理*/
- if (!MangleRtpPacket(packet, rtpStream))
- return ReceiveRtpPacketResult::DISCARDED;
-
- // Post-process the packet.
- /*最后再对packet进行一次处理*/
- PostProcessRtpPacket(packet);
- /*将处理后的packet,发送到其订阅者transport中。*/
- this->listener->OnProducerRtpPacketReceived(this, packet);
-
- return result;
- }
- inline void Transport::OnProducerRtpPacketReceived(RTC::Producer* producer, RTC::RtpPacket* packet)
- {
- MS_TRACE();
- //listener是上层的Router
- this->listener->OnTransportProducerRtpPacketReceived(this, producer, packet);
- }
- inline void Router::OnTransportProducerRtpPacketReceived(
- RTC::Transport* /*transport*/, RTC::Producer* producer, RTC::RtpPacket* packet)
- {
- MS_TRACE();
-
- packet->logger.routerId = this->id;
- //通过生产者,所以出订阅者列表
- auto& consumers = this->mapProducerConsumers.at(producer);
- //如果存在对应的订阅者
- if (!consumers.empty())
- {
- // Cloned ref-counted packet that RtpStreamSend will store for as long as
- // needed avoiding multiple allocations unless absolutely necessary.
- // Clone only happens if needed.
- std::shared_ptr
sharedPacket; -
- for (auto* consumer : consumers)
- {
- // Update MID RTP extension value.
- const auto& mid = consumer->GetRtpParameters().mid;
-
- if (!mid.empty())
- packet->UpdateMid(mid);
- //发送RTP数据
- consumer->SendRtpPacket(packet, sharedPacket);
- }
- }
-
- auto it = this->mapProducerRtpObservers.find(producer);
-
- if (it != this->mapProducerRtpObservers.end())
- {
- auto& rtpObservers = it->second;
-
- for (auto* rtpObserver : rtpObservers)
- {
- rtpObserver->ReceiveRtpPacket(producer, packet);
- }
- }
- }
- void PlainTransport::SendRtpPacket(
- RTC::Consumer* /*consumer*/, RTC::RtpPacket* packet, RTC::Transport::onSendCallback* cb)
- {
- MS_TRACE();
-
- if (!IsConnected())
- {
- if (cb)
- {
- (*cb)(false);
- delete cb;
- }
-
- return;
- }
-
- const uint8_t* data = packet->GetData();
- auto intLen = static_cast<int>(packet->GetSize());
-
- if (HasSrtp() && !this->srtpSendSession->EncryptRtp(&data, &intLen))
- {
- if (cb)
- {
- (*cb)(false);
- delete cb;
- }
-
- return;
- }
-
- auto len = static_cast<size_t>(intLen);
- //使用元组获发送RTP数据
- this->tuple->Send(data, len, cb);
-
- // Increase send transmission.增加发送传输的数据大小。
- RTC::Transport::DataSent(len);
- }
- void Send(const uint8_t* data, size_t len, RTC::TransportTuple::onSendCallback* cb = nullptr)
- {
- if (this->protocol == Protocol::UDP)
- this->udpSocket->Send(data, len, this->udpRemoteAddr, cb);
- else
- this->tcpConnection->Send(data, len, cb);
- }
底层实际发送
- void UdpSocketHandler::Send(
- const uint8_t* data, size_t len, const struct sockaddr* addr, UdpSocketHandler::onSendCallback* cb)
- {
- MS_TRACE();
-
- if (this->closed)
- {
- if (cb)
- {
- (*cb)(false);
- delete cb;
- }
-
- return;
- }
-
- if (len == 0)
- {
- if (cb)
- {
- (*cb)(false);
- delete cb;
- }
-
- return;
- }
-
- // First try uv_udp_try_send(). In case it can not directly send the datagram
- // then build a uv_req_t and use uv_udp_send().
-
- uv_buf_t buffer = uv_buf_init(reinterpret_cast<char*>(const_cast<uint8_t*>(data)), len);
- const int sent = uv_udp_try_send(this->uvHandle, &buffer, 1, addr);
-
- // Entire datagram was sent. Done.
- if (sent == static_cast<int>(len))
- {
- // Update sent bytes.
- this->sentBytes += sent;
-
- if (cb)
- {
- (*cb)(true);
- delete cb;
- }
-
- return;
- }
- else if (sent >= 0)
- {
- MS_WARN_DEV("datagram truncated (just %d of %zu bytes were sent)", sent, len);
-
- // Update sent bytes.
- this->sentBytes += sent;
-
- if (cb)
- {
- (*cb)(false);
- delete cb;
- }
-
- return;
- }
- // Any error but legit EAGAIN. Use uv_udp_send().
- else if (sent != UV_EAGAIN)
- {
- MS_WARN_DEV("uv_udp_try_send() failed, trying uv_udp_send(): %s", uv_strerror(sent));
- }
-
- auto* sendData = new UvSendData(len);
-
- sendData->req.data = static_cast<void*>(sendData);
- std::memcpy(sendData->store, data, len);
- sendData->cb = cb;
-
- buffer = uv_buf_init(reinterpret_cast<char*>(sendData->store), len);
-
- int err = uv_udp_send(
- &sendData->req, this->uvHandle, &buffer, 1, addr, static_cast
(onSend)); -
- if (err != 0)
- {
- // NOTE: uv_udp_send() returns error if a wrong INET family is given
- // (IPv6 destination on a IPv4 binded socket), so be ready.
- MS_WARN_DEV("uv_udp_send() failed: %s", uv_strerror(err));
-
- if (cb)
- (*cb)(false);
-
- // Delete the UvSendData struct (it will delete the store and cb too).
- delete sendData;
- }
- else
- {
- // Update sent bytes.
- this->sentBytes += len;
- }
- }