简易线程池实现
ThreadPool.hpp(线程池)
#ifndef THREADPOOL_HPP
#define THREADPOOL_HPP
#include
#include
#include
#include
#include
#include "sem.hpp"
#include "Mutex.hpp"
#include "Cond.hpp"
#include "thread.hpp"
#include "Task.hpp"
using namespace std;
const int g_thread_num = 3;
template
class ThreadPool
{
public:
ThreadPool(int num = g_thread_num)
:num_(num)
{
for(int i=1;i<=num;++i)
{
threads_.push_back(new Thread(i,routine,(void*)this));
}
}
static void* routine(void* args)
{
ThreadData* td = (ThreadData*)args;
ThreadPool* tp = (ThreadPool*)td->args_;
while(true)
{
T task;
{
lock_Guard lg(tp->getMutex());
while(tp->isEmpty()) tp->waitcond(lg);
task = tp->getTask();
//tp->cv_.notify_one();
}
task();
}
}
void run()
{
for(auto iter : threads_)
{
iter->start();
}
}
void pushTask(const T& task)
{
sleep(2);
lock_Guard lg(mtx_);
// while(task_queue_.size() >= num_)
// {
// cv_.Wait(lg);
// }
task_queue_.push(task);
cout<<"push: "<join();
delete iter;
}
}
Mutex& getMutex() {return mtx_;}
bool isEmpty() {return task_queue_.empty();}
void waitcond(lock_Guard& lg) {cv_.Wait(lg);}
private:
vector threads_;
int num_;
queue task_queue_;
Mutex mtx_;
Condition_variable cv_;
};
#endif

- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
thread.hpp(封装线程)
#ifndef THREAD_HPP
#define THREAD_HPP
#include
#include
#include
#include"ThreadPool.hpp"
using namespace std;
typedef void*(*fun_t)(void*);
class ThreadData
{
public:
void* args_;
string name_;
};
class Thread
{
public:
Thread(int num,fun_t callback,void* args)
:func_(callback)
{
char namebuffer[64];
snprintf(namebuffer,sizeof namebuffer,"thread-%d",num);
tdata_.name_ = namebuffer;
tdata_.args_ = args;
}
void start()
{
pthread_create(&tid_,nullptr,func_,(void*)&tdata_);
}
void join()
{
pthread_join(tid_,nullptr);
}
~Thread()
{
}
private:
pthread_t tid_;
fun_t func_;
ThreadData tdata_;
};
#endif
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
Task.hpp(任务)
#ifndef TASK_HPP
#define TASK_HPP
#include
class Task
{
public:
Task() {}
Task(int x, int y)
: x_(x), y_(y)
{
}
int operator()()
{
std::cout<<"recive and do: "<

- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
Mutex.hpp(封装锁)
#include
#include
#ifndef MUTEX_HPP
#define MUTEX_HPP
class Mutex
{
public:
//Mutex(pthread_mutex_t* mutex)
//:mutex_(mutex)
Mutex()
{
pthread_mutex_init(&mutex_, nullptr);
}
void lock()
{
pthread_mutex_lock(&mutex_);
}
void unlock()
{
pthread_mutex_unlock(&mutex_);
}
pthread_mutex_t* getMutex()
{
return &mutex_;
}
~Mutex()
{
pthread_mutex_destroy(&mutex_);
}
Mutex(const Mutex& mtx) = delete;
Mutex& operator=(const Mutex& mtx) = delete;
private:
pthread_mutex_t mutex_;
};
class lock_Guard
{
public:
lock_Guard(Mutex& mutex)
:mutex_(mutex)
{
mutex_.lock();
}
~lock_Guard()
{
mutex_.unlock();
}
pthread_mutex_t* getMutex()
{
return mutex_.getMutex();
}
lock_Guard(const lock_Guard& lg) = delete;
lock_Guard& operator=(const lock_Guard& lg) = delete;
private:
Mutex& mutex_;
};
#endif
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
Cond.hpp(封装条件变量)
#include
#include
#include "Mutex.hpp"
class Condition_variable
{
public:
Condition_variable()
{
pthread_cond_init(&cond_,nullptr);
}
void Wait(lock_Guard& lg)
{
pthread_cond_wait(&cond_,lg.getMutex());
}
void notify_one()
{
pthread_cond_signal(&cond_);
}
void notify_all()
{
pthread_cond_broadcast(&cond_);
}
~Condition_variable()
{
pthread_cond_destroy(&cond_);
}
private:
pthread_cond_t cond_;
};
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
sem.hpp(封装信号量)
#include
#include
using namespace std;
class Sem
{
public:
Sem(int value)
{
sem_init(&sem_,0,value);
}
void P()
{
sem_wait(&sem_);
}
void V()
{
sem_post(&sem_);
}
~Sem()
{
sem_destroy(&sem_);
}
private:
sem_t sem_;
};
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
main.cc
#include "ThreadPool.hpp"
#include
#define CUSTOMERSIZE 2
#define PRODUCTORSIZE 1
int main()
{
srand((unsigned long)time(nullptr) ^ getpid());
ThreadPool *tp = new ThreadPool();
tp->run();
while(true)
{
int x = rand()%100+1;
int y = rand()%100+1;
Task t(x,y);
tp->pushTask(t);
}
return 0;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
makefile
threadpool:main.cc
g++ -o $@ $^ -std=c++11 -l pthread
.PHONY:clean
clean:
rm -f threadpool