【C++模块实现】| 【01】日志系统实现
【C++模块实现】| 【02】日志系统优化
【C++模块实现】| 【03】文件管理模块
【C++模块实现】| 【04】配置模块
【C++模块实现】| 【05】日志模块增加配置模块的功能
【C++模块实现】| 【06】日志模块添加循环覆盖写文件功能
【C++模块实现】| 【07】对于互斥、自旋锁、条件变量、信号量简介及封装
该模块是从sylar服务器框架中学习的,以下将会对其进行总结以加深对该框架的理解;
========》Linux中的线程、互斥量、信号量的基本使用《========
=========》线程、线程同步、线程安全《========
以下实现采用POSIX 线程(pthread),基于pthread封装的一个thread类,线程执行函数static void* run(void* arg),通过
传入std::function,执行函数主体,可通过std::bind对函数的参数进行绑定;
可设置线程名称,构造函数中提供名称参数,pthread_setname_np(pthread_self(), thread->m_name.substr(0, 15).c_str());
使用该函数设置指定线程名称;
/**
* @brief 线程类
*/
class Thread : Noncopyable {
public:
/// 线程智能指针类型
typedef std::shared_ptr<Thread> ptr;
/**
* @brief 构造函数
* @param[in] cb 线程执行函数
* @param[in] name 线程名称
*/
Thread(std::function<void()> cb, const std::string& name);
// Thread();
// void add_task(std::function cb);
/**
* @brief 析构函数
*/
~Thread();
/**
* @brief 线程ID
*/
pid_t getId() const { return m_id;}
/**
* @brief 线程名称
*/
const std::string& getName() const { return m_name;}
/**
* @brief 等待线程执行完成
*/
void join();
/**
* @brief 获取当前的线程指针
*/
static Thread* GetThis();
/**
* @brief 获取当前的线程名称
*/
static const std::string& GetName();
/**
* @brief 设置当前线程名称
* @param[in] name 线程名称
*/
static void SetName(const std::string& name);
private:
/**
* @brief 线程执行函数
*/
static void* run(void* arg);
/**
* @brief 线程执行函数
*/
// static void* call_run(void* arg);
private:
/// 线程id
pid_t m_id = -1;
/// 线程结构
pthread_t m_thread = 0;
/// 线程执行函数
std::function<void()> m_cb;
/// 线程名称
std::string m_name;
/// 信号量
Semaphore m_semaphore;
};
提供两个线程局部变量,可通过Get、Set方法对其进行设置和获取;
static thread_local Thread* t_thread = nullptr;
static thread_local std::string t_thread_name = "UNKNOW";
static sylar::Logger::ptr g_logger = SYLAR_LOG_NAME("system");
Thread* Thread::GetThis() {
return t_thread;
}
const std::string& Thread::GetName() {
return t_thread_name;
}
void Thread::SetName(const std::string& name) {
if(name.empty()) {
return;
}
if(t_thread) {
t_thread->m_name = name;
}
t_thread_name = name;
}
Thread::Thread(std::function<void()> cb, const std::string& name)
:m_cb(cb)
,m_name(name) {
if(name.empty()) {
m_name = "UNKNOW";
}
int rt = pthread_create(&m_thread, nullptr, &Thread::run, this);
if(rt) {
SYLAR_LOG_ERROR(g_logger) << "pthread_create thread fail, rt=" << rt
<< " name=" << name;
throw std::logic_error("pthread_create error");
}
m_semaphore.wait();
}
Thread::~Thread() {
if(m_thread) {
pthread_detach(m_thread);
}
}
void Thread::join() {
if(m_thread) {
int rt = pthread_join(m_thread, nullptr);
if(rt) {
SYLAR_LOG_ERROR(g_logger) << "pthread_join thread fail, rt=" << rt
<< " name=" << m_name;
throw std::logic_error("pthread_join error");
}
m_thread = 0;
}
}
void* Thread::run(void* arg) {
Thread* thread = (Thread*)arg;
t_thread = thread;
t_thread_name = thread->m_name;
thread->m_id = sylar::GetThreadId();
pthread_setname_np(pthread_self(), thread->m_name.substr(0, 15).c_str());
std::function<void()> cb;
cb.swap(thread->m_cb);
thread->m_semaphore.notify();
cb();
return 0;
}
void fun2() {
SYLAR_LOG_INFO(g_logger) << "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx";
}
void fun3() {
SYLAR_LOG_INFO(g_logger) << "========================================";
}
void test() {
SYLAR_LOG_INFO(g_logger) << "thread test begin";
std::vector<sylar::Thread::ptr> thrs;
for(int i = 0; i < 100; ++i) {
sylar::Thread::ptr thr(new sylar::Thread(&fun2, "name_" + std::to_string(i * 2)));
sylar::Thread::ptr thr2(new sylar::Thread(&fun3, "name_" + std::to_string(i * 2 + 1)));
thrs.push_back(thr);
thrs.push_back(thr2);
}
for(size_t i = 0; i < thrs.size(); ++i) {
thrs[i]->join();
}
SYLAR_LOG_INFO(g_logger) << "thread test end";
}

class ThreadPool {
public:
typedef std::function<void()> task_type;
ThreadPool(int count=10);
~ThreadPool();
void add_task(task_type task);
bool IsTaskEmpty() const { return !m_tasks.empty(); }
void run();
void stop();
private:
std::queue<task_type> m_tasks;
std::vector<Thread::ptr> m_threads;
int m_threadNums;
bool m_isRunning;
OwnSemaphore* m_sem;
};
ThreadPool::ThreadPool(int count)
:m_threadNums(count) {
m_isRunning = true;
m_sem = new OwnSemaphore(10);
for(int i=0; i<count; ++i) {
sylar::Thread::ptr thr(new Thread(std::bind(&ThreadPool::run, this), "test_" + std::to_string(i)));
m_threads.push_back(thr);
}
}
ThreadPool::~ThreadPool() {
if(m_isRunning) {
stop();
}
}
void ThreadPool::add_task(task_type task) {
m_tasks.push(task);
m_sem->notify();
}
void ThreadPool::stop() {
m_isRunning = false;
for (int i = 0; i < m_threadNums; i++)
{
m_threads[i]->join();
}
}
void ThreadPool::run() {
task_type task = nullptr;
while (m_isRunning)
{
while (m_tasks.empty())
{
if(!m_isRunning) {
SYLAR_LOG_INFO(g_logger) << "stop";
break;
}
m_sem->wait();
}
task_type task = m_tasks.front();
m_tasks.pop();
if (!task)
{
SYLAR_LOG_INFO(g_logger) << "thread " << pthread_self() << "will exit";
return;
}
task();
}
}
void test_pool() {
sylar::ThreadPool threadPool(10);
for(int i = 0; i < 20; i++)
{
threadPool.add_task([]{
SYLAR_LOG_INFO(g_logger) << "xxxx" << pthread_self();
sleep(2);
return 0;
});
}
while (1) {
// sleep(1);
}
}
