• 流媒体分析之rtmp协议srs服务器数据收发


    根据上篇文章,rtmp 推流处理publishing 。do_publishing 处理SrsLiveSource及传入收发SrsPublishRecvThread协程。

    1. srs_error_t SrsRtmpConn::publishing(SrsLiveSource* source)
    2. {
    3. srs_error_t err = srs_success;
    4. SrsRequest* req = info->req;
    5. if (_srs_config->get_refer_enabled(req->vhost)) {
    6. if ((err = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != srs_success) {
    7. return srs_error_wrap(err, "rtmp: referer check");
    8. }
    9. }
    10. if ((err = http_hooks_on_publish()) != srs_success) {
    11. return srs_error_wrap(err, "rtmp: callback on publish");
    12. }
    13. // TODO: FIXME: Should refine the state of publishing.
    14. if ((err = acquire_publish(source)) == srs_success) {
    15. // use isolate thread to recv,
    16. // @see: https://github.com/ossrs/srs/issues/237
    17. SrsPublishRecvThread rtrd(rtmp, req, srs_netfd_fileno(stfd), 0, this, source, _srs_context->get_id());
    18. err = do_publishing(source, &rtrd);
    19. rtrd.stop();
    20. }
    21. // whatever the acquire publish, always release publish.
    22. // when the acquire error in the midlle-way, the publish state changed,
    23. // but failed, so we must cleanup it.
    24. // @see https://github.com/ossrs/srs/issues/474
    25. // @remark when stream is busy, should never release it.
    26. if (srs_error_code(err) != ERROR_SYSTEM_STREAM_BUSY) {
    27. release_publish(source);
    28. }
    29. http_hooks_on_unpublish();
    30. return err;
    31. }

     do_publishing  :协程执行,处理数据统计。

    1. srs_error_t SrsRtmpConn::do_publishing(SrsLiveSource* source, SrsPublishRecvThread* rtrd)
    2. {
    3. srs_error_t err = srs_success;
    4. SrsRequest* req = info->req;
    5. SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_publish();
    6. SrsAutoFree(SrsPithyPrint, pprint);
    7. // update the statistic when source disconveried.
    8. SrsStatistic* stat = SrsStatistic::instance();
    9. if ((err = stat->on_client(_srs_context->get_id().c_str(), req, this, info->type)) != srs_success) {
    10. return srs_error_wrap(err, "rtmp: stat client");
    11. }
    12. // start isolate recv thread.
    13. // TODO: FIXME: Pass the callback here.
    14. if ((err = rtrd->start()) != srs_success) {
    15. return srs_error_wrap(err, "rtmp: receive thread");
    16. }
    17. // initialize the publish timeout.
    18. publish_1stpkt_timeout = _srs_config->get_publish_1stpkt_timeout(req->vhost);
    19. publish_normal_timeout = _srs_config->get_publish_normal_timeout(req->vhost);
    20. // set the sock options.
    21. set_sock_options();
    22. if (true) {
    23. bool mr = _srs_config->get_mr_enabled(req->vhost);
    24. srs_utime_t mr_sleep = _srs_config->get_mr_sleep(req->vhost);
    25. srs_trace("start publish mr=%d/%d, p1stpt=%d, pnt=%d, tcp_nodelay=%d",
    26. mr, srsu2msi(mr_sleep), srsu2msi(publish_1stpkt_timeout), srsu2msi(publish_normal_timeout), tcp_nodelay);
    27. }
    28. int64_t nb_msgs = 0;
    29. uint64_t nb_frames = 0;
    30. while (true) {
    31. if ((err = trd->pull()) != srs_success) {
    32. return srs_error_wrap(err, "rtmp: thread quit");
    33. }
    34. pprint->elapse();
    35. // cond wait for timeout.
    36. if (nb_msgs == 0) {
    37. // when not got msgs, wait for a larger timeout.
    38. // @see https://github.com/ossrs/srs/issues/441
    39. rtrd->wait(publish_1stpkt_timeout);
    40. } else {
    41. rtrd->wait(publish_normal_timeout);
    42. }
    43. // check the thread error code.
    44. if ((err = rtrd->error_code()) != srs_success) {
    45. return srs_error_wrap(err, "rtmp: receive thread");
    46. }
    47. // when not got any messages, timeout.
    48. if (rtrd->nb_msgs() <= nb_msgs) {
    49. return srs_error_new(ERROR_SOCKET_TIMEOUT, "rtmp: publish timeout %dms, nb_msgs=%d",
    50. nb_msgs? srsu2msi(publish_normal_timeout) : srsu2msi(publish_1stpkt_timeout), (int)nb_msgs);
    51. }
    52. nb_msgs = rtrd->nb_msgs();
    53. // Update the stat for video fps.
    54. // @remark https://github.com/ossrs/srs/issues/851
    55. SrsStatistic* stat = SrsStatistic::instance();
    56. if ((err = stat->on_video_frames(req, (int)(rtrd->nb_video_frames() - nb_frames))) != srs_success) {
    57. return srs_error_wrap(err, "rtmp: stat video frames");
    58. }
    59. nb_frames = rtrd->nb_video_frames();
    60. // reportable
    61. if (pprint->can_print()) {
    62. kbps->sample();
    63. bool mr = _srs_config->get_mr_enabled(req->vhost);
    64. srs_utime_t mr_sleep = _srs_config->get_mr_sleep(req->vhost);
    65. srs_trace("<- " SRS_CONSTS_LOG_CLIENT_PUBLISH " time=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d, mr=%d/%d, p1stpt=%d, pnt=%d",
    66. (int)pprint->age(), kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
    67. kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), mr, srsu2msi(mr_sleep),
    68. srsu2msi(publish_1stpkt_timeout), srsu2msi(publish_normal_timeout));
    69. }
    70. }
    71. return err;
    72. }

    SrsPublishRecvThread::start   执行:trd.start 。trd指向SrsRecvThread 。

    1. srs_error_t SrsPublishRecvThread::start()
    2. {
    3. srs_error_t err = srs_success;
    4. if ((err = trd.start()) != srs_success) {
    5. err = srs_error_wrap(err, "publish recv thread");
    6. }
    7. ncid = cid = trd.cid();
    8. return err;
    9. }

     SrsRecvThread 执行SrsSTCoroutine。SrsSTCoroutine传入this对象cycle()

    1. srs_error_t SrsRecvThread::start()
    2. {
    3. srs_error_t err = srs_success;
    4. srs_freep(trd);
    5. trd = new SrsSTCoroutine("recv", this, _parent_cid);
    6. //change stack size to 256K, fix crash when call some 3rd-part api.
    7. ((SrsSTCoroutine*)trd)->set_stack_size(1 << 18);
    8. if ((err = trd->start()) != srs_success) {
    9. return srs_error_wrap(err, "recv thread");
    10. }
    11. return err;
    12. }

     SrsRecvThread::cycle

    1. srs_error_t SrsRecvThread::cycle()
    2. {
    3. srs_error_t err = srs_success;
    4. // the multiple messages writev improve performance large,
    5. // but the timeout recv will cause 33% sys call performance,
    6. // to use isolate thread to recv, can improve about 33% performance.
    7. rtmp->set_recv_timeout(SRS_UTIME_NO_TIMEOUT);
    8. pumper->on_start();
    9. if ((err = do_cycle()) != srs_success) {
    10. err = srs_error_wrap(err, "recv thread");
    11. }
    12. // reset the timeout to pulse mode.
    13. rtmp->set_recv_timeout(timeout);
    14. pumper->on_stop();
    15. return err;
    16. }

     do_cycle  执行。

    rtmp->recv_message ,rtmp 收数据。

    pumper->consume     数据推流到。pumper->consume   指向。 SrsPublishRecvThread::consume(SrsCommonMessage* msg)

    1. srs_error_t SrsRecvThread::do_cycle()
    2. {
    3. srs_error_t err = srs_success;
    4. while (true) {
    5. if ((err = trd->pull()) != srs_success) {
    6. return srs_error_wrap(err, "recv thread");
    7. }
    8. // When the pumper is interrupted, wait then retry.
    9. if (pumper->interrupted()) {
    10. srs_usleep(timeout);
    11. continue;
    12. }
    13. SrsCommonMessage* msg = NULL;
    14. // Process the received message.
    15. if ((err = rtmp->recv_message(&msg)) == srs_success) {
    16. err = pumper->consume(msg);
    17. }
    18. if (err != srs_success) {
    19. // Interrupt the receive thread for any error.
    20. trd->interrupt();
    21. // Notify the pumper to quit for error.
    22. pumper->interrupt(err);
    23. return srs_error_wrap(err, "recv thread");
    24. }
    25. }
    26. return err;
    27. }

     SrsPublishRecvThread::consume 。执行_conn->handle_publish_message 函数。

    _conn->handle_publish_message  指向SrsRtmpConn::handle_publish_message。

    1. srs_error_t SrsPublishRecvThread::consume(SrsCommonMessage* msg)
    2. {
    3. srs_error_t err = srs_success;
    4. // when cid changed, change it.
    5. if (ncid.compare(cid)) {
    6. _srs_context->set_id(ncid);
    7. cid = ncid;
    8. }
    9. _nb_msgs++;
    10. if (msg->header.is_video()) {
    11. video_frames++;
    12. }
    13. // log to show the time of recv thread.
    14. srs_verbose("recv thread now=%" PRId64 "us, got msg time=%" PRId64 "ms, size=%d",
    15. srs_update_system_time(), msg->header.timestamp, msg->size);
    16. // the rtmp connection will handle this message
    17. err = _conn->handle_publish_message(_source, msg);
    18. // must always free it,
    19. // the source will copy it if need to use.
    20. srs_freep(msg);
    21. if (err != srs_success) {
    22. return srs_error_wrap(err, "handle publish message");
    23. }
    24. // Yield to another coroutines.
    25. // @see https://github.com/ossrs/srs/issues/2194#issuecomment-777463768
    26. if (++nn_msgs_for_yield_ >= 15) {
    27. nn_msgs_for_yield_ = 0;
    28. srs_thread_yield();
    29. }
    30. return err;
    31. }

     handle_publish_message 处理。

    1. srs_error_t SrsRtmpConn::handle_publish_message(SrsLiveSource* source, SrsCommonMessage* msg)
    2. {
    3. srs_error_t err = srs_success;
    4. // process publish event.
    5. if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) {
    6. SrsPacket* pkt = NULL;
    7. if ((err = rtmp->decode_message(msg, &pkt)) != srs_success) {
    8. return srs_error_wrap(err, "rtmp: decode message");
    9. }
    10. SrsAutoFree(SrsPacket, pkt);
    11. // for flash, any packet is republish.
    12. if (info->type == SrsRtmpConnFlashPublish) {
    13. // flash unpublish.
    14. // TODO: maybe need to support republish.
    15. srs_trace("flash flash publish finished.");
    16. return srs_error_new(ERROR_CONTROL_REPUBLISH, "rtmp: republish");
    17. }
    18. // for fmle, drop others except the fmle start packet.
    19. if (dynamic_cast<SrsFMLEStartPacket*>(pkt)) {
    20. SrsFMLEStartPacket* unpublish = dynamic_cast<SrsFMLEStartPacket*>(pkt);
    21. if ((err = rtmp->fmle_unpublish(info->res->stream_id, unpublish->transaction_id)) != srs_success) {
    22. return srs_error_wrap(err, "rtmp: republish");
    23. }
    24. return srs_error_new(ERROR_CONTROL_REPUBLISH, "rtmp: republish");
    25. }
    26. srs_trace("fmle ignore AMF0/AMF3 command message.");
    27. return err;
    28. }
    29. // video, audio, data message
    30. if ((err = process_publish_message(source, msg)) != srs_success) {
    31. return srs_error_wrap(err, "rtmp: consume message");
    32. }
    33. return err;
    34. }

       process_publish_message ,处理source->on_audio 及source->on_video 函数。

    1. srs_error_t SrsRtmpConn::process_publish_message(SrsLiveSource* source, SrsCommonMessage* msg)
    2. {
    3. srs_error_t err = srs_success;
    4. // for edge, directly proxy message to origin.
    5. if (info->edge) {
    6. if ((err = source->on_edge_proxy_publish(msg)) != srs_success) {
    7. return srs_error_wrap(err, "rtmp: proxy publish");
    8. }
    9. return err;
    10. }
    11. // process audio packet
    12. if (msg->header.is_audio()) {
    13. if ((err = source->on_audio(msg)) != srs_success) {
    14. return srs_error_wrap(err, "rtmp: consume audio");
    15. }
    16. return err;
    17. }
    18. // process video packet
    19. if (msg->header.is_video()) {
    20. if ((err = source->on_video(msg)) != srs_success) {
    21. return srs_error_wrap(err, "rtmp: consume video");
    22. }
    23. return err;
    24. }
    25. // process aggregate packet
    26. if (msg->header.is_aggregate()) {
    27. if ((err = source->on_aggregate(msg)) != srs_success) {
    28. return srs_error_wrap(err, "rtmp: consume aggregate");
    29. }
    30. return err;
    31. }
    32. // process onMetaData
    33. if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
    34. SrsPacket* pkt = NULL;
    35. if ((err = rtmp->decode_message(msg, &pkt)) != srs_success) {
    36. return srs_error_wrap(err, "rtmp: decode message");
    37. }
    38. SrsAutoFree(SrsPacket, pkt);
    39. if (dynamic_cast<SrsOnMetaDataPacket*>(pkt)) {
    40. SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt);
    41. if ((err = source->on_meta_data(msg, metadata)) != srs_success) {
    42. return srs_error_wrap(err, "rtmp: consume metadata");
    43. }
    44. return err;
    45. }
    46. return err;
    47. }
    48. return err;
    49. }

    2. 数据发送:

    1. srs_error_t SrsRtmpConn::playing(SrsLiveSource* source)
    2. {
    3. srs_error_t err = srs_success;
    4. // Check page referer of player.
    5. SrsRequest* req = info->req;
    6. if (_srs_config->get_refer_enabled(req->vhost)) {
    7. if ((err = refer->check(req->pageUrl, _srs_config->get_refer_play(req->vhost))) != srs_success) {
    8. return srs_error_wrap(err, "rtmp: referer check");
    9. }
    10. }
    11. // When origin cluster enabled, try to redirect to the origin which is active.
    12. // A active origin is a server which is delivering stream.
    13. if (!info->edge && _srs_config->get_vhost_origin_cluster(req->vhost) && source->inactive()) {
    14. vector<string> coworkers = _srs_config->get_vhost_coworkers(req->vhost);
    15. for (int i = 0; i < (int)coworkers.size(); i++) {
    16. // TODO: FIXME: User may config the server itself as coworker, we must identify and ignore it.
    17. string host; int port = 0; string coworker = coworkers.at(i);
    18. string url = "http://" + coworker + "/api/v1/clusters?"
    19. + "vhost=" + req->vhost + "&ip=" + req->host + "&app=" + req->app + "&stream=" + req->stream
    20. + "&coworker=" + coworker;
    21. if ((err = SrsHttpHooks::discover_co_workers(url, host, port)) != srs_success) {
    22. // If failed to discovery stream in this coworker, we should request the next one util the last.
    23. // @see https://github.com/ossrs/srs/issues/1223
    24. if (i < (int)coworkers.size() - 1) {
    25. continue;
    26. }
    27. return srs_error_wrap(err, "discover coworkers, url=%s", url.c_str());
    28. }
    29. string rurl = srs_generate_rtmp_url(host, port, req->host, req->vhost, req->app, req->stream, req->param);
    30. srs_trace("rtmp: redirect in cluster, from=%s:%d, target=%s:%d, url=%s, rurl=%s",
    31. req->host.c_str(), req->port, host.c_str(), port, url.c_str(), rurl.c_str());
    32. // Ignore if host or port is invalid.
    33. if (host.empty() || port == 0) {
    34. continue;
    35. }
    36. bool accepted = false;
    37. if ((err = rtmp->redirect(req, rurl, accepted)) != srs_success) {
    38. srs_error_reset(err);
    39. } else {
    40. return srs_error_new(ERROR_CONTROL_REDIRECT, "redirected");
    41. }
    42. }
    43. return srs_error_new(ERROR_OCLUSTER_REDIRECT, "no origin");
    44. }
    45. // Set the socket options for transport.
    46. set_sock_options();
    47. // Create a consumer of source.
    48. SrsLiveConsumer* consumer = NULL;
    49. SrsAutoFree(SrsLiveConsumer, consumer);
    50. if ((err = source->create_consumer(consumer)) != srs_success) {
    51. return srs_error_wrap(err, "rtmp: create consumer");
    52. }
    53. if ((err = source->consumer_dumps(consumer)) != srs_success) {
    54. return srs_error_wrap(err, "rtmp: dumps consumer");
    55. }
    56. // Use receiving thread to receive packets from peer.
    57. SrsQueueRecvThread trd(consumer, rtmp, SRS_PERF_MW_SLEEP, _srs_context->get_id());
    58. if ((err = trd.start()) != srs_success) {
    59. return srs_error_wrap(err, "rtmp: start receive thread");
    60. }
    61. // Deliver packets to peer.
    62. wakable = consumer;
    63. err = do_playing(source, consumer, &trd);
    64. wakable = NULL;
    65. trd.stop();
    66. // Drop all packets in receiving thread.
    67. if (!trd.empty()) {
    68. srs_warn("drop the received %d messages", trd.size());
    69. }
    70. return err;
    71. }

    do_playing  函数。

    consumer->dump_packets 从缓存 取出数据,

    rtmp->send_and_free_messages 发送数据。

    1. srs_error_t SrsRtmpConn::do_playing(SrsLiveSource* source, SrsLiveConsumer* consumer, SrsQueueRecvThread* rtrd)
    2. {
    3. srs_error_t err = srs_success;
    4. SrsRequest* req = info->req;
    5. srs_assert(req);
    6. srs_assert(consumer);
    7. // update the statistic when source disconveried.
    8. SrsStatistic* stat = SrsStatistic::instance();
    9. if ((err = stat->on_client(_srs_context->get_id().c_str(), req, this, info->type)) != srs_success) {
    10. return srs_error_wrap(err, "rtmp: stat client");
    11. }
    12. // initialize other components
    13. SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_play();
    14. SrsAutoFree(SrsPithyPrint, pprint);
    15. SrsMessageArray msgs(SRS_PERF_MW_MSGS);
    16. bool user_specified_duration_to_stop = (req->duration > 0);
    17. int64_t starttime = -1;
    18. // setup the realtime.
    19. realtime = _srs_config->get_realtime_enabled(req->vhost);
    20. // setup the mw config.
    21. // when mw_sleep changed, resize the socket send buffer.
    22. mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime);
    23. mw_sleep = _srs_config->get_mw_sleep(req->vhost);
    24. skt->set_socket_buffer(mw_sleep);
    25. // initialize the send_min_interval
    26. send_min_interval = _srs_config->get_send_min_interval(req->vhost);
    27. srs_trace("start play smi=%dms, mw_sleep=%d, mw_msgs=%d, realtime=%d, tcp_nodelay=%d",
    28. srsu2msi(send_min_interval), srsu2msi(mw_sleep), mw_msgs, realtime, tcp_nodelay);
    29. while (true) {
    30. // when source is set to expired, disconnect it.
    31. if ((err = trd->pull()) != srs_success) {
    32. return srs_error_wrap(err, "rtmp: thread quit");
    33. }
    34. // collect elapse for pithy print.
    35. pprint->elapse();
    36. // to use isolate thread to recv, can improve about 33% performance.
    37. while (!rtrd->empty()) {
    38. SrsCommonMessage* msg = rtrd->pump();
    39. if ((err = process_play_control_msg(consumer, msg)) != srs_success) {
    40. return srs_error_wrap(err, "rtmp: play control message");
    41. }
    42. }
    43. // quit when recv thread error.
    44. if ((err = rtrd->error_code()) != srs_success) {
    45. return srs_error_wrap(err, "rtmp: recv thread");
    46. }
    47. #ifdef SRS_PERF_QUEUE_COND_WAIT
    48. // wait for message to incoming.
    49. // @see https://github.com/ossrs/srs/issues/257
    50. consumer->wait(mw_msgs, mw_sleep);
    51. #endif
    52. // get messages from consumer.
    53. // each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
    54. // @remark when enable send_min_interval, only fetch one message a time.
    55. int count = (send_min_interval > 0)? 1 : 0;
    56. if ((err = consumer->dump_packets(&msgs, count)) != srs_success) {
    57. return srs_error_wrap(err, "rtmp: consumer dump packets");
    58. }
    59. // reportable
    60. if (pprint->can_print()) {
    61. kbps->sample();
    62. srs_trace("-> " SRS_CONSTS_LOG_PLAY " time=%d, msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d, mw=%d/%d",
    63. (int)pprint->age(), count, kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
    64. kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), srsu2msi(mw_sleep), mw_msgs);
    65. }
    66. if (count <= 0) {
    67. #ifndef SRS_PERF_QUEUE_COND_WAIT
    68. srs_usleep(mw_sleep);
    69. #endif
    70. // ignore when nothing got.
    71. continue;
    72. }
    73. // only when user specifies the duration,
    74. // we start to collect the durations for each message.
    75. if (user_specified_duration_to_stop) {
    76. for (int i = 0; i < count; i++) {
    77. SrsSharedPtrMessage* msg = msgs.msgs[i];
    78. // foreach msg, collect the duration.
    79. // @remark: never use msg when sent it, for the protocol sdk will free it.
    80. if (starttime < 0 || starttime > msg->timestamp) {
    81. starttime = msg->timestamp;
    82. }
    83. duration += (msg->timestamp - starttime) * SRS_UTIME_MILLISECONDS;
    84. starttime = msg->timestamp;
    85. }
    86. }
    87. // sendout messages, all messages are freed by send_and_free_messages().
    88. // no need to assert msg, for the rtmp wilal assert it.
    89. printf("%s %d %d\n",__FUNCTION__,__LINE__,count);
    90. if (count > 0 && (err = rtmp->send_and_free_messages(msgs.msgs, count, info->res->stream_id)) != srs_success) {
    91. return srs_error_wrap(err, "rtmp: send %d messages", count);
    92. }
    93. // if duration specified, and exceed it, stop play live.
    94. // @see: https://github.com/ossrs/srs/issues/45
    95. if (user_specified_duration_to_stop) {
    96. if (duration >= req->duration) {
    97. return srs_error_new(ERROR_RTMP_DURATION_EXCEED, "rtmp: time %d up %d", srsu2msi(duration), srsu2msi(req->duration));
    98. }
    99. }
    100. // apply the minimal interval for delivery stream in srs_utime_t.
    101. if (send_min_interval > 0) {
    102. srs_usleep(send_min_interval);
    103. }
    104. // Yield to another coroutines.
    105. // @see https://github.com/ossrs/srs/issues/2194#issuecomment-777437476
    106. srs_thread_yield();
    107. }
    108. return err;
    109. }

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

  • 相关阅读:
    【专利】一种光伏加工产品缺陷检测方法
    openEuler 22.03 LTS编译安装libreoffice并制作rpm包——筑梦之路
    老版本的Spring应用该如何应对CVE-2022-22965漏洞?
    优思学院|六西格玛将烹饪和美味提升至极致
    求 k 整除最大元素和(dp)
    4383 [八省联考 2018] 林克卡特树(WQS 二分+DP)
    给运行中的docker容器挂载目录——筑梦之路
    What is Fan-out
    windows nodejs 15.0.0下载安装
    RNN变体——LSTM原理及代码实现
  • 原文地址:https://blog.csdn.net/u012794472/article/details/126943634