一些小Tips:
g++ 7.thread_pool.cpp -lpthread
time ./a.out
- #include
// 引入线程库 - #include
// 加入锁机制需要引入库函数mutex - #include
// 引入信号量机制
this_thread::get_id()
- condition_variable m_cond // 信号量
- std::mutex m_mutex; // 互斥锁,在非std命名空间的情况下
ulimit -a
多线程概念:
代码样例:
- #include
- #include
- using namespace std;
-
- #define BEGINS(x) namespace x{
- #define ENDS(x) }
-
- BEGINS(thread_usage) // 自定义命名空间
- void func() {
- cout << "hello wolrd" << endl;
- return ;
- }
- int main() {
- thread t1(func); // t1已经开始运行了
- t1.join(); // 等待t1线程结束
- return 0;
- }
- ENDS(thread_usage)
-
- int main() {
- thread_usage::main(); // 调用该命名空间下的main函数
-
- return 0;
- }
- void print(int a, int b) {
- cout << a << " " << b << endl;
- return ;
- }
- int main() {
- thread t2(print, 3, 4);
- t2.join();
- return 0;
- }
多线程设计理念:
- #include
- #include
- #include
- #include
- using namespace std;
-
- #define BEGINS(x) namespace x{
- #define ENDS(x) }
-
- BEGINS(is_prime)
- bool is_prime(int x) {
- for (int i = 2, I = sqrt(x); i <= I; i++) {
- if (x % i == 0) return false;
- }
- return true;
- }
- // 多线程——功能函数
- int prime_count(int l, int r) {
- // 从l到r范围内素数的数量
- int ans = 0;
- for (int i = l; i <= r; i++) {
- ans += is_prime(i);
- }
- return ans;
- }
- // 多线程——入口函数
- void worker(int l, int r, int &ret) {
- cout << this_thread::get_id() << "begin" << endl;
- ret = prime_count(l, r);
- cout << this_thread::get_id() << "dnoe" << endl;
- }
- int main() {
- #define batch 500000
- thread *t[10];
- int ret[10];
- for (int i = 0, j = 1; i < 10; i++, j += batch) {
- t[i] = new thread(worker, j, j + batch - 1, ref(ret[i]));
- }
- for (auto x : t) x->join();
- int ans = 0;
- for (auto x : ret) ans += x;
- for (auto x : t) delete x;
- cout << ans << endl;
- #undef batch
- return 0;
- }
- ENDS(is_prime)
-
- int main() {
- // thread_usage::main();
- is_prime::main();
- return 0;
- }
![]()
- #include
- #include
- #include
- #include
- using namespace std;
-
- #define BEGINS(x) namespace x{
- #define ENDS(x) }
- BEGINS(prime_count2)
- int ans = 0;
- std::mutex m_mutex;
- bool is_prime(int x) {
- for (int i = 2, I = sqrt(x); i <= I; i++) {
- if (x % i == 0) return false;
- }
- return true;
- }
- void prime_count(int l, int r) {
- cout << this_thread::get_id() << " begin\n";
- for (int i = l; i <= r; i++) {
- std::unique_lock
lock(m_mutex) ; // 临界区 - ans += is_prime(i);
- lock.unlock();
- }
- cout << this_thread::get_id() << " done\n";
- return ;
- }
- int main() {
- #define batch 500000
- thread *t[10];
- for (int i = 0, j = 1; i < 10; i++, j += batch) {
- t[i] = new thread(prime_count, j, j + batch - 1);
- }
- for (auto x : t) x->join();
- for (auto x : t) delete x;
- cout << ans << endl;
- #undef batch
- return 0;
- }
- ENDS(prime_count2)
-
- int main() {
- prime_count2::main();
- return 0;
- }
![]()
显然能够看出,加了锁要比不加锁耗时得多。所以加锁的行为要慎重。
- #include
- #include
- #include
- #include
- using namespace std;
-
- #define BEGINS(x) namespace x{
- #define ENDS(x) }
-
- BEGINS(thread3)
- int ans = 0;
- bool is_prime(int x) {
- for (int i = 2, I = sqrt(x); i <= I; i++) {
- if (x % i == 0) return false;
- }
- return true;
- }
- void prime_count(int l, int r) {
- cout << this_thread::get_id() << "begin\n";
- for (int i = l; i <= r; i++) {
- int ret = is_prime(i);
- __sync_fetch_and_add(&ans, ret);
- }
- cout << this_thread::get_id() << "done\n";
- }
- int main() {
- #define batch 500000
- thread *t[10];
- for (int i = 0, j = 1; i < 10; i++, j += batch) {
- t[i] = new thread(prime_count, j, j + batch - 1);
- }
- for (auto x : t) x->join();
- for (auto x : t) delete x;
- cout << ans << endl;
- #undef batch
- return 0;
- }
- ENDS(thread3)
-
- int main() {
- thread3::main();
- return 0;
- }
![]()
代码实现:
- /*************************************************************************
- > File Name: threadpool.cpp
- > Author: jby
- > Mail:
- > Created Time: Wed 13 Sep 2023 08:48:23 AM CST
- ************************************************************************/
-
- #include
- #include
- #include
- #include
- #include
- #include
- #include
- #include
- #include
- using namespace std;
-
- #define BEGINS(x) namespace x {
- #define ENDS(x) }
-
- BEGINS(thread_pool_test)
- class Task {
- public:
- template<typename FUNC_T, typename ...ARGS>
- Task(FUNC_T func, ARGS ...args) {
- this->func = bind(func, forward
(args)...); - }
- void run() {
- func();
- return ;
- }
- private:
- function<void()> func; // 任意函数
- };
- class ThreadPool {
- public:
- ThreadPool(int n = 1) : thread_size(n), threads(n), starting(false) {
- this->start();
- return ;
- }
- void worker() {
- auto id = this_thread::get_id(); // 获得本进程号
- running[id] = true;
- while (running[id]) {
- // 取任务
- Task *t = get_task();
- t->run();
- delete t;
- }
- return ;
- }
- void start() {
- if (starting == true) return ; // 如果已经开始了就不用启动了
- for (int i = 0; i < thread_size; i++) {
- threads[i] = new thread(&ThreadPool::worker, this);
- }
- starting = true;
- return ;
- }
- template<typename FUNC_T, typename ...ARGS>
- void add_task(FUNC_T func, ARGS ...args) {
- unique_lock
lock(m_mutex) ; - tasks.push(new Task(func, forward
(args)...)); // 任务池相当于临界资源 - m_cond.notify_one(); // 生产者消费者模型
- return ;
- }
- void stop() {
- if (starting == false) return ; // 如果已经关了就不用再关了
- for (int i = 0; i < threads.size(); i++) { // 往队列末尾投递毒药任务
- this->add_task(&ThreadPool::stop_runnnig, this);
- }
- for (int i = 0; i < threads.size(); i++) {
- threads[i]->join();
- }
- for (int i = 0; i < threads.size(); i++) {
- delete threads[i]; // 释放那片进程剩余的空间
- threads[i] = nullptr; // 进程指针指向空
- }
- starting = false;
- return ;
- }
- ~ThreadPool() {
- this->stop();
- while (!tasks.empty()) { // 如果任务队列里还有任务没执行完,全部丢弃
- delete tasks.front();
- tasks.pop();
- }
- return ;
- }
- private:
- bool starting;
- int thread_size;
- Task *get_task() {
- unique_lock
lock(m_mutex) ; - while (tasks.empty()) m_cond.wait(lock); // 生产者消费者模型 //子进程的中wait函数对互斥量进行解锁,同时线程进入阻塞或者等待状态。
- Task *t = tasks.front();
- tasks.pop();
- return t;
- }
- std::mutex m_mutex;
- std::condition_variable m_cond;
- vector
threads; // 线程池子 - unordered_map<decltype(this_thread::get_id()), bool> running; // 进程号到运行状态的哈希映射
- queue
tasks; // 任务队列 - void stop_runnnig() { // 毒药任务
- auto id = this_thread::get_id();
- running[id] = false;
- return ;
- }
- };
- bool is_prime(int x) {
- for (int i = 2, I = sqrt(x); i <= I; i++) {
- if (x % i == 0) return false;
- }
- return true;
- }
- int prime_count(int l, int r) {
- int ans = 0;
- for (int i = l; i <= r; i++) {
- ans += is_prime(i);
- }
- return ans;
- }
- void worker(int l, int r, int &ret) {
- cout << this_thread::get_id() << "begin\n";
- ret = prime_count(l, r);
- cout << this_thread::get_id() << "done\n";
- return ;
- }
- int main() {
- #define batch 500000
- ThreadPool tp(5); // 五个任务的窗口队列
- int ret[10];
- for (int i = 0, j = 1; i < 10; i++, j += batch) {
- tp.add_task(worker, j, j + batch - 1, ref(ret[i]));
- }
- tp.stop();
- int ans = 0;
- for (auto x : ret) ans += x;
- cout << ans << endl;
- #undef batch
- return 0;
- }
- ENDS(thread_pool_test)
-
- int main() {
- thread_pool_test::main();
- return 0;
- }