• C++实现一个通用线程池


    C++实现一个通用线程池

    简介

    本文主要记录一个自己实现的简单的通用线程池对象。主要包含以下几个类:

    • Platforms.h: 屏蔽平台差异,提供跨平台使用的函数, 有需要可以未来进行扩展
    • SingleTon.h: 线程安全的单例类
    • ThreadPool.h: 线程池的实现
    • main.cpp: 测试代码

    Platforms.h

    #pragma once
    
    #ifdef WIN32
    #include 
    #endif // WIN32
    
    class Platforms
    {
    public:
    	unsigned long GetCurrentThreadId()
    	{
    		unsigned long id = 0;
    #ifdef WIN32
    		id = ::GetCurrentThreadId();
    #endif // WIN32
    		return id;
    	}
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    SingleTon.h

    #pragma once
    
    /*
    * Date: 2022-06-21
    * Author: nicklaus
    */
    template<typename T>
    class SingleTon {
    private:
    	SingleTon() {}
    public:
    	/*
    	* 使用单例包装并返回原生类型T
    	*/
    	static T* GetInstance()
    	{
    		static T instance;
    		return &instance;
    	}
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    ThreadPool.h

    #pragma once
    
    #include
    #include
    #include
    #include
    #include
    
    #include
    #include
    
    #include
    #include"SingleTon.h"
    #include"Platforms.h"
    
    class ThreadPool
    {
    public:
    	using Task = std::function<void()>;
    
    	ThreadPool(int threadNum) 
    	{
    		createThreads(threadNum);
    	}
    	~ThreadPool() 
    	{
    		Stop();
    	}
    
    	ThreadPool(const ThreadPool& pool) = delete;
    	ThreadPool& operator=(const ThreadPool& pool) = delete;
    	
    	void AddTask(const Task& task)
    	{
    		std::lock_guard<std::mutex> locker(m_taskQueueMutex);
    		m_taskQueue.push(task);
    
    		m_notEmpty.notify_one();
    	}
    
    	void AddTask(Task&& task)
    	{
    		std::lock_guard<std::mutex> locker(m_taskQueueMutex);
    		m_taskQueue.push(std::forward<Task>(task));
    
    		m_notEmpty.notify_one();
    	}
    
    private:
    	void createThreads(int num)
    	{
    		for (auto i = 0; i < num; i++)
    		{
    			m_threadGroup.push_back(std::make_shared<std::thread>(&ThreadPool::threadproc, this));
    		}
    	}
    
    	void Stop()
    	{
    		m_bForceStoped = true;
    
    		m_notEmpty.notify_all();
    
    		for (auto i = 0; i < m_threadGroup.size(); i++)
    		{
    			if (m_threadGroup[i] != nullptr)
    			{
    				m_threadGroup[i]->join();
    			}
    		}
    
    		m_threadGroup.clear();
    	}
    
    	bool IsForceStoped()
    	{
    		return m_bForceStoped;
    	}
    
    	bool IsTaskQueueEmpty()
    	{
    		return m_taskQueue.empty();
    	}
    
    	void threadproc()
    	{
    		while (true)
    		{
    			if (!IsForceStoped())// 正常情况,线程池还处于运行状态,池中的线程们需要互斥取任务执行
    			{
    				std::unique_lock<std::mutex> locker(m_taskQueueMutex);
    				// 如果是按下面这行代码不检测,是否强制停止会有下面这种现象:
    				// 主线程通过AddTask向m_taskQueue中添加任务,之后通过OS的调度,使得线程池中的某一个线程取到所有任务,并执行,
    				// 此时外界不再添加任何任务进来,那么如果按照下面的写法,那么线程中的所有子线程会阻塞在wait函数处,导致ThreadPool
    				// 的析构永远不能终结,所以wait的等待条件需要加上IsForceStoped()来避免这种情况
    				//m_notEmpty.wait(locker, [this] {return !IsTaskQueueEmpty(); });
    				m_notEmpty.wait(locker, [this] {return IsForceStoped() || !IsTaskQueueEmpty(); });
    
    				std::queue<Task> tasks = std::move(m_taskQueue);
    				while (!tasks.empty())
    				{
    					auto task = tasks.front();
    					tasks.pop();
    					printf("thread[%d] run task: %s\n",
    						SingleTon<Platforms>::GetInstance()->GetCurrentThreadId(),
    						typeid(task).name());
    					task();
    				}
    			}
    			else // 强制停止线程池运行,需要保证任务队列中的任务能够运行结束
    			{
    				std::unique_lock<std::mutex> locker(m_taskQueueMutex);
    				if (!IsTaskQueueEmpty())
    				{
    					std::queue<Task> tasks = std::move(m_taskQueue);
    					while (!tasks.empty())
    					{
    						auto task = tasks.front();
    						tasks.pop();
    						task();
    					}
    				}
    
    				printf("打印次数如果等于线程数量说明没有无限循环\n"); // 可以取消注释看看
    				break; // 必须break,不然会导致无限循环(都强制停止了,必须结束线程体的运行)
    			}
    		}
    	}
    
    private:
    	std::mutex	m_taskQueueMutex;
    	std::queue<Task> m_taskQueue;
    	std::condition_variable	m_notEmpty;
    
    	bool m_bForceStoped;					// 设计初衷:线程池中的线程不允许写,只允许读(特别重要)
    	std::vector<std::shared_ptr<std::thread>> m_threadGroup;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137

    main.cpp

    #include"ThreadPool.h"
    #include"SingleTon.h"
    #include"Platforms.h"
    
    void printA_Z()
    {
    	printf("%s: ", __FUNCTION__);
    	for (char c = 'A'; c <= 'Z'; c++)
    	{
    		printf("%c ", c);
    	}
    	printf("\n\n");
    }
    
    void print1_100()
    {
    	printf("%s: ", __FUNCTION__);
    	for (int c = 1; c <= 100; c++)
    	{
    		printf("%d ", c);
    	}
    	printf("\n\n");
    }
    
    class A 
    {
    public:
    	void f() 
    	{
    		printf("%s: ", __FUNCTION__);
    		for (int c = 1; c <= 100; c++)
    		{
    			printf("%d ", c);
    		}
    		printf("\n\n");
    	}
    };
    
    
    int main()
    {
    	printf("main thread id:%d\n", SingleTon<Platforms>::GetInstance()->GetCurrentThreadId());
    
    	{
    		A a;
    
    		ThreadPool pool(10);
    		pool.AddTask(printA_Z);
    		pool.AddTask(print1_100);
    		pool.AddTask(printA_Z);
    		pool.AddTask(print1_100);
    		pool.AddTask(printA_Z);
    		pool.AddTask(print1_100);
    		pool.AddTask(std::bind(&A::f, a));
    		Sleep(1000);
    	}
    
    	printf("main thread not block\n");
    	return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    测试结果

    main thread id:236676
    thread[232416] run task: class std::function<void __cdecl(void)>
    printA_Z: A B C D E F G H I J K L M N O P Q R S T U V W X Y Z
    
    thread[232416] run task: class std::function<void __cdecl(void)>
    print1_100: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
    
    thread[232416] run task: class std::function<void __cdecl(void)>
    printA_Z: A B C D E F G H I J K L M N O P Q R S T U V W X Y Z
    
    thread[232416] run task: class std::function<void __cdecl(void)>
    print1_100: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
    
    thread[232416] run task: class std::function<void __cdecl(void)>
    printA_Z: A B C D E F G H I J K L M N O P Q R S T U V W X Y Z
    
    thread[232416] run task: class std::function<void __cdecl(void)>
    print1_100: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
    
    thread[232416] run task: class std::function<void __cdecl(void)>
    A::f: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
    
    打印次数如果等于线程数量说明没有无限循环
    打印次数如果等于线程数量说明没有无限循环
    打印次数如果等于线程数量说明没有无限循环
    打印次数如果等于线程数量说明没有无限循环
    打印次数如果等于线程数量说明没有无限循环
    打印次数如果等于线程数量说明没有无限循环
    打印次数如果等于线程数量说明没有无限循环
    打印次数如果等于线程数量说明没有无限循环
    打印次数如果等于线程数量说明没有无限循环
    打印次数如果等于线程数量说明没有无限循环
    main thread not block
    
    F:\HappyCoding\threadpool\Debug\threadpool.exe (process 236836) exited with code 0.
    Press any key to close this window . . .
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
  • 相关阅读:
    Win11 Edge浏览器进入朔日考试系统(无纸化测评系统)的方法
    2023年中国机动车拍卖网络化趋势加速,网络拍卖专场数量大幅上升至47489场[图]
    交换机对PCIE进行降速
    体育场馆预约小程序,体育馆预约小程序,体育馆预约系统小程序
    在外出差资料调阅 只需简单三步搞定
    java中静态属性和静态方法的使用
    深入了解==和equals的区别
    (附源码)spring boot智能养老系统平台 毕业设计 170900
    Android APT
    怎么下载 jar 包
  • 原文地址:https://blog.csdn.net/handsomeasme/article/details/126585222