在accept一个链接后,创建对应的SrsRtmpConn。
SrsRtmpConn自身是一个协程的子类,运行后进行rtmp协议中的handshake、connect、create stream。并且判断是publish之后,创建SrsRecvThread来接受推流。
但是有一点比较奇怪的写法就是,在handshake之后,connect之前就根据发的包内容来判断是一个publish还是play。这点和我认知的rtmp协议不同,在我的认知里面,rtmp协议在create stream会发play或者push的message表示自己是一个什么样的角色,一般都是通过这个方法来判断的。
SrsRecvThread是一个协程,运行后会开始接受message数据。根据对应message执行不同的函数,并且把message放入到SrsConsumer队列中,
在放入SrsConsumer的队列中后会通过条件srs_cond_signal(mw_wait),通知等待的协程可以开始消费message了
这个是接受tcp链接的代码,rtmp的tcp监听也是在这边的。accept一个fd后,调用on_tcp_client处理这个链接。
这边有一点需要注意的是,srs的io操作大部分是state thread库函数做的。调用accept的socket是一个非阻塞式的,但是st_accept用起来像阻塞式的,实际上是一个非阻塞式的。
- srs_error_t SrsTcpListener::cycle()
- {
- srs_error_t err = srs_success;
-
- while (true) {
- if ((err = trd->pull()) != srs_success) {
- return srs_error_wrap(err, "tcp listener");
- }
- /*接受一个链接*/
- srs_netfd_t fd = srs_accept(lfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT);
- if(fd == NULL){
- return srs_error_new(ERROR_SOCKET_ACCEPT, "accept at fd=%d", srs_netfd_fileno(lfd));
- }
-
- if ((err = srs_fd_closeexec(srs_netfd_fileno(fd))) != srs_success) {
- return srs_error_wrap(err, "set closeexec");
- }
- /*调用处理函数*/
- if ((err = handler->on_tcp_client(fd)) != srs_success) {
- return srs_error_wrap(err, "handle fd=%d", srs_netfd_fileno(fd));
- }
- }
-
- return err;
- }
-
本文福利, C++音视频学习资料包、技术视频,内容包括(音视频开发,面试题,FFmpeg ,webRTC ,rtmp ,hls ,rtsp ,ffplay ,srs)↓↓↓↓↓↓见下面↓↓文章底部点击领取↓↓
on_tcp_client最后调用会调用fd2con,将fd生成对应的SrsRtmpConn对像。
- srs_error_t SrsServer::fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsConnection** pconn)
- {
- /*.....*/
-
- if (type == SrsListenerRtmpStream) {
- *pconn = new SrsRtmpConn(this, stfd, ip, port);
- } else if (type == SrsListenerHttpApi) {
- *pconn = new SrsHttpApi(this, stfd, http_api_mux, ip, port);
- } else if (type == SrsListenerHttpStream) {
- *pconn = new SrsResponseOnlyHttpConn(this, stfd, http_server, ip, port);
- } else {
- srs_warn("close for no service handler. fd=%d, ip=%s:%d", fd, ip.c_str(), port);
- srs_close_stfd(stfd);
- return err;
- }
- return err;
- }
并且SrsRtmpConn是一个st可以执行的协程类,最后会调用do_cycle进行handshake、connect、create stream。
- srs_error_t SrsRtmpConn::do_cycle()
- {
- /*......*/
- //握手
- if ((err = rtmp->handshake()) != srs_success) {
- return srs_error_wrap(err, "rtmp handshake");
- }
-
- //进行下一步操作
- if ((err = service_cycle()) != srs_success) {
- err = srs_error_wrap(err, "service cycle");
- }
- /*........*/
- return err;
- }
在service_cycle中就是调用stream_service_cycle
- srs_error_t SrsRtmpConn::service_cycle()
- {
- /*.....*/
- //这个while看起来像是做推流错误恢复处理,具体我也没有看明白
- while (true) {
- /*.....*/
- //调用这个进入接受message
- err = stream_service_cycle();
- // for other system control message, fatal error.
- return srs_error_wrap(err, "rtmp: reject");
- }
-
- return err;
- }
这个函数做的东西比较多,一开始回去确认身份。然后根据不同的推流端进行一些特例化的操作,大致就是connect,createstream,然后开始推流。
- srs_error_t SrsRtmpConn::stream_service_cycle()
- {
- //验证身份是play还是push
- if ((err = rtmp->identify_client(info->res->stream_id, info->type, req->stream, req->duration)) != srs_success) {
- return srs_error_wrap(err, "rtmp: identify client");
- }
- /*......*/
- //根据不同的身份做不同的操作,一般start play中就是做create stream和connect
- switch (info->type) {
- case SrsRtmpConnPlay: {
- // response connection start play
- if ((err = rtmp->start_play(info->res->stream_id)) != srs_success) {
- return srs_error_wrap(err, "rtmp: start play");
- }
- if ((err = http_hooks_on_play()) != srs_success) {
- return srs_error_wrap(err, "rtmp: callback on play");
- }
- err = playing(source);
- http_hooks_on_stop();
- return err;
- }
- case SrsRtmpConnFMLEPublish: {
- if ((err = rtmp->start_fmle_publish(info->res->stream_id)) != srs_success) {
- return srs_error_wrap(err, "rtmp: start FMLE publish");
- }
- return publishing(source);
- }
- case SrsRtmpConnHaivisionPublish: {
- if ((err = rtmp->start_haivision_publish(info->res->stream_id)) != srs_success) {
- return srs_error_wrap(err, "rtmp: start HAIVISION publish");
- }
- return publishing(source);
- }
- case SrsRtmpConnFlashPublish: {
- if ((err = rtmp->start_flash_publish(info->res->stream_id)) != srs_success) {
- return srs_error_wrap(err, "rtmp: start FLASH publish");
- }
- return publishing(source);
- }
- default: {
- return srs_error_new(ERROR_SYSTEM_CLIENT_INVALID, "rtmp: unknown client type=%d", info->type);
- }
- }
- return err;
- }
创建一个协程开使推流,调用do_publishing中启动协程开始推流,并做了一些链接的错误处理。
- rs_error_t SrsRtmpConn::publishing(SrsSource* source)
- {
-
- /*......*/
- // TODO: FIXME: Should refine the state of publishing.
- if ((err = acquire_publish(source)) == srs_success) {
- //创建一个协程开始开始推流
- SrsPublishRecvThread rtrd(rtmp, req, srs_netfd_fileno(stfd), 0, this, source, _srs_context->get_id());
- err = do_publishing(source, &rtrd);
- rtrd.stop();
- }
- /*......*/
- return err;
- }
在do_publishing中调用SrsPublishRecvThread的start,最后会开启协程。
- srs_error_t SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* rtrd)
- {
- srs_error_t err = srs_success;
-
- SrsRequest* req = info->req;
- SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_publish();
- SrsAutoFree(SrsPithyPrint, pprint);
-
- // start isolate recv thread.
- //调用SrsPublishRecvThread的start开启协程,专门做接受数据
- if ((err = rtrd->start()) != srs_success) {
- return srs_error_wrap(err, "rtmp: receive thread");
- }
-
- /*......*/
- return err;
- }
start最后会调用SrsRecvThread协程,中间就省略很多无用的代码。
- srs_error_t SrsRecvThread::do_cycle()
- {
- srs_error_t err = srs_success;
-
- while (true) {
- /*......*/
-
- // Process the received message.
- //recv_message接受数据
- if ((err = rtmp->recv_message(&msg)) == srs_success) {
- //consume消费一个数据
- err = pumper->consume(msg);
- }
- /*.......*/
- }
-
- return err;
- }
-
最后会放入到source的consumer中
- if (!drop_for_reduce) {
- for (int i = 0; i < (int)consumers.size(); i++) {
- SrsConsumer* consumer = consumers.at(i);
- //把msg放入到consumer中
- if ((err = consumer->enqueue(msg, atc, jitter_algorithm)) != srs_success) {
- return srs_error_wrap(err, "consume message");
- }
- }
- }
最后会通知所有的消费者,有message写入,可以开始消费
- srs_error_t SrsConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag)
- {
- /*......*/
- if ((err = queue->enqueue(msg, NULL)) != srs_success) {
- return srs_error_wrap(err, "enqueue message");
- }
-
- #ifdef SRS_PERF_QUEUE_COND_WAIT
- // fire the mw when msgs is enough.
- if (mw_waiting) {
-
- if (atc && duration < 0) {
- //通知消费者
- srs_cond_signal(mw_wait);
- mw_waiting = false;
- return err;
- }
-
- // when duration ok, signal to flush.
- if (match_min_msgs && duration > mw_duration) {
- //通知消费者
- srs_cond_signal(mw_wait);
- mw_waiting = false;
- return err;
- }
- }
- #endif
-
- return err;
- }
本文福利, C++音视频学习资料包、技术视频,内容包括(音视频开发,面试题,FFmpeg ,webRTC ,rtmp ,hls ,rtsp ,ffplay ,srs)↓↓↓↓↓↓见下面↓↓文章底部点击领取↓↓