• obs推流核心流程分析


     

    前置步骤和录屏是一样的,见我的上一篇文章

    https://www.cnblogs.com/billin/p/17219558.html

    bool obs_output_actual_start(obs_output_t *output)在上文的这个函数中,如果是启用推流直播,函数指针会转到这里

    //碧麟精简批注版
    //rtmp推流开始
    static bool rtmp_stream_start(void *data)
    {
    	struct rtmp_stream *stream = data;
    
    	os_atomic_set_bool(&stream->connecting, true);
    	
        //开启推流线程
        return pthread_create(&stream->connect_thread, NULL, connect_thread,
    			      stream) == 0;
    }

    推流使用的是obs-outputs插件

    上面函数就是新开一个线程,来执行

    这个函数static void *connect_thread(void *data),线程传入参数是stream 

    先看一下rtmp_stream结构

    //rtmp stream结构
    struct rtmp_stream {
    	//obs_output
        obs_output_t *output;
        
        //packet信息
    	struct circlebuf packets;
    	bool sent_headers;
    
    	bool got_first_video;
    	int64_t start_dts_offset;
    
    	volatile bool connecting;
        //连接线程地址
    	pthread_t connect_thread;
        //发送线程地址
    	pthread_t send_thread;
    
    	os_sem_t *send_sem;
        
        //推流地址,key
        //比如b站,推流地址是rtmp://live-push.bilivideo.com/live-bvc/
    	struct dstr path, key;
        //用户名,密码
    	struct dstr username, password;
        //编码器名字:
        //我用的是FMLE/3.0 (compatible; FMSc/1.0)
    	struct dstr encoder_name;
    	struct dstr bind_ip;
    
    
    	int64_t last_dts_usec;
    
    	uint64_t total_bytes_sent;
    	int dropped_frames;
    
    	pthread_mutex_t dbr_mutex;
    	struct circlebuf dbr_frames;
    	size_t dbr_data_size;
    
    	long audio_bitrate;
    	long dbr_est_bitrate;
    	long dbr_orig_bitrate;
    	long dbr_prev_bitrate;
    	long dbr_cur_bitrate;
    	long dbr_inc_bitrate;
    	bool dbr_enabled;
        
        // RTMP结构对象
    	RTMP rtmp;
        
    	pthread_t socket_thread;
    	uint8_t *write_buf;
    	size_t write_buf_len;
    	size_t write_buf_size;
    	pthread_mutex_t write_buf_mutex;
    	
    };

    rtmp_stream结构里保存了推流的所有关键信息,包括推流地址,key,流的编码器等参数

    还保存了几个关键的指针,用于多线程中调度。有连接线程connect_thread,也有发送线程pthread_t send_thread;

     

    //rtmp连接线程
    static void *connect_thread(void *data)
    {
    	struct rtmp_stream *stream = data;
    	int ret;
        //设置线程名
    	os_set_thread_name("rtmp-stream: connect_thread");
        
        //初始化
    	if (!silently_reconnecting(stream)) {
    		if (!init_connect(stream)) {
    			obs_output_signal_stop(stream->output,
    					       OBS_OUTPUT_BAD_PATH);
    			os_atomic_set_bool(&stream->silent_reconnect, false);
    			return NULL;
    		}
    	} else {
    		struct encoder_packet packet;
    		peek_next_packet(stream, &packet);
    		stream->start_dts_offset = get_ms_time(&packet, packet.dts);
    	}
        
        //连接
    	ret = try_connect(stream);
    
    	if (ret != OBS_OUTPUT_SUCCESS) {
    		obs_output_signal_stop(stream->output, ret);
    		info("Connection to %s failed: %d", stream->path.array, ret);
    	}
    
    	if (!stopping(stream))
    		pthread_detach(stream->connect_thread);
    
    	os_atomic_set_bool(&stream->silent_reconnect, false);
    	os_atomic_set_bool(&stream->connecting, false);
    	return NULL;
    }

    上面的方法是在单独的线程中执行的,主要是做了下面几件事:

    1 设置线程名为“rtmp-stream :connect_thread”

    2 初始化 init_connect

    3 通过调用try_connect(stream)执行实际连接逻辑

    //rtmp connect
    static int try_connect(struct rtmp_stream *stream)
    {
    	info("Connecting to RTMP URL %s...", stream->path.array);
    
        //rtmp初始化
    	RTMP_Init(&stream->rtmp);
        
        //设置URL
    	if (!RTMP_SetupURL(&stream->rtmp, stream->path.array))
    		return OBS_OUTPUT_BAD_PATH;
    
    	RTMP_EnableWrite(&stream->rtmp);
        
        //设置流编码格式
    	dstr_copy(&stream->encoder_name, "FMLE/3.0 (compatible; FMSc/1.0)");
        
        //设置用户名密码
    	set_rtmp_dstr(&stream->rtmp.Link.pubUser, &stream->username);
    	set_rtmp_dstr(&stream->rtmp.Link.pubPasswd, &stream->password);
    	set_rtmp_dstr(&stream->rtmp.Link.flashVer, &stream->encoder_name);
    	stream->rtmp.Link.swfUrl = stream->rtmp.Link.tcUrl;
    	stream->rtmp.Link.customConnectEncode = add_connect_data;
    
    	if (dstr_is_empty(&stream->bind_ip) ||
    	    dstr_cmp(&stream->bind_ip, "default") == 0) {
    		memset(&stream->rtmp.m_bindIP, 0,
    		       sizeof(stream->rtmp.m_bindIP));
    	} else {
    		bool success = netif_str_to_addr(&stream->rtmp.m_bindIP.addr,
    						 &stream->rtmp.m_bindIP.addrLen,
    						 stream->bind_ip.array);
    		if (success) {
    			int len = stream->rtmp.m_bindIP.addrLen;
    			bool ipv6 = len == sizeof(struct sockaddr_in6);
    			info("Binding to IPv%d", ipv6 ? 6 : 4);
    		}
    	}
    
    	RTMP_AddStream(&stream->rtmp, stream->key.array);
    
    	stream->rtmp.m_outChunkSize = 4096;
    	stream->rtmp.m_bSendChunkSizeInfo = true;
    	stream->rtmp.m_bUseNagle = true;
    
        //连接
    	if (!RTMP_Connect(&stream->rtmp, NULL)) {
    		set_output_error(stream);
    		return OBS_OUTPUT_CONNECT_FAILED;
    	}
    
    	if (!RTMP_ConnectStream(&stream->rtmp, 0))
    		return OBS_OUTPUT_INVALID_STREAM;
    
    	info("Connection to %s successful", stream->path.array);
        
        //到这里说明连接成功,开始初始化send逻辑,准备推流
    	return init_send(stream);
    }

    当连接成功,开始调用init_send函数,初始化send,准备推流

    需要注意,send也是在单独的线

    程处理的,因为传视频比较大,如果不单开线程,肯定会造成阻塞。

    //初始化rtmp send逻辑
    static int init_send(struct rtmp_stream *stream)
    {
    	int ret;
    	obs_output_t *context = stream->output;
    
        //创建send线程,执行send_thread方法,参数是stream
    	ret = pthread_create(&stream->send_thread, NULL, send_thread, stream);
    
    	if (stream->new_socket_loop) {
    		int one = 1;
    #ifdef _WIN32
    		if (ioctlsocket(stream->rtmp.m_sb.sb_socket, FIONBIO, &one)) {
    			stream->rtmp.last_error_code = WSAGetLastError();
    #else
    		if (ioctl(stream->rtmp.m_sb.sb_socket, FIONBIO, &one)) {
    			stream->rtmp.last_error_code = errno;
    #endif
    			warn("Failed to set non-blocking socket");
    			return OBS_OUTPUT_ERROR;
    		}
    
    		os_event_reset(stream->send_thread_signaled_exit);
    
    		info("New socket loop enabled by user");
    		if (stream->low_latency_mode)
    			info("Low latency mode enabled by user");
    
    		if (stream->write_buf)
    			bfree(stream->write_buf);
    
    		int total_bitrate = 0;
    
    		obs_encoder_t *vencoder = obs_output_get_video_encoder(context);
    		if (vencoder) {
    			obs_data_t *params = obs_encoder_get_settings(vencoder);
    			if (params) {
    				int bitrate =
    					obs_data_get_int(params, "bitrate");
    				if (!bitrate) {
    					warn("Video encoder didn't return a "
    					     "valid bitrate, new network "
    					     "code may function poorly. "
    					     "Low latency mode disabled.");
    					stream->low_latency_mode = false;
    					bitrate = 10000;
    				}
    				total_bitrate += bitrate;
    				obs_data_release(params);
    			}
    		}
    
    		obs_encoder_t *aencoder =
    			obs_output_get_audio_encoder(context, 0);
    		if (aencoder) {
    			obs_data_t *params = obs_encoder_get_settings(aencoder);
    			if (params) {
    				int bitrate =
    					obs_data_get_int(params, "bitrate");
    				if (!bitrate)
    					bitrate = 160;
    				total_bitrate += bitrate;
    				obs_data_release(params);
    			}
    		}
    
    		// to bytes/sec
    		int ideal_buffer_size = total_bitrate * 128;
    
    		if (ideal_buffer_size < 131072)
    			ideal_buffer_size = 131072;
    
    		stream->write_buf_size = ideal_buffer_size;
    		stream->write_buf = bmalloc(ideal_buffer_size);
    
    #ifdef _WIN32
    		ret = pthread_create(&stream->socket_thread, NULL,
    				     socket_thread_windows, stream);
    #else
    		warn("New socket loop not supported on this platform");
    		return OBS_OUTPUT_ERROR;
    #endif
    
    		if (ret != 0) {
    			RTMP_Close(&stream->rtmp);
    			warn("Failed to create socket thread");
    			return OBS_OUTPUT_ERROR;
    		}
    
    		stream->socket_thread_active = true;
    		stream->rtmp.m_bCustomSend = true;
    		stream->rtmp.m_customSendFunc = socket_queue_data;
    		stream->rtmp.m_customSendParam = stream;
    	}
    
    	os_atomic_set_bool(&stream->active, true);
    
    	if (!send_meta_data(stream)) {
    		warn("Disconnected while attempting to send metadata");
    		set_output_error(stream);
    		return OBS_OUTPUT_DISCONNECTED;
    	}
    
    	obs_encoder_t *aencoder = obs_output_get_audio_encoder(context, 1);
    	if (aencoder && !send_additional_meta_data(stream)) {
    		warn("Disconnected while attempting to send additional "
    		     "metadata");
    		return OBS_OUTPUT_DISCONNECTED;
    	}
    
    	if (obs_output_get_audio_encoder(context, 2) != NULL) {
    		warn("Additional audio streams not supported");
    		return OBS_OUTPUT_DISCONNECTED;
    	}
    
    	if (!silently_reconnecting(stream))
    		obs_output_begin_data_capture(stream->output, 0);
    
    	return OBS_OUTPUT_SUCCESS;
    }

     

    核心推流线程,在单独的线程里完成推流逻辑

     

    //推流核心线程
    static void *send_thread(void *data)
    {
    	struct rtmp_stream *stream = data;
        
        //设置线程名
    	os_set_thread_name("rtmp-stream: send_thread");
    
        
        //设定buffersize
    #if defined(_WIN32)
    	// Despite MSDN claiming otherwise, send buffer auto tuning on
    	// Windows 7 doesn't seem to work very well.
    	if (get_win_ver_int() == 0x601) {
    		DWORD cur_sendbuf_size;
    		DWORD desired_sendbuf_size = 524288;
    		socklen_t int_size = sizeof(int);
    
    		if (!getsockopt(stream->rtmp.m_sb.sb_socket, SOL_SOCKET,
    				SO_SNDBUF, (char *)&cur_sendbuf_size,
    				&int_size) &&
    		    cur_sendbuf_size < desired_sendbuf_size) {
    
    			setsockopt(stream->rtmp.m_sb.sb_socket, SOL_SOCKET,
    				   SO_SNDBUF, (char *)&desired_sendbuf_size,
    				   sizeof(desired_sendbuf_size));
    		}
    	}
    
    	log_sndbuf_size(stream);
    #endif
        
        //推流主循环
    	while (os_sem_wait(stream->send_sem) == 0) {
    		struct encoder_packet packet;
    		struct dbr_frame dbr_frame;
    
    		if (stopping(stream) && stream->stop_ts == 0) {
    			break;
    		}
    
    		if (!get_next_packet(stream, &packet))
    			continue;
    
    		if (stopping(stream)) {
    			if (can_shutdown_stream(stream, &packet)) {
    				obs_encoder_packet_release(&packet);
    				break;
    			}
    		}
    
    		if (!stream->sent_headers) {
    			if (!send_headers(stream)) {
    				os_atomic_set_bool(&stream->disconnected, true);
    				break;
    			}
    		}
    
    		/* silent reconnect signal received from server, reconnect on
    		 * next keyframe */
    		if (silently_reconnecting(stream) &&
    		    packet.type == OBS_ENCODER_VIDEO && packet.keyframe) {
    			reinsert_packet_at_front(stream, &packet);
    			break;
    		}
    
    		if (stream->dbr_enabled) {
    			dbr_frame.send_beg = os_gettime_ns();
    			dbr_frame.size = packet.size;
    		}
    
    		if (send_packet(stream, &packet, false, packet.track_idx) < 0) {
    			os_atomic_set_bool(&stream->disconnected, true);
    			break;
    		}
    
    		if (stream->dbr_enabled) {
    			dbr_frame.send_end = os_gettime_ns();
    
    			pthread_mutex_lock(&stream->dbr_mutex);
    			dbr_add_frame(stream, &dbr_frame);
    			pthread_mutex_unlock(&stream->dbr_mutex);
    		}
    	}
    
    	bool encode_error = os_atomic_load_bool(&stream->encode_error);
    
    	if (disconnected(stream)) {
    		info("Disconnected from %s", stream->path.array);
    	} else if (encode_error) {
    		info("Encoder error, disconnecting");
    	} else if (silently_reconnecting(stream)) {
    		info("Silent reconnect signal received from server");
    	} else {
    		info("User stopped the stream");
    	}
    
    #if defined(_WIN32)
    	log_sndbuf_size(stream);
    #endif
    
    	if (stream->new_socket_loop) {
    		os_event_signal(stream->send_thread_signaled_exit);
    		os_event_signal(stream->buffer_has_data_event);
    		pthread_join(stream->socket_thread, NULL);
    		stream->socket_thread_active = false;
    		stream->rtmp.m_bCustomSend = false;
    	}
    
    	set_output_error(stream);
    
    	if (silently_reconnecting(stream)) {
    		/* manually close the socket to prevent librtmp from sending
    		 * unpublish / deletestream messages when we call RTMP_Close,
    		 * since we want to re-use this stream when we reconnect */
    		RTMPSockBuf_Close(&stream->rtmp.m_sb);
    		stream->rtmp.m_sb.sb_socket = -1;
    	}
    
    	RTMP_Close(&stream->rtmp);
    
    	/* reset bitrate on stop */
    	if (stream->dbr_enabled) {
    		if (stream->dbr_cur_bitrate != stream->dbr_orig_bitrate) {
    			stream->dbr_cur_bitrate = stream->dbr_orig_bitrate;
    			dbr_set_bitrate(stream);
    		}
    	}
    
    	if (!stopping(stream)) {
    		pthread_detach(stream->send_thread);
    		if (!silently_reconnecting(stream))
    			obs_output_signal_stop(stream->output,
    					       OBS_OUTPUT_DISCONNECTED);
    	} else if (encode_error) {
    		obs_output_signal_stop(stream->output, OBS_OUTPUT_ENCODE_ERROR);
    	} else {
    		obs_output_end_data_capture(stream->output);
    	}
    
    	if (!silently_reconnecting(stream)) {
    		free_packets(stream);
    		os_event_reset(stream->stop_event);
    		os_atomic_set_bool(&stream->active, false);
    	}
    
    	stream->sent_headers = false;
    
    	/* reset bitrate on stop */
    	if (stream->dbr_enabled) {
    		if (stream->dbr_cur_bitrate != stream->dbr_orig_bitrate) {
    			stream->dbr_cur_bitrate = stream->dbr_orig_bitrate;
    			dbr_set_bitrate(stream);
    		}
    	}
    
    	if (silently_reconnecting(stream)) {
    		rtmp_stream_start(stream);
    	}
    
    	return NULL;
    }

     

  • 相关阅读:
    外包干了3天,技术退步明显.......
    前端生成二维码
    Zookeeper集群 + Kafka集群
    在PG或HGDB上启用块校验checksum
    latex 编译
    【JavaEE进阶系列 | 从小白到工程师】正则表达式的语法&使用
    Swagger API 文档
    《吐血整理》高级系列教程-吃透Fiddler抓包教程(30)-Fiddler如何抓取Android7.0以上的Https包-番外篇
    什么是过期域名?做网站用过期域名好不好?
    Tomcat域名访问文件出现访问不到的问题
  • 原文地址:https://www.cnblogs.com/billin/p/17222702.html