• GB28181学习(十七)——基于jrtplib实现tcp被动和主动发流


    前言

    GB/T28181-2022实时流的传输方式介绍:https://blog.csdn.net/www_dong/article/details/134255185

    基于jrtplib实现tcp被动和主动收流介绍:https://blog.csdn.net/www_dong/article/details/134451387

    本文主要介绍下级平台或设备发流功能,用于对接特定的SIP服务器或上级平台。

    UDP发流

    流程图

    在这里插入图片描述

    发送端流程

    • 初始化rtp参数;
    • 裸流数据做PS复用;
    • 组RTP包发送;

    设计

    1. 初始化rtp参数
    int CUdp::InitRtp_()
    {
    	RTPSessionParams sessionParams;
    	sessionParams.SetMinimumRTCPTransmissionInterval(10);
    	sessionParams.SetOwnTimestampUnit(1.0 / 90000.0);
    	sessionParams.SetAcceptOwnPackets(true);
    	sessionParams.SetMaximumPacketSize(1450);
    
    	RTPUDPv4TransmissionParams transParams;
    	transParams.SetRTPSendBuffer(2*1024*1024);
    	transParams.SetBindIP(m_ip);
    	transParams.SetPortbase((uint16_t)m_port);
    
    	if (0 != Create(sessionParams, &transParams))
    	{
    		return -1;
    	}
    
    	SetDefaultPayloadType((uint8_t)m_payload);
    	SetDefaultTimestampIncrement(3600);
    	SetDefaultMark(true);
    
    	RTPIPv4Address addr(ntohl(inet_addr(m_ip), (uint16_t)m_port);
    	if(0 != AddDestination(addr))
        {
    		return -1;
    	}
    
    	return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    1. 流数据复用为PS
    // 使用ireader开源库进行ps复用
    // 初始化
    CData2PS::CData2PS()
    {
    	struct ps_muxer_func_t func;
    	func.alloc = Alloc;
    	func.free = Free;
    	func.write = Packet;
    	m_ps = ps_muxer_create(&func, this);
    	// TODO codecid待补充
    	m_ps_stream = ps_muxer_add_stream(m_ps, PSI_STREAM_H264, nullptr, 0);
    }
    
    
    // 塞数据
    int CData2PS::InputData(void* data, int len)
    {
    	if (!m_ps)
    		return -1;
    
    	uint64_t clock = time64_now();
    	if (0 == m_ps_clock)
    		m_ps_clock = clock;
    
    	return ps_muxer_input(m_ps, m_ps_stream, 0, (clock - m_ps_clock) * 90, (clock - m_ps_clock) * 90, data, len);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    1. 发送rtp包
    // 调用jrtplib中SendPacket(data, len);接口发送数据
    
    // 以下为SendPacket部分源码
    // 主要流程:
    // 1. 构建packet
    // 2. 发送rtp数据
    int RTPSession::SendPacket(const void *data,size_t len,
                    uint8_t pt,bool mark,uint32_t timestampinc)
    {
    	int status;
    
    	if (!created)
    		return ERR_RTP_SESSION_NOTCREATED;
    	
    	BUILDER_LOCK
    	if ((status = packetbuilder.BuildPacket(data,len,pt,mark,timestampinc)) < 0)
    	{
    		BUILDER_UNLOCK
    		return status;
    	}
    	if ((status = SendRTPData(packetbuilder.GetPacket(),packetbuilder.GetPacketLength())) < 0)
    	{
    		BUILDER_UNLOCK
    		return status;
    	}
    	BUILDER_UNLOCK
    	
    	SOURCES_LOCK
    	sources.SentRTPPacket();
    	SOURCES_UNLOCK
    	PACKSENT_LOCK
    	sentpackets = true;
    	PACKSENT_UNLOCK
    	return 0;
    }
    
    // 构建包
    int RTPPacketBuilder::PrivateBuildPacket(const void *data,size_t len,
    	                  uint8_t pt,bool mark,uint32_t timestampinc,bool gotextension,
    	                  uint16_t hdrextID,const void *hdrextdata,size_t numhdrextwords)
    {
    	RTPPacket p(pt,data,len,seqnr,timestamp,ssrc,mark,numcsrcs,csrcs,gotextension,hdrextID,
    	            (uint16_t)numhdrextwords,hdrextdata,buffer,maxpacksize,GetMemoryManager());
    	int status = p.GetCreationError();
    
    	if (status < 0)
    		return status;
    	packetlength = p.GetPacketLength();
    
    	if (numpackets == 0) // first packet
    	{
    		lastwallclocktime = RTPTime::CurrentTime();
    		lastrtptimestamp = timestamp;
    		prevrtptimestamp = timestamp;
    	}
    	else if (timestamp != prevrtptimestamp)
    	{
    		lastwallclocktime = RTPTime::CurrentTime();
    		lastrtptimestamp = timestamp;
    		prevrtptimestamp = timestamp;
    	}
    	
    	numpayloadbytes += (uint32_t)p.GetPayloadLength();
    	numpackets++;
    	timestamp += timestampinc;
    	seqnr++;
    
    	return 0;
    }
    
    // 发送包
    int RTPSession::SendRTPData(const void *data, size_t len)
    {
    	if (!m_changeOutgoingData)
    		return rtptrans->SendRTPData(data, len);
    
    	void *pSendData = 0;
    	size_t sendLen = 0;
    	int status = 0;
    
    	status = OnChangeRTPOrRTCPData(data, len, true, &pSendData, &sendLen);
    	if (status < 0)
    		return status;
    
    	if (pSendData)
    	{
    		status = rtptrans->SendRTPData(pSendData, sendLen);
    		OnSentRTPOrRTCPData(pSendData, sendLen, true);
    	}
    
    	return status;
    }
    
    // 底层实现
    int RTPUDPv4Transmitter::SendRTPData(const void *data,size_t len)	
    {
    	if (!init)
    		return ERR_RTP_UDPV4TRANS_NOTINIT;
    
    	MAINMUTEX_LOCK
    	
    	if (!created)
    	{
    		MAINMUTEX_UNLOCK
    		return ERR_RTP_UDPV4TRANS_NOTCREATED;
    	}
    	if (len > maxpacksize)
    	{
    		MAINMUTEX_UNLOCK
    		return ERR_RTP_UDPV4TRANS_SPECIFIEDSIZETOOBIG;
    	}
    	
    	destinations.GotoFirstElement();
    	while (destinations.HasCurrentElement())
    	{
            // 调用sendto函数实现udp包的发送
    		sendto(rtpsock,(const char *)data,len,0,(const struct sockaddr *)destinations.GetCurrentElement().GetRTPSockAddr(),sizeof(struct sockaddr_in));
    		destinations.GotoNextElement();
    	}
    	
    	MAINMUTEX_UNLOCK
    	return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123

    tcp passive发流

    流程图

    在这里插入图片描述

    发送端流程:

    • 上级平台或sip服务器以主动方式连接,对于下级平台或者设备(数据发送端)为被动方式;
    • 下级平台或者设备(数据发送端)启动端口监听;
    • 接收上级平台或sip服务器tcp连接请求;
    • 向上级平台或sip服务器发送流数据;

    设计

    1. 创建socket、bind、listen,启动数据接收线程;
    // TcpServer为封装的socket类
    
    int CGBTcpServer::Start()
    {
    	if (0 != m_localPort || m_tcpServer.get())
    		return 0;
    
    	int ret = -1;
    	do 
    	{
    		m_tcpServer = std::make_shared(nullptr, this);
    		if (!m_tcpServer.get())
    			break;
    
    		ret = m_tcpServer->TcpCreate();
    		if (0 != ret)
    			break;
    
    		ret = m_tcpServer->TcpBind(m_localPort);
    		if (0 != ret)
    			break;
    
    		ret = m_tcpServer->TcpListen(5);
    		if (0 != ret)
    			break;
    
    		m_thread = std::thread(TCPData2PSThread, this);
    		return 0;
    	} while (0);
    
    	Stop();
    	return ret;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    1. 在线程内等待连接,连接成功后接收数据并回调至应用层处理
    void CGBTcpServer::TCPData2PSWorker()
    {
    	if (!m_pspacker)
    		m_pspacker = new(std::nothrow) CData2PS(PSTCPDataCB, this);
    
    	bool bAccept = false;
    	while (m_running)
    	{
    		if (!bAccept)
    		{
    			if (0 == m_tcpServer->TcpAccept())
    			{
    				bAccept = true;
    				if (0 != InitRtp_())
    				{
    					break;
    				}
    			}
    
    			continue;
    		}
    
    		std::this_thread::sleep_for(std::chrono::seconds(1));
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    1. 初始化rtp参数
    int CGBTcpServer::InitRtp_()
    {
    	const int packetSize = 45678;
    	RTPSessionParams sessionparams;
    	sessionparams.SetProbationType(RTPSources::NoProbation);
    	sessionparams.SetOwnTimestampUnit(1.0 / packetSize);
    	sessionparams.SetMaximumPacketSize(packetSize + 64);
    
    	m_rtpTcpTransmitter = new RTPTCPTransmitter(nullptr);
    	m_rtpTcpTransmitter->Init(true);
    	m_rtpTcpTransmitter->Create(65535, 0);
    
    	int status = Create(sessionparams, m_rtpTcpTransmitter);
    	if (status < 0)
    	{
    		return status;
    	}
    
    	status = AddDestination(RTPTCPAddress(m_tcpServer->GetClientSocket()));
    	if (0 != status)
    		return status;
    
    	SetDefaultPayloadType(96);
    	SetDefaultMark(false);
    	SetDefaultTimestampIncrement(160);
    	return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    1. 将数据复用为PS;
    2. tcp方式发包
    // 调用jrtplib中SendPacket(data, len);接口发送数据
    
    // 以下为tcp方式SendPacket部分源码
    int RTPTCPTransmitter::SendRTPData(const void *data,size_t len)	
    {
    	return SendRTPRTCPData(data, len);
    }
    
    int RTPTCPTransmitter::SendRTPRTCPData(const void *data, size_t len)
    {
    	if (!m_init)
    		return ERR_RTP_TCPTRANS_NOTINIT;
    
    	MAINMUTEX_LOCK
    	
    	if (!m_created)
    	{
    		MAINMUTEX_UNLOCK
    		return ERR_RTP_TCPTRANS_NOTCREATED;
    	}
        
        // #define RTPTCPTRANS_MAXPACKSIZE							65535
    	if (len > RTPTCPTRANS_MAXPACKSIZE)
    	{
    		MAINMUTEX_UNLOCK
    		return ERR_RTP_TCPTRANS_SPECIFIEDSIZETOOBIG;
    	}
    	
    	std::map::iterator it = m_destSockets.begin();
    	std::map::iterator end = m_destSockets.end();
    
    	vector errSockets;
    	int flags = 0;
    #ifdef RTP_HAVE_MSG_NOSIGNAL
    	flags = MSG_NOSIGNAL;
    #endif // RTP_HAVE_MSG_NOSIGNAL
    
    	while (it != end)
    	{
    		uint8_t lengthBytes[2] = { (uint8_t)((len >> 8)&0xff), (uint8_t)(len&0xff) };
    		SocketType sock = it->first;
    
            // 调用send接口发送数据
            // 1. 先发送2字节头(固定格式)
            // 2. 再发送数据
    		if (send(sock,(const char *)lengthBytes,2,flags) < 0 ||
    			send(sock,(const char *)data,len,flags) < 0)
    			errSockets.push_back(sock);
    		++it;
    	}
    	
    	MAINMUTEX_UNLOCK
    
    	if (errSockets.size() != 0)
    	{
    		for (size_t i = 0 ; i < errSockets.size() ; i++)
    			OnSendError(errSockets[i]);
    	}
    
    	// Don't return an error code to avoid the poll thread exiting
    	// due to one closed connection for example
    
    	return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64

    tcp active发流

    流程图

    在这里插入图片描述

    发送端流程:

    • 上级平台或sip服务器启动tcp监听连接,对于下级平台或者设备(数据发送端)为主动方式;
    • 下级平台或者设备(数据发送端)发起tcp连接;
    • 接收上级平台或sip服务器tcp响应;
    • 向上级平台或sip服务器发送流数据;

    设计

    1. 创建socket、connect、初始化rtp,启动数据接收线程
    // TcpClient为封装的客户端socket类
    
    int CGBTcpClient::Start()
    {
    	if (0 != m_localPort || m_tcpClient.get())
    		return 0;
    
    	int ret = -1;
    	do
    	{
    		m_tcpClient = std::make_shared(nullptr, this);
    		if (!m_tcpClient.get() || 0 != m_tcpClient->TcpCreate())
    			break;
    
    		ret = m_tcpClient->TcpConnectByTime(m_localIP.c_str(), m_localPort, 5);
    		if (0 != ret)
    			break;
    
    		ret = InitRtp_();
    		if (0 != ret)
    			break;
    
    		m_thread = std::thread(RTPPackerThread, this);
    		return 0;
    	} while (0);
    
    	Stop();
    	return ret;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    1. 初始化rtp参数
    int CGBTcpClient::InitRtp_()
    {
    	const int packSize = 45678;
    	RTPSessionParams sessionParams;
    	sessionParams.SetProbationType(RTPSources::NoProbation);
    	sessionParams.SetOwnTimestampUnit(90000.0 / 25.0);
    	sessionParams.SetMaximumPacketSize(packSize + 64);
    
    	m_rtpTcpTransmitter = new RTPTCPTransmitter(nullptr);
    	m_rtpTcpTransmitter->Init(true);
    	m_rtpTcpTransmitter->Create(65535, 0);
    
    	if (0 != Create(sessionParams, m_rtpTcpTransmitter))
    		return -1;
    
    	if (0 != AddDestination(RTPTCPAddress(m_tcpClient->GetClientSocket())))
    		return -1;
    
    	return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    1. 视音频数据复用为PS
    2. 发送数据,同tcp passive发流
  • 相关阅读:
    [GWCTF 2019]枯燥的抽奖 mt_rand 伪随机数
    【硬件架构的艺术】学习笔记(1)亚稳态的世界
    云原生之使用Docker部署Nas-Cab个人NAS平台
    什么是零代码?零代码与低代码有什么联系与区别?
    第九篇、线程同步(解决并发问题)
    美国疾控中心:持续减肥,降低32%患癌风险,降低48%癌死亡风险
    unity打包工具
    深度残差网络的实现与编程
    项目--苍穹外卖
    购物单-蓝桥杯
  • 原文地址:https://blog.csdn.net/www_dong/article/details/134562951