MultiThreadEx.h
- #pragma once
-
- #include
- #include
- #include
- #include
- #include
- #include
- #include
- #include
-
- using TaskFun = std::function<void(void)>;
-
- class CMultiThreadEx {
- public:
- CMultiThreadEx();
- ~CMultiThreadEx();
-
- bool start();
-
- void stop();
-
- bool post(const TaskFun& task_fun);
-
- private:
- void exec();
- void add_thread();
- void del_thread();
- void del_thread_oper();
- private:
-
- std::atomic_bool exist_;
- std::mutex thread_mutex_;
- std::map
> threads_; - std::list
threads_need_del_; - std::atomic<int32_t> thread_num_;
- std::atomic<int32_t> task_num_;
- std::queue
tasks_; - std::mutex task_mutex_;
- std::condition_variable condition_;
-
- int32_t min_thead_num_;
- int32_t max_thead_num_;
- int32_t task_avg_num_;
- };
MultiThreadEx.cpp:
- #include "MultiThreadEx.h"
- #include
-
- CMultiThreadEx::CMultiThreadEx()
- : exist_(false)
- , min_thead_num_(1)
- , max_thead_num_(1024)
- , task_avg_num_(10)
- {
- }
-
- CMultiThreadEx::~CMultiThreadEx() {}
-
- bool CMultiThreadEx::start() {
- if (exist_) {
- return false;
- }
-
- exist_ = true;
-
- for (size_t i = 0; i < min_thead_num_; i++)
- {
- add_thread();
- }
-
- return true;
- }
-
- void CMultiThreadEx::stop() {
- if (!exist_) {
- return;
- }
-
- exist_ = false;
- condition_.notify_all();
-
- for (auto& iter_thread : threads_)
- {
- if (iter_thread.second->joinable()) {
- iter_thread.second->join();
- }
- }
-
- return;
- }
-
- bool CMultiThreadEx::post(const TaskFun& task_fun) {
- if (!exist_) {
- return false;
- }
-
- {
- std::lock_guard
lock(task_mutex_) ; - tasks_.push(task_fun);
- ++task_num_;
- }
-
- if (thread_num_ < min_thead_num_
- || task_num_ > thread_num_ * task_avg_num_)
- {
- add_thread();
- }
-
- condition_.notify_one();
- return true;
- }
-
- void CMultiThreadEx::exec() {
- while (exist_)
- {
- TaskFun task = nullptr;
- del_thread_oper();
-
- {
- std::unique_lock
lock(task_mutex_) ; - if (tasks_.empty()) {
-
- if (thread_num_ > min_thead_num_) {
-
- auto ret =condition_.wait_for(lock, std::chrono::milliseconds(1000));
- if (std::cv_status::timeout == ret)
- {
- // 退出线程
- del_thread();
- break;
- }
-
- if (!exist_) {
- break;
- }
-
- }
-
- condition_.wait(lock);
- }
-
- if (!exist_) {
- break;
- }
-
- if (!tasks_.empty())
- {
- task = tasks_.front();
- tasks_.pop();
- --task_num_;
- }
- }
-
- if (nullptr != task) {
- task();
- }
- }
-
- return;
- }
-
- void CMultiThreadEx::add_thread() {
- std::lock_guard
lock(thread_mutex_) ; -
- std::shared_ptr
sptr_thread = - std::make_shared
(std::thread(std::bind(&CMultiThreadEx::exec, this))); - threads_.insert(std::make_pair(sptr_thread->get_id(), sptr_thread));
- ++thread_num_;
- }
-
- void CMultiThreadEx::del_thread(){
- std::lock_guard
lock(thread_mutex_) ; -
- auto id = std::this_thread::get_id();
- auto iter_thread = threads_.find(id);
-
- if (threads_.end() != iter_thread) {
- threads_need_del_.push_back(id);
- --thread_num_;
- }
- }
-
- void CMultiThreadEx::del_thread_oper(){
- std::lock_guard
lock(thread_mutex_) ; -
- if (threads_need_del_.empty())
- {
- return;
- }
-
- for (auto& id : threads_need_del_)
- {
- auto iter_thread = threads_.find(id);
-
- if (threads_.end() != iter_thread){
- if (iter_thread->second->joinable()) {
- iter_thread->second->join();
- }
- threads_.erase(id);
- }
- }
- }
main:
- #include
- #include "MultiThreadEx.h"
-
- int main() {
-
- std::mutex m;
-
- CMultiThreadEx t;
- t.start();
-
- auto fun = [&]()
- {
- std::lock_guard
lock(m); -
- static int count = 0;
- std::cout << "Count: " << ++count << "thread-id: " << std::this_thread::get_id() << std::endl;
- std::this_thread::sleep_for(std::chrono::seconds(1));
- };
-
- for (int i = 0; i < 20; ++i)
- {
- t.post(fun);
- //std::this_thread::sleep_for(std::chrono::microseconds(500));
- }
-
- getchar();
- t.post(fun);
- getchar();
- t.stop();
-
- return 0;
- }