• 【MediaSoup---源码篇】(五)接收RTP数据的处理


    通过前面的文章我们可以了解到,当创建好Transport的时候,socket已经建立好。具备了相应的网络传输能力。我们来看一下socket接收到数据是如何处理的。

    UdpSocketHandler::OnUvRecv

    Socket接收数据

    1. inline void UdpSocketHandler::OnUvRecv(
    2. ssize_t nread, const uv_buf_t* buf, const struct sockaddr* addr, unsigned int flags)
    3. {
    4. MS_TRACE();
    5. // NOTE: Ignore if there is nothing to read or if it was an empty datagram.
    6. if (nread == 0)
    7. return;
    8. // Check flags.
    9. if ((flags & UV_UDP_PARTIAL) != 0u)
    10. {
    11. MS_ERROR("received datagram was truncated due to insufficient buffer, ignoring it");
    12. return;
    13. }
    14. // Data received.
    15. if (nread > 0)
    16. {
    17. // Update received bytes.更新接收字节。
    18. this->recvBytes += nread;
    19. // Notify the subclass.通知子类。UdpSocket 是其子类
    20. UserOnUdpDatagramReceived(reinterpret_cast<uint8_t*>(buf->base), nread, addr);
    21. }
    22. // Some error.
    23. else
    24. {
    25. MS_DEBUG_DEV("read error: %s", uv_strerror(nread));
    26. }
    27. }
    UserOnUdpDatagramReceived

    具体由UdpSocket其子类实现,其中listener是在创建transport创建时的具体transport

    1. void UdpSocket::UserOnUdpDatagramReceived(const uint8_t* data, size_t len, const struct sockaddr* addr)
    2. {
    3. MS_TRACE();
    4. if (!this->listener)
    5. {
    6. MS_ERROR("no listener set");
    7. return;
    8. }
    9. // Notify the reader.通知读者。
    10. this->listener->OnUdpSocketPacketReceived(this, data, len, addr);
    11. }
    OnUdpSocketPacketReceived

    以PlainTransport为例

    1. //从udpsocket获得了接收数据
    2. inline void PlainTransport::OnUdpSocketPacketReceived(
    3. RTC::UdpSocket* socket, const uint8_t* data, size_t len, const struct sockaddr* remoteAddr)
    4. {
    5. MS_TRACE();
    6. //形成元组,记录IP等内容
    7. RTC::TransportTuple tuple(socket, remoteAddr);
    8. //进入到当前transport处理
    9. OnPacketReceived(&tuple, data, len);
    10. }
    PlainTransport::OnPacketReceived
    1. inline void PlainTransport::OnPacketReceived(RTC::TransportTuple* tuple, const uint8_t* data, size_t len)
    2. {
    3. MS_TRACE();
    4. // Increase receive transmission.增加接收传输。
    5. RTC::Transport::DataReceived(len);
    6. // Check if it's RTCP.检查它是否是RTCP。
    7. if (RTC::RTCP::Packet::IsRtcp(data, len))
    8. {
    9. OnRtcpDataReceived(tuple, data, len);
    10. }
    11. // Check if it's RTP.检查它是否是RTP。
    12. else if (RTC::RtpPacket::IsRtp(data, len))
    13. {
    14. OnRtpDataReceived(tuple, data, len);
    15. }
    16. // Check if it's SCTP.检查它是否是SCTP。
    17. else if (RTC::SctpAssociation::IsSctp(data, len))
    18. {
    19. OnSctpDataReceived(tuple, data, len);
    20. }
    21. else
    22. {
    23. MS_WARN_DEV("ignoring received packet of unknown type");
    24. }
    25. }
    RTP数据处理方式

    首先来处理是不是加密的RTP数据;然后根据既定格式重构RTP数据为Packet;最后透传整理好的packet到上层Transport

    1. inline void PlainTransport::OnRtpDataReceived(
    2. RTC::TransportTuple* tuple, const uint8_t* data, size_t len)
    3. {
    4. MS_TRACE();
    5. if (HasSrtp() && !IsSrtpReady())
    6. return;
    7. // Decrypt the SRTP packet.解密SRTP报文。
    8. auto intLen = static_cast<int>(len);
    9. if (HasSrtp() && !this->srtpRecvSession->DecryptSrtp(const_cast<uint8_t*>(data), &intLen))
    10. {
    11. RTC::RtpPacket* packet = RTC::RtpPacket::Parse(data, static_cast<size_t>(intLen));
    12. if (!packet)
    13. {
    14. MS_WARN_TAG(srtp, "DecryptSrtp() failed due to an invalid RTP packet");
    15. }
    16. else
    17. {
    18. MS_WARN_TAG(
    19. srtp,
    20. "DecryptSrtp() failed [ssrc:%" PRIu32 ", payloadType:%" PRIu8 ", seq:%" PRIu16 "]",
    21. packet->GetSsrc(),
    22. packet->GetPayloadType(),
    23. packet->GetSequenceNumber());
    24. delete packet;
    25. }
    26. return;
    27. }
    28. //解析socket数据,获取格式化后的RtpPacket
    29. RTC::RtpPacket* packet = RTC::RtpPacket::Parse(data, static_cast<size_t>(intLen));
    30. if (!packet)
    31. {
    32. MS_WARN_TAG(rtp, "received data is not a valid RTP packet");
    33. return;
    34. }
    35. // If we don't have a RTP tuple yet, check whether comedia mode is set.
    36. if (!this->tuple)
    37. {
    38. if (!this->comedia)
    39. {
    40. MS_DEBUG_TAG(rtp, "ignoring RTP packet while not connected");
    41. // Remove this SSRC.
    42. RecvStreamClosed(packet->GetSsrc());
    43. delete packet;
    44. return;
    45. }
    46. MS_DEBUG_TAG(rtp, "setting RTP tuple (comedia mode enabled)");
    47. auto wasConnected = IsConnected();
    48. this->tuple = new RTC::TransportTuple(tuple);
    49. if (!this->listenIp.announcedIp.empty())
    50. this->tuple->SetLocalAnnouncedIp(this->listenIp.announcedIp);
    51. // If not yet connected do it now.
    52. if (!wasConnected)
    53. {
    54. // Notify the Node PlainTransport.
    55. json data = json::object();
    56. this->tuple->FillJson(data["tuple"]);
    57. this->shared->channelNotifier->Emit(this->id, "tuple", data);
    58. RTC::Transport::Connected();
    59. }
    60. }
    61. // Otherwise, if RTP tuple is set, verify that it matches the origin
    62. // of the packet.
    63. else if (!this->tuple->Compare(tuple))
    64. {
    65. MS_DEBUG_TAG(rtp, "ignoring RTP packet from unknown IP:port");
    66. // Remove this SSRC.
    67. RecvStreamClosed(packet->GetSsrc());
    68. delete packet;
    69. return;
    70. }
    71. // Pass the packet to the parent transport.将数据包传递给父传输。
    72. RTC::Transport::ReceiveRtpPacket(packet);
    73. }
    Transport::ReceiveRtpPacket
    1. //当前调用来源于子类的OnRtpDataReceived中触发了当前接口
    2. void Transport::ReceiveRtpPacket(RTC::RtpPacket* packet)
    3. {
    4. MS_TRACE();
    5. packet->logger.recvTransportId = this->id;
    6. // Apply the Transport RTP header extension ids so the RTP listener can use them.
    7. // 应用传输RTP报头扩展id,以便RTP侦听器可以使用它们。
    8. packet->SetMidExtensionId(this->recvRtpHeaderExtensionIds.mid);
    9. packet->SetRidExtensionId(this->recvRtpHeaderExtensionIds.rid);
    10. packet->SetRepairedRidExtensionId(this->recvRtpHeaderExtensionIds.rrid);
    11. packet->SetAbsSendTimeExtensionId(this->recvRtpHeaderExtensionIds.absSendTime);
    12. packet->SetTransportWideCc01ExtensionId(this->recvRtpHeaderExtensionIds.transportWideCc01);
    13. auto nowMs = DepLibUV::GetTimeMs();
    14. // Feed the TransportCongestionControlServer.
    15. if (this->tccServer)
    16. {
    17. this->tccServer->IncomingPacket(nowMs, packet);
    18. }
    19. // Get the associated Producer.
    20. /*根据收到的packet,查找关联的producer。*/
    21. RTC::Producer* producer = this->rtpListener.GetProducer(packet);
    22. if (!producer)
    23. {
    24. packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::PRODUCER_NOT_FOUND);
    25. MS_WARN_TAG(
    26. rtp,
    27. "no suitable Producer for received RTP packet [ssrc:%" PRIu32 ", payloadType:%" PRIu8 "]",
    28. packet->GetSsrc(),
    29. packet->GetPayloadType());
    30. // Tell the child class to remove this SSRC.告诉子类删除这个SSRC。
    31. RecvStreamClosed(packet->GetSsrc());
    32. delete packet;
    33. return;
    34. }
    35. // MS_DEBUG_DEV(
    36. // "RTP packet received [ssrc:%" PRIu32 ", payloadType:%" PRIu8 ", producerId:%s]",
    37. // packet->GetSsrc(),
    38. // packet->GetPayloadType(),
    39. // producer->id.c_str());
    40. // Pass the RTP packet to the corresponding Producer.
    41. /*将packet传给指定的producer,进行下一步处理。*/
    42. auto result = producer->ReceiveRtpPacket(packet);
    43. switch (result)/*根据packet包类型不同,进行不同通道的码率统计。*/
    44. {
    45. case RTC::Producer::ReceiveRtpPacketResult::MEDIA:
    46. this->recvRtpTransmission.Update(packet);/*媒体通道的码率统计*/
    47. break;
    48. case RTC::Producer::ReceiveRtpPacketResult::RETRANSMISSION:
    49. this->recvRtxTransmission.Update(packet); /*重传通道的码率统计*/
    50. break;
    51. case RTC::Producer::ReceiveRtpPacketResult::DISCARDED:
    52. // Tell the child class to remove this SSRC.
    53. RecvStreamClosed(packet->GetSsrc());
    54. break;
    55. default:;
    56. }
    57. /*释放rtp包*/
    58. delete packet;
    59. }
    Producer::ReceiveRtpPacket
    1. /*接收到transport传入的packet,对packet进行指定的处理。*/
    2. Producer::ReceiveRtpPacketResult Producer::ReceiveRtpPacket(RTC::RtpPacket* packet)
    3. {
    4. MS_TRACE();
    5. packet->logger.producerId = this->id;
    6. // Reset current packet.
    7. /*重置当前正在处理的packet*/
    8. this->currentRtpPacket = nullptr;
    9. // Count number of RTP streams.统计当前接收流的数目
    10. auto numRtpStreamsBefore = this->mapSsrcRtpStream.size();
    11. /*通过packet,获取对应的接收流。*/
    12. auto* rtpStream = GetRtpStream(packet);
    13. if (!rtpStream)/*没有查找到对应的rtp接收流*/
    14. {
    15. MS_WARN_TAG(rtp, "no stream found for received packet [ssrc:%" PRIu32 "]", packet->GetSsrc());
    16. packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::RECV_RTP_STREAM_NOT_FOUND);
    17. return ReceiveRtpPacketResult::DISCARDED;/*将packet丢弃*/
    18. }
    19. // Pre-process the packet.
    20. /*对packet进行预处理:如果是视频,则添加头部扩展id。*/
    21. PreProcessRtpPacket(packet);
    22. ReceiveRtpPacketResult result;
    23. bool isRtx{ false };/*packet是否是rtx流中的packet*/
    24. // Media packet.
    25. /*是主流中的rtp包*/
    26. if (packet->GetSsrc() == rtpStream->GetSsrc())
    27. {
    28. /*设置返回结果,表示是媒体流,视频流或音频流。*/
    29. result = ReceiveRtpPacketResult::MEDIA;
    30. // Process the packet.
    31. /*rtp接收流处理接收到的packet*/
    32. if (!rtpStream->ReceivePacket(packet))
    33. {
    34. // May have to announce a new RTP stream to the listener.
    35. /*如果添加了新的rtp接收流,则通知其订阅者。*/
    36. if (this->mapSsrcRtpStream.size() > numRtpStreamsBefore)
    37. NotifyNewRtpStream(rtpStream); /*最终通知到的是与producer相关的consumer*/
    38. packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::RECV_RTP_STREAM_DISCARDED);
    39. return result;
    40. }
    41. }
    42. // RTX packet.
    43. /*重传流中的rtp包*/
    44. else if (packet->GetSsrc() == rtpStream->GetRtxSsrc())
    45. {
    46. result = ReceiveRtpPacketResult::RETRANSMISSION;
    47. isRtx = true;
    48. // Process the packet.
    49. /*rtp接收流处理重传流中的packet*/
    50. if (!rtpStream->ReceiveRtxPacket(packet))
    51. {
    52. packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::RECV_RTP_STREAM_NOT_FOUND);
    53. return result;
    54. }
    55. }
    56. // Should not happen.
    57. else
    58. {
    59. MS_ABORT("found stream does not match received packet");
    60. }
    61. /*判断packet是否是关键帧中的包*/
    62. if (packet->IsKeyFrame())
    63. {
    64. MS_DEBUG_TAG(
    65. rtp,
    66. "key frame received [ssrc:%" PRIu32 ", seq:%" PRIu16 "]",
    67. packet->GetSsrc(),
    68. packet->GetSequenceNumber());
    69. // Tell the keyFrameRequestManager.
    70. if (this->keyFrameRequestManager)
    71. this->keyFrameRequestManager->KeyFrameReceived(packet->GetSsrc()); /*更新关键帧*/
    72. }
    73. // May have to announce a new RTP stream to the listener.
    74. if (this->mapSsrcRtpStream.size() > numRtpStreamsBefore)
    75. {
    76. // Request a key frame for this stream since we may have lost the first packets
    77. // (do not do it if this is a key frame).
    78. if (this->keyFrameRequestManager && !this->paused && !packet->IsKeyFrame())
    79. this->keyFrameRequestManager->ForceKeyFrameNeeded(packet->GetSsrc());
    80. // Update current packet.
    81. this->currentRtpPacket = packet;
    82. NotifyNewRtpStream(rtpStream);
    83. // Reset current packet.
    84. this->currentRtpPacket = nullptr;
    85. }
    86. // If paused stop here.
    87. if (this->paused)
    88. return result;
    89. // May emit 'trace' event.
    90. EmitTraceEventRtpAndKeyFrameTypes(packet, isRtx);
    91. // Mangle the packet before providing the listener with it.
    92. /*在将packet发布至其订阅者之前,对其进行倾轧。
    93. 主要进行payload type,ssrc,header extension的处理*/
    94. if (!MangleRtpPacket(packet, rtpStream))
    95. return ReceiveRtpPacketResult::DISCARDED;
    96. // Post-process the packet.
    97. /*最后再对packet进行一次处理*/
    98. PostProcessRtpPacket(packet);
    99. /*将处理后的packet,发送到其订阅者transport中。*/
    100. this->listener->OnProducerRtpPacketReceived(this, packet);
    101. return result;
    102. }
    向上传递到Transport层
    1. inline void Transport::OnProducerRtpPacketReceived(RTC::Producer* producer, RTC::RtpPacket* packet)
    2. {
    3. MS_TRACE();
    4. //listener是上层的Router
    5. this->listener->OnTransportProducerRtpPacketReceived(this, producer, packet);
    6. }
    向上传递到Router层
    1. inline void Router::OnTransportProducerRtpPacketReceived(
    2. RTC::Transport* /*transport*/, RTC::Producer* producer, RTC::RtpPacket* packet)
    3. {
    4. MS_TRACE();
    5. packet->logger.routerId = this->id;
    6. //通过生产者,所以出订阅者列表
    7. auto& consumers = this->mapProducerConsumers.at(producer);
    8. //如果存在对应的订阅者
    9. if (!consumers.empty())
    10. {
    11. // Cloned ref-counted packet that RtpStreamSend will store for as long as
    12. // needed avoiding multiple allocations unless absolutely necessary.
    13. // Clone only happens if needed.
    14. std::shared_ptr sharedPacket;
    15. for (auto* consumer : consumers)
    16. {
    17. // Update MID RTP extension value.
    18. const auto& mid = consumer->GetRtpParameters().mid;
    19. if (!mid.empty())
    20. packet->UpdateMid(mid);
    21. //发送RTP数据
    22. consumer->SendRtpPacket(packet, sharedPacket);
    23. }
    24. }
    25. auto it = this->mapProducerRtpObservers.find(producer);
    26. if (it != this->mapProducerRtpObservers.end())
    27. {
    28. auto& rtpObservers = it->second;
    29. for (auto* rtpObserver : rtpObservers)
    30. {
    31. rtpObserver->ReceiveRtpPacket(producer, packet);
    32. }
    33. }
    34. }
    具体transport通道转发数据
    1. void PlainTransport::SendRtpPacket(
    2. RTC::Consumer* /*consumer*/, RTC::RtpPacket* packet, RTC::Transport::onSendCallback* cb)
    3. {
    4. MS_TRACE();
    5. if (!IsConnected())
    6. {
    7. if (cb)
    8. {
    9. (*cb)(false);
    10. delete cb;
    11. }
    12. return;
    13. }
    14. const uint8_t* data = packet->GetData();
    15. auto intLen = static_cast<int>(packet->GetSize());
    16. if (HasSrtp() && !this->srtpSendSession->EncryptRtp(&data, &intLen))
    17. {
    18. if (cb)
    19. {
    20. (*cb)(false);
    21. delete cb;
    22. }
    23. return;
    24. }
    25. auto len = static_cast<size_t>(intLen);
    26. //使用元组获发送RTP数据
    27. this->tuple->Send(data, len, cb);
    28. // Increase send transmission.增加发送传输的数据大小。
    29. RTC::Transport::DataSent(len);
    30. }
    1. void Send(const uint8_t* data, size_t len, RTC::TransportTuple::onSendCallback* cb = nullptr)
    2. {
    3. if (this->protocol == Protocol::UDP)
    4. this->udpSocket->Send(data, len, this->udpRemoteAddr, cb);
    5. else
    6. this->tcpConnection->Send(data, len, cb);
    7. }

    底层实际发送

    1. void UdpSocketHandler::Send(
    2. const uint8_t* data, size_t len, const struct sockaddr* addr, UdpSocketHandler::onSendCallback* cb)
    3. {
    4. MS_TRACE();
    5. if (this->closed)
    6. {
    7. if (cb)
    8. {
    9. (*cb)(false);
    10. delete cb;
    11. }
    12. return;
    13. }
    14. if (len == 0)
    15. {
    16. if (cb)
    17. {
    18. (*cb)(false);
    19. delete cb;
    20. }
    21. return;
    22. }
    23. // First try uv_udp_try_send(). In case it can not directly send the datagram
    24. // then build a uv_req_t and use uv_udp_send().
    25. uv_buf_t buffer = uv_buf_init(reinterpret_cast<char*>(const_cast<uint8_t*>(data)), len);
    26. const int sent = uv_udp_try_send(this->uvHandle, &buffer, 1, addr);
    27. // Entire datagram was sent. Done.
    28. if (sent == static_cast<int>(len))
    29. {
    30. // Update sent bytes.
    31. this->sentBytes += sent;
    32. if (cb)
    33. {
    34. (*cb)(true);
    35. delete cb;
    36. }
    37. return;
    38. }
    39. else if (sent >= 0)
    40. {
    41. MS_WARN_DEV("datagram truncated (just %d of %zu bytes were sent)", sent, len);
    42. // Update sent bytes.
    43. this->sentBytes += sent;
    44. if (cb)
    45. {
    46. (*cb)(false);
    47. delete cb;
    48. }
    49. return;
    50. }
    51. // Any error but legit EAGAIN. Use uv_udp_send().
    52. else if (sent != UV_EAGAIN)
    53. {
    54. MS_WARN_DEV("uv_udp_try_send() failed, trying uv_udp_send(): %s", uv_strerror(sent));
    55. }
    56. auto* sendData = new UvSendData(len);
    57. sendData->req.data = static_cast<void*>(sendData);
    58. std::memcpy(sendData->store, data, len);
    59. sendData->cb = cb;
    60. buffer = uv_buf_init(reinterpret_cast<char*>(sendData->store), len);
    61. int err = uv_udp_send(
    62. &sendData->req, this->uvHandle, &buffer, 1, addr, static_cast(onSend));
    63. if (err != 0)
    64. {
    65. // NOTE: uv_udp_send() returns error if a wrong INET family is given
    66. // (IPv6 destination on a IPv4 binded socket), so be ready.
    67. MS_WARN_DEV("uv_udp_send() failed: %s", uv_strerror(err));
    68. if (cb)
    69. (*cb)(false);
    70. // Delete the UvSendData struct (it will delete the store and cb too).
    71. delete sendData;
    72. }
    73. else
    74. {
    75. // Update sent bytes.
    76. this->sentBytes += len;
    77. }
    78. }

  • 相关阅读:
    Redis中的慢查询日志(一)
    SAP 采购订单免费标识自动勾选的判断依据
    博客系统(SSM)
    论人类下一代语言的可能—4.1算术
    Win10系统下torch.cuda.is_available()返回为False的问题解决
    HTML+CSS+JavaScript仿京东购物商城网站 web前端制作服装购物商城 html电商购物网站
    Centos7安装SCL源
    Java中常用的一些业务校验
    SQL Server详细使用教程(包含启动SQL server服务、建立数据库、建表的详细操作) 非常适合初学者
    想要进行期权模拟?这里是最佳选择!
  • 原文地址:https://blog.csdn.net/qq_40179458/article/details/133682124