• GB28181学习(十六)——基于jrtplib实现tcp被动和主动收流


    前言

    GB/T28181-2022实时流的传输方式介绍:https://blog.csdn.net/www_dong/article/details/134255185

    tcp passive收流

    流程图

    在这里插入图片描述

    注意:

    • m字段指定传输方式为TCP/RTP/AVP;
    • sdp信息中增加"a=setup:passive";
    • SIP服务器启动端口监听,设备发起tcp连接请求;

    设计

    1. 创建socket、bind、listen,启动数据接收线程;
    // TcpServer为封装的socket类
    
    int CGBTcpServerStreamReceiver::Start()
    {
    	if (m_localIP.empty() || m_localPort <= 0)
    		return -1;
    
    	if (m_tcpServer.get())
    		return 0;
    
    	m_tcpServer = std::make_shared(TcpDataCB, this);
    	if (!m_tcpServer.get())
    		return -1;
    
    	if (0 != m_tcpServer->TcpCreate() 
    		|| 0 != m_tcpServer->TcpBind(m_localPort) 
    		|| 0 != m_tcpServer->TcpListen(5))
    		return -1;
    
    	m_thread = std::thread(TcpDataThread, this);
    	return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    1. 在线程内等待连接,连接成功后接收数据并回调至应用层处理
    void CGBTcpServerStreamReceiver::TcpDataWorker()
    {
    	bool bAccept = false;
    	uint8_t payload;
    	while (m_running)
    	{
    		if (!bAccept)
    		{
                // 等待设备连接
    			if (0 == m_tcpServer->TcpAccept())
    			{
    				bAccept = true;
                    // 连接成功后,初始化rtp参数
    				if (0 != InitRtpSession_())
    				{
    					break;
    				}
    			}
    
    			continue;
    		}
    
    		Poll();
    		BeginDataAccess();
    
            // 开始接收数据
    		if (GotoFirstSourceWithData())
    		{
    			do
    			{
    				RTPPacket* packet = nullptr;
    				while (nullptr != (packet = GetNextPacket()))
    				{
    					payload = packet->GetPayloadType();
    					if (0 == payload)
    					{
    						DeletePacket(packet);
    						continue;
    					}
    
    					struct rtp_packet_tcp data;
    					data.mark = packet->HasMarker();
    					data.pts = packet->GetTimestamp();
    					data.seq = packet->GetSequenceNumber();
    					data.data = packet->GetPayloadData();
    					data.len = (int)packet->GetPayloadLength();
    
    					m_payload = payload;
    
    					if (m_lastSeq < 0)
    					{
    						m_lastSeq = data.seq - 1;
    					}
    
    					if (m_lastSeq = data.seq - 1)
    					{
    						PackData_(data.data, data.len);
    					}
    
    					DeletePacket(packet);
    				}
    
    			} while (GotoNextSourceWithData());
    		}
    
    		EndDataAccess();
    		Sleep(30);
    	}
    
    	Destroy();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    1. 初始化rtp参数
    int CGBTcpServerStreamReceiver::InitRtpSession_()
    {
    	const int packSize = 45678;
    	RTPSessionParams sessionParams;
    	sessionParams.SetProbationType(RTPSources::NoProbation);
    	sessionParams.SetOwnTimestampUnit(90000.0 / 25.0);
    	sessionParams.SetMaximumPacketSize(packSize + 64);
    
    	m_rtpTcpTransmitter = new RTPTCPTransmitter(nullptr);
    	m_rtpTcpTransmitter->Init(true);
    	m_rtpTcpTransmitter->Create(65535, 0);
    
    	if (0 != Create(sessionParams, m_rtpTcpTransmitter))
    		return -1;
    
    	if (0 != AddDestination(RTPTCPAddress(m_tcpServer->GetClientSocket())))
    		return -1;
    
    	return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    注意:RTP over TCP模式比RTP over UDP模式多了两字节的长度字段。

    tcp active收流

    流程图

    在这里插入图片描述

    注意:

    • m字段指定传输方式为TCP/RTP/AVP;
    • sdp信息中增加"a=setup:active";
    • 设备返回200 OK,报文的SDP信息中包含tcp监听端口;
    • SIP服务器根据设备监听端口发起TCP连接请求;

    设计

    1. 创建socket、connect、初始化rtp,启动数据接收线程
    // TcpClient为封装的客户端socket类
    
    int CGBTcpClientStreamReceiver::Start(int streamType)
    {
    	if (m_localIP.empty() || m_localPort <= 0)
    		return -1;
    
    	if (m_tcpClient.get())
    		return 0;
    
        // 创建socket
    	m_tcpClient = std::make_shared(TcpDataCB, this);
    	if (!m_tcpClient.get() || 0 != m_tcpClient->TcpCreate())
    		return -1;
    
    	// connect
    	int ret = m_tcpClient->TcpConnectByTime(m_localIP.c_str(), m_localPort, 5);
    	if (0 != ret)
    		return -1;
    
        // 初始化rtp session
    	if (0 != InitRtpSession_())
    		return -1;
    
        // 启动接收线程
    	m_thread = std::thread(TcpDataThread, this);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    1. 初始化rtp参数
    int CGBTcpClientStreamReceiver::InitRtpSession_()
    {
    	const int packSize = 45678;
    	RTPSessionParams sessionParams;
    	sessionParams.SetProbationType(RTPSources::NoProbation);
    	sessionParams.SetOwnTimestampUnit(90000.0 / 25.0);
    	sessionParams.SetMaximumPacketSize(packSize + 64);
    
    	m_rtpTcpTransmitter = new RTPTCPTransmitter(nullptr);
    	m_rtpTcpTransmitter->Init(true);
    	m_rtpTcpTransmitter->Create(65535, 0);
    
    	if (0 != Create(sessionParams, m_rtpTcpTransmitter))
    		return -1;
    
        // 添加客户端socket
    	if (0 != AddDestination(RTPTCPAddress(m_tcpClient->GetClientSocket())))
    		return -1;
    
    	return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    1. 在线程内等待连接,连接成功后接收数据并回调至应用层处理

    同tcp passive收流流程。

    rtp解包工具

    基于qt+ireader库实现tcp解包(ps封装+264载荷)

    解包流程

    1. 打开文件,创建解码器
    void RtpUnpackDlg::StartRtpUnpack()
    {
    	if (m_thread.joinable())
    		return;
    
    	QString filePath = ui.le_filePath->text();
    	if (filePath.isEmpty())
    	{
    		QMessageBox::critical(this, QString::fromLocal8Bit("错误"), QString::fromLocal8Bit("文件路径为空"), QMessageBox::Ok);
    		return;
    	}
    
    	m_rtpPayloadParam.payload = 100;
    	m_rtpPayloadParam.encoding = "PS";
    	m_rtpPayloadParam.frtp = fopen(filePath.toStdString().c_str(), "rb");
    	if (!m_rtpPayloadParam.frtp)
    	{
    		QMessageBox::critical(this, QString::fromLocal8Bit("警告"), QString::fromLocal8Bit("打开输入文件失败"), QMessageBox::Ok);
    		return;
    	}
    
    	m_rtpPayloadParam.fout = fopen(outFilePath.toStdString().c_str(), "wb");
    	if (!m_rtpPayloadParam.fout)
    	{
    		QMessageBox::critical(this, QString::fromLocal8Bit("警告"), QString::fromLocal8Bit("打开输出文件失败"), QMessageBox::Ok);
    		return;
    	}
    
    	struct rtp_payload_t handler;
    	handler.alloc = RtpPacketAlloc;
    	handler.free = RtpPacketFree;
    	handler.packet = RtpDecodePacket;
    	m_rtpPayloadParam.decoder = rtp_payload_decode_create(100, "PS", &handler, this);
    
    	m_thread = std::thread(DataReadThread, this);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    1. 读数据
    void RtpUnpackDlg::RtpDataReadWorker()
    {
    	while (m_running)
    	{
    		// 先读两个字节的头
    		unsigned char s2[2];
    		if (2 != fread(s2, 1, 2, m_rtpPayloadParam.frtp))
    			break;
    
    		m_rtpPayloadParam.size = (s2[0] << 8) | s2[1];
    		assert(ctx.size < sizeof(ctx.packet));
    		if (m_rtpPayloadParam.size != (int)fread(m_rtpPayloadParam.packet, 1, m_rtpPayloadParam.size, m_rtpPayloadParam.frtp))
    			break;
    
            // 塞数据
    		if (m_rtpPayloadParam.packet[1] < RTCP_FIR || m_rtpPayloadParam.packet[1] > RTCP_LIMIT)
    			rtp_payload_decode_input(m_rtpPayloadParam.decoder, m_rtpPayloadParam.packet, m_rtpPayloadParam.size);
    	}
    
    	fclose(m_rtpPayloadParam.frtp);
    	fclose(m_rtpPayloadParam.fout);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    1. 解包
    int RtpUnpackDlg::DecodePacket(const void* packet, int bytes, uint32_t timestamp, int flags)
    {
    	static const unsigned char start_code[4] = { 0, 0, 0, 1 };
    	static unsigned char buffer[2 * 1024 * 1024] = {0, };
    
    	size_t size = 0;
    	if (0 == strcmp("H264", m_rtpPayloadParam.encoding) 
    		|| 0 == strcmp("H265", m_rtpPayloadParam.encoding)
    		|| 0 == strcmp("PS", m_rtpPayloadParam.encoding))
    	{
    		memcpy(buffer, start_code, sizeof(start_code));
    		size += sizeof(start_code);
    	}
    
    	memcpy(buffer + size, packet, bytes);
    	size += bytes;
    
    	fwrite(buffer, 1, size, m_rtpPayloadParam.fout);
    
        // 新增界面播放功能
    	if (m_playWidget)
    		m_playWidget->AddData(CODEC_VIDEO_H264, (void*)buffer, size);
    
    	return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    界面示例

    在这里插入图片描述

    参考:https://github.com/ireader中的demo示例

  • 相关阅读:
    Yii2 关联查询结果AR对象 如何取到表以外的字段
    求数据流中的中位数问题
    SSH访问Centos7中文显示乱码
    RabbitMQ消息的可靠传输和防止消息丢失
    Enzo丨Enzo SAVIEW PLUS AP试剂解决方案
    DeepSpeed: 大模型训练框架 | 京东云技术团队
    nginx基础概念
    【.net core】yisha框架单页面双列表联动效果示例
    [CSAW‘22] 世界这么大
    WPF UI样式介绍
  • 原文地址:https://blog.csdn.net/www_dong/article/details/134451387