• c++千万数据级别正确使用无锁队列,避免内存撕碎(二)


    第一篇文章
    这里逐渐深入,先讲一下协议,很多概念必须讲清楚

    七层协议

    我们使用的是网络传输,千万级别的数据实际上并不多,对于队列来说,每个队列的数据块大小定义为MTU大小,如1400字节,实际上传输层定义在操作系统里面,我们无法更改,无论是linux还是windows,都需要经过内核态和用户态,假定我们有100路视频, 每路视频为2M 码流, 一分钟有多少数据呢, 2 * 1024*1024 / 8 * 60 * 100 = 1572864000 bytes = 1500 G
    对于MTU 包个数来说, 是 1500G / 1400 = 1G 个队列包, 那每秒钟就是 1024 * 1024 / 60 = 17476 个队列包, 每路有 17476 / 100 = 175 个 MTU包,对于带宽来说,上传就需要200Mbit的带宽,所以这里千万数据级别,说的就是1分钟MTU的包个数。

    会话层来说,如果传输层协议使用tcp, 100个上传就需要100个会话,到应用层的时候需要发送的RTP协议实际上每个包都会加上至少12个字节的包头。这些都是开销,选择RTMP协议和RTSP,SIP等等,各自都有开销,但是RTSP和SIP 传输层都可以是 RTP协议 ,也都可以选择UDP和TCP,选择UDP意味着更低的开销,TCP 意味着丢包会少。
    在这里插入图片描述

    应用方

    主要应用在视频传输方面,下面画出主要的视频传输,无论使用什么协议,我们先关注在RTP协议上面,RTP协议首先为UDP,RTP over UDP协议我们主要关注在他的包数量和缓冲的内存数量上,我们这样来计算,对于2Mbit 的视频传输来说, 我们有如下算法
    MTU = 1400
    2M 约为 210241024 /8 /1400 = 187

    也就是如果需要缓存1秒数据,至少需要187 个缓存单元,有了如上计算,我们就可以假定一个视频为2M的单元来说,我们缓存200 个单元,每个单元大小为1400字节。
    在这里插入图片描述

    数据量极大

    大数据量莫过于视频,尤其是RTP协议中的包,1是多,每个RTP协议包数据量不会超过MTU,最大传输单元,

    #include 
    #include 
    #include 
    enum
    {
        en_emptying = 0,
        en_writing,
        en_canreadings
    };
    struct smem
    {
        char v_status;
        char v_i_flag;
        char r1;
        char r2;
        int v_len = 0;
        uint32_t v_pts = 0;
        uint8_t* v_data = NULL;
    };
    
    typedef std::shared_ptr<smem> ptr_smem;
    
    struct s_mem_pool
    {
        //qianbo :just one thread read,one thread write ,otherwise error occur
        uint8_t* v_data = NULL;
        uint8_t* v_end = NULL;
        int v_len = 0;;
        uint8_t* v_write = NULL;
        uint8_t* v_read = NULL;
    
        std::mutex v_mux;
        std::atomic_int v_framenum;
        std::queue<ptr_smem>v_i;
        void init_mem(int memlen)
        {
            if (v_data != NULL)
            {
                if (v_len < memlen)
                {
                    free(v_data);
                    v_data = NULL;
                }
            }
    
    
            v_len = memlen;
            if (v_data == NULL)
                v_data = (uint8_t*)malloc(memlen);
    
            v_end = v_data + memlen;
            v_write = v_data;
            v_read = v_data;
            v_framenum = 0;
            v_mux.lock();
            while (!v_i.empty())
            {
                v_i.pop();
            }
            v_mux.unlock();
    
        }
    
        ~s_mem_pool()
        {
            clearqueue();
            if (v_data != NULL)
                free(v_data);
        }
    
    
        bool push(uint8_t* data, int len, uint32_t pts, bool i_flag)
        {
    #define EX_LEN (sizeof(char)*4 +sizeof(int)+sizeof(uint32_t))
    #define NEEDLEN (len)
    
    #define WRITE_INFO \
            ptr_smem mem = std::make_shared<smem>();\
            mem->v_data = v_write;\
            mem->v_len = len;\
            mem->v_pts = pts;\
            mem->v_i_flag = i_flag; \
            mem->v_status = en_writing;\
            v_mux.lock();\
            v_i.emplace(mem);\
            v_mux.unlock();
    
    #define WRITE_INFO2 \
        uint8_t *m = v_write; \
        *(char*)(m) = (char)en_writing; \
        m +=1; \
        *(char*)(m) = (char)i_flag;\
        m +=3; \
        *(int*)m = len; \
        m +=4; \
        *(uint32_t*)m = pts; \
        m+= 4;\
        v_write = m;
    
    
            if (v_write >= v_read)
            {
                if ((v_end - v_write) > (long)(NEEDLEN))
                {
    
                    memcpy(v_write, data, len);
                    WRITE_INFO
                        v_write += (NEEDLEN);
                    v_framenum++;
                    return true;
                }
                else // the left mem is not enough
                {
                    //qInfo()<<"left mem is not enough";
                    if ((v_read - v_data) > (long)(NEEDLEN))
                    {
                        v_write = v_data;
                        memcpy(v_write, data, len);
                        WRITE_INFO
                            v_write += NEEDLEN;
                        v_framenum++;
                        return true; //from the head
                    }
                    else
                    {
                        std::cout << "not enough memory 0 :num" << v_framenum;
                        return false;
                    }
                }
            }
            else //read>write
            {
                if ((v_read - v_write) > (long)(NEEDLEN))
                {
                    memcpy(v_write, data, len);
                    WRITE_INFO
                        v_write += (NEEDLEN);
                    v_framenum++;
                    //IncNumber();
                    return true;
                }
                std::cout << "not enough memory 1 clear the buffer the len is " << NEEDLEN;
    
                return false;
            }
        }
        ptr_smem get()
        {
            ptr_smem sm = nullptr;
            v_mux.lock();
            if (!v_i.empty())
            {
                sm = v_i.front();
                v_i.pop();
                v_read = sm->v_data;
            }
            v_mux.unlock();
            v_framenum--;
            return sm;
        }
            int size()
            {
                return v_framenum;
            }
            void clearqueue()
            {
                v_mux.lock();
                if (!v_i.empty())
                {
                    v_i.pop();
                }
                v_mux.unlock();
            }
        };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174

    去除拷贝,去除锁

    还是要去除拷贝的,否则,随着数据量越来越大,拷贝消耗的cpu也就客观了,所以修改成为以下方式:

    struct s_mem_pool_fix
    {
        ~s_mem_pool_fix()
        {
            if (v_data != NULL)
                free(v_data);
        }
        uint8_t* v_data = NULL;
        uint8_t* v_end = NULL;
        uint8_t* v_write = NULL;
        uint8_t* v_preparewrite = NULL;
        uint8_t* v_read = NULL;
        void init_mem(int memlen, int totalnum)
        {
            v_data = (uint8_t*)malloc(1500 * 1000);
            //v_write = v_data;
            v_preparewrite = v_data;
            v_end = v_data + 1500 * 1000;
        }
        uint8_t* get_mem()
        {
            v_write = v_preparewrite;
            v_preparewrite += 1500;
            if (v_preparewrite == v_end)
                v_preparewrite = v_data;
            return v_write;
        }
    
    
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    去除锁,使用无锁队列,意味着用一个线程写,一个线程读,使用状态码来防止越界,这个将会在下一篇文章里面写,上一篇文章有着简单的测试。

    main 测试

    这个后面再讲了
    int main()
    {
    std::cout << “Hello World!\n”;
    }

  • 相关阅读:
    shell 入门第5课 函数
    1 FPGA ZYBO Xilinx 按键控制LED灯 key_led
    TorchScript学习使用
    小白快速自建博客--halo博客
    【IoT-卫朋】智能硬件 | 产品按键设计
    jvm调优 和实际案例
    【计算机视觉 | 目标检测】目标检测常用数据集及其介绍(十五)
    uniapp:谷歌地图,实现地图展示,搜索功能,H5导航
    新营销模式之分享购营销模式~你见过这种营销模式吗?
    人工智能生成内容AIGC:AIGC for Various Data Modalities: A Survey
  • 原文地址:https://blog.csdn.net/qianbo042311/article/details/126496773