- #include
- #include
- #include
- #include
-
- // 模拟一个从数据库查询的操作
- std::string fetchDataFromDB(std::string query)
- {
- // 模拟一个耗时操作
- std::this_thread::sleep_for(std::chrono::seconds(5));
- return "Data is: " + query;
- }
-
- void use_async()
- {
- // async异步调用fetchDataFromDB
- std::future
result_from_DB = std::async(std::launch::async, fetchDataFromDB, "Test Query"); -
- // 主线程做其他事情
- std::cout << "Do something else... \n";
-
- // 从future中获取数据
- std::string DB_Data = result_from_DB.get();
- std::cout << DB_Data << std::endl;
-
- std::cout << "Async test finished! \n";
- }
-
- int main()
- {
- // 1. 测试异步执行
- use_async();
-
- std::cout << "Finished!\n";
- }
异步执行时,数据如果没有准备好,取数据时会阻塞。
- int my_task()
- {
- std::this_thread::sleep_for(std::chrono::seconds(5));
- std::cout << "My Task run 5 seconds \n";
- return 42;
- }
-
- void use_package()
- {
- // 创建一个包装了任务的 std::packaged_task 对象
- // package_task和future一起,可以实现一个线程池操作
- std::packaged_task<int()> task(my_task);
-
- // 获取与任务有关的std::future对象, 代表一个异步操作结果
- std::future<int> result = task.get_future();
-
- // 在另一个线程上执行
- // package_task只支持移动构造
- std::thread t(std::move(task));
- t.detach(); // 后台执行,和变量t解耦,但是主线程依然是守护线程
-
- // 等待任务执行完毕,获取结果
- int value = result.get();
- std::cout << "The result is: " << value << std::endl;
- }
-
- int main()
- {
- // 2. 使用std::package_task打包任务
- use_package();
-
- std::cout << "Finished!\n";
- }
-
- void set_value(std::promise<int> prom)
- {
- std::this_thread::sleep_for(std::chrono::seconds(5));
- prom.set_value(10);
- std::cout << "promise set value success. \n";
- }
-
- void use_promise()
- {
- std::promise<int> prom;
-
- // promise与future对象关联
- std::future<int> fut = prom.get_future();
-
- // 新线程中使用promise
- std::thread t(set_value, std::move(prom));
-
- // 在主线程中等待future的值
- std::cout << "Waiting for the thread to set the value...\n";
- std::cout << "Value set by the thread: " << fut.get() << '\n';
- t.join();
- }
-
- int main()
- {
- // 3. 使用promise
- /*
- promise类似package,package需要等待函数全部执行完毕后,才可以通过future获取值;
- promise:绑定函数执行过程中如果设置了value后,可以在中途获取到.
- */
- use_promise();
-
- std::cout << "Finished!\n";
- }
- void myFunction(std::promise<int>&& promise) {
- // 模拟一些工作
- std::this_thread::sleep_for(std::chrono::seconds(1));
- promise.set_value(42); // 设置 promise 的值
- }
-
- void threadFunction(std::shared_future<int> future)
- {
- try {
- int result = future.get();
- std::cout << "Result: " << result << std::endl;
- }
- catch (const std::future_error& e) {
- std::cout << "Future error: " << e.what() << std::endl;
- }
- }
-
- void use_shared_future()
- {
- std::promise<int> promise;
- std::shared_future<int> future = promise.get_future();
-
- // 移动promise到线程中,执行类似计算的操作
- std::thread my_thread1(myFunction, std::move(promise));
-
- // 定义取结果的线程
- std::thread get_result1(threadFunction, future);
- std::thread get_result2(threadFunction, future);
-
- my_thread1.join();
- get_result1.join();
- get_result2.join();
- }
-
- void use_shared_future_error()
- {
- std::promise<int> promise;
-
- // 隐式转换,默认转换为shared_future
- std::shared_future<int> future = promise.get_future();
-
- // 移动promise到线程中,执行类似计算的操作
- std::thread my_thread1(myFunction, std::move(promise));
-
- // 定义取结果的线程
- std::thread get_result1(threadFunction, std::move(future));
- std::thread get_result2(threadFunction, std::move(future));
-
- my_thread1.join();
- get_result1.join();
- get_result2.join();
- }
-
-
- int main()
- {
- // 3. shared_future使用方法:供多个线程使用
- use_shared_future();
-
- /*
- 在shared_future中,定义的future不可以被移动构造,否则另一个线程无法使用
- */
- // use_shared_future_error(); // 错误用法
-
- std::cout << "Finished!\n";
- }
- // 定义异常,通过子线程抛出
- void set_exception(std::promise<void> prom)
- {
- try
- {
- throw std::runtime_error("An error occurred! \n");
- }
- catch (const std::exception&)
- {
- prom.set_exception(std::current_exception());
- }
- }
-
- void use_promise_exception()
- {
- std::promise<void> prom;
- std::future<void> fut = prom.get_future();
-
- // 在新线程中设置promise异常
- std::thread t(set_exception, std::move(prom));
-
- // 主线程中获取异常
- try
- {
- std::cout << "Waiting for the thread to set the exception...\n";
- fut.get();
- }
- catch (const std::exception& e)
- {
- std::cout << "Exception set by the thread: " << e.what() << '\n';
- }
- t.join();
- }
-
- void use_promise_destruct()
- {
- std::thread t;
- std::future<int> fut;
- {
- std::promise<int> prom;
- fut = prom.get_future();
- t = std::thread(set_value, std::move(prom));
- }
- // 作用域结束后,prom可能因为release或者linux系统原因马上释放,导致错误.
-
- // 主线程中获取future值
- std::cout << "Waiting for the thread to set the value...\n";
- std::cout << "Value set by the thread: " << fut.get() << '\n';
- t.join();
- }
-
- int main()
- {
- // 4. 使用promise,在主线程中获取异常
- // 子线程抛出异常,主线程一定需要try捕获,否则会导致主线程崩溃。
- use_promise_exception();
-
- // 作用域原因的错误用法
- // 可能出错,也可能不会出错; 解决方法就是将prom按照智能指针方式传递.
- // use_promise_destruct();
-
- std::cout << "Finished!\n";
- }
首先在头文件中定义ThreadPool.h:
- #ifndef __THREAD_POOL_H__
- #define __THREAD_POOL_H__
-
- #include
- #include
- #include
- #include
- #include
- #include
- #include
- #include
-
- class ThreadPool
- {
- public:
- ThreadPool& operator=(const ThreadPool&) = delete;
-
- // 实现一个单例模式
- static ThreadPool& instance()
- {
- static ThreadPool instance;
- return instance;
- }
-
- using Task = std::packaged_task<void()>;
-
- ~ThreadPool()
- {
- stop();
- }
-
- // 一个返回类型后置语法
- template<class F, class ...Args>
- auto commit(F&& f, Args&&... args) -> std::future<decltype(f(args...))>
- {
- using RetType = decltype(f(args...));
-
- // 如果线程池已经结束了,那么直接返回空的future.
- if (stop_.load()) {
- return std::future
{}; - }
-
- // 将回调函数f以及可变参数绑定一起,生成一个无参函数。
- auto task = std::make_shared
RetType()>>( - std::bind(std::forward
(f), std::forward(args)...)); -
- // 通过std::future获取运行结果
- std::future
ret = task->get_future(); - {
- // 将需要执行的任务放入任务队列, 等待线程领取.
- std::lock_guard
cv_mt(cv_mt_) ; -
- // 传递一个回调函数, lambda表达式,在弹出时执行
- // 通过lambda表达式捕获task,保证task在回调之前避免被销毁,类似伪闭包.
- tasks_.emplace([task] { (*task)(); });
- }
-
- // 通知挂起的线程执行
- cv_lock_.notify_one();
- return ret;
- }
-
- int idleThreadCount()
- {
- return thread_num_;
- }
-
- private:
- ThreadPool(unsigned int num = 5): stop_(false)
- {
- {
- if (num < 1) thread_num_ = 1;
- else thread_num_ = num;
- }
- start();
- }
-
- void start()
- {
- for (int i = 0; i < thread_num_; i++) {
- pool_.emplace_back([this]() {
- while (!this->stop_.load()) {
- Task task;
- {
- std::unique_lock
cv_mt(cv_mt_); -
- // 通过谓词判断,如果线程池已经停止,或者任务队列为空
- // 通过条件变量让线程挂起
- this->cv_lock_.wait(cv_mt, [this] {
- return this->stop_.load() || !this->tasks_.empty();
- });
-
- if (this->tasks_.empty())
- return;
-
- task = std::move(this->tasks_.front());
- this->tasks_.pop();
- }
- this->thread_num_--;
- task();
- this->thread_num_++;
- }
- });
- }
- }
-
- void stop()
- {
- // 原子操作,将停止信号设置为true.
- stop_.store(true);
-
- // 通知所有线程,停止前避免卡死.
- cv_lock_.notify_all();
- for (auto& td : pool_) {
- if (td.joinable()) {
- std::cout << "join thread " << td.get_id() << std::endl;
- td.join();
- }
- }
- }
-
- private:
- std::mutex cv_mt_;
- std::condition_variable cv_lock_;
- std::atomic_bool stop_;
- std::atomic_int thread_num_;
- std::queue
tasks_; // 任务队列 - std::vector
pool_; // 线程池 - };
-
- #endif // __THREAD_POOL_H__
然后在主函数中测试调用:
- #include
- #include
- #include
- #include
- #include "ThreadPool.h"
-
- int main()
- {
- // 5. 使用线程池, 以及使用例子
- int m = 0;
- ThreadPool::instance().commit([](int& m) {
- m = 1024;
- std::cout << "Inner set m is " << m << std::endl;
- }, m);
- std::this_thread::sleep_for(std::chrono::seconds(3));
- std::cout << "Main: m is " << m << std::endl;
-
- ThreadPool::instance().commit([](int& m) {
- m = 1024;
- std::cout << "Inner set m is " << m << std::endl;
- }, std::ref(m));
- std::this_thread::sleep_for(std::chrono::seconds(3));
- std::cout << "Main: m is " << m << std::endl;
-
- /*
- 1. 如果任务有顺序要求,不可以用线程池.
- 2. 任务是有顺序要求的.
- */
-
- std::cout << "Finished!\n";
- }
最后的测试结果:
