• 海思3559万能平台:RTSP优化buffpool的引入


    前言

      在只有一路编码进行rtsp播放和保存时。之前的思路是没有任何问题的,设计比较简单,但是如果我们的运算量上来了,也不仅仅一个通道编码了,编码速率,保存速率,rtsp发送包的速率的差异会越来越大,而每一包的大小又不可能相同,这个时候就需要引入一个缓冲池来平衡输入输出的速率不一致(简单这么理解一下),且解决包大小不同的问题
      理论部分节选自知乎专栏https://zhuanlan.zhihu.com/p/533321012
      代码部分给出了缓冲池功能源码以及调用部分代码

    缓冲管理

      最开始的缓冲引入是为了缓和 CPU 与 I/O 设备速度不匹配的矛盾,提高 CPU 和 I/O 设备的并行性,在现代操作系统中,几乎所有的 I/O 设备在与处理机交换数据时都用了缓冲区。缓冲管理的主要职责是组织好这些缓冲区,并提供获得和释放缓冲区的手段。
      事实上,凡在数据到达速率与其离去速率不同的地方,都可设置缓冲区,以缓和它们之间速率不匹配的矛盾。众所周知,CPU的运算速率远远高于 I/O 设备的速率,如果没有缓冲区,则在输出数据时,必然会由于打印机的速度跟不上而使 CPU 停下来等待;然而在计算阶段,打印机又空闲无事。显然,如果在打印机或控制器中设置一缓冲区,用于快速暂存程序的输出数据,以后由打印机“慢慢地”从中取出数据打印,这样,就可提高 CPU 的工作效率。类似地,在输入设备与 CPU 之间也设置缓冲区,也可使 CPU 的工作效率得以提高。
      减少对 CPU 的中断频率,放宽对 CPU 中断响应时间的限制。在远程通信系统中,如果从远地终端发来的数据仅用一位缓冲来接收,如下图(a)所示,则必须在每收到一位数据时便中断一次 CPU,这样,对于速率为 9.6 Kb/s 的数据通信来说,就意味着其中断 CPU的频率也为 9.6 Kb/s,即每 100 μs 就要中断 CPU 一次,而且 CPU 必须在 100 μs 内予以响应,否则缓冲区内的数据将被冲掉。倘若设置一个具有 8 位的缓冲(移位)寄存器,如下图(b)所示,则可使 CPU 被中断的频率降低为原来的 1/8;若再设置一个 8 位寄存器,如下图©所示,则又可把 CPU 对中断的响应时间放宽到 800 μs。
    在这里插入图片描述
      提高 CPU 和 I/O 设备之间的并行性。缓冲的引入可显著地提高 CPU 和 I/O 设备间的并行操作程度,提高系统的吞吐量和设备的利用率。例如,在 CPU 和打印机之间设置了缓冲区后,便可使 CPU 与打印机并行工作。

    单缓冲(Single Buffer)

      在单缓冲情况下,每当用户进程发出一 I/O 请求时,操作系统便在主存中为之分配一缓冲区,如下图所示。在块设备输入时,假定从磁盘把一块数据输入到缓冲区的时间为 T,操作系统将该缓冲区中的数据传送到用户区的时间为 M,而 CPU 对这一块数据处理(计算)的时间为 C。由于 T 和 C 是可以并行的(如下图),当 T>C 时,系统对每一块数据的处理时间为 M+T,反之则为 M+C,故可把系统对每一块数据的处理时间表示为Max(C,T)+M。
    在这里插入图片描述
      在字符设备输入时,缓冲区用于暂存用户输入的一行数据,在输入期间,用户进程被挂起以等待数据输入完毕;在输出时,用户进程将一行数据输入到缓冲区后,继续进行处理。当用户进程已有第二行数据输出时,如果第一行数据尚未被提取完毕,则此时用户进程应阻塞。

    双缓冲(Double Buffer)

      为了加快输入和输出速度,提高设备利用率,人们又引入了双缓冲区机制,也称为缓冲对换(Buffer Swapping)。在设备输入时,先将数据送入第一缓冲区,装满后便转向第二缓冲区。此时操作系统可以从第一缓冲区中移出数据,并送入用户进程(如下图)。接着由 CPU 对数据进行计算。在双缓冲时,系统处理一块数据的时间可以粗略地认为是Max(C,T)。如果 CT,则可使 CPU 不必等待设备输入。对于字符设备,若采用行输入方式,则采用双缓冲通常能消除用户的等待时间,即用户在输入完第一行之后,在 CPU 执行第一行中的命令时,用户可继续向第二缓冲区输入下一行数据。
    在这里插入图片描述
      如果我们在实现两台机器之间的通信时,仅为它们配置了单缓冲,如下图(a)所示,那么,它们之间在任一时刻都只能实现单方向的数据传输。例如,只允许把数据从 A 机传送到 B 机,或者从 B 机传送到 A 机,而绝不允许双方同时向对方发送数据。为了实现双向数据传输,必须在两台机器中都设置两个缓冲区,一个用作发送缓冲区,另一个用作接收缓冲区,如下图(b)所示。

    在这里插入图片描述

    循环缓冲

      当输入与输出或生产者与消费者的速度基本相匹配时,采用双缓冲能获得较好的效果,可使生产者和消费者基本上能并行操作。但若两者的速度相差甚远,双缓冲的效果则不够理想,不过可以随着缓冲区数量的增加,使情况有所改善。因此,又引入了多缓冲机制。可将多个缓冲组织成循环缓冲形式。对于用作输入的循环缓冲,通常是提供给输入进程或计算进程使用,输入进程不断向空缓冲区输入数据,而计算进程则从中提取数据进行计算。

    循环缓冲的组成

      多个缓冲区。在循环缓冲中包括多个缓冲区,其每个缓冲区的大小相同。作为输入的多缓冲区可分为三种类型:用于装输入数据的空缓冲区 R、已装满数据的缓冲区 G 以及计算进程正在使用的现行工作缓冲区 C,如下图所示。
    在这里插入图片描述
      多个指针。作为输入的缓冲区可设置三个指针:用于指示计算进程下一个可用缓冲区 G 的指针 Nextg、指示输入进程下次可用的空缓冲区 R 的指针 Nexti,以及用于指示计算进程正在使用的缓冲区 C 的指针 Current。

    循环缓冲区的使用

      计算进程和输入进程可利用下述两个过程来使用循环缓冲区。

      Getbuf 过程。当计算进程要使用缓冲区中的数据时,可调用 Getbuf 过程。该过程将由指针 Nextg 所指示的缓冲区提供给进程使用,相应地,须把它改为现行工作缓冲区,并令 Current 指针指向该缓冲区的第一个单元,同时将 Nextg 移向下一个 G 缓冲区。类似地,每当输入进程要使用空缓冲区来装入数据时,也调用 Getbuf 过程,由该过程将指针 Nexti所指示的缓冲区提供给输入进程使用,同时将 Nexti 指针移向下一个 R 缓冲区。
      Releasebuf 过程。当计算进程把 C 缓冲区中的数据提取完毕时,便调用 Releasebuf过程,将缓冲区 C 释放。此时,把该缓冲区由当前(现行)工作缓冲区 C 改为空缓冲区 R。类似地,当输入进程把缓冲区装满时,也应调用 Releasebuf 过程,将该缓冲区释放,并改为 G缓冲区。

    进程同步

      使用输入循环缓冲,可使输入进程和计算进程并行执行。相应地,指针 Nexti 和指针Nextg 将不断地沿着顺时针方向移动,这样就可能出现下述两种情况:

      Nexti 指针追赶上 Nextg 指针。这意味着输入进程输入数据的速度大于计算进程处理数据的速度,已把全部可用的空缓冲区装满,再无缓冲区可用。此时,输入进程应阻塞,直到计算进程把某个缓冲区中的数据全部提取完,使之成为空缓冲区 R,并调用 Releasebuf过程将它释放时,才将输入进程唤醒。这种情况被称为系统受计算限制。
      Nextg 指针追赶上 Nexti 指针。这意味着输入数据的速度低于计算进程处理数据的速度,使全部装有输入数据的缓冲区都被抽空,再无装有数据的缓冲区供计算进程提取数据。这时,计算进程只能阻塞,直至输入进程又装满某个缓冲区,并调用 Releasebuf 过程将它释放时,才去唤醒计算进程。这种情况被称为系统受 I/O 限制。

    缓冲池

      上述的缓冲区仅适用于某特定的 I/O 进程和计算进程,因而它们属于专用缓冲。当系统较大时,将会有许多这样的循环缓冲,这不仅要消耗大量的内存空间,而且其利用率不高。为了提高缓冲区的利用率,目前广泛流行公用缓冲池(Buffer Pool),在池中设置了多个可供若干个进程共享的缓冲区。

    缓冲池的组成

      对于既可用于输入又可用于输出的公用缓冲池,其中至少应含有以下三种类型的缓冲区:

      ① 空(闲)缓冲区;

      ② 装满输入数据的缓冲区;

      ③ 装满输出数据的缓冲区。

      为了管理上的方便,可将相同类型的缓冲区链成一个队列,于是可形成以下三个队列:

      空缓冲队列 emq。这是由空缓冲区所链成的队列。其队首指针 F(emq)和队尾指针L(emq)分别指向该队列的首缓冲区和尾缓冲区。
      输入队列 inq。这是由装满输入数据的缓冲区所链成的队列。其队首指针 F(inq)和队尾指针 L(inq)分别指向该队列的首缓冲区和尾缓冲区。
      输出队列 outq。这是由装满输出数据的缓冲区所链成的队列。其队首指针 F(outq)和队尾指针 L(outq)分别指向该队列的首缓冲区和尾缓冲区。
      除了上述三个队列外,还应具有四种工作缓冲区:① 用于收容输入数据的工作缓冲区;② 用于提取输入数据的工作缓冲区;③ 用于收容输出数据的工作缓冲区;④ 用于提取输出数据的工作缓冲区。

    Getbuf 过程和 Putbuf 过程

      在“数据结构”课程中,曾介绍过队列和对队列进行操作的两个过程,它们是:

      Addbuf(type,number)过程。该过程用于将由参数number 所指示的缓冲区B挂在type队列上。
      Takebuf(type)过程。该过程用于从 type 所指示的队列的队首摘下一个缓冲区。
      这两个过程能否用于对缓冲池中的队列进行操作呢?答案是否定的。因为缓冲池中的队列本身是临界资源,多个进程在访问一个队列时,既应互斥,又须同步。为此,需要对这两个过程加以改造,以形成可用于对缓冲池中的队列进行操作的 Getbuf 和 Putbuf 过程。

      为使诸进程能互斥地访问缓冲池队列,可为每一队列设置一个互斥信号量 MS(type)。此外,为了保证诸进程同步地使用缓冲区,又为每个缓冲队列设置了一个资源信号量RS(type)。既可实现互斥又可保证同步的 Getbuf 过程和 Putbuf 过程描述如下:
    在这里插入图片描述

    缓冲区的工作方式

      缓冲区可以工作在收容输入、提取输入、收容输出和提取输出四种工作方式下,如下图所示。
    在这里插入图片描述
      收容输入。在输入进程需要输入数据时,便调用 Getbuf(emq)过程,从空缓冲队列emq 的队首摘下一空缓冲区,把它作为收容输入工作缓冲区 hin。然后,把数据输入其中,装满后再调用 Putbuf(inq,hin)过程,将该缓冲区挂在输入队列 inq 上。
      提取输入。当计算进程需要输入数据时,调用 Getbuf(inq)过程,从输入队列 inq 的队首取得一个缓冲区,作为提取输入工作缓冲区(sin),计算进程从中提取数据。计算进程用完该数据后,再调用 Putbuf(emq,sin)过程,将该缓冲区挂到空缓冲队列 emq 上。
      收容输出。当计算进程需要输出时,调用 Getbuf(emq)过程从空缓冲队列 emq 的队首取得一个空缓冲区,作为收容输出工作缓冲区 hout。当其中装满输出数据后,又调用Putbuf(outq,hout)过程,将该缓冲区挂在 outq 末尾。
      提取输出。由输出进程调用 Getbuf(outq)过程,从输出队列的队首取得一装满输出数据的缓冲区,作为提取输出工作缓冲区 sout。在数据提取完后,再调用 Putbuf(emq,sout)过程,将该缓冲区挂在空缓冲队列末尾。

    代码

    bufferpool.cpp

    // BufferPool.cpp : 闁跨喐鏋婚幏鐑芥晸閺傘倖瀚� DLL 鎼存棃鏁撻惌顐ゎ劜閹风兘鏁撻弬銈嗗閻楋繝鏁撻弬銈嗗闁跨喐鏋婚幏鐑芥晸閺傘倖瀚归柨鐔告灮閹风兘鏁撻敓锟�
    //
    
    //#include "stdafx.h"
    #include "BufferPool.h"
    
    #include "ThreadUtils.h"
    
    
    BuffPool::BuffPool(const char *id) : EmptyBuffs(0), FullBuffs(0)
    {
    
    	last_empty = 0;
    	last_full  = 0;
    	next_full  = 0;
    	buffsz     = 0;
    	numbuf     = 0;
    	pooladdr   = 0;
    	pname      = id;
    }
    
    BuffPool::~BuffPool()
    {
    	try
    	{
    		Buffer *currp;
    	
    		EmptyPool.Lock();
    			while((currp = last_empty))
    			{last_empty = last_empty->next; delete currp;}
    			EmptyPool.UnLock();
    	
    				FullPool.Lock();
    			while((currp = next_full))
    			{next_full = next_full->next; delete currp;}
    			FullPool.UnLock();
    			free(pooladdr);
    			pooladdr = NULL;
    	}
    	
    	catch ( ... )
    	{
    	}
    }
    
    
    int BuffPool::Allocate(int buffnum, int  bsize)
    {
    	char *buffaddr = NULL;
    	int bnum = buffnum;
    
        //闁跨喐鏋婚幏宄扳偓锟�
        numempty=buffnum;
        numfull=0;
    	Buffer *new_empty;
    	buffaddr = (char*)malloc(bsize*buffnum);
    	if(!buffaddr)
    	{
    		fprintf(stderr,"malloc buffer pool failed\n");
    		return -1;
    	}
    	pooladdr = buffaddr;
    
    	EmptyPool.Lock();//闁跨喐鏋婚幏鐑芥晸閺傘倖瀚�
    
    	buffsz = bsize;//闁跨喐鏋婚幏鐑芥晸閺傘倖瀚归柨鐔告灮閹风兘鏁撻弬銈嗗闁跨噦鎷�
    	while(bnum--)
    	{
    		if (!(new_empty = new Buffer(this)))
    		{
    			fprintf(stderr,"new buffer failed\n");
    			break;
    		}
    		new_empty->data = (char *)buffaddr;
    		new_empty->next = last_empty;
    		last_empty = new_empty;
    		buffaddr += buffsz;
    		EmptyBuffs.Post();//闁跨喓绮ㄦ禍銈勭闁跨喐鏋婚幏鐑芥晸閺傘倖瀚瑰┃锟�
    	}
    	numbuf += buffnum-(bnum+1);
    	EmptyPool.UnLock();
    
    	return -(bnum+1);
    }
    
    
    Buffer *BuffPool::getEmptyBuff()
    {
    	Buffer *buffp = 0;
    
    	while(!buffp)
    	{
    		EmptyBuffs.Wait();//闁跨喖銈烘潏鐐闁跨喐鏋婚幏閿嬬爱
    
    		EmptyPool.Lock();
    		buffp = last_empty;
    		last_empty = buffp->next;
    		EmptyPool.UnLock();
    	}
    	buffp->dlen = 0;
    	return buffp;
    }
    
    void BuffPool::putEmptyBuff(Buffer *buffp)
    {
    
    	EmptyPool.Lock();
    	buffp->next = last_empty;
    	last_empty  = buffp;
    
        numempty+=1;
        numfull-=1;
        //printf("numempty is %d numfull is %d\n",numempty,numfull);
    	EmptyPool.UnLock();
    
    	EmptyBuffs.Post();//闁跨喖鍙洪崙銈嗗闁跨喐鏋婚幏閿嬬爱
    }
    
    
    Buffer *BuffPool::getFullBuff()
    {
    	Buffer *buffp = 0;
    
    	while(!buffp)
    	{
    		FullBuffs.Wait();
    
    		FullPool.Lock();
    		buffp = next_full;
    		if (!(next_full = buffp->next)) last_full = 0;
    		FullPool.UnLock();
    	}
    
    	return buffp;
    }
    
    Buffer *BuffPool::getFullBuff(HANDLE_FUNCTION handler)
    {
    	Buffer *buffp = this->getFullBuff();
    	if (buffp == NULL) 
    		return NULL;
    
    	int r = (*handler)(buffp);
    	if (r < 0)
    		fprintf(stderr,"Handler occure error,Please check it\n");
    	return buffp;
    }
    
    void BuffPool::putFullBuff(Buffer *buffp)
    {
    
    	FullPool.Lock();
    	if (last_full) 
    		last_full->next = buffp;
    	else 
    		next_full = buffp;
    	last_full = buffp;
    	buffp->next = 0;
    
        //闁跨喐鏋婚幏鐑芥晸閺傘倖瀚�
        numempty-=1;
        numfull+=1;
        //printf("numempty is %d numfull is %d\n",numempty,numfull);
    
    	FullPool.UnLock();
    
    	FullBuffs.Post();
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170

    buffpool.h

    #ifndef __BUFFPOOL_H__
    #define __BUFFPOOL_H__
    
    #include 
    #include 
    #include "ThreadUtils.h"
    
    #ifdef BUFFERPOOL_EXPORTS
    #define BUFFERPOOL_API __declspec(dllexport)
    #else
    #define BUFFERPOOL_API __declspec(dllimport)
    #endif
    
    
    
    
    class Buffer;
    
    typedef int (*HANDLE_FUNCTION)(Buffer *);
    
    class /*BUFFERPOOL_API*/ BuffPool
    {
        //自己添加代码
        //int
    public:
    	int          Allocate(int buffnum, int bsize);
    
    	int          BuffCount() {return numbuf;}
    
    	int          BufferSize() {return buffsz;}
    
    	Buffer		*getEmptyBuff();//返回空指针的地址  xin.han
    
    	void		putEmptyBuff(Buffer *buff);
    
    	Buffer		*getFullBuff();//返回最后一个有效数据的地址 xin.han
    
    	Buffer		*getFullBuff(HANDLE_FUNCTION handler);
    
    	void         putFullBuff(Buffer *buff);
    
    	BuffPool(const char *id);
    	~BuffPool();
    
        int         numempty;
        int         numfull;
    
    private:
    
    	BufferPoolMutex EmptyPool; //互斥锁
    	BufferPoolMutex FullPool;
    
    	BufferPoolSemaphore EmptyBuffs;//资源锁
    	BufferPoolSemaphore FullBuffs;
    
    	int         numbuf;
    //    int         numempty;
    //    int         numfull;
    	int         buffsz;//缓冲块的容量
    	const char *pname;
    	char *pooladdr; //内存池首地址
    
    	Buffer *next_full;
    	Buffer *last_full;
    	Buffer *last_empty;
    };
    
    
    class /*BUFFERPOOL_API*/ Buffer
    {
    public:
    	Buffer  *next;
    	int           dlen;//缓冲块中有效数据量
    	char         *data;
    
    	inline void   Recycle() {Owner->putEmptyBuff(this);}
    
    	Buffer(BuffPool *oP, char *bp=0, int data_len=0)
    		: next(0), dlen(data_len),data(bp), Owner(oP){}
    	~Buffer() {if (data) data=NULL;}
    
    private:
    	BuffPool *Owner;
    };
    #endif
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86

    threadutils.h

    #ifndef __THREADUTILS__
    #define __THREADUTILS__
    
    #include 
    #include 
    #include 
    
    #define MAX_SEM_COUNT 200
    
    
    class /*BUFFERPOOL_API*/ BufferPoolMutex
    {
        public:
    
            inline void   Lock() {pthread_mutex_lock(&ghMutex);}
    
            inline void UnLock() {pthread_mutex_unlock(&ghMutex);}
    
            BufferPoolMutex() 
            {
                pthread_mutex_init(&ghMutex,NULL);
            }
            ~BufferPoolMutex() {pthread_mutex_destroy(&ghMutex);}
    
        private:
            pthread_mutex_t ghMutex;
    };
    
    class /*BUFFERPOOL_API*/ BufferPoolSemaphore
    {
        public:
    
            inline void Post() {sem_post(&ghSemaphore);}
    
            inline void Wait() {sem_wait(&ghSemaphore);}
    
            BufferPoolSemaphore(int semval=0) 
            {
                sem_init(&ghSemaphore,0,0);
            }
            ~BufferPoolSemaphore() {sem_destroy(&ghSemaphore);}
    
        private:
            sem_t ghSemaphore;
    };
    #endif
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    调用部分

      PLATFORM_BuffPoll_Init初始化buffpool,开辟内存,析构时释放
      通过信号量来通知,有了新的数据
      线程1里源源不断编码,然后将码流先SAMPLE_PUSH_HDENCFRAME_ToBuffer保存至buffpool,比如在线播放功能,那么在线播放的信号量+1,SAMPLE_PUSH_HDENCFRAME_ToBuffer的具体操作为新建个临时buff,判断大小有没有超出初始化时设置的,接着Buffer* tmpbuff=bufpool->getEmptyBuff();,若获取到空则释放bufpool->putEmptyBuff(tmpbuff);获取成功了则把要保存的内容 memcpy(tmpbuff->data,frmaddr,frmlen);到缓冲池临时buff里,然后把临时buff丢进缓冲池bufpool->putFullBuff(tmpbuff);
      线程2里while1执行在线播放的信号量sem_wait(&sem_rtspflg);自减。新建个临时buff指针指向getFullBuff();,若获取到为空则释放h264BuffPool_rtsp->putEmptyBuff(buff);,接着发送临时buff的内容,发送完毕后释放h264BuffPool_rtsp->putEmptyBuff(buff);
      抽象出来简单来说就是个大内存,化为两部分,一个方向上empty里面拿一块放在了full部分,另一个方向就从full部分copy走在放回empty部分,使用比较简单

    /******************************************************************************
    
      Copyright (C), 2022, Sunwise Space. Co., Ltd.
    
     ******************************************************************************
      File Name     : rtsp.c
      Version       : 
      Author        : xin.han
      Created       : 2022.06.16
      Description   :
    ******************************************************************************/
    extern "C" {
    #include "rtsp.h"
    }
    #include "BufferPool.h"
    #include "utils_buf.h"
    extern sem_t sem_saveflg;
    extern sem_t sem_rtspflg;
    
    // static int flag_run = 1;
    BuffPool *h264BuffPool_rtsp=NULL;
    BuffPool *h264BuffPool_save=NULL;
    
    rtsp_demo_handle demo;
    rtsp_session_handle session[MAX_SESSION_NUM] = {NULL};
    // static uint64_t ts = 0;
    extern HI_S64 rtsp_ts;
    extern HI_BOOL saveEnable;
    // static void sig_proc(int signo)
    // {
    //     flag_run = 0;
    // }
    /* 
     *描述  :buffpool的初始化
     *参数  :NULL
     *返回值:无
     *注意  :只开辟了空间,析构的时候自动释放
     */
    HI_S32 PLATFORM_BuffPoll_Init()
    {
    	sem_init(&sem_saveflg,0,0);
    	sem_init(&sem_rtspflg,0,0);
    	h264BuffPool_rtsp=new BuffPool("h264buffer");
    	h264BuffPool_rtsp->Allocate(10,512*1024);
        if(h264BuffPool_rtsp!=NULL)
        {
            //判断创建缓冲区是否成功
            printf("allocate h264BuffPool_rtsp success!\n");
    
        }
        else
        {
            printf("allocate h264BuffPool_rtsp failed!\n");
        }
    	h264BuffPool_save=new BuffPool("h264buffer");
    	h264BuffPool_save->Allocate(10,512*1024);
        if(h264BuffPool_save!=NULL)
        {
            //判断创建缓冲区是否成功
            printf("allocate h264BuffPool_save success!\n");
    
        }
        else
        {
            printf("allocate h264BuffPool_save failed!\n");
        }
    }
    
    // static uint64_t rtsp_get_abstime (void)
    // {
    // 	struct timeval tv;
    // 	gettimeofday(&tv, NULL);
    // 	return (tv.tv_sec * 1000000ULL + tv.tv_usec);
    // }
    /* 
     *描述  :用于rtsp实时播放的线程
     *参数  :NULL
     *返回值:无
     *注意  :加载文件platform.ini  rtsp://192.168.119.164:8554/mnt/sample/venc/rtsp.264
    		  2022-10-21 新增bufferpool和信号量
     */
    
    void *video_play_rtsp_task(void* arg)
    {
    	cpu_set_t mask;//cpu核的集合
        cpu_set_t get;//获取在集合中的cpu
    
        int num = sysconf(_SC_NPROCESSORS_CONF);
        printf("frame_check_task:system has %d processor(s)\n", num);
    
        CPU_ZERO(&mask);//置空
        CPU_SET(0, &mask);//设置亲和力值
         
        if (pthread_setaffinity_np(pthread_self(), sizeof(mask), &mask) < 0)//设置线程CPU亲和力
        {
            fprintf(stderr, "set thread affinity failed\n");
        }
    
        if (pthread_getaffinity_np(pthread_self(), sizeof(get), &get) < 0)//获取线程CPU亲和力
        {
            fprintf(stderr, "get thread affinity failed\n");
        }
      	printf("rtsp ts is %lld\n", rtsp_ts);
    	demo = create_rtsp_demo(8554);//rtsp sever socket
    
    	if (NULL == demo) {
    		SAMPLE_PRT("rtsp new demo failed!\n");
    		// return 0;
    	}
    	// session[0] = rtsp_new_session(demo, "/mnt/sample/venc/rtsp.264");//对应rtsp session 
    	session[0] = create_rtsp_session(demo, "/mnt/sample/venc/RTSP/RTSP_chn0.h264");//对应rtsp session 
    		if (NULL == session[0]) {
    			printf("rtsp_new_session failed\n");  
    			// continue;
    		}
    	printf("==========> rtsp://192.168.0.164:8554/mnt/sample/venc/RTSP/RTSP_chn0.h264 <===========\n" );
    	// ts = rtsp_get_reltime();
    	// rtsp_do_event(demo);
    	// signal(SIGINT, sig_proc);
    	// sndh264flag==HI_TRUE
    	// while(sndh264flag==HI_TRUE)
    	uint64_t ts = 0;
    	Buffer* buff=NULL;
    	while(1)
    	{
    		sem_wait(&sem_rtspflg);
    		if(h264BuffPool_rtsp==NULL)
    		{
    			continue;
    		}
    		buff=h264BuffPool_rtsp->getFullBuff();
    		if(buff==NULL)
    		{
    			h264BuffPool_rtsp->putEmptyBuff(buff);
    			continue;
    		}
    		/* [DEBUG rtsp_demo.c:1145:rtsp_recv_msg] peer closed [INFO  rtsp_demo.c:428:rtsp_del_client_connection] delete client 29 from 192.168.0.73 */
    		if((buff->data[4]==0x06) )
    		{
    			h264BuffPool_rtsp->putEmptyBuff(buff);
    			
    			continue;
    		}
    
    		
    		ts = rtsp_get_reltime();//clock_gettime()
    		// ts = rtsp_get_abstime();//gettimeofday  可能大量丢画面
    		
    		rtsp_tx_video(session[0],(uint8_t*)buff->data,buff->dlen, ts);
    		rtsp_do_event(demo);
    		h264BuffPool_rtsp->putEmptyBuff(buff);
    		
    	}
        return 0;
    }
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    
    /****************************************************************************** 
     *描述  :保存码流到bufferpool
     ******************************************************************************/
    HI_VOID SAMPLE_PUSH_HDENCFRAME_ToBuffer(BuffPool	*bufpool,HI_U8* frmaddr ,HI_U32 frmlen)
    {
        
        
        //printf("hd recv frm len is %d\n",frmlen);
        
        if(bufpool==NULL)
        {
            
            printf("bufpool BuffPool is invalid && return !\n");
            return;
        }
        if(frmlen>=512*1024)
        {
            printf("frmlen %d is greater than bufferpool size\n",frmlen);
        }
        Buffer* tmpbuff=bufpool->getEmptyBuff();
        if(tmpbuff==NULL)
        {
            tmpbuff->dlen=0;
            bufpool->putEmptyBuff(tmpbuff);
        }
        else
        {
            memcpy(tmpbuff->data,frmaddr,frmlen);
            tmpbuff->dlen=frmlen;
            bufpool->putFullBuff(tmpbuff);
        }
        // if(rtspEnable)
        // {
        //     sem_post(&sem_rtspflg);
        // }
        // if(saveEnable)
        // {
        //     sem_post(&sem_saveflg);
        // }
        
        
    }
    /******************************************************************************
     *描述  :处理文件
     *参数  :pFd 文件描述符
              pstStream 帧码流类型结构体。
     *返回值:成功返回0
     *注意  :无
    ******************************************************************************/
    HI_S32 PLATFORM_VENC_HandleStream(FILE* pFd, VENC_STREAM_S* pstStream)
    {
        HI_U32 i;
    
        if(rtspEnable)
        {
            for (i = 0; i < pstStream->u32PackCount; i++)
            {
                SAMPLE_PUSH_HDENCFRAME_ToBuffer(h264BuffPool_rtsp,
                                                (uint8_t*)(pstStream->pstPack[i].pu8Addr + pstStream->pstPack[i].u32Offset),
                                                pstStream->pstPack[i].u32Len - pstStream->pstPack[i].u32Offset);
                sem_post(&sem_rtspflg);
            }
            
        }
        if(saveEnable)
        {
            for (i = 0; i < pstStream->u32PackCount; i++)
            {
                SAMPLE_PUSH_HDENCFRAME_ToBuffer(h264BuffPool_save,
                                                (uint8_t*)(pstStream->pstPack[i].pu8Addr + pstStream->pstPack[i].u32Offset),
                                                pstStream->pstPack[i].u32Len - pstStream->pstPack[i].u32Offset);
                sem_post(&sem_saveflg);
            }
            
        }
       
        
    
        return HI_SUCCESS;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
  • 相关阅读:
    C++智能指针
    简单代理模式
    Windows环境下的ELK——搭建环境(1)
    ModbusTCP服务端
    摩尔纹是什么?如何消除摩尔纹?
    Vue项目实战之电商后台管理系统(九) 项目优化部署上线
    (附源码)spring boot网上商品定制系统 毕业设计 180915
    理解Laravel中的pipeline
    ElasticSearch 批量查询
    SpringBoot学习(三)——yaml语法
  • 原文地址:https://blog.csdn.net/qq_42330920/article/details/127546279