STL容器不是线程安全的。比如对于vector,即使写方(生产者)是单线程写入,但是并发读的时候,由于潜在的内存重新申请和对象复制问题,会导致读方(消费者)的迭代器失效。实际表现也就是招致了core dump。另外一种情况,如果是多个写方,并发的push_back(),也会导致core dump。但可以通过固定vector的大小(调用resize())避免动态扩容(无push_back)来做到lock-free。
c++的map的并发操作也是不安全的,c++里边有红黑树实现的std::map和hash表 unordered_map。在《C++并发编程实战》一书中的162页提供了一个细粒度锁的MAP数据结构,使用了 boost的shared_mutex (C++14已经支持,C++11没有),那上面的实现代码挺长的,这里给出个OpenHarmony源码实现的safe_map,代码精简,值得学习。
接上篇欣赏了OpenHarmony源码中实现的ThreadPool的实现,链接在这里:
c++的ThreadPool,OpenHarmony源码实现版赏析和使用
源码位置:code-v3.0-LTS\OpenHarmony\utils\native\base\include\safe_map.h
- /*
- * Copyright (c) 2021 Huawei Device Co., Ltd.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
- #ifndef UTILS_BASE_SAFE_MAP_H
- #define UTILS_BASE_SAFE_MAP_H
-
- #include
- #include
-
- namespace OHOS {
-
- template <typename K, typename V>
- class SafeMap {
- public:
- SafeMap() {}
-
- ~SafeMap() {}
-
- SafeMap(const SafeMap& rhs)
- {
- map_ = rhs.map_;
- }
-
- SafeMap& operator=(const SafeMap& rhs)
- {
- if (&rhs != this) {
- map_ = rhs.map_;
- }
-
- return *this;
- }
-
- V& operator[](const K& key)
- {
- return map_[key];
- }
-
- // when multithread calling size() return a tmp status, some threads may insert just after size() call
- int Size()
- {
- std::lock_guard
lock(mutex_) ; - return map_.size();
- }
-
- // when multithread calling Empty() return a tmp status, some threads may insert just after Empty() call
- bool IsEmpty()
- {
- std::lock_guard
lock(mutex_) ; - return map_.empty();
- }
-
- bool Insert(const K& key, const V& value)
- {
- std::lock_guard
lock(mutex_) ; - auto ret = map_.insert(std::pair
(key, value)); - return ret.second;
- }
-
- void EnsureInsert(const K& key, const V& value)
- {
- std::lock_guard
lock(mutex_) ; - auto ret = map_.insert(std::pair
(key, value)); - // find key and cannot insert
- if (!ret.second) {
- map_.erase(ret.first);
- map_.insert(std::pair
(key, value)); - return;
- }
- return;
- }
-
- bool Find(const K& key, V& value)
- {
- bool ret = false;
- std::lock_guard
lock(mutex_) ; -
- auto iter = map_.find(key);
- if (iter != map_.end()) {
- value = iter->second;
- ret = true;
- }
-
- return ret;
- }
-
- bool FindOldAndSetNew(const K& key, V& oldValue, const V& newValue)
- {
- bool ret = false;
- std::lock_guard
lock(mutex_) ; - if (map_.size() > 0) {
- auto iter = map_.find(key);
- if (iter != map_.end()) {
- oldValue = iter->second;
- map_.erase(iter);
- map_.insert(std::pair
(key, newValue)); - ret = true;
- }
- }
-
- return ret;
- }
-
- void Erase(const K& key)
- {
- std::lock_guard
lock(mutex_) ; - map_.erase(key);
- }
-
- void Clear()
- {
- std::lock_guard
lock(mutex_) ; - map_.clear();
- return;
- }
-
- private:
- std::mutex mutex_;
- std::map
map_; - };
-
- } // namespace OHOS
- #endif
使用模板语法template
自定义实现了几个常用的操作方法如Find,Erase和Clear,每个里面的操作都相应的加了锁。操作符重载实现了[]和赋值=操作。注意这两处的地方没有用锁,你知道为什么吗?如果多个线程只访问容器但不更改其结构,则不需要对容器进行同步。另外一个原因是,对于map,如果只是通过[]的方式修改而不是新插入,则多线程下也不会core dump。
源码中同样有safe_map的单元测试,单元测试框架使用的是google的gtest。看来gtest还是很强大的,华为也选择使用了它。以下给出源码,可以熟悉下gtest单元测试的用法。
- /*
- * Copyright (c) 2021 Huawei Device Co., Ltd.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #include "safe_map.h"
-
- #include
- #include
- #include
- #include
- #include
-
- #include
-
- #include
// std::chrono::seconds - #include
// std::cout - #include
// std::thread, std::this_thread::sleep_for -
- using namespace testing::ext;
- using namespace OHOS;
- using namespace std;
-
- class UtilsSafeMap : public testing::Test {
- };
-
- /*
- * @tc.name: testUtilsCopyAndAssign001
- * @tc.desc: single thread test the normal feature insert and erase and EnsureInsert
- */
- HWTEST_F(UtilsSafeMap, testUtilsCopyAndAssign001, TestSize.Level1)
- {
- SafeMap
int> demoData; - // insert new
- demoData.Insert("A", 1);
- ASSERT_FALSE(demoData.IsEmpty());
- ASSERT_EQ(demoData.Size(), 1);
-
- SafeMap
int> newdemo = demoData; - int tar = -1;
- ASSERT_TRUE(newdemo.Find("A", tar));
- ASSERT_EQ(1, tar);
-
- tar = -1;
- SafeMap
int> newdemo2; - newdemo2 = demoData;
- ASSERT_TRUE(newdemo2.Find("A", tar));
- ASSERT_EQ(1, tar);
- }
-
- /*
- * @tc.name: testUtilsoperator001
- * @tc.desc: SafeMap
- */
- HWTEST_F(UtilsSafeMap, testUtilsoperator001, TestSize.Level1)
- {
- SafeMap
int> demoData; - // insert new
- demoData.Insert("A", 1);
- ASSERT_FALSE(demoData.IsEmpty());
- ASSERT_EQ(demoData.Size(), 1);
- ASSERT_EQ(demoData["A"], 1);
-
- SafeMap
int> newdemo = demoData; - ASSERT_EQ(newdemo["A"], 1);
-
- int tar = -1;
- newdemo["B"] = 6;
- ASSERT_TRUE(newdemo.Find("B", tar));
- ASSERT_EQ(6, tar);
-
- SafeMap
int> newdemo2; - newdemo2 = newdemo;
- ASSERT_EQ(newdemo2["A"], 1);
- }
-
- /*
- * @tc.name: testUtilsNormalFeatureInsert001
- * @tc.desc: SafeMap
- */
- HWTEST_F(UtilsSafeMap, testUtilsNormalFeatureInsert001, TestSize.Level1)
- {
- SafeMap
int> demoData; - ASSERT_TRUE(demoData.IsEmpty());
- // insert new
- demoData.Insert("A", 1);
- ASSERT_FALSE(demoData.IsEmpty());
- ASSERT_EQ(demoData.Size(), 1);
-
- // insert copy one should fail
- ASSERT_FALSE(demoData.Insert("A", 2));
- ASSERT_EQ(demoData.Size(), 1);
- }
-
- /*
- * @tc.name: testUtilsNormalFeatureEnsureInsert001
- * @tc.desc: SafeMap
- */
- HWTEST_F(UtilsSafeMap, testUtilsNormalFeatureEnsureInsert001, TestSize.Level1)
- {
- SafeMap
int> demoData; - ASSERT_TRUE(demoData.IsEmpty());
-
- demoData.Insert("A", 1);
- demoData.EnsureInsert("B", 2);
-
- ASSERT_FALSE(demoData.IsEmpty());
- ASSERT_EQ(demoData.Size(), 2);
- // insert copy one and new one
- demoData.EnsureInsert("B", 5);
- demoData.EnsureInsert("C", 6);
-
- ASSERT_EQ(demoData.Size(), 3);
- }
-
- /*
- * @tc.name: testUtilsNormalFeatureFind001
- * @tc.desc: SafeMap
- */
- HWTEST_F(UtilsSafeMap, testUtilsNormalFeatureFind001, TestSize.Level1)
- {
- SafeMap
int> demoData; - ASSERT_TRUE(demoData.IsEmpty());
-
- demoData.Insert("A", 1);
- demoData.Insert("B", 10000);
- demoData.EnsureInsert("B", 2);
- demoData.EnsureInsert("C", 6);
-
- ASSERT_FALSE(demoData.IsEmpty());
- ASSERT_EQ(demoData.Size(), 3);
-
- int i = 0;
- ASSERT_TRUE(demoData.Find("A", i));
- ASSERT_EQ(i, 1);
- ASSERT_TRUE(demoData.Find("B", i));
- ASSERT_EQ(i, 2);
-
- ASSERT_TRUE(demoData.Find("C", i));
- ASSERT_EQ(i, 6);
- }
-
- /*
- * @tc.name: testUtilsNormalFeatureFindAndSet001
- * @tc.desc: SafeMap
- */
- HWTEST_F(UtilsSafeMap, testUtilsNormalFeatureFindAndSet001, TestSize.Level1)
- {
- SafeMap
int> demoData; - ASSERT_TRUE(demoData.IsEmpty());
-
- demoData.Insert("A", 1);
- demoData.EnsureInsert("B", 2);
-
- int oldvalue = 0;
- int newvalue = 3;
- ASSERT_TRUE(demoData.FindOldAndSetNew("A", oldvalue, newvalue));
- // old value
- ASSERT_EQ(oldvalue, 1);
-
- newvalue = 4;
- ASSERT_TRUE(demoData.FindOldAndSetNew("B", oldvalue, newvalue));
- // old value
- ASSERT_EQ(oldvalue, 2);
- int i = -1;
- ASSERT_TRUE(demoData.Find("A", i));
- // new value
- ASSERT_EQ(i, 3);
- ASSERT_TRUE(demoData.Find("B", i));
- // new value
- ASSERT_EQ(i, 4);
- }
-
- /*
- * @tc.name: testUtilsNormalFeatureEraseAndClear001
- * @tc.desc: SafeMap
- */
- HWTEST_F(UtilsSafeMap, testUtilsNormalFeatureEraseAndClear001, TestSize.Level1)
- {
- SafeMap
int> demoData; - ASSERT_TRUE(demoData.IsEmpty());
-
- demoData.Insert("A", 1);
- demoData.EnsureInsert("B", 2);
-
- ASSERT_EQ(demoData.Size(), 2);
- demoData.Erase("A");
- ASSERT_EQ(demoData.Size(), 1);
-
- demoData.Clear();
- ASSERT_EQ(demoData.Size(), 0);
- }
-
- /*
- * @tc.name: testUtilsConcurrentWriteAndRead001
- * @tc.desc: 100 threads test in writein to the same key of the map, while read at same time and no throw
- */
- const int THREAD_NUM = 100;
- HWTEST_F(UtilsSafeMap, testUtilsConcurrentWriteAndRead001, TestSize.Level1)
- {
- SafeMap
int> demoData; - std::thread threads[THREAD_NUM];
- std::thread checkThread[THREAD_NUM];
- ASSERT_NO_THROW({
- auto lamfuncInsert = [](SafeMap
int>& data, const string& key, - const int& value, const std::chrono::system_clock::time_point& absTime) {
- std::this_thread::sleep_until(absTime);
- data.EnsureInsert(key, value);
- };
-
- auto lamfuncCheck = [](SafeMap
int>& data, const string& key, - std::chrono::system_clock::time_point absTime) {
- std::this_thread::sleep_until(absTime);
- thread_local int i = -1;
- data.Find(key, i);
- };
-
- using std::chrono::system_clock;
-
- std::time_t timeT = system_clock::to_time_t(system_clock::now());
- timeT += 2;
- string key("A");
-
- for (int i = 0; i < THREAD_NUM; ++i) {
- threads[i] = std::thread(lamfuncInsert, std::ref(demoData), key, i, system_clock::from_time_t(timeT));
- checkThread[i] = std::thread(lamfuncCheck, std::ref(demoData), key, system_clock::from_time_t(timeT));
- }
-
- std::this_thread::sleep_for(std::chrono::seconds(3));
- for (auto& t : threads) {
- t.join();
- }
-
- for (auto& t : checkThread) {
- t.join();
- }
- });
- }
-
- /*
- * @tc.name: testUtilsConcurrentWriteAndFind001
- * @tc.desc: 100 threads test in writein to the corresponding key of the map,
- * while read at same time and check the results
- */
- HWTEST_F(UtilsSafeMap, testUtilsConcurrentWriteAndFind001, TestSize.Level1)
- {
- SafeMap
int> demoData; - std::thread threads[THREAD_NUM];
- std::vector
int>> vcfi; -
- ASSERT_NO_THROW({
- auto lamfuncInsert = [](SafeMap
int>& data, const string& key, - const int& value, const std::chrono::system_clock::time_point& absTime) {
- std::this_thread::sleep_until(absTime);
- data.EnsureInsert(key, value);
- };
-
- auto lamfuncCheckLoop = [](SafeMap
int>& data, const string& key, - std::chrono::system_clock::time_point absTime) {
- std::this_thread::sleep_until(absTime);
- thread_local int i = -1;
- while (!data.Find(key, i)) {
- std::this_thread::sleep_for(std::chrono::microseconds(10));
- }
- return i;
- };
-
- using std::chrono::system_clock;
-
- std::time_t timeT = system_clock::to_time_t(system_clock::now());
- timeT += 2;
- string key("A");
-
- for (int i = 0; i < THREAD_NUM; ++i) {
- threads[i] = std::thread(lamfuncInsert, std::ref(demoData),
- key + std::to_string(i), i, system_clock::from_time_t(timeT));
- vcfi.push_back(std::async(std::launch::async, lamfuncCheckLoop,
- std::ref(demoData), key + std::to_string(i), system_clock::from_time_t(timeT)));
- }
-
- std::this_thread::sleep_for(std::chrono::seconds(4));
- for (auto& t : threads) {
- t.join();
- }
-
- vector<int> result;
-
- for (auto& t : vcfi) {
- result.push_back(t.get());
- }
-
- std::sort(result.begin(), result.end());
-
- for (int i = 0; i < THREAD_NUM; ++i) {
- ASSERT_EQ(i, result[i]);
- }
- });
- }
-
- /*
- * @tc.name: testUtilsConcurrentWriteAndFindAndSet001
- * @tc.desc: 100 threads test in writein to the corresponding key of the map,
- * while findandfix at same time and check the results
- */
- HWTEST_F(UtilsSafeMap, testUtilsConcurrentWriteAndFindAndSet001, TestSize.Level1)
- {
- SafeMap
int> demoData; - std::thread threads[THREAD_NUM];
- std::vector
int>> vcfi; -
- ASSERT_NO_THROW({
- auto lamfuncInsert = [](SafeMap
int>& data, const string& key, - const int& value, const std::chrono::system_clock::time_point& absTime) {
- std::this_thread::sleep_until(absTime);
- data.EnsureInsert(key, value);
- };
-
- auto lamfuncCheckLoop = [](SafeMap
int>& data, const string& key, - const int& newvalue, std::chrono::system_clock::time_point absTime) {
- std::this_thread::sleep_until(absTime);
- thread_local int i = -1;
- while (!data.FindOldAndSetNew(key, i, newvalue)) {
- std::this_thread::sleep_for(std::chrono::microseconds(10));
- }
- return i;
- };
-
- using std::chrono::system_clock;
-
- std::time_t timeT = system_clock::to_time_t(system_clock::now());
- timeT += 2;
- string key("A");
-
- for (int i = 0; i < THREAD_NUM; ++i) {
- threads[i] = std::thread(lamfuncInsert, std::ref(demoData),
- key + std::to_string(i), i, system_clock::from_time_t(timeT));
- vcfi.push_back(std::async(std::launch::async, lamfuncCheckLoop,
- std::ref(demoData), key + std::to_string(i), i + 1, system_clock::from_time_t(timeT)));
- }
-
- std::this_thread::sleep_for(std::chrono::seconds(4));
- for (auto& t : threads) {
- t.join();
- }
-
- vector<int> result;
- for (auto& t : vcfi) {
- result.push_back(t.get());
- }
-
- std::sort(result.begin(), result.end());
-
- for (int i = 0; i < THREAD_NUM; ++i) {
- ASSERT_EQ(i, result[i]);
- }
-
- int t = 0;
- result.clear();
- for (int i = 0; i < THREAD_NUM; ++i) {
- t = -1;
- ASSERT_TRUE(demoData.Find("A" + std::to_string(i), t));
- result.push_back(t);
- }
-
- std::sort(result.begin(), result.end());
-
- for (int i = 0; i < THREAD_NUM; ++i) {
- ASSERT_EQ(i + 1, result[i]);
- }
- });
- }
C++11:基于std::queue和std::mutex构建一个线程安全的队列_10km的博客-CSDN博客_std::queue 线程安全
C++11:基于std::unordered_map和共享锁构建线程安全的map_10km的博客-CSDN博客_c++ map 无锁
如何设计并实现一个线程安全的 Map ?(上篇) - JavaShuo
c++线程安全的map_clh01s的博客-CSDN博客_c++ 线程安全的map
C++ 20 线程安全的Map_学习好烦啊的博客-CSDN博客_c++ map 线程安全
c++ map 多线程_线程安全原理简析及HashMap多线程并发5种场景异常分析_sumilao的博客-CSDN博客