根据上篇文章,rtmp 推流处理publishing 。do_publishing 处理SrsLiveSource及传入收发SrsPublishRecvThread协程。
- srs_error_t SrsRtmpConn::publishing(SrsLiveSource* source)
- {
- srs_error_t err = srs_success;
-
- SrsRequest* req = info->req;
-
- if (_srs_config->get_refer_enabled(req->vhost)) {
- if ((err = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != srs_success) {
- return srs_error_wrap(err, "rtmp: referer check");
- }
- }
-
- if ((err = http_hooks_on_publish()) != srs_success) {
- return srs_error_wrap(err, "rtmp: callback on publish");
- }
-
- // TODO: FIXME: Should refine the state of publishing.
- if ((err = acquire_publish(source)) == srs_success) {
- // use isolate thread to recv,
- // @see: https://github.com/ossrs/srs/issues/237
- SrsPublishRecvThread rtrd(rtmp, req, srs_netfd_fileno(stfd), 0, this, source, _srs_context->get_id());
- err = do_publishing(source, &rtrd);
- rtrd.stop();
- }
-
- // whatever the acquire publish, always release publish.
- // when the acquire error in the midlle-way, the publish state changed,
- // but failed, so we must cleanup it.
- // @see https://github.com/ossrs/srs/issues/474
- // @remark when stream is busy, should never release it.
- if (srs_error_code(err) != ERROR_SYSTEM_STREAM_BUSY) {
- release_publish(source);
- }
-
- http_hooks_on_unpublish();
-
- return err;
- }
do_publishing :协程执行,处理数据统计。
- srs_error_t SrsRtmpConn::do_publishing(SrsLiveSource* source, SrsPublishRecvThread* rtrd)
- {
- srs_error_t err = srs_success;
-
- SrsRequest* req = info->req;
- SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_publish();
- SrsAutoFree(SrsPithyPrint, pprint);
-
- // update the statistic when source disconveried.
- SrsStatistic* stat = SrsStatistic::instance();
- if ((err = stat->on_client(_srs_context->get_id().c_str(), req, this, info->type)) != srs_success) {
- return srs_error_wrap(err, "rtmp: stat client");
- }
-
- // start isolate recv thread.
- // TODO: FIXME: Pass the callback here.
- if ((err = rtrd->start()) != srs_success) {
- return srs_error_wrap(err, "rtmp: receive thread");
- }
-
- // initialize the publish timeout.
- publish_1stpkt_timeout = _srs_config->get_publish_1stpkt_timeout(req->vhost);
- publish_normal_timeout = _srs_config->get_publish_normal_timeout(req->vhost);
-
- // set the sock options.
- set_sock_options();
-
- if (true) {
- bool mr = _srs_config->get_mr_enabled(req->vhost);
- srs_utime_t mr_sleep = _srs_config->get_mr_sleep(req->vhost);
- srs_trace("start publish mr=%d/%d, p1stpt=%d, pnt=%d, tcp_nodelay=%d",
- mr, srsu2msi(mr_sleep), srsu2msi(publish_1stpkt_timeout), srsu2msi(publish_normal_timeout), tcp_nodelay);
- }
-
- int64_t nb_msgs = 0;
- uint64_t nb_frames = 0;
- while (true) {
- if ((err = trd->pull()) != srs_success) {
- return srs_error_wrap(err, "rtmp: thread quit");
- }
-
- pprint->elapse();
-
- // cond wait for timeout.
- if (nb_msgs == 0) {
- // when not got msgs, wait for a larger timeout.
- // @see https://github.com/ossrs/srs/issues/441
- rtrd->wait(publish_1stpkt_timeout);
- } else {
- rtrd->wait(publish_normal_timeout);
- }
-
- // check the thread error code.
- if ((err = rtrd->error_code()) != srs_success) {
- return srs_error_wrap(err, "rtmp: receive thread");
- }
-
- // when not got any messages, timeout.
- if (rtrd->nb_msgs() <= nb_msgs) {
- return srs_error_new(ERROR_SOCKET_TIMEOUT, "rtmp: publish timeout %dms, nb_msgs=%d",
- nb_msgs? srsu2msi(publish_normal_timeout) : srsu2msi(publish_1stpkt_timeout), (int)nb_msgs);
- }
- nb_msgs = rtrd->nb_msgs();
-
- // Update the stat for video fps.
- // @remark https://github.com/ossrs/srs/issues/851
- SrsStatistic* stat = SrsStatistic::instance();
- if ((err = stat->on_video_frames(req, (int)(rtrd->nb_video_frames() - nb_frames))) != srs_success) {
- return srs_error_wrap(err, "rtmp: stat video frames");
- }
- nb_frames = rtrd->nb_video_frames();
-
- // reportable
- if (pprint->can_print()) {
- kbps->sample();
- bool mr = _srs_config->get_mr_enabled(req->vhost);
- srs_utime_t mr_sleep = _srs_config->get_mr_sleep(req->vhost);
- srs_trace("<- " SRS_CONSTS_LOG_CLIENT_PUBLISH " time=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d, mr=%d/%d, p1stpt=%d, pnt=%d",
- (int)pprint->age(), kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
- kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), mr, srsu2msi(mr_sleep),
- srsu2msi(publish_1stpkt_timeout), srsu2msi(publish_normal_timeout));
- }
- }
-
- return err;
- }
SrsPublishRecvThread::start 执行:trd.start 。trd指向SrsRecvThread 。
- srs_error_t SrsPublishRecvThread::start()
- {
- srs_error_t err = srs_success;
-
- if ((err = trd.start()) != srs_success) {
- err = srs_error_wrap(err, "publish recv thread");
- }
-
- ncid = cid = trd.cid();
-
- return err;
- }
SrsRecvThread 执行SrsSTCoroutine。SrsSTCoroutine传入this对象cycle()
- srs_error_t SrsRecvThread::start()
- {
- srs_error_t err = srs_success;
-
- srs_freep(trd);
- trd = new SrsSTCoroutine("recv", this, _parent_cid);
-
- //change stack size to 256K, fix crash when call some 3rd-part api.
- ((SrsSTCoroutine*)trd)->set_stack_size(1 << 18);
-
- if ((err = trd->start()) != srs_success) {
- return srs_error_wrap(err, "recv thread");
- }
-
- return err;
- }
SrsRecvThread::cycle
- srs_error_t SrsRecvThread::cycle()
- {
- srs_error_t err = srs_success;
-
- // the multiple messages writev improve performance large,
- // but the timeout recv will cause 33% sys call performance,
- // to use isolate thread to recv, can improve about 33% performance.
- rtmp->set_recv_timeout(SRS_UTIME_NO_TIMEOUT);
-
- pumper->on_start();
-
- if ((err = do_cycle()) != srs_success) {
- err = srs_error_wrap(err, "recv thread");
- }
-
- // reset the timeout to pulse mode.
- rtmp->set_recv_timeout(timeout);
-
- pumper->on_stop();
-
- return err;
- }
do_cycle 执行。
rtmp->recv_message ,rtmp 收数据。
pumper->consume 数据推流到。pumper->consume 指向。 SrsPublishRecvThread::consume(SrsCommonMessage* msg)
- srs_error_t SrsRecvThread::do_cycle()
- {
- srs_error_t err = srs_success;
-
- while (true) {
- if ((err = trd->pull()) != srs_success) {
- return srs_error_wrap(err, "recv thread");
- }
-
- // When the pumper is interrupted, wait then retry.
- if (pumper->interrupted()) {
- srs_usleep(timeout);
- continue;
- }
-
- SrsCommonMessage* msg = NULL;
-
- // Process the received message.
- if ((err = rtmp->recv_message(&msg)) == srs_success) {
- err = pumper->consume(msg);
- }
-
- if (err != srs_success) {
- // Interrupt the receive thread for any error.
- trd->interrupt();
-
- // Notify the pumper to quit for error.
- pumper->interrupt(err);
-
- return srs_error_wrap(err, "recv thread");
- }
- }
-
- return err;
- }
SrsPublishRecvThread::consume 。执行_conn->handle_publish_message 函数。
_conn->handle_publish_message 指向SrsRtmpConn::handle_publish_message。
- srs_error_t SrsPublishRecvThread::consume(SrsCommonMessage* msg)
- {
- srs_error_t err = srs_success;
-
- // when cid changed, change it.
- if (ncid.compare(cid)) {
- _srs_context->set_id(ncid);
- cid = ncid;
- }
-
- _nb_msgs++;
-
- if (msg->header.is_video()) {
- video_frames++;
- }
-
- // log to show the time of recv thread.
- srs_verbose("recv thread now=%" PRId64 "us, got msg time=%" PRId64 "ms, size=%d",
- srs_update_system_time(), msg->header.timestamp, msg->size);
-
- // the rtmp connection will handle this message
- err = _conn->handle_publish_message(_source, msg);
-
- // must always free it,
- // the source will copy it if need to use.
- srs_freep(msg);
-
- if (err != srs_success) {
- return srs_error_wrap(err, "handle publish message");
- }
-
- // Yield to another coroutines.
- // @see https://github.com/ossrs/srs/issues/2194#issuecomment-777463768
- if (++nn_msgs_for_yield_ >= 15) {
- nn_msgs_for_yield_ = 0;
- srs_thread_yield();
- }
-
- return err;
- }
handle_publish_message 处理。
- srs_error_t SrsRtmpConn::handle_publish_message(SrsLiveSource* source, SrsCommonMessage* msg)
- {
- srs_error_t err = srs_success;
-
- // process publish event.
- if (msg->header.is_amf0_command() || msg->header.is_amf3_command()) {
- SrsPacket* pkt = NULL;
- if ((err = rtmp->decode_message(msg, &pkt)) != srs_success) {
- return srs_error_wrap(err, "rtmp: decode message");
- }
- SrsAutoFree(SrsPacket, pkt);
-
- // for flash, any packet is republish.
- if (info->type == SrsRtmpConnFlashPublish) {
- // flash unpublish.
- // TODO: maybe need to support republish.
- srs_trace("flash flash publish finished.");
- return srs_error_new(ERROR_CONTROL_REPUBLISH, "rtmp: republish");
- }
-
- // for fmle, drop others except the fmle start packet.
- if (dynamic_cast<SrsFMLEStartPacket*>(pkt)) {
- SrsFMLEStartPacket* unpublish = dynamic_cast<SrsFMLEStartPacket*>(pkt);
- if ((err = rtmp->fmle_unpublish(info->res->stream_id, unpublish->transaction_id)) != srs_success) {
- return srs_error_wrap(err, "rtmp: republish");
- }
- return srs_error_new(ERROR_CONTROL_REPUBLISH, "rtmp: republish");
- }
-
- srs_trace("fmle ignore AMF0/AMF3 command message.");
- return err;
- }
-
- // video, audio, data message
- if ((err = process_publish_message(source, msg)) != srs_success) {
- return srs_error_wrap(err, "rtmp: consume message");
- }
-
- return err;
- }
process_publish_message ,处理source->on_audio 及source->on_video 函数。
- srs_error_t SrsRtmpConn::process_publish_message(SrsLiveSource* source, SrsCommonMessage* msg)
- {
- srs_error_t err = srs_success;
-
- // for edge, directly proxy message to origin.
- if (info->edge) {
- if ((err = source->on_edge_proxy_publish(msg)) != srs_success) {
- return srs_error_wrap(err, "rtmp: proxy publish");
- }
- return err;
- }
-
- // process audio packet
- if (msg->header.is_audio()) {
- if ((err = source->on_audio(msg)) != srs_success) {
- return srs_error_wrap(err, "rtmp: consume audio");
- }
- return err;
- }
- // process video packet
- if (msg->header.is_video()) {
- if ((err = source->on_video(msg)) != srs_success) {
- return srs_error_wrap(err, "rtmp: consume video");
- }
- return err;
- }
-
- // process aggregate packet
- if (msg->header.is_aggregate()) {
- if ((err = source->on_aggregate(msg)) != srs_success) {
- return srs_error_wrap(err, "rtmp: consume aggregate");
- }
- return err;
- }
-
- // process onMetaData
- if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
- SrsPacket* pkt = NULL;
- if ((err = rtmp->decode_message(msg, &pkt)) != srs_success) {
- return srs_error_wrap(err, "rtmp: decode message");
- }
- SrsAutoFree(SrsPacket, pkt);
-
- if (dynamic_cast<SrsOnMetaDataPacket*>(pkt)) {
- SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt);
- if ((err = source->on_meta_data(msg, metadata)) != srs_success) {
- return srs_error_wrap(err, "rtmp: consume metadata");
- }
- return err;
- }
- return err;
- }
-
- return err;
- }
2. 数据发送:
- srs_error_t SrsRtmpConn::playing(SrsLiveSource* source)
- {
- srs_error_t err = srs_success;
-
- // Check page referer of player.
- SrsRequest* req = info->req;
- if (_srs_config->get_refer_enabled(req->vhost)) {
- if ((err = refer->check(req->pageUrl, _srs_config->get_refer_play(req->vhost))) != srs_success) {
- return srs_error_wrap(err, "rtmp: referer check");
- }
- }
-
- // When origin cluster enabled, try to redirect to the origin which is active.
- // A active origin is a server which is delivering stream.
- if (!info->edge && _srs_config->get_vhost_origin_cluster(req->vhost) && source->inactive()) {
- vector<string> coworkers = _srs_config->get_vhost_coworkers(req->vhost);
- for (int i = 0; i < (int)coworkers.size(); i++) {
- // TODO: FIXME: User may config the server itself as coworker, we must identify and ignore it.
- string host; int port = 0; string coworker = coworkers.at(i);
-
- string url = "http://" + coworker + "/api/v1/clusters?"
- + "vhost=" + req->vhost + "&ip=" + req->host + "&app=" + req->app + "&stream=" + req->stream
- + "&coworker=" + coworker;
- if ((err = SrsHttpHooks::discover_co_workers(url, host, port)) != srs_success) {
- // If failed to discovery stream in this coworker, we should request the next one util the last.
- // @see https://github.com/ossrs/srs/issues/1223
- if (i < (int)coworkers.size() - 1) {
- continue;
- }
- return srs_error_wrap(err, "discover coworkers, url=%s", url.c_str());
- }
-
- string rurl = srs_generate_rtmp_url(host, port, req->host, req->vhost, req->app, req->stream, req->param);
- srs_trace("rtmp: redirect in cluster, from=%s:%d, target=%s:%d, url=%s, rurl=%s",
- req->host.c_str(), req->port, host.c_str(), port, url.c_str(), rurl.c_str());
-
- // Ignore if host or port is invalid.
- if (host.empty() || port == 0) {
- continue;
- }
-
- bool accepted = false;
- if ((err = rtmp->redirect(req, rurl, accepted)) != srs_success) {
- srs_error_reset(err);
- } else {
- return srs_error_new(ERROR_CONTROL_REDIRECT, "redirected");
- }
- }
-
- return srs_error_new(ERROR_OCLUSTER_REDIRECT, "no origin");
- }
-
- // Set the socket options for transport.
- set_sock_options();
-
- // Create a consumer of source.
- SrsLiveConsumer* consumer = NULL;
- SrsAutoFree(SrsLiveConsumer, consumer);
- if ((err = source->create_consumer(consumer)) != srs_success) {
- return srs_error_wrap(err, "rtmp: create consumer");
- }
- if ((err = source->consumer_dumps(consumer)) != srs_success) {
- return srs_error_wrap(err, "rtmp: dumps consumer");
- }
-
- // Use receiving thread to receive packets from peer.
- SrsQueueRecvThread trd(consumer, rtmp, SRS_PERF_MW_SLEEP, _srs_context->get_id());
-
- if ((err = trd.start()) != srs_success) {
- return srs_error_wrap(err, "rtmp: start receive thread");
- }
-
- // Deliver packets to peer.
- wakable = consumer;
- err = do_playing(source, consumer, &trd);
- wakable = NULL;
-
- trd.stop();
-
- // Drop all packets in receiving thread.
- if (!trd.empty()) {
- srs_warn("drop the received %d messages", trd.size());
- }
-
- return err;
- }
do_playing 函数。
consumer->dump_packets 从缓存 取出数据,
rtmp->send_and_free_messages 发送数据。
- srs_error_t SrsRtmpConn::do_playing(SrsLiveSource* source, SrsLiveConsumer* consumer, SrsQueueRecvThread* rtrd)
- {
- srs_error_t err = srs_success;
-
- SrsRequest* req = info->req;
- srs_assert(req);
- srs_assert(consumer);
-
- // update the statistic when source disconveried.
- SrsStatistic* stat = SrsStatistic::instance();
- if ((err = stat->on_client(_srs_context->get_id().c_str(), req, this, info->type)) != srs_success) {
- return srs_error_wrap(err, "rtmp: stat client");
- }
-
- // initialize other components
- SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_play();
- SrsAutoFree(SrsPithyPrint, pprint);
-
- SrsMessageArray msgs(SRS_PERF_MW_MSGS);
- bool user_specified_duration_to_stop = (req->duration > 0);
- int64_t starttime = -1;
-
- // setup the realtime.
- realtime = _srs_config->get_realtime_enabled(req->vhost);
- // setup the mw config.
- // when mw_sleep changed, resize the socket send buffer.
- mw_msgs = _srs_config->get_mw_msgs(req->vhost, realtime);
- mw_sleep = _srs_config->get_mw_sleep(req->vhost);
- skt->set_socket_buffer(mw_sleep);
- // initialize the send_min_interval
- send_min_interval = _srs_config->get_send_min_interval(req->vhost);
-
- srs_trace("start play smi=%dms, mw_sleep=%d, mw_msgs=%d, realtime=%d, tcp_nodelay=%d",
- srsu2msi(send_min_interval), srsu2msi(mw_sleep), mw_msgs, realtime, tcp_nodelay);
-
- while (true) {
- // when source is set to expired, disconnect it.
- if ((err = trd->pull()) != srs_success) {
- return srs_error_wrap(err, "rtmp: thread quit");
- }
-
- // collect elapse for pithy print.
- pprint->elapse();
-
- // to use isolate thread to recv, can improve about 33% performance.
- while (!rtrd->empty()) {
- SrsCommonMessage* msg = rtrd->pump();
- if ((err = process_play_control_msg(consumer, msg)) != srs_success) {
- return srs_error_wrap(err, "rtmp: play control message");
- }
- }
-
- // quit when recv thread error.
- if ((err = rtrd->error_code()) != srs_success) {
- return srs_error_wrap(err, "rtmp: recv thread");
- }
-
- #ifdef SRS_PERF_QUEUE_COND_WAIT
- // wait for message to incoming.
- // @see https://github.com/ossrs/srs/issues/257
- consumer->wait(mw_msgs, mw_sleep);
- #endif
-
- // get messages from consumer.
- // each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
- // @remark when enable send_min_interval, only fetch one message a time.
- int count = (send_min_interval > 0)? 1 : 0;
- if ((err = consumer->dump_packets(&msgs, count)) != srs_success) {
- return srs_error_wrap(err, "rtmp: consumer dump packets");
- }
-
- // reportable
- if (pprint->can_print()) {
- kbps->sample();
- srs_trace("-> " SRS_CONSTS_LOG_PLAY " time=%d, msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d, mw=%d/%d",
- (int)pprint->age(), count, kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
- kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), srsu2msi(mw_sleep), mw_msgs);
- }
-
- if (count <= 0) {
- #ifndef SRS_PERF_QUEUE_COND_WAIT
- srs_usleep(mw_sleep);
- #endif
- // ignore when nothing got.
- continue;
- }
-
- // only when user specifies the duration,
- // we start to collect the durations for each message.
- if (user_specified_duration_to_stop) {
- for (int i = 0; i < count; i++) {
- SrsSharedPtrMessage* msg = msgs.msgs[i];
-
- // foreach msg, collect the duration.
- // @remark: never use msg when sent it, for the protocol sdk will free it.
- if (starttime < 0 || starttime > msg->timestamp) {
- starttime = msg->timestamp;
- }
- duration += (msg->timestamp - starttime) * SRS_UTIME_MILLISECONDS;
- starttime = msg->timestamp;
- }
- }
-
- // sendout messages, all messages are freed by send_and_free_messages().
- // no need to assert msg, for the rtmp wilal assert it.
- printf("%s %d %d\n",__FUNCTION__,__LINE__,count);
- if (count > 0 && (err = rtmp->send_and_free_messages(msgs.msgs, count, info->res->stream_id)) != srs_success) {
- return srs_error_wrap(err, "rtmp: send %d messages", count);
- }
-
- // if duration specified, and exceed it, stop play live.
- // @see: https://github.com/ossrs/srs/issues/45
- if (user_specified_duration_to_stop) {
- if (duration >= req->duration) {
- return srs_error_new(ERROR_RTMP_DURATION_EXCEED, "rtmp: time %d up %d", srsu2msi(duration), srsu2msi(req->duration));
- }
- }
-
- // apply the minimal interval for delivery stream in srs_utime_t.
- if (send_min_interval > 0) {
- srs_usleep(send_min_interval);
- }
-
- // Yield to another coroutines.
- // @see https://github.com/ossrs/srs/issues/2194#issuecomment-777437476
- srs_thread_yield();
- }
-
- return err;
- }