• srs流媒体服务器推流的流程


    简述

    在accept一个链接后,创建对应的SrsRtmpConn。

    SrsRtmpConn自身是一个协程的子类,运行后进行rtmp协议中的handshake、connect、create stream。并且判断是publish之后,创建SrsRecvThread来接受推流。

    但是有一点比较奇怪的写法就是,在handshake之后,connect之前就根据发的包内容来判断是一个publish还是play。这点和我认知的rtmp协议不同,在我的认知里面,rtmp协议在create stream会发play或者push的message表示自己是一个什么样的角色,一般都是通过这个方法来判断的。

    SrsRecvThread是一个协程,运行后会开始接受message数据。根据对应message执行不同的函数,并且把message放入到SrsConsumer队列中,

    在放入SrsConsumer的队列中后会通过条件srs_cond_signal(mw_wait),通知等待的协程可以开始消费message了

    accept请求

    这个是接受tcp链接的代码,rtmp的tcp监听也是在这边的。accept一个fd后,调用on_tcp_client处理这个链接。

    这边有一点需要注意的是,srs的io操作大部分是state thread库函数做的。调用accept的socket是一个非阻塞式的,但是st_accept用起来像阻塞式的,实际上是一个非阻塞式的。

    1. srs_error_t SrsTcpListener::cycle()
    2. {
    3. srs_error_t err = srs_success;
    4. while (true) {
    5. if ((err = trd->pull()) != srs_success) {
    6. return srs_error_wrap(err, "tcp listener");
    7. }
    8. /*接受一个链接*/
    9. srs_netfd_t fd = srs_accept(lfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT);
    10. if(fd == NULL){
    11. return srs_error_new(ERROR_SOCKET_ACCEPT, "accept at fd=%d", srs_netfd_fileno(lfd));
    12. }
    13. if ((err = srs_fd_closeexec(srs_netfd_fileno(fd))) != srs_success) {
    14. return srs_error_wrap(err, "set closeexec");
    15. }
    16. /*调用处理函数*/
    17. if ((err = handler->on_tcp_client(fd)) != srs_success) {
    18. return srs_error_wrap(err, "handle fd=%d", srs_netfd_fileno(fd));
    19. }
    20. }
    21. return err;
    22. }

    本文福利, C++音视频学习资料包、技术视频,内容包括(音视频开发,面试题,FFmpeg webRTC rtmp hls rtsp ffplay srs↓↓↓↓↓↓见下面↓↓文章底部点击领取↓↓

    创建SrsRtmpConn

    on_tcp_client最后调用会调用fd2con,将fd生成对应的SrsRtmpConn对像。

    1. srs_error_t SrsServer::fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsConnection** pconn)
    2. {
    3. /*.....*/
    4. if (type == SrsListenerRtmpStream) {
    5. *pconn = new SrsRtmpConn(this, stfd, ip, port);
    6. } else if (type == SrsListenerHttpApi) {
    7. *pconn = new SrsHttpApi(this, stfd, http_api_mux, ip, port);
    8. } else if (type == SrsListenerHttpStream) {
    9. *pconn = new SrsResponseOnlyHttpConn(this, stfd, http_server, ip, port);
    10. } else {
    11. srs_warn("close for no service handler. fd=%d, ip=%s:%d", fd, ip.c_str(), port);
    12. srs_close_stfd(stfd);
    13. return err;
    14. }
    15. return err;
    16. }

    并且SrsRtmpConn是一个st可以执行的协程类,最后会调用do_cycle进行handshake、connect、create stream。

    1. srs_error_t SrsRtmpConn::do_cycle()
    2. {
    3. /*......*/
    4. //握手
    5. if ((err = rtmp->handshake()) != srs_success) {
    6. return srs_error_wrap(err, "rtmp handshake");
    7. }
    8. //进行下一步操作
    9. if ((err = service_cycle()) != srs_success) {
    10. err = srs_error_wrap(err, "service cycle");
    11. }
    12. /*........*/
    13. return err;
    14. }

    在service_cycle中就是调用stream_service_cycle

    1. srs_error_t SrsRtmpConn::service_cycle()
    2. {
    3. /*.....*/
    4. //这个while看起来像是做推流错误恢复处理,具体我也没有看明白
    5. while (true) {
    6. /*.....*/
    7. //调用这个进入接受message
    8. err = stream_service_cycle();
    9. // for other system control message, fatal error.
    10. return srs_error_wrap(err, "rtmp: reject");
    11. }
    12. return err;
    13. }

    这个函数做的东西比较多,一开始回去确认身份。然后根据不同的推流端进行一些特例化的操作,大致就是connect,createstream,然后开始推流。

    1. srs_error_t SrsRtmpConn::stream_service_cycle()
    2. {
    3. //验证身份是play还是push
    4. if ((err = rtmp->identify_client(info->res->stream_id, info->type, req->stream, req->duration)) != srs_success) {
    5. return srs_error_wrap(err, "rtmp: identify client");
    6. }
    7. /*......*/
    8. //根据不同的身份做不同的操作,一般start play中就是做create stream和connect
    9. switch (info->type) {
    10. case SrsRtmpConnPlay: {
    11. // response connection start play
    12. if ((err = rtmp->start_play(info->res->stream_id)) != srs_success) {
    13. return srs_error_wrap(err, "rtmp: start play");
    14. }
    15. if ((err = http_hooks_on_play()) != srs_success) {
    16. return srs_error_wrap(err, "rtmp: callback on play");
    17. }
    18. err = playing(source);
    19. http_hooks_on_stop();
    20. return err;
    21. }
    22. case SrsRtmpConnFMLEPublish: {
    23. if ((err = rtmp->start_fmle_publish(info->res->stream_id)) != srs_success) {
    24. return srs_error_wrap(err, "rtmp: start FMLE publish");
    25. }
    26. return publishing(source);
    27. }
    28. case SrsRtmpConnHaivisionPublish: {
    29. if ((err = rtmp->start_haivision_publish(info->res->stream_id)) != srs_success) {
    30. return srs_error_wrap(err, "rtmp: start HAIVISION publish");
    31. }
    32. return publishing(source);
    33. }
    34. case SrsRtmpConnFlashPublish: {
    35. if ((err = rtmp->start_flash_publish(info->res->stream_id)) != srs_success) {
    36. return srs_error_wrap(err, "rtmp: start FLASH publish");
    37. }
    38. return publishing(source);
    39. }
    40. default: {
    41. return srs_error_new(ERROR_SYSTEM_CLIENT_INVALID, "rtmp: unknown client type=%d", info->type);
    42. }
    43. }
    44. return err;
    45. }

    创建SrsRecvThread协程开始接受数据

    创建一个协程开使推流,调用do_publishing中启动协程开始推流,并做了一些链接的错误处理。

    1. rs_error_t SrsRtmpConn::publishing(SrsSource* source)
    2. {
    3. /*......*/
    4. // TODO: FIXME: Should refine the state of publishing.
    5. if ((err = acquire_publish(source)) == srs_success) {
    6. //创建一个协程开始开始推流
    7. SrsPublishRecvThread rtrd(rtmp, req, srs_netfd_fileno(stfd), 0, this, source, _srs_context->get_id());
    8. err = do_publishing(source, &rtrd);
    9. rtrd.stop();
    10. }
    11. /*......*/
    12. return err;
    13. }

    在do_publishing中调用SrsPublishRecvThread的start,最后会开启协程。

    1. srs_error_t SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* rtrd)
    2. {
    3. srs_error_t err = srs_success;
    4. SrsRequest* req = info->req;
    5. SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_publish();
    6. SrsAutoFree(SrsPithyPrint, pprint);
    7. // start isolate recv thread.
    8. //调用SrsPublishRecvThread的start开启协程,专门做接受数据
    9. if ((err = rtrd->start()) != srs_success) {
    10. return srs_error_wrap(err, "rtmp: receive thread");
    11. }
    12. /*......*/
    13. return err;
    14. }

    start最后会调用SrsRecvThread协程,中间就省略很多无用的代码。

    1. srs_error_t SrsRecvThread::do_cycle()
    2. {
    3. srs_error_t err = srs_success;
    4. while (true) {
    5. /*......*/
    6. // Process the received message.
    7. //recv_message接受数据
    8. if ((err = rtmp->recv_message(&msg)) == srs_success) {
    9. //consume消费一个数据
    10. err = pumper->consume(msg);
    11. }
    12. /*.......*/
    13. }
    14. return err;
    15. }

    放入消费队列,通知消费者

    最后会放入到source的consumer中

    1. if (!drop_for_reduce) {
    2. for (int i = 0; i < (int)consumers.size(); i++) {
    3. SrsConsumer* consumer = consumers.at(i);
    4. //把msg放入到consumer中
    5. if ((err = consumer->enqueue(msg, atc, jitter_algorithm)) != srs_success) {
    6. return srs_error_wrap(err, "consume message");
    7. }
    8. }
    9. }

    最后会通知所有的消费者,有message写入,可以开始消费

    1. srs_error_t SrsConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag)
    2. {
    3. /*......*/
    4. if ((err = queue->enqueue(msg, NULL)) != srs_success) {
    5. return srs_error_wrap(err, "enqueue message");
    6. }
    7. #ifdef SRS_PERF_QUEUE_COND_WAIT
    8. // fire the mw when msgs is enough.
    9. if (mw_waiting) {
    10. if (atc && duration < 0) {
    11. //通知消费者
    12. srs_cond_signal(mw_wait);
    13. mw_waiting = false;
    14. return err;
    15. }
    16. // when duration ok, signal to flush.
    17. if (match_min_msgs && duration > mw_duration) {
    18. //通知消费者
    19. srs_cond_signal(mw_wait);
    20. mw_waiting = false;
    21. return err;
    22. }
    23. }
    24. #endif
    25. return err;
    26. }

    本文福利, C++音视频学习资料包、技术视频,内容包括(音视频开发,面试题,FFmpeg webRTC rtmp hls rtsp ffplay srs↓↓↓↓↓↓见下面↓↓文章底部点击领取↓↓ 

  • 相关阅读:
    网络编程入门
    Spark RDD转换成DataFrame的两种方式
    Harbor断电重启postgres报错 could not locate a valid checkpoint record
    互联网大厂面试必问的40个SpringBoot面试题【建议收藏】
    MySQL的预编译入门
    Nodejs -- Express 路由原理及设置模块化路由
    Git面经
    Vue中如何进行分布式搜索与全文搜索(如Elasticsearch)
    双十二买什么牌子电容笔?值得买的平价电容笔推荐
    C语言 求两个正整数的最大公约数及最小公倍数
  • 原文地址:https://blog.csdn.net/m0_60259116/article/details/126123946