文章目录
1.4以页为单位的大内存管理Span的定义及Spanlist定义
九、多线程并发环境下,与malloc进行申请和释放内存效率对比
当前项目是实现一个高并发的内存池,他的原型是google的一个开源项目tcmalloc,tcmalloc全称
Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存管理,用于替代系统的内存分配相关的函数(malloc、free)
所谓“池化技术”,就是程序先向系统申请过量的资源,然后自己管理,以备不时之需。之所以要申请过量的资源,是因为每次申请该资源都有较大的开销,不如提前申请好了,这样使用时就会变得非常快捷,大大提高程序运行效率。
在计算机中,有很多使用“池”这种技术的地方,除了内存池,还有连接池、线程池、对象池等。以服务器上的线程池为例,它的主要思想是:先启动若干数量的线程,让它们处于睡眠状态,当接收到客户端的请求时,唤醒池中某个睡眠的线程,让它来处理客户端的请求,当处理完这个请求,线程又进入睡眠状态。
内存池是指程序预先从操作系统申请一块足够大内存,此后,当程序中需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,而是返回内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。
内存池主要解决的就是效率的问题,它能够避免让程序频繁的向系统申请和释放内存。其次,内存池作为系统的内存分配器,还需要尝试解决内存碎片的问题。
内存碎片分为内部碎片和外部碎片:
注意: 内存池尝试解决的是外部碎片的问题,同时也尽可能的减少内部碎片的产生。
C/C++中我们要动态申请内存并不是直接去堆申请的,而是通过malloc函数去申请的,包括C++中的new实际上也是封装了malloc函数的。
我们申请内存块时是先调用malloc,malloc再去向操作系统申请内存。malloc实际就是一个内存池,malloc相当于向操作系统“批发”了一块较大的内存空间,然后“零售”给程序用,当全部“售完”或程序有大量的内存需求时,再根据实际需求向操作系统“进货”。malloc的实现方式有很多种,一般不同编译器平台用的都是不同的。比如windows的vs系列用的微软自己写的一套,linux gcc用的glibc中的ptmalloc。
作为程序员(C/C++)我们知道申请内存使用的是malloc,malloc其实就是一个通用的大众货,什么场景下都可以用,但是什么场景下都可以用就意味着什么场景下都不会有很高的性能,下面我们就先来设计一个定长内存池做个开胃菜,当然这个定长内存池在我们后面的高并发内存池中也是有价值的,所以学习他目的有两层,先熟悉一下简单内存池是如何控制的,第二他会作为我们后面内存池的一个基础组件。
如何实现定长?
在实现定长内存池时要做到“定长”有很多种方法,比如我们可以使用非类型模板参数,使得在该内存池中申请到的对象的大小都是N。
- template<size_t N>
- class ObjectPool
- {};
此外,定长内存池也叫做对象池,在创建对象池时,对象池可以根据传入的对象类型的大小来实现“定长”,因此我们可以通过使用模板参数来实现“定长”,比如创建定长内存池时传入的对象类型是int,那么该内存池就只支持4字节大小内存的申请和释放。
- template<class T>
- class ObjectPool
- {};
如何直接向堆申请空间?
既然是内存池,那么我们首先得向系统申请一块内存空间,然后对其进行管理。要想直接向堆申请内存空间,在Windows下,可以调用VirtualAlloc函数;在Linux下,可以调用brk或mmap函数。
- #ifdef _WIN32
- #include
- #else
- //...
- #endif
-
- //直接去堆上申请按页申请空间
- inline static void* SystemAlloc(size_t kpage)
- {
- #ifdef _WIN32
- void* ptr = VirtualAlloc(0, kpage<<13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
- #else
- // linux下brk mmap等
- #endif
- if (ptr == nullptr)
- throw std::bad_alloc();
- return ptr;
- }
这里我们可以通过条件编译将对应平台下向堆申请内存的函数进行封装,此后我们就不必再关心当前所在平台,当我们需要直接向堆申请内存时直接调用我们封装后的SystemAlloc函数即可。
定长内存池中应该包含哪些成员变量?
对于向堆申请到的大块内存,我们可以用一个指针来对其进行管理,但仅用一个指针肯定是不够的,我们还需要用一个变量来记录这块内存的长度。
由于此后我们需要将这块内存进行切分,为了方便切分操作,指向这块内存的指针最好是字符指针,因为指针的类型决定了指针向前或向后走一步有多大距离,对于字符指针来说,当我们需要向后移动n个字节时,直接对字符指针进行加n操作即可。
其次,释放回来的定长内存块也需要被管理,我们可以将这些释放回来的定长内存块链接成一个链表,这里我们将管理释放回来的内存块的链表叫做自由链表,为了能找到这个自由链表,我们还需要一个指向自由链表的指针。
因此,定长内存池当中包含三个成员变量:
内存池如何管理释放的对象?
对于还回来的定长内存块,我们可以用自由链表将其链接起来,但我们并不需要为其专门定义链式结构,我们可以让内存块的前4个字节(32位平台)或8个字节(64位平台)作为指针,存储后面内存块的起始地址即可。
因此在向自由链表插入被释放的内存块时,先让该内存块的前4个字节或8个字节存储自由链表中第一个内存块的地址,然后再让 _freeList 指向该内存块即可,也就是一个简单的链表头插操作。
这里有一个有趣问题:如何让一个指针在32位平台下解引用后能向后访问4个字节,在64位平台下解引用后能向后访问8个字节?
首先我们得知道,32位平台下指针的大小是4个字节,64位平台下指针的大小是8个字节。而指针指向数据的类型,决定了指针解引用后能向后访问的空间大小,因此我们这里需要的是一个指向指针的指针,这里使用二级指针就行了。
当我们需要访问一个内存块的前4 or 8个字节时,我们可以将该内存块的地址先强转为二级指针,由于二级指针存储的是一级指针的地址,二级指针解引用能向后访问一个指针的大小,因此在32位平台下访问的就是4个字节,在64位平台下访问的就是8个字节,此时我们访问到了该内存块的前4 or 8个字节。
- // 获取内存对象中存储的头4 or 8字节值,即链接的下一个对象的地址
- void*& NextObj(void* ptr)
- {
- return (*(void**)ptr);
- }
需要注意的是,在释放对象时,我们应该显示调用该对象的析构函数清理该对象,因为该对象可能还管理着其他某些资源,如果不对其进行清理那么这些资源将无法被释放,就会导致内存泄漏。
- //释放对象
- void Delete(T* obj)
- {
- //显示调用T的析构函数清理对象
- obj->~T();
-
- //将释放的对象头插到自由链表
- NextObj(obj) = _freeList;
- _freeList = obj;
- }
内存池如何为我们申请对象?
当我们申请对象时,内存池应该优先把还回来的内存块对象再次重复利用,因此如果自由链表当中有内存块的话,就直接从自由链表头删一个内存块进行返回即可。
如果自由链表当中没有内存块,那么我们就在大块内存中切出定长的内存块进行返回,当内存块切出后及时更新_memory指针的指向,以及_remainBytes的值即可。
需要特别注意的是,由于当内存块释放时我们需要将内存块链接到自由链表当中,因此我们必须保证切出来的对象至少能够存储得下一个地址,所以当对象的大小小于当前所在平台指针的大小时,需要按指针的大小进行内存块的切分。
此外,当大块内存已经不足以切分出一个对象时,我们就应该调用我们封装的SystemAlloc函数,再次向堆申请一块内存空间,此时也要注意及时更新_memory指针的指向,以及_remainBytes的值。
- // 申请内存
- T* New()
- {
- T* obj = nullptr;
-
- // 优先把还回来内存块对象,再次重复利用
- if (_freeList != nullptr)
- {
- obj = (T*)_freeList;
- _freeList = NextObj(_freeList);
- }
- else
- {
- // 剩余内存不够一个对象大小时,则重新开一大块空间
- if (_remainBytes < sizeof(T))
- {
- _remainBytes = 128 * 1024;
- _memory = (char*)SystemAlloc(_remainBytes >> 13); // _memory = (char*)malloc(_remainBytes);
-
- if (_memory == nullptr)
- {
- // 如果申请失败,就抛异常
- throw std::bad_alloc();
- }
- }
-
- obj = (T*)_memory;
- size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T); // 如果申请的空间小于一个指针的大小,就直接给一个指针的大小,这样当内存还回来的时候,就能存下一个指针了
- _memory += objSize;
- _remainBytes -= objSize;
- }
-
- // 定位new,显示调用T的构造函数的初始化
- new(obj)T;
-
- return obj;
- }
需要注意的是,与释放对象时需要显示调用该对象的析构函数一样,当内存块切分出来后,我们也应该使用定位new,显示调用该对象的构造函数对其进行初始化。
定长内存池整体代码如下:
- template<class T>
- class ObjectPool
- {
- public:
- // 申请内存
- T* New()
- {
- T* obj = nullptr;
-
- // 优先把还回来内存块对象,再次重复利用
- if (_freeList != nullptr)
- {
- obj = (T*)_freeList;
- _freeList = NextObj(_freeList);
- }
- else
- {
- // 剩余内存不够一个对象大小时,则重新开一大块空间
- if (_remainBytes < sizeof(T))
- {
- _remainBytes = 128 * 1024;
- _memory = (char*)SystemAlloc(_remainBytes >> 13); // _memory = (char*)malloc(_remainBytes);
-
- if (_memory == nullptr)
- {
- // 如果申请失败,就抛异常
- throw std::bad_alloc();
- }
- }
-
- obj = (T*)_memory;
- size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T); // 如果申请的空间小于一个指针的大小,就直接给一个指针的大小,这样当内存还回来的时候,就能存下一个指针了
- _memory += objSize;
- _remainBytes -= objSize;
- }
-
- // 定位new,显示调用T的构造函数的初始化
- new(obj)T;
-
- return obj;
- }
-
- // 还回内存
- void Delete(T* obj)
- {
- // 显示调用析构函数
- obj->~T();
-
- // 头插(这里第一次和第n次都可以用头插的方法)
- NextObj(obj) = _freeList;
- _freeList = obj;
- }
- private:
- char* _memory = nullptr; // 指向大块内存的指针
- size_t _remainBytes = 0; // 大块内存在切分过程中剩余的字节数
-
- void* _freeList = nullptr; // 还回来过程中链接的自由链表的头指针
- };
性能对比
下面我们将实现的定长内存池和malloc/free进行性能对比,测试代码如下:
- struct TreeNode
- {
- int _val;
- TreeNode* _left;
- TreeNode* _right;
- TreeNode()
- :_val(0)
- , _left(nullptr)
- , _right(nullptr)
- {}
- };
-
- void TestObjectPool()
- {
- // 申请释放的轮次
- const size_t Rounds = 3;
- // 每轮申请释放多少次
- const size_t N = 1000000;
- std::vector
v1; - v1.reserve(N);
-
- //malloc和free
- size_t begin1 = clock();
- for (size_t j = 0; j < Rounds; ++j)
- {
- for (int i = 0; i < N; ++i)
- {
- v1.push_back(new TreeNode);
- }
- for (int i = 0; i < N; ++i)
- {
- delete v1[i];
- }
- v1.clear();
- }
- size_t end1 = clock();
-
- //定长内存池
- ObjectPool
TNPool; - std::vector
v2; - v2.reserve(N);
- size_t begin2 = clock();
- for (size_t j = 0; j < Rounds; ++j)
- {
- for (int i = 0; i < N; ++i)
- {
- v2.push_back(TNPool.New());
- }
- for (int i = 0; i < N; ++i)
- {
- TNPool.Delete(v2[i]);
- }
- v2.clear();
- }
- size_t end2 = clock();
-
- cout << "new cost time:" << end1 - begin1 << endl;
- cout << "object pool cost time:" << end2 - begin2 << endl;
- }
在代码中,我们先用new申请若干个TreeNode对象,然后再用delete将这些对象再释放,通过clock函数得到整个过程消耗的时间。(new和delete底层就是封装的malloc和free)
然后再重复该过程,只不过将其中的new和delete替换为定长内存池当中的New和Delete,此时再通过clock函数得到该过程消耗的时间。
可以看到在这个过程中,定长内存池消耗的时间比malloc/free消耗的时间要短。这就是因为malloc是一个通用的内存池,而定长内存池是专门针对申请定长对象而设计的,因此在这种特殊场景下定长内存池的效率更高,正所谓“尺有所短,寸有所长”。
现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本身其实已经很优秀,那么我们项目的原型tcmalloc就是在多线程高并发的场景下更胜一筹,所以这次我们实现的内存池需要考虑以下几方面的问题。
concurrent memory pool主要由以下3个部分构成:
thread cache是哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表。每个线程都会有一个thread cache对象,这样每个线程在这里获取对象和释放对象时是无锁的。
每个线程都有一个自己独享的thread cache,那应该如何创建这个thread cache呢?我们不能将这个thread cache创建为全局的,因为全局变量是所有线程共享的,这样就不可避免的需要锁来控制,增加了控制成本和代码复杂度。
要实现每个线程无锁的访问属于自己的thread cache,我们需要用到线程局部存储TLS(Thread Local Storage),这是一种变量的存储方法,使用该存储方法的变量在它所在的线程是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性。
- //TLS - Thread Local Storage
- static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
但不是每个线程被创建时就立马有了属于自己的thread cache,而是当该线程调用相关申请内存的接口时才会创建自己的thread cache,因此在申请内存的函数中会包含以下逻辑。
- //通过TLS,每个线程无锁的获取自己专属的ThreadCache对象
- if (pTLSThreadCache == nullptr)
- {
- pTLSThreadCache = new ThreadCache;
- }
- class ThreadCache
- {
- public:
- // 申请内存对象
- void* Allocate(size_t size);
-
- // 释放内存对象
- void Deallocate(void* ptr, size_t size);
-
- // 从中心缓存获取对象
- void* FetchFromCentralCache(size_t index, size_t size);
-
- // 释放对象时,链表过长时,回收内存回到中心缓存
- void ListTooLong(FreeList& list, size_t size);
- private:
- FreeList _freeLists[NFREELIST]; // 存储自由链表的哈希表
- };
-
- // TLS thread local storage
- static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr; // 用TLS保证每个线程独享一个thread cache
-
-
-
- // 管理切分好的小对象的自由链表
- class FreeList
- {
- public:
- // 插入对象
- void Push(void* obj)
- {
- assert(obj);
-
- // 头插
- NextObj(obj) = _freeList; // 用obj头4个(或8个)字节存储下一个节点的地址
- _freeList = obj;
-
- ++_size;
- }
-
- // 给n个连续对象进行插入
- // start->node->....->node->end
- void PushRange(void* start, void* end, size_t n)
- {
- NextObj(end) = _freeList;
- _freeList = start;
-
- _size += n;
- }
-
- // 将n个连续对象进行弹出
- // start->node->....->node->end
- void PopRange(void*& start, void*& end, size_t n)
- {
- assert(n <= _size);
-
- start = _freeList;
- end = start;
-
- for (size_t i = 0; i < n - 1; ++i)
- {
- end = NextObj(end);
- }
-
- _freeList = NextObj(end);
- NextObj(end) = nullptr;
- _size -= n;
- }
-
- // 弹出对象
- void* Pop()
- {
- assert(_freeList);
-
- // 头删
- void* obj = _freeList;
- _freeList = NextObj(obj);
- --_size;
-
- return obj;
- }
-
- // 判断自由链表是否为空
- bool Empty()
- {
- return _freeList == nullptr;
- }
-
- size_t& MaxSize()
- {
- return _maxSize;
- }
-
- size_t Size()
- {
- return _size;
- }
-
- private:
- void* _freeList = nullptr;
- size_t _maxSize = 1; // 当前自由链表节点的个数上限
- size_t _size = 0; // 记录自由链表当前节点的个数
- };
thread cache支持小于等于256KB内存的申请,如果我们将每种字节数的内存块都用一个自由链表进行管理的话,那么此时我们就需要20多万个自由链表,光是存储这些自由链表的头指针就需要消耗大量内存,这显然是得不偿失的。
这时我们可以选择做一些平衡的牺牲,让这些字节数按照某种规则进行对齐。
如何进行对齐?
首先,这些内存块是会被链接到自由链表上的,因此一开始肯定是按8字节进行对齐是最合适的,因为我们必须保证这些内存块,无论是在32位平台下还是64位平台下,都至少能够存储得下一个指针。
但如果所有的字节数都按照8字节进行对齐的话,那么我们就需要建立256 × 1024 ÷ 8 = 32768个桶,这个数量还是比较多的,实际上我们可以让不同范围的字节数按照不同的对齐数进行对齐,具体对齐方式如下:
字节数 | 对齐数 | 哈希桶下标 |
[ 1,128 ] | 8 | [ 0,16 ) |
[ 128+1,1024 ] | 16 | [ 16,72 ) |
[ 1024+1,8×1024 ] | 128 | [ 72,128 ) |
[ 8×1024+1,64×1024 ] | 1024 | [ 128,184 ) |
[ 64×1024+1,256×1024 ] | 8×1024 | [ 184,208 ) |
但此时由于对齐的原因,就可能会产生一些碎片化的内存无法被利用,比如线程只申请了6字节的内存,而thread cache却直接给了8字节的内存,这多给出的2字节就无法被利用,导致了一定程度的空间浪费,这些因为某些对齐原因导致无法被利用的内存,就是内存碎片中的内部碎片。
计算空间浪费率:空间浪费率 = 浪费的字节数 ÷ 对齐后的字节数
虽然对齐产生的内碎片会引起一定程度的空间浪费,但按照上面的对齐规则,我们可以将浪费率控制到百分之十左右。需要说明的是,1~128这个区间我们不做讨论,因为1字节就算是对齐到2字节也有百分之五十的浪费率,这里我们就从第二个区间开始进行计算。
根据上面的公式,我们要得到某个区间的最大浪费率,就应该让分子取到最大,让分母取到最小。比如[ 1024+1,8×1024 ]这个区间,该区域的对齐数是128,那么最大浪费的字节数就是127,而最小对齐后的字节数就是这个区间内的前128个数所对齐到的字节数,也就是1152,那么该区间的最大浪费率也就是127 ÷ 1152 ≈ 11.02% 。同样的道理,后面两个区间的最大浪费率分别是1023 ÷ 9216 ≈ 11.10% 和 8191 ÷ 73728 ≈ 11.11%。
- // 小于等于MAX_BYTES,就找thread cache申请
- // 大于MAX_BYTES,就直接找page cache或者系统堆申请
- static const size_t MAX_BYTES = 256 * 1024; // ThreadCache最多可以分配的空间为256kb
- static const size_t NFREELIST = 208; // thread cache 和 central cache自由链表哈希桶的表大小
- static const size_t NPAGES = 129; // page cache 管理span list哈希表大小(一个page chache中一个span最大能有129页)
- static const size_t PAGE_SHIFT = 13; // 页大小转换偏移, 即一页定义为2^13,也就是8KB
-
- // 页编号类型,32位下是4byte类型,64位下是8byte类型
- #ifdef _WIN32
- typedef size_t PageID;
- #else
- typedef unsigned long long PageID;
- #endif // _WIN32
-
- // 获取内存对象中存储的头4 or 8字节值,即链接的下一个对象的地址
- inline void*& NextObj(void* obj)
- {
- return *((void**)obj);
- }
-
- // 计算对象大小的对齐映射规则
- class SizeClass
- {
- public:
- // 整体控制在最多10%左右的内碎片浪费
- // [1,128] 8byte对齐 freelist[0,16)
- // [128+1,1024] 16byte对齐 freelist[16,72)
- // [1024+1,8*1024] 128byte对齐 freelist[72,128)
- // [8*1024+1,64*1024] 1024byte对齐 freelist[128,184)
- // [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208)
-
- // 参数:对象大小bytes,对齐数AlignNum
- static inline size_t _RoundUp(size_t bytes, size_t AlignNum)
- {
- // 原理:对象为任何大小,都先加上他对应的(对齐数-1),然后再抹去多加出来的
- // 例如:bytes=7,AlignNum=8,7+(8-1)=15,然后抹去多加出来的6,也就是15-6=8
- return (bytes + AlignNum - 1) & ~(AlignNum - 1);
- }
-
- // 计算对齐后对象的大小
- static inline size_t RoundUp(size_t size)
- {
- if (size <= 128)
- {
- return _RoundUp(size, 8);
- }
- else if (size <= 1024)
- {
- return _RoundUp(size, 16);
- }
- else if (size <= 8 * 1024)
- {
- return _RoundUp(size, 128);
- }
- else if (size <= 64 * 1024)
- {
- return _RoundUp(size, 1024);
- }
- else if (size <= 256 * 1024)
- {
- return _RoundUp(size, 8 * 1024);
- }
- else
- {
- return _RoundUp(size, 1 << PAGE_SHIFT);
- }
- }
-
- // 参数:对象大小bytes,其对应的对齐数转化成2的align_shift次方
- static inline size_t _Index(size_t bytes, size_t align_shift)
- {
- return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
- }
-
- // 计算映射的哪一个自由链表桶
- static inline size_t Index(size_t bytes)
- {
- assert(bytes <= MAX_BYTES);
-
- // 每个区间有多少个链
- static int group_array[4] = { 16, 56, 56, 56 };
- if (bytes <= 128) {
- return _Index(bytes, 3);
- }
- else if (bytes <= 1024) {
- return _Index(bytes - 128, 4) + group_array[0];
- }
- else if (bytes <= 8 * 1024) {
- return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
- }
- else if (bytes <= 64 * 1024) {
- return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1]
- + group_array[0];
- }
- else if (bytes <= 256 * 1024) {
- return _Index(bytes - 64 * 1024, 13) + group_array[3] +
- group_array[2] + group_array[1] + group_array[0];
- }
- else {
- assert(false);
- }
- return -1;
- }
-
- // thread cache一次从中心缓存获取多少个size大小对象的上限数
- static size_t NumMoveSize(size_t size)
- {
- assert(size > 0);
-
- // [2, 512],一次批量移动多少个对象的(慢启动)上限值
- // 小对象一次批量上限高
- // 大对象一次批量上限低
-
- int num = MAX_BYTES / size;
- if (num < 2)
- num = 2;
-
- if (num > 512)
- num = 512;
- return num;
- }
-
- // 计算一次向系统获取几个页
- // 单个对象 8byte
- // ...
- // 单个对象 256KB
- static size_t NumMovePage(size_t size)
- {
- size_t num = NumMoveSize(size);
- size_t npage = num * size; // 先算出总的字节数
-
- npage >>= PAGE_SHIFT; // 再算出需要的页数
- if (npage == 0)
- npage = 1;
-
- return npage;
- }
- };
central cache也是一个哈希桶结构,他的哈希桶的映射关系跟thread cache是一样的。不同的是他的每个哈希桶位置挂是SpanList链表结构,不过每个映射桶下面的span中的大内存块被按映射关系切成了一个个小内存块对象挂在span的自由链表中。
central cache在加锁时并不是将整个central cache全部锁上了,central cache在加锁时用的是桶锁,也就是说每个桶都有一个锁。此时只有当多个线程同时访问central cache的同一个桶时才会存在锁竞争,如果是多个线程同时访问central cache的不同桶就不会存在锁竞争。
每个线程都有一个属于自己的thread cache,我们是用TLS来实现每个线程无锁的访问属于自己的thread cache的。而central cache和page cache在整个进程中只有一个,对于这种只能创建一个对象的类,我们可以将其设置为单例模式。
单例模式可以保证系统中该类只有一个实例,并提供一个访问它的全局访问点,该实例被所有程序模块共享。单例模式又分为饿汉模式和懒汉模式,懒汉模式相对较复杂,我们这里使用饿汉模式就足够了。
- // 设计成单例模式,因为全局只有这一个CentralCache
- class CentralCache
- {
- public:
- static CentralCache* GetInstance()
- {
- return &_sInst;
- }
-
- // 获取一个非空的span
- Span* GetOneSpan(SpanList& list, size_t size);
-
- // 从中心缓存获取一定数量的对象给thread cache
- size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);
-
- // 将一定数量的对象释放到span跨度
- void ReleaseListToSpans(void* start, size_t size);
- private:
- SpanList _spanLists[NFREELIST]; // 结构与thread cache一样
- private:
- CentralCache()
- {}
-
- CentralCache(const CentralCache&) = delete;
-
- static CentralCache _sInst;
- };
Span的结构
- // Span管理一个跨度的大块内存
- // 管理以页为单位的大块内存
- // 管理多个连续页大块内存跨度结构
- struct Span
- {
- PAGE_ID _pageId = 0; // 大块内存起始页的页号(在32位下,4g内存被分成2^19个页,假如一个页8k)
- size_t _n = 0; // 页的数量
-
- Span* _next = nullptr; // 双向链表的结构
- Span* _prev = nullptr;
-
- size_t _objSize = 0; // 切好的小对象的大小
- size_t _useCount = 0; // 切好小块内存,被分配个thread cache的计数
- void* _freeList = nullptr; // 切好的小块内存的自由链表
-
- bool _isUse = false; // 当前span是否在被使用
- };
介绍Span的页号(_pageId)
对于span管理的以页为单位的大块内存,我们需要知道这块内存具体在哪一个位置,便于之后page cache进行前后页的合并,因此span结构当中会记录所管理大块内存起始页的页号。
至于每一个span管理的到底是多少个页,这并不是固定的,需要根据多方面的因素来控制,因此span结构当中有一个_n成员,该成员就代表着该span管理的页的数量。
此外,每个span管理的大块内存,都会被切成相应大小的内存块挂到当前span的自由链表中,比如8Byte哈希桶中的span,会被切成一个个8Byte大小的内存块挂到当前span的自由链表中,因此span结构中需要存储切好的小块内存的自由链表。
页号的类型是什么?
每个程序运行起来后都有自己的进程地址空间,在32位平台下,进程地址空间的大小是2^32;而在64位平台下,进程地址空间的大小就是2^64。
页的大小一般是4K或者8K,我们以8K为例。在32位平台下,进程地址空间就可以被分成2^32 ÷ 2^13 = 2^19个页;在64位平台下,进程地址空间就可以被分成2^64 ÷ 2^13 = 2^51个页。页号本质与地址是一样的,它们都是一个编号,只不过地址是以一个字节为一个单位,而页是以多个字节为一个单位。
由于页号在64位平台下的取值范围是 [0, 2^51],因此我们不能简单的用一个无符号整型来存储页号,这时我们需要借助条件编译来解决这个问题。
- #ifdef _WIN64
- typedef unsigned long long PAGE_ID;
- #elif _WIN32
- typedef size_t PAGE_ID;
- #else
- //linux
- #endif
注意:在32位下,_WIN32有定义,_WIN64没有定义;而在64位下,_WIN32和_WIN64都有定义。因此在条件编译时,我们应该先判断_WIN64是否有定义,再判断_WIN32是否有定义。
介绍Span的_useCount
span结构当中的_useCount成员记录的就是,当前span中切好的小块内存,被分配给thread cache的计数,当某个span的_useCount计数变为0时,代表当前span切出去的内存块对象全部还回来了,此时central cache就可以将这个span再还给page cache。
双链表结构的Spanlist
每个桶当中的span是以双链表的形式组织起来的,当我们需要将某个span归还给page cache时,就可以很方便的将该span从双链表结构中移出。如果用单链表结构的话就比较麻烦了,因为单链表在删除时,需要知道当前结点的前一个结点。
- // 带头双向循环链表
- class SpanList
- {
- public:
- SpanList()
- {
- _head = new Span;
- _head->_next = _head;
- _head->_prev = _head;
- }
-
- Span* Begin()
- {
- return _head->_next;
- }
-
- Span* End()
- {
- return _head;
- }
-
- bool Empty()
- {
- return _head->_next == _head;
- }
-
- void PushFront(Span* span)
- {
- Insert(Begin(), span);
- }
-
- Span* PopFront()
- {
- Span* front = _head->_next;
- Erase(front);
- return front;
- }
-
- // 在pos前面插入newSpan
- void Insert(Span* pos, Span* newSpan)
- {
- assert(pos);
- assert(newSpan);
-
- Span* prev = pos->_prev;
-
- // prev <-> newSpan <-> pos
- prev->_next = newSpan;
- newSpan->_prev = prev;
- newSpan->_next = pos;
- pos->_prev = newSpan;
- }
-
- void Erase(Span* pos)
- {
- assert(pos);
- assert(pos != _head);
-
- Span* prev = pos->_prev;
- Span* next = pos->_next;
-
- prev->_next = next;
- next->_prev = prev;
- }
- private:
- Span* _head;
- public:
- std::mutex _mtx; // 桶锁
- };
注意:从双链表删除的span会还给下一层的page cache,相当于只是把这个span从双链表中移除,因此不需要对删除的span进行delete操作。
当thread cache向central cache申请内存时,central cache应该给出多少个对象呢?这是一个值得思考的问题,如果central cache给的太少,那么thread cache在短时间内用完了又会来申请;但如果一次性给的太多了,可能thread cache用不完也就浪费了。
鉴于此,我们这里采用了一个慢开始反馈调节算法。当thread cache向central cache申请内存时,如果申请的是较小的对象,那么可以多给一点,但如果申请的是较大的对象,就可以少给一点。
通过下面这个函数,我们就可以根据所需申请的对象的大小计算出具体给出的对象个数,并且可以将给出的对象个数控制到2~512个之间。也就是说,就算thread cache要申请的对象再小,我最多一次性给出512个对象;就算thread cache要申请的对象再大,我至少一次性给出2个对象。
- //管理对齐和映射等关系
- class SizeClass
- {
- public:
- // thread cache一次从中心缓存获取多少个size大小对象的上限数
- static size_t NumMoveSize(size_t size)
- {
- assert(size > 0);
-
- // [2, 512],一次批量移动多少个对象的(慢启动)上限值
- // 小对象一次批量上限高
- // 大对象一次批量上限低
-
- int num = MAX_BYTES / size;
- if (num < 2)
- num = 2;
-
- if (num > 512)
- num = 512;
- return num;
- }
- };
但就算申请的是小对象,一次性给出512个也是比较多的,基于这个原因,我们可以在FreeList结构中增加一个叫做_maxSize的成员变量,该变量的初始值设置为1,并且提供一个公有成员函数用于获取这个变量。也就是说,现在thread cache中的每个自由链表都会有一个自己的_maxSize。
- //管理切分好的小对象的自由链表
- class FreeList
- {
- public:
- size_t& MaxSize()
- {
- return _maxSize;
- }
-
- private:
- void* _freeList = nullptr; //自由链表
- size_t _maxSize = 1;
- };
此时当thread cache申请对象时,我们会比较_maxSize和计算得出的值,取出其中的较小值作为本次申请对象的个数。此外,如果本次采用的是_maxSize的值,那么还会将thread cache中该自由链表的_maxSize的值进行加一。
因此,thread cache第一次向central cache申请某大小的对象时,申请到的都是一个,但下一次thread cache再向central cache申请同样大小的对象时,因为该自由链表中的_maxSize增加了,最终就会申请到两个。直到该自由链表中_maxSize的值,增长到超过计算出的值后就不会继续增长了,此后申请到的对象个数就是计算出的个数。
- // 1.page cache是一个以页为单位的span自由链表
- // 2.为了保证全局只有唯一的page cache,这个类被设计成了单例模式
- class PageCache
- {
- public:
- static PageCache* GetInstance()
- {
- return &_sInst;
- }
-
- // 获取从对象到span的映射
- Span* MapObjectToSpan(void* obj);
-
- // 释放空闲span回到Pagecache,并合并相邻的span
- void ReleaseSpanToPageCache(Span* span);
-
- // 获取一个k页的span
- Span* NewSpan(size_t k);
-
- std::mutex _pageMtx; // 全局大锁(这里与central cache不一样)
- private:
- SpanList _spanLists[NPAGES];
- ObjectPool _spanPool; // 用自己写的方法申请定长span
-
- std::unordered_map
_idSpanMap; // 页号和span的映射 -
- PageCache()
- {}
- PageCache(const PageCache&) = delete;
-
- static PageCache _sInst;
- };
ConcurrentAlloc函数
在将thread cache、central cache以及page cache的申请流程写通了之后,我们就可以向外提供一个ConcurrentAlloc函数,用于申请内存块。每个线程第一次调用该函数时会通过TLS获取到自己专属的thread cache对象,然后每个线程就可以通过自己对应的thread cache申请对象了。
- static void* ConcurrentAlloc(size_t size)
- {
- //通过TLS,每个线程无锁的获取自己专属的ThreadCache对象
- if (pTLSThreadCache == nullptr)
- {
- pTLSThreadCache = new ThreadCache;
- }
- return pTLSThreadCache->Allocate(size);
- }
注意:编译时会出现的问题,在C++的algorithm头文件中有一个min函数,这是一个函数模板,而在Windows.h头文件中也有一个min,这是一个宏。由于调用函数模板时需要进行参数类型的推演,因此当我们调用min函数时,编译器会优先匹配Windows.h当中以宏的形式实现的min,此时当我们以std::min的形式调用min函数时就会产生报错,这就是没有用命名空间进行封装的坏处,这时我们只能选择将std::去掉,让编译器调用Windows.h当中的min。
大于256KB的大块内存申请问题
之前说到,每个线程的thread cache是用于申请小于等于256KB的内存的,而对于大于256KB的内存,我们可以考虑直接向page cache申请,但page cache中最大的页也就只有128页,因此如果是大于128页的内存申请,就只能直接向堆申请了。
申请内存的大小 | 申请方式 |
x <= 256KB(32页) | 向thread cache申请 |
32页 < x <= 128页 | 向page cache申请 |
x > 128页 | 向堆申请 |
当申请的内存大于256KB时,虽然不是从thread cache进行获取,但在分配内存时也是需要进行向上对齐的,对于大于256KB的内存我们可以直接按页进行对齐。 |
当申请对象的大小大于256KB时,就不用向thread cache申请了,这时先计算出按页对齐后实际需要申请的页数,然后通过调用NewSpan申请指定页数的span即可。
- static void* ConcurrentAlloc(size_t size)
- {
- if (size > MAX_BYTES) //大于256KB的内存申请
- {
- //计算出对齐后需要申请的页数
- size_t alignSize = SizeClass::RoundUp(size);
- size_t kPage = alignSize >> PAGE_SHIFT;
-
- //向page cache申请kPage页的span
- PageCache::GetInstance()->_pageMtx.lock();
- Span* span = PageCache::GetInstance()->NewSpan(kPage);
- PageCache::GetInstance()->_pageMtx.unlock();
-
- void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
- return ptr;
- }
- else
- {
- //通过TLS,每个线程无锁的获取自己专属的ThreadCache对象
- if (pTLSThreadCache == nullptr)
- {
- pTLSThreadCache = new ThreadCache;
- }
- //cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;
-
- return pTLSThreadCache->Allocate(size);
- }
- }
当某个线程申请的对象不用了,可以将其释放给thread cache,然后thread cache将该对象插入到对应哈希桶的自由链表当中即可。
但是随着线程不断的释放,对应自由链表的长度也会越来越长,这些内存堆积在一个thread cache中就是一种浪费,我们应该将这些内存还给central cache,这样一来,这些内存对其他线程来说也是可申请的,因此当thread cache某个桶当中的自由链表太长时我们可以进行一些处理。
如果thread cache某个桶当中自由链表的长度超过它一次批量向central cache申请的对象个数,那么此时我们就要把该自由链表当中的这些对象还给central cache。
- void ThreadCache::Deallocate(void* ptr, size_t size)
- {
- assert(ptr);
- assert(size <= MAX_BYTES);
-
- // 找对应映射的自由链表桶,将对象插入进去
- size_t index = SizeClass::Index(size);
- _freeLists[index].Push(ptr);
-
- // 当链表长度大于一次批量申请的内存时就开始还一段list给central cache
- if (_freeLists[index].Size() >= _freeLists[index].MaxSize())
- {
- ListTooLong(_freeLists[index], size);
- }
- }
当自由链表的长度大于一次批量申请的对象时,我们具体的做法就是,从该自由链表中取出一次批量个数的对象,然后将取出的这些对象还给central cache中对应的span即可。
- // 释放对象时,链表过长时,回收内存回到中心缓存
- void ThreadCache::ListTooLong(FreeList& list, size_t size)
- {
- void* start = nullptr;
- void* end = nullptr;
-
- //从list中取出一次批量个数的对象
- list.PopRange(start, end, list.MaxSize());
-
- //将取出的对象还给central cache中对应的span
- CentralCache::GetInstance()->ReleaseListToSpans(start, size);
- }
从上述代码可以看出,FreeList类需要支持用Size函数获取自由链表中对象的个数,还需要支持用PopRange函数从自由链表中取出指定个数的对象。因此我们需要给FreeList类增加一个对应的PopRange函数,然后再增加一个_size成员变量,该成员变量用于记录当前自由链表中对象的个数,当我们向自由链表插入或删除对象时,都应该更新_size的值。
- // 管理切分好的小对象的自由链表
- class FreeList
- {
- private:
- void* _freeList = nullptr;
- size_t _maxSize = 1; // 当前自由链表节点的个数上限
- size_t _size = 0; // 记录自由链表当前节点的个数
- };
当thread cache中某个自由链表太长时,会将自由链表当中的这些对象还给central cache中的span。
但是需要注意的是,还给central cache的这些对象不一定都是属于同一个span的。central cache中的每个哈希桶当中可能都不止一个span,因此当我们计算出还回来的对象应该还给central cache的哪一个桶后,还需要知道这些对象到底应该还给这个桶当中的哪一个span。
如何根据对象的地址得到对象所在的页号?
首先我们必须理解的是,某个页当中的所有地址除以页的大小都等该页的页号。比如我们这里假设一页的大小是100,那么地址0~99都属于第0页,它们除以100都等于0,而地址100~199都属于第1页,它们除以100都等于1。
如何找到一个对象对应的span?
虽然我们现在可以通过对象的地址得到其所在的页号,但是我们还是不能知道这个对象到底属于哪一个span。因为一个span管理的可能是多个页。
为了解决这个问题,我们可以建立页号和span之间的映射。由于这个映射关系在page cache进行span的合并时也需要用到,因此我们直接将其存放到page cache里面。
- //单例模式
- class PageCache
- {
- public:
- //获取从对象到span的映射
- Span* MapObjectToSpan(void* obj);
- private:
- std::unordered_map
_idSpanMap; // 页号和span的映射 - };
每当page cache分配span给central cache时,都需要记录一下页号和span之间的映射关系。此后当thread cache还对象给central cache时,才知道应该具体还给哪一个span。
因此当central cache在调用NewSpan接口向page cache申请k页的span时,page cache在返回这个k页的span给central cache之前,应该建立这k个页号与该span之间的映射关系。
- // 获取一个k页的span
- Span* PageCache::NewSpan(size_t k)
- {
- assert(k > 0);
-
- // 大于128页的直接向堆申请
- if (k > NPAGES - 1)
- {
- void* ptr = SystemAlloc(k);
- Span* span = _spanPool.New(); //Span* span = new Span;
- span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
- span->_n = k;
-
- _idSpanMap[span->_pageId] = span;
-
- return span;
- }
-
- // 先检查第k个桶里面有没有span
- if (!_spanLists[k].Empty())
- {
- Span* kSpan = _spanLists[k].PopFront();
-
- // 建立id和span的映射,方便central cache回收小块内存时,查找对应的span
- for (PAGE_ID i = 0; i < kSpan->_n; ++i)
- {
- _idSpanMap[kSpan->_pageId + i] = kSpan;
- }
-
- return kSpan;
- }
-
- // 检查一下后面的桶里面有没有span,如果有,可以把它进行切分(将n页的span切分成1个k页的span和1个n-k页的span)
- for (size_t i = k + 1; i < NPAGES; ++i)
- {
- if (!_spanLists[i].Empty())
- {
- Span* nSpan = _spanLists[i].PopFront();
- Span* kSpan = _spanPool.New(); // Span* kSpan = new Span;
-
- // 在nSpan的头部切一个k页下来
- // k页的span返回
- // nSpan再挂到对应新的映射位置
- kSpan->_pageId = nSpan->_pageId;
- kSpan->_n = k;
-
- nSpan->_pageId += k;
- nSpan->_n -= k;
-
- _spanLists[nSpan->_n].PushFront(nSpan); // 最后将剩下的n-k页的span挂到它对应的位置
- // 存储nSpan的首尾页号跟nSpan映射,方便page cache回收内存时进行的合并查找
- //(这里不用存储所有的页号,因为nspan不用切分成小块的thread cache)
- _idSpanMap[nSpan->_pageId] = nSpan;
- _idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;
-
- // 建立id和span的映射,方便central cache回收小块内存时,查找对应的span
- for (PAGE_ID i = 0; i < kSpan->_n; ++i)
- {
- _idSpanMap[kSpan->_pageId + i] = kSpan;
- }
-
- return kSpan;
- }
- }
-
- // 走到这个位置就说明后面没有大页的span了
- // 这时候就取找堆要一个128页的span
- Span* bigSpan = _spanPool.New(); // Span* bigSpan = new Span;
- void* ptr = SystemAlloc(NPAGES - 1);
- bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
- bigSpan->_n = NPAGES - 1;
-
- _spanLists[bigSpan->_n].PushFront(bigSpan);
-
- return NewSpan(k); // 申请完span后再重新调用一次这个函数进行切分,而且还避免了代码重复
- }
此时我们就可以通过对象的地址找到该对象对应的span了,直接将该对象的地址除以页的大小得到页号,然后在unordered_map当中找到其对应的span即可。
- //获取从对象到span的映射
- Span* PageCache::MapObjectToSpan(void* obj)
- {
- PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT; // 先用对象地址算出该对象是第几页
- auto ret = _idSpanMap.find(id); // 再用这页的页号找到对应是哪一个span
- if (ret != _idSpanMap.end())
- {
- return ret->second;
- }
- else
- {
- assert(false);
- return nullptr;
- }
- }
注意:当我们要通过某个页号查找其对应的span时,该页号与其span之间的映射一定是建立过的,如果此时我们没有在unordered_map当中找到,则说明我们之前的代码逻辑有问题,因此当没有找到对应的span时可以直接用断言结束程序,以表明程序逻辑出错。
central cache回收内存
这时当thread cache还对象给central cache时,就可以依次遍历这些对象,将这些对象插入到其对应span的自由链表当中,并且及时更新该span的_usseCount计数即可。
在thread cache还对象给central cache的过程中,如果central cache中某个span的_useCount减到0时,说明这个span分配出去的对象全部都还回来了,那么此时就可以将这个span再进一步还给page cache。
- // 将一定数量的对象释放到span跨度
- void CentralCache::ReleaseListToSpans(void* start, size_t size)
- {
- size_t index = SizeClass::Index(size);
- _spanLists[index]._mtx.lock();
-
- while (start)
- {
- void* next = NextObj(start);
-
- Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
- NextObj(start) = span->_freeList;
- span->_freeList = start;
- span->_useCount--;
-
- if (span->_useCount == 0)
- {
- // 说明span切分出去的所有小内存都回来了
- // 这个span就可以再回收给page cache,page cache可以再尝试去做前后页的合并
- _spanLists[index].Erase(span);
- span->_freeList = nullptr;
- span->_next = nullptr;
- span->_prev = nullptr;
-
- // 释放span给page cache时,使用page cache的锁就可以了
- // 这时把桶锁解掉,因为这个span已经被erase了,其他线程就申请不到这个span了。这样就不会影响其他span的申请与释放
- _spanLists[index]._mtx.unlock();
-
- PageCache::GetInstance()->_pageMtx.lock();
- PageCache::GetInstance()->ReleaseSpanToPageCache(span);
- PageCache::GetInstance()->_pageMtx.unlock();
-
- _spanLists[index]._mtx.lock();
- }
-
- start = next;
- }
-
- _spanLists[index]._mtx.unlock();
- }
注意:如果要把某个span还给page cache,我们需要先将这个span从central cache对应的双链表中移除,然后再将该span的自由链表置空,因为page cache中的span是不需要切分成一个个的小对象的,以及该span的前后指针也都应该置空,因为之后要将其插入到page cache对应的双链表中。但span当中记录的起始页号以及它管理的页数是不能清除的,否则对应内存块就找不到了。
并且在central cache还span给page cache时也存在锁的问题,此时需要先将central cache中对应的桶锁解掉,然后再加上page cache的大锁之后才能进入page cache进行相关操作,当处理完毕回到central cache时,除了将page cache的大锁解掉,还需要立刻获得central cache对应的桶锁,然后将还未还完对象继续还给central cache中对应的span。
如果central cache中有某个span的_useCount减到0了,那么central cache就需要将这个span还给page cache了。
这个过程看似是非常简单的,page cache只需将还回来的span挂到对应的哈希桶上就行了。但实际为了缓解内存碎片的问题,page cache还需要尝试将还回来的span与其他空闲的span进行合并。
page cache进行前后页的合并
合并的过程可以分为向前合并和向后合并。如果还回来的span的起始页号是num,该span所管理的页数是n。那么在向前合并时,就需要判断第num-1页对应span是否空闲,如果空闲则可以将其进行合并,并且合并后还需要继续向前尝试进行合并,直到不能进行合并为止。而在向后合并时,就需要判断第num+n页对应的span是否空闲,如果空闲则可以将其进行合并,并且合并后还需要继续向后尝试进行合并,直到不能进行合并为止。
因此page cache在合并span时,是需要通过页号获取到对应的span的,这就是我们要把页号与span之间的映射关系存储到page cache的原因。
但需要注意的是,当我们通过页号找到其对应的span时,这个span此时可能挂在page cache,也可能挂在central cache。而在合并时我们只能合并挂在page cache的span,因为挂在central cache的span当中的对象正在被其他线程使用。
可是我们不能通过span结构当中的_useCount成员,来判断某个span到底是在central cache还是在page cache。因为当central cache刚向page cache申请到一个span时,这个span的_useCount就是等于0的,这时可能当我们正在对该span进行切分的时候,page cache就把这个span拿去进行合并了,这显然是不合理的。
鉴于此,我们可以在span结构中再增加一个_isUse成员,用于标记这个span是否正在被使用,而当一个span结构被创建时我们默认该span是没有被使用的。
- //管理以页为单位的大块内存
- struct Span
- {
- PAGE_ID _pageId = 0; //大块内存起始页的页号
- size_t _n = 0; //页的数量
-
- Span* _next = nullptr; //双链表结构
- Span* _prev = nullptr;
-
- size_t _useCount = 0; //切好的小块内存,被分配给thread cache的计数
- void* _freeList = nullptr; //切好的小块内存的自由链表
-
- bool _isUse = false; //是否在被使用
- };
因此当central cache向page cache申请到一个span时,需要立即将该span的_isUse改为true。
span->_isUse = true;
而当central cache将某个span还给page cache时,也就需要将该span的_isUse改成false。
span->_isUse = false;
由于在合并page cache当中的span时,需要通过页号找到其对应的span,而一个span是在被分配给central cache时,才建立的各个页号与span之间的映射关系,因此page cache当中的span也需要建立页号与span之间的映射关系。
与central cache中的span不同的是,在page cache中,只需建立一个span的首尾页号与该span之间的映射关系。因为当一个span在尝试进行合并时,如果是往前合并,那么只需要通过一个span的尾页找到这个span,如果是向后合并,那么只需要通过一个span的首页找到这个span。也就是说,在进行合并时我们只需要用到span与其首尾页之间的映射关系就够了。
因此当我们申请k页的span时,如果是将n页的span切成了一个k页的span和一个n-k页的span,我们除了需要建立k页span中每个页与该span之间的映射关系之外,还需要建立剩下的n-k页的span与其首尾页之间的映射关系。
- // 获取一个k页的span
- Span* PageCache::NewSpan(size_t k)
- {
- assert(k > 0);
-
- // 大于128页的直接向堆申请
- if (k > NPAGES - 1)
- {
- void* ptr = SystemAlloc(k);
- Span* span = _spanPool.New(); //Span* span = new Span;
- span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
- span->_n = k;
-
- _idSpanMap[span->_pageId] = span;
-
- return span;
- }
-
- // 先检查第k个桶里面有没有span
- if (!_spanLists[k].Empty())
- {
- Span* kSpan = _spanLists[k].PopFront();
-
- // 建立id和span的映射,方便central cache回收小块内存时,查找对应的span
- for (PAGE_ID i = 0; i < kSpan->_n; ++i)
- {
- _idSpanMap[kSpan->_pageId + i] = kSpan;
- }
-
- return kSpan;
- }
-
- // 检查一下后面的桶里面有没有span,如果有,可以把它进行切分(将n页的span切分成1个k页的span和1个n-k页的span)
- for (size_t i = k + 1; i < NPAGES; ++i)
- {
- if (!_spanLists[i].Empty())
- {
- Span* nSpan = _spanLists[i].PopFront();
- Span* kSpan = _spanPool.New(); // Span* kSpan = new Span;
-
- // 在nSpan的头部切一个k页下来
- // k页的span返回
- // nSpan再挂到对应新的映射位置
- kSpan->_pageId = nSpan->_pageId;
- kSpan->_n = k;
-
- nSpan->_pageId += k;
- nSpan->_n -= k;
-
- _spanLists[nSpan->_n].PushFront(nSpan); // 最后将剩下的n-k页的span挂到它对应的位置
- // 存储nSpan的首尾页号跟nSpan映射,方便page cache回收内存时进行的合并查找
- //(这里不用存储所有的页号,因为nspan不用切分成小块的thread cache)
- _idSpanMap[nSpan->_pageId] = nSpan;
- _idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;
-
- // 建立id和span的映射,方便central cache回收小块内存时,查找对应的span
- for (PAGE_ID i = 0; i < kSpan->_n; ++i)
- {
- _idSpanMap[kSpan->_pageId + i] = kSpan;
- }
-
- return kSpan;
- }
- }
-
- // 走到这个位置就说明后面没有大页的span了
- // 这时候就取找堆要一个128页的span
- Span* bigSpan = _spanPool.New(); // Span* bigSpan = new Span;
- void* ptr = SystemAlloc(NPAGES - 1);
- bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
- bigSpan->_n = NPAGES - 1;
-
- _spanLists[bigSpan->_n].PushFront(bigSpan);
-
- return NewSpan(k); // 申请完span后再重新调用一次这个函数进行切分,而且还避免了代码重复
- }
此时page cache当中的span就都与其首尾页之间建立了映射关系,现在我们就可以进行span的合并了,其合并逻辑如下:
- //释放空闲的span回到PageCache,并合并相邻的span
- void PageCache::ReleaseSpanToPageCache(Span* span)
- {
- //对span的前后页,尝试进行合并,缓解内存碎片问题
- //1、向前合并
- while (1)
- {
- PAGE_ID prevId = span->_pageId - 1;
- auto ret = _idSpanMap.find(prevId);
- //前面的页号没有(还未向系统申请),停止向前合并
- if (ret == _idSpanMap.end())
- {
- break;
- }
- //前面的页号对应的span正在被使用,停止向前合并
- Span* prevSpan = ret->second;
- if (prevSpan->_isUse == true)
- {
- break;
- }
- //合并出超过128页的span无法进行管理,停止向前合并
- if (prevSpan->_n + span->_n > NPAGES - 1)
- {
- break;
- }
- //进行向前合并
- span->_pageId = prevSpan->_pageId;
- span->_n += prevSpan->_n;
-
- //将prevSpan从对应的双链表中移除
- _spanLists[prevSpan->_n].Erase(prevSpan);
-
- delete prevSpan;
- }
- //2、向后合并
- while (1)
- {
- PAGE_ID nextId = span->_pageId + span->_n;
- auto ret = _idSpanMap.find(nextId);
- //后面的页号没有(还未向系统申请),停止向后合并
- if (ret == _idSpanMap.end())
- {
- break;
- }
- //后面的页号对应的span正在被使用,停止向后合并
- Span* nextSpan = ret->second;
- if (nextSpan->_isUse == true)
- {
- break;
- }
- //合并出超过128页的span无法进行管理,停止向后合并
- if (nextSpan->_n + span->_n > NPAGES - 1)
- {
- break;
- }
- //进行向后合并
- span->_n += nextSpan->_n;
-
- //将nextSpan从对应的双链表中移除
- _spanLists[nextSpan->_n].Erase(nextSpan);
-
- delete nextSpan;
- }
- //将合并后的span挂到对应的双链表当中
- _spanLists[span->_n].PushFront(span);
- //建立该span与其首尾页的映射
- _idSpanMap[span->_pageId] = span;
- _idSpanMap[span->_pageId + span->_n - 1] = span;
- //将该span设置为未被使用的状态
- span->_isUse = false;
- }
需要注意的是,在向前或向后进行合并的过程中:
在合并span时,由于这个span是在page cache的某个哈希桶的双链表当中的,因此在合并后需要将其从对应的双链表中移除,然后再将这个被合并了的span结构进行delete。
除此之外,在合并结束后,除了将合并后的span挂到page cache对应哈希桶的双链表当中,还需要建立该span与其首位页之间的映射关系,便于此后合并出更大的span。
ConcurrentFree函数
至此我们将thread cache、central cache以及page cache的释放流程也都写完了,此时我们就可以向外提供一个ConcurrentFree函数,用于释放内存块,释放内存块时每个线程通过自己的thread cache对象,调用thread cache中释放内存对象的接口即可。
当释放对象时,我们需要判断释放对象的大小:
申请内存的大小 | 申请方式 |
x <= 256KB(32页) | 释放给thread cache |
32页 < x <= 128页 | 释放给page cache |
x > 128页 | 释放给堆 |
因此当释放对象时,我们需要先找到该对象对应的span,但是在释放对象时我们只知道该对象的起始地址。这也就是我们在申请大于256KB的内存时,也要给申请到的内存建立span结构,并建立起始页号与该span之间的映射关系的原因。此时我们就可以通过释放对象的起始地址计算出起始页号,进而通过页号找到该对象对应的span。
- // 内存释放
- static void ConcurrentFree(void* ptr)
- {
- Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
- size_t size = span->_objSize;
-
- if (size > MAX_BYTES)
- {
- PageCache::GetInstance()->_pageMtx.lock();
- PageCache::GetInstance()->ReleaseSpanToPageCache(span);
- PageCache::GetInstance()->_pageMtx.unlock();
- }
- else
- {
- assert(pTLSThreadCache);
-
- pTLSThreadCache->Deallocate(ptr, size);
- }
- }
因此page cache在回收span时也需要进行判断,如果该span的大小是小于等于128页的,那么直接还给page cache进行了,page cache会尝试对其进行合并。而如果该span的大小是大于128页的,那么说明该span是直接向堆申请的,我们直接将这块内存释放给堆,然后将这个span结构进行delete就行了。
- //释放空闲的span回到PageCache,并合并相邻的span
- void PageCache::ReleaseSpanToPageCache(Span* span)
- {
- if (span->_n > NPAGES - 1) //大于128页直接释放给堆
- {
- void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
- SystemFree(ptr);
- delete span;
- return;
- }
- //对span的前后页,尝试进行合并,缓解内存碎片问题
- //1、向前合并
- while (1)
- {
- PAGE_ID prevId = span->_pageId - 1;
- auto ret = _idSpanMap.find(prevId);
- //前面的页号没有(还未向系统申请),停止向前合并
- if (ret == _idSpanMap.end())
- {
- break;
- }
- //前面的页号对应的span正在被使用,停止向前合并
- Span* prevSpan = ret->second;
- if (prevSpan->_isUse == true)
- {
- break;
- }
- //合并出超过128页的span无法进行管理,停止向前合并
- if (prevSpan->_n + span->_n > NPAGES - 1)
- {
- break;
- }
- //进行向前合并
- span->_pageId = prevSpan->_pageId;
- span->_n += prevSpan->_n;
-
- //将prevSpan从对应的双链表中移除
- _spanLists[prevSpan->_n].Erase(prevSpan);
-
- delete prevSpan;
- }
- //2、向后合并
- while (1)
- {
- PAGE_ID nextId = span->_pageId + span->_n;
- auto ret = _idSpanMap.find(nextId);
- //后面的页号没有(还未向系统申请),停止向后合并
- if (ret == _idSpanMap.end())
- {
- break;
- }
- //后面的页号对应的span正在被使用,停止向后合并
- Span* nextSpan = ret->second;
- if (nextSpan->_isUse == true)
- {
- break;
- }
- //合并出超过128页的span无法进行管理,停止向后合并
- if (nextSpan->_n + span->_n > NPAGES - 1)
- {
- break;
- }
- //进行向后合并
- span->_n += nextSpan->_n;
-
- //将nextSpan从对应的双链表中移除
- _spanLists[nextSpan->_n].Erase(nextSpan);
-
- delete nextSpan;
- }
- //将合并后的span挂到对应的双链表当中
- _spanLists[span->_n].PushFront(span);
- //建立该span与其首尾页的映射
- _idSpanMap[span->_pageId] = span;
- _idSpanMap[span->_pageId + span->_n - 1] = span;
- //将该span设置为未被使用的状态
- span->_isUse = false;
- }
tcmalloc是要在高并发场景下替代malloc进行内存申请的,因此tcmalloc在实现的时,其内部是不能调用malloc函数的,我们当前的代码中存在通过new获取到的内存,而new在底层实际上就是封装了malloc。
为了完全脱离掉malloc函数,此时我们之前实现的定长内存池就起作用了,代码中使用new时基本都是为Span结构的对象申请空间,而span对象基本都是在page cache层创建的,因此我们可以在PageCache类当中定义一个_spanPool,用于span对象的申请和释放。
- //单例模式
- class PageCache
- {
- public:
- //...
- private:
- ObjectPool _spanPool;
- };
然后将代码中使用new的地方替换为调用定长内存池当中的New函数,将代码中使用delete的地方替换为调用定长内存池当中的Delete函数。
- //申请span对象
- Span* span = _spanPool.New();
- //释放span对象
- _spanPool.Delete(span);
注意:当使用定长内存池当中的New函数申请Span对象时,New函数通过定位new也是对Span对象进行了初始化的。
此外,每个线程第一次申请内存时都会创建其专属的thread cache,而这个thread cache目前也是new出来的,我们也需要对其进行替换。
- //通过TLS,每个线程无锁的获取自己专属的ThreadCache对象
- if (pTLSThreadCache == nullptr)
- {
- static std::mutex tcMtx;
- static ObjectPool
tcPool; - tcMtx.lock();
- pTLSThreadCache = tcPool.New();
- tcMtx.unlock();
- }
这里我们将用于申请ThreadCache类对象的定长内存池定义为静态的,保持全局只有一个,让所有线程创建自己的thread cache时,都在个定长内存池中申请内存就行了。
但注意在从该定长内存池中申请内存时需要加锁,防止多个线程同时申请自己的ThreadCache对象而导致线程安全问题。
最后在SpanList的构造函数中也用到了new,因为SpanList是带头循环双向链表,所以在构造期间我们需要申请一个span对象作为双链表的头结点。
- //带头双向循环链表
- class SpanList
- {
- public:
- SpanList()
- {
- _head = _spanPool.New();
- _head->_next = _head;
- _head->_prev = _head;
- }
- private:
- Span* _head;
- static ObjectPool _spanPool;
- };
由于每个span双链表只需要一个头结点,因此将这个定长内存池定义为静态的,保持全局只有一个,让所有span双链表在申请头结点时,都在一个定长内存池中申请内存就行了。
- // 用自己写的ConcurrentAlloc与malloc进行性能对比
- // 参数:
- // ntimes:一轮申请和释放内存的次数
- // nworks:创建的线程数
- // rounds:轮次
- void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
- {
- std::vector
vthread(nworks) ; - std::atomic<size_t> malloc_costtime = 0;
- std::atomic<size_t> free_costtime = 0;
- for (size_t k = 0; k < nworks; ++k)
- {
- vthread[k] = std::thread([&, k]() {
- std::vector<void*> v;
- v.reserve(ntimes);
- for (size_t j = 0; j < rounds; ++j)
- {
- size_t begin1 = clock();
- for (size_t i = 0; i < ntimes; i++)
- {
- //v.push_back(malloc(16));
- v.push_back(malloc((16 + i) % 8192 + 1));
- }
- size_t end1 = clock();
- size_t begin2 = clock();
- for (size_t i = 0; i < ntimes; i++)
- {
- free(v[i]);
- }
- size_t end2 = clock();
- v.clear();
- malloc_costtime += (end1 - begin1);
- free_costtime += (end2 - begin2);
- }
- });
- }
- for (auto& t : vthread)
- {
- t.join();
- }
- printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",
- nworks, rounds, ntimes, (const size_t)malloc_costtime);
- printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",
- nworks, rounds, ntimes, (const size_t)free_costtime);
- printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",
- nworks, nworks * rounds * ntimes, (const size_t)malloc_costtime + (const size_t)free_costtime);
- }
-
- // 单轮次申请释放次数 线程数 轮次
- void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
- {
- std::vector
vthread(nworks) ; - std::atomic<size_t> malloc_costtime = 0;
- std::atomic<size_t> free_costtime = 0;
- for (size_t k = 0; k < nworks; ++k)
- {
- vthread[k] = std::thread([&]() {
- std::vector<void*> v;
- v.reserve(ntimes);
- for (size_t j = 0; j < rounds; ++j)
- {
- size_t begin1 = clock();
- for (size_t i = 0; i < ntimes; i++)
- {
- //v.push_back(ConcurrentAlloc(16));
- v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
- }
- size_t end1 = clock();
- size_t begin2 = clock();
- for (size_t i = 0; i < ntimes; i++)
- {
- ConcurrentFree(v[i]);
- }
- size_t end2 = clock();
- v.clear();
- malloc_costtime += (end1 - begin1);
- free_costtime += (end2 - begin2);
- }
- });
- }
- for (auto& t : vthread)
- {
- t.join();
- }
- printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",
- nworks, rounds, ntimes, (const size_t)malloc_costtime);
- printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",
- nworks, rounds, ntimes, (const size_t)free_costtime);
- printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",
- nworks, nworks * rounds * ntimes, (const size_t)malloc_costtime + (const size_t)free_costtime);
- }
- int main()
- {
- size_t n = 10000;
- cout << "==========================================================" <<
- endl;
- BenchmarkConcurrentMalloc(n, 4, 10);
- cout << endl << endl;
- BenchmarkMalloc(n, 4, 10);
- cout << "==========================================================" <<
- endl;
- return 0;
- }
其中测试函数各个参数的含义如下:
在测试函数中,我们通过clock函数分别获取到每轮次申请和释放所花费的时间,然后将其对应累加到malloc_costtime和free_costtime上。最后我们就得到了,nworks个线程跑rounds轮,每轮申请和释放ntimes次,这个过程申请所消耗的时间、释放所消耗的时间、申请和释放总共消耗的时间。
注意,我们创建线程时让线程执行的是lambda表达式,而我们这里在使用lambda表达式时,以值传递的方式捕捉了变量k,以引用传递的方式捕捉了其他父作用域中的变量,因此我们可以将各个线程消耗的时间累加到一起。
我们将所有线程申请内存消耗的时间都累加到malloc_costtime上, 将释放内存消耗的时间都累加到free_costtime上,此时malloc_costtime和free_costtime可能被多个线程同时进行累加操作的,所以存在线程安全的问题。鉴于此,我们在定义这两个变量时使用了atomic类模板,这时对它们的操作就是原子操作了。
固定大小内存的申请和释放
我们先来测试一下固定大小内存的申请和释放:
- v.push_back(malloc(16));
- v.push_back(ConcurrentAlloc(16));
温馨提示:这里测试时可以在Release下进行测试,会比在Debug下运行速度快很多。
此时4个线程执行10轮操作,每轮申请释放10000次,总共申请释放了40万次,运行后可以看到,malloc的效率还是更高的。
由于此时我们申请释放的都是固定大小的对象,每个线程申请释放时访问的都是各自thread cache的同一个桶,当thread cache的这个桶中没有对象或对象太多要归还时,也都会访问central cache的同一个桶。此时central cache中的桶锁就不起作用了,因为我们让central cache使用桶锁的目的就是为了,让多个thread cache可以同时访问central cache的不同桶,而此时每个thread cache访问的却都是central cache中的同一个桶。
不同大小内存的申请和释放
下面我们再来测试一下不同大小内存的申请和释放:
- v.push_back(malloc((16 + i) % 8192 + 1));
- v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
运行后可以看到,由于申请和释放内存的大小是不同的,此时central cache当中的桶锁就起作用了,ConcurrentAlloc的效率也有了较大增长,但相比malloc来说还是差一点点。
经过前面的测试可以看到,我们的代码此时与malloc之间还是有差距的,此时我们就应该分析分析我们当前项目的瓶颈在哪里,但这不能简单的凭感觉,我们应该用性能分析的工具来进行分析。
VS编译器下性能分析的操作步骤
VS编译器中就带有性能分析的工具的,我们可以依次点击“调试→性能和诊断”进行性能分析,注意该操作要在Debug模式下进行。
同时我们将代码中n的值由10000调成了1000,否则该分析过程可能会花费较多时间,并且将malloc的测试代码进行了屏蔽,因为我们要分析的是我们实现的高并发内存池。
- int main()
- {
- size_t n = 1000;
- cout << "==========================================================" <<
- endl;
- BenchmarkConcurrentMalloc(n, 4, 10);
- cout << endl << endl;
- //BenchmarkMalloc(n, 4, 10);
- cout << "==========================================================" <<
- endl;
- return 0;
- }
在点击了“调试→性能和诊断”→“检测”,因为我们要分析的是各个函数的用时时间,然后我们直接点击“开始”就可以进行了。
然后我们就只用等待分析结果就可以了。
分析性能瓶颈
通过分析结果可以看到,光是Deallocate和MapObjectToSpan这两个函数就占用了一半多的时间。
而在Deallocate函数中,调用ListTooLong函数时消耗的时间是最多的。
继续往下看,在ListTooLong函数中,调用ReleaseListToSpans函数时消耗的时间是最多的。
再进一步看,在ReleaseListToSpans函数中,调用MapObjectToSpan函数时消耗的时间是最多的。
也就是说,最终消耗时间最多的实际就是MapObjectToSpan函数,我们这时再来看看为什么调用MapObjectToSpan函数会消耗这么多时间。通过观察我们最终发现,调用该函数时会消耗这么多时间就是因为锁的原因。
因此当前项目的瓶颈点就在锁竞争上面,需要解决调用MapObjectToSpan函数访问映射关系时的加锁问题。tcmalloc当中针对这一点使用了基数树进行优化,使得在读取这个映射关系时可以做到不加锁。
为了解决PageCache访问页号与Span*的映射加锁耗费太多的时间问题,引出了基数树这种映射方法。
直接定址法的基数树:
一层基数树映射保存页号和Span*
- // Single-level array
- template <int BITS> // 这里的BITS表示存储页号需要多少位,在32位下为32-PAGE_SHIFT,64位下为64-PAGE_SHIFT
- class TCMalloc_PageMap1 {
- private:
- static const int LENGTH = 1 << BITS;
- void** array_; // 一个数组,用于存储span*,数组的下标就是页号
- };
假设一页8K(2 ^ 13byte),在32位平台下,2 ^ 32 / 2 ^ 13= 2 ^ 19,所以需要19位可以表示所有的页,BITS=19
如图数组大小仅为2MB所以对于32位可以直接将映射数组开辟出来。
注意:如果是64位机器,不能之一这种方式,因为要开辟的数组太大。这个时候需要高层映射(基数树)
二层基数树
假设在32位下,拿到一个页号,因为页号最大到19位,所以前面13位都是0可以不用管。
给一个页号,先分离出后19为[A],通过[A]前5位来找在第一层位置,再通过后14位来找第二层数组的位置。
- // Two-level radix tree
- template <int BITS>
- class TCMalloc_PageMap2 {
- private:
- // Put 32 entries in the root and (2^BITS)/32 entries in each leaf.
- static const int ROOT_BITS = 5;
- static const int ROOT_LENGTH = 1 << ROOT_BITS; //第一层数组的大小
-
- static const int LEAF_BITS = BITS - ROOT_BITS;
- static const int LEAF_LENGTH = 1 << LEAF_BITS; //第二层数组的大小
-
- // Leaf node
- struct Leaf {
- void* values[LEAF_LENGTH];
- };
-
- Leaf* root_[ROOT_LENGTH]; //第一层数组的每个数组元素是一个数组
- };
三层基数树
64位采用为了减少开辟数组的大小,选用三层的基数树。需要访问那个位置,再开辟数组,可以避免开始就开辟特别大的数组。
同理三层模型与二层基数树类型,这里不在赘述。
- // Three-level radix tree
- template <int BITS>
- class TCMalloc_PageMap3 {
- private:
- // How many bits should we consume at each interior level
- static const int INTERIOR_BITS = (BITS + 2) / 3; // Round-up
- static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;
-
- // How many bits should we consume at leaf level
- static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;
- static const int LEAF_LENGTH = 1 << LEAF_BITS;
-
- // Interior node
- struct Node {
- Node* ptrs[INTERIOR_LENGTH];
- };
-
- // Leaf node
- struct Leaf {
- void* values[LEAF_LENGTH];
- };
-
- Node* root_; // Root of radix tree
- };
一二三层基数树C++实现
- // Single-level array
- template <int BITS> // 这里的BITS表示存储页号需要多少位,在32位下为32-PAGE_SHIFT,64位下为64-PAGE_SHIFT
- class TCMalloc_PageMap1 {
- private:
- static const int LENGTH = 1 << BITS;
- void** array_; // 一个数组,用于存储span*,数组的下标就是页号
-
- public:
- typedef uintptr_t Number;
-
- //explicit TCMalloc_PageMap1(void* (*allocator)(size_t)) {
- explicit TCMalloc_PageMap1() {
- //array_ = reinterpret_cast
((*allocator)(sizeof(void*) << BITS)); - size_t size = sizeof(void*) << BITS;
- size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT);
- array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);
- memset(array_, 0, sizeof(void*) << BITS);
- }
-
- // Return the current value for KEY. Returns NULL if not yet set,
- // or if k is out of range.
- void* get(Number k) const {
- if ((k >> BITS) > 0) {
- return NULL;
- }
- return array_[k];
- }
-
- // REQUIRES "k" is in range "[0,2^BITS-1]".
- // REQUIRES "k" has been ensured before.
- //
- // Sets the value 'v' for key 'k'.
- void set(Number k, void* v) {
- array_[k] = v;
- }
- };
-
- // Two-level radix tree
- template <int BITS>
- class TCMalloc_PageMap2 {
- private:
- // Put 32 entries in the root and (2^BITS)/32 entries in each leaf.
- static const int ROOT_BITS = 5;
- static const int ROOT_LENGTH = 1 << ROOT_BITS;
-
- static const int LEAF_BITS = BITS - ROOT_BITS;
- static const int LEAF_LENGTH = 1 << LEAF_BITS;
-
- // Leaf node
- struct Leaf {
- void* values[LEAF_LENGTH];
- };
-
- Leaf* root_[ROOT_LENGTH]; // Pointers to 32 child nodes
- void* (*allocator_)(size_t); // Memory allocator
-
- public:
- typedef uintptr_t Number;
-
- //explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {
- explicit TCMalloc_PageMap2() {
- //allocator_ = allocator;
- memset(root_, 0, sizeof(root_));
-
- PreallocateMoreMemory();
- }
-
- void* get(Number k) const {
- const Number i1 = k >> LEAF_BITS;
- const Number i2 = k & (LEAF_LENGTH - 1);
- if ((k >> BITS) > 0 || root_[i1] == NULL) {
- return NULL;
- }
- return root_[i1]->values[i2];
- }
-
- void set(Number k, void* v) {
- const Number i1 = k >> LEAF_BITS;
- const Number i2 = k & (LEAF_LENGTH - 1);
- ASSERT(i1 < ROOT_LENGTH);
- root_[i1]->values[i2] = v;
- }
-
- bool Ensure(Number start, size_t n) {
- for (Number key = start; key <= start + n - 1;) {
- const Number i1 = key >> LEAF_BITS;
-
- // Check for overflow
- if (i1 >= ROOT_LENGTH)
- return false;
-
- // Make 2nd level node if necessary
- if (root_[i1] == NULL) {
- //Leaf* leaf = reinterpret_cast
((*allocator_)(sizeof(Leaf))); - //if (leaf == NULL) return false;
- static ObjectPool
leafPool; - Leaf* leaf = (Leaf*)leafPool.New();
-
- memset(leaf, 0, sizeof(*leaf));
- root_[i1] = leaf;
- }
-
- // Advance key past whatever is covered by this leaf node
- key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
- }
- return true;
- }
-
- void PreallocateMoreMemory() {
- // Allocate enough to keep track of all possible pages
- Ensure(0, 1 << BITS);
- }
- };
-
- // Three-level radix tree
- template <int BITS>
- class TCMalloc_PageMap3 {
- private:
- // How many bits should we consume at each interior level
- static const int INTERIOR_BITS = (BITS + 2) / 3; // Round-up
- static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;
-
- // How many bits should we consume at leaf level
- static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;
- static const int LEAF_LENGTH = 1 << LEAF_BITS;
-
- // Interior node
- struct Node {
- Node* ptrs[INTERIOR_LENGTH];
- };
-
- // Leaf node
- struct Leaf {
- void* values[LEAF_LENGTH];
- };
-
- Node* root_; // Root of radix tree
- void* (*allocator_)(size_t); // Memory allocator
-
- Node* NewNode() {
- Node* result = reinterpret_cast
((*allocator_)(sizeof(Node))); - if (result != NULL) {
- memset(result, 0, sizeof(*result));
- }
- return result;
- }
-
- public:
- typedef uintptr_t Number;
-
- explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) {
- allocator_ = allocator;
- root_ = NewNode();
- }
-
- void* get(Number k) const {
- const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
- const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
- const Number i3 = k & (LEAF_LENGTH - 1);
- if ((k >> BITS) > 0 ||
- root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) {
- return NULL;
- }
- return reinterpret_cast
(root_->ptrs[i1]->ptrs[i2])->values[i3]; - }
-
- void set(Number k, void* v) {
- ASSERT(k >> BITS == 0);
- const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
- const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
- const Number i3 = k & (LEAF_LENGTH - 1);
- reinterpret_cast
(root_->ptrs[i1]->ptrs[i2])->values[i3] = v; - }
-
- bool Ensure(Number start, size_t n) {
- for (Number key = start; key <= start + n - 1;) {
- const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);
- const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
-
- // Check for overflow
- if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)
- return false;
-
- // Make 2nd level node if necessary
- if (root_->ptrs[i1] == NULL) {
- Node* n = NewNode();
- if (n == NULL) return false;
- root_->ptrs[i1] = n;
- }
-
- // Make leaf node if necessary
- if (root_->ptrs[i1]->ptrs[i2] == NULL) {
- Leaf* leaf = reinterpret_cast
((*allocator_)(sizeof(Leaf))); - if (leaf == NULL) return false;
- memset(leaf, 0, sizeof(*leaf));
- root_->ptrs[i1]->ptrs[i2] = reinterpret_cast
(leaf); - }
-
- // Advance key past whatever is covered by this leaf node
- key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
- }
- return true;
- }
-
- void PreallocateMoreMemory() {
- }
- };
注意:这里要使用一层基数树只能在32位下运行,64位需要三层基数树。
VS2022默认64位,如果你是VS2022下测试注意改为32位:
现在我们用基数树对代码进行优化,此时将PageCache类当中的unorder_map用基数树进行替换即可,由于当前是32位平台,因此这里随便用几层基数树都可以。
- //单例模式
- class PageCache
- {
- public:
- //...
- private:
- //std::unordered_map
_idSpanMap; - TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap;
- };
此时当我们需要建立页号与span的映射时,就调用基数树当中的set函数。
_idSpanMap.set(span->_pageId, span);
而当我们需要读取某一页号对应的span时,就调用基数树当中的get函数。
Span* ret = (Span*)_idSpanMap.get(id);
并且现在PageCache类向外提供的,用于读取映射关系的MapObjectToSpan函数内部就不需要加锁了。
- //获取从对象到span的映射
- Span* PageCache::MapObjectToSpan(void* obj)
- {
- PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT; //页号
- Span* ret = (Span*)_idSpanMap.get(id);
- assert(ret != nullptr);
- return ret;
- }
当某个线程在读取映射关系时,可能另外一个线程正在建立其他页号的映射关系,而此时无论我们用的是C++当中的map还是unordered_map,在读取映射关系时都是需要加锁的。
因为C++中map的底层数据结构是红黑树,unordered_map的底层数据结构是哈希表,而无论是红黑树还是哈希表,当我们在插入数据时其底层的结构都有可能会发生变化。比如红黑树在插入数据时可能会引起树的旋转,而哈希表在插入数据时可能会引起哈希表扩容。此时要避免出现数据不一致的问题,就不能让插入操作和读取操作同时进行,因此我们在读取映射关系的时候是需要加锁的。
而对于基数树来说就不一样了,基数树的空间一旦开辟好了就不会发生变化,因此无论什么时候去读取某个页的映射,都是对应在一个固定的位置进行读取的。并且我们不会同时对同一个页进行读取映射和建立映射的操作,因为我们只有在释放对象时才需要读取映射,而建立映射的操作都是在page cache进行的。也就是说,读取映射时读取的都是对应span的_useCount不等于0的页,而建立映射时建立的都是对应span的_useCount等于0的页,所以说我们不会同时对同一个页进行读取映射和建立映射的操作。
还是同样的代码,只不过我们用基数树对代码进行了优化,这时测试固定大小内存的申请和释放的结果如下:
可以看到,这时就算申请释放的是固定大小的对象,其效率都是malloc的两倍。下面在申请释放不同大小的对象时,由于central cache的桶锁起作用了,其效率更是变成了malloc的好几倍。
实际Google开源的tcmalloc是会直接用于替换malloc的,不同平台替换的方式不同。比如基于Unix的系统上的glibc,使用了weak alias的方式替换;而对于某些其他平台,需要使用hook的钩子技术来做。
对于我们当前实现的项目,可以考虑将其打包成静态库或动态库。我们先右击解决方案资源管理器当中的项目名称,然后选择属性。
此时会弹出该选项卡,按照以下图示就可以选择将其打包成静态库或动态库了。
项目到这里就已经圆满结束了,但是由于篇幅已经很长了,项目中有些地方只给了文字说明与框架,具体的完整实现代码我会放在Gitee上供大家参考。项目源码https://gitee.com/what-you-want-a/high-concurrency-memory-pool