在前文中,我们用多线程编写了简单的线程安全的栈和队列,因为这两种经典的数据结构接口足够简单,实现起来相对容易。
接下来,我们试图设计一个基于有限桶的哈希表,并将其设计为适应并发环境的数据结构。
其实C++ 是有自己的 hashmap 的,也就是 std::unordered_map 只不过并非适用于并发,当然如果改造只是粗暴的加一个锁,那并不是真正的并发数据结构,所以,需要重新设计。
hashmap 是一个经典的数据结构,当 hash 函数和桶的数量足够多的情况,可以期许 O(1) 效率的查找,如此恐怖的性能如果加之以并发的支持,近乎可以碾压一切无需顺序查找的查询表。
下面我们就看看具体实现细节。
hash 函数,可以用标准库函数,当然,涉及标准库不可支持的 key 类型时,我们还需自己设计一个分散性足够好并且运算性能同样好的 hash 函数。
由除留余法的 hash 函数进行数据分散,我们用 std::list 结构去实现 “桶”,处置可能出现的不同 key 值的相同 hash 定位问题。
而并发的关键,是桶的设计,也就是我们的并发颗粒度,可以到达桶的水平,当桶的数量和 hash 函数足够的合适,也就是可以保证作为桶的拉链(list)足够小,我们的并发颗粒度就越能趋近于单个元素(key - value 对)的水平。
查找表一般读多写少,所以我们需要用到 std::shared_lock
#include
#include
#include
#include
#include
namespace threadSafe
{
template <typename Key, typename Value, typename Hash>
struct bucketType;
template <typename Key, typename Value, typename Hash = std::hash<Key>>
struct threadSafeLookupTable
{
using keyType = Key;
using mappedType = Value;
using hashType = Hash;
// 构造函数,需选用足够大的桶数量,以及合适的 hash 函数
explicit threadSafeLookupTable(const unsigned numBuckets = 19,
const Hash &Hasher = Hash())
: buckets(numBuckets)
, hasher(Hasher)
{
for (unsigned i = 0; i < numBuckets; ++i)
{
buckets[i].reset(new bucketType<Key, Value, Hash>);
}
}
// 不可拷贝构造
threadSafeLookupTable(const threadSafeLookupTable &other) = delete;
// 不可拷贝赋值
auto operator=(const threadSafeLookupTable &other)
-> threadSafeLookupTable & = delete;
// 查找 key 对应的 value 值,如果没有,返回 defaultValue
[[nodiscard]] auto valueFor(const Key &key,
const Value &defaultValue = Value()) const
-> Value
{
return getBucket(key).valueFor(key, defaultValue);
}
// 无 key 则增加 key value 键值对,有 key 更新其对应的 value
void addOrUpdateMapping(const Key &key, const Value &value)
{
getBucket(key).addOrUpdateMapping(key, value);
}
// 删除 key 对应的键值对
void removeMapping(const Key &key)
{
getBucket(key).removeMapping(key);
}
private:
// 数组,其元素为智能指针,智能指针封装着哈希表的桶
std::vector<std::unique_ptr<bucketType<Key, Value, Hash>>> buckets;
// 哈希类,计算哈希值的仿函数类
Hash hasher;
// 根据 key 值,获取哈希表的桶
[[nodiscard]] auto getBucket(const Key &key) const
-> bucketType<Key, Value, Hash> &
{
const std::size_t bucketIndex = hasher(key) % buckets.size();
return *buckets[bucketIndex];
}
};
template <typename Key, typename Value, typename Hash = std::hash<Key>>
struct bucketType
{
// 若键值对链表中找到了 key 则返回其所对应的 value
// 若没有找到 key 则返回默认值 defaultValue
auto valueFor(const Key &key, const Value &defaultValue) const -> Value
{
std::shared_lock<std::shared_mutex> const lock(sharedMutex);
auto foundEntry = findEntryFor(key);
return (foundEntry == data.cend()) ? defaultValue : foundEntry->second;
}
// 若键值对链表中没有 key value 键值对,则增加此节点,
// 若键值对链表中找到了 key,则更新 key 对应的 value 值
void addOrUpdateMapping(const Key &key, const Value &value)
{
std::unique_lock<std::shared_mutex> const lock(sharedMutex);
auto foundEntry = findEntryFor(key);
if (foundEntry == data.end())
{
data.push_back(bucketValue(key, value));
}
else
{
foundEntry->second = value;
}
}
// 删除键值对链表中对应 key 的节点
void removeMapping(const Key &key)
{
std::unique_lock<std::shared_mutex> const lock(sharedMutex);
auto foundEntry = findEntryFor(key);
if (foundEntry != data.end())
{
data.erase(foundEntry);
}
}
private:
// 键值对
using bucketValue = std::pair<Key, Value>;
// 键值对链表
using bucketData = std::list<bucketValue>;
// 键值对链表迭代器
using bucketIterator = typename bucketData::iterator;
// 键值对链表迭代器 const 版
using bucketConstIterator = typename bucketData::const_iterator;
// 查找键值对链表中与 key 值相等的节点的迭代器
auto findEntryFor(const Key &key) -> bucketIterator
{
return std::find_if(
data.begin(), data.end(),
[&key](const bucketValue &item) { return item.first == key; });
}
// 查找键值对链表中与 key 值相等的节点的迭代器 const 版
auto findEntryFor(const Key &key) const -> bucketConstIterator
{
return std::find_if(
data.begin(), data.end(),
[&key](const bucketValue &item) { return item.first == key; });
}
// 键值对链表数据
bucketData data;
// 共享锁
mutable std::shared_mutex sharedMutex;
};
} // namespace threadSafe
auto main() -> int
{
threadSafe::threadSafeLookupTable<int, char> tslt;
tslt.addOrUpdateMapping(5, 'a');
tslt.addOrUpdateMapping(5, 'b');
char ch = tslt.valueFor(5);
ch = tslt.valueFor(6);
tslt.removeMapping(5);
return 0;
}
本文的 hashmap 在桶的层次设计了锁的结构,辅助并发逻辑,我们需要注意,桶的封装是基于一个链表,对于读,是天然的并发安全的,但对于写,则是要将整个链表上锁,如果 hash 函数不够好,或桶的数量太小(为了保证 hashmap 的性能,桶的数量必须是质数),则整个 hashmap 的并发性能会下降,因为无法分散于各个不相干的桶,所有线程都争抢一个桶时,就退化为单线程逻辑了。