• 【C++模块实现】| 【09】线程模块及线程池的实现


    ========》以下全部代码查看《========

    索引

    【C++模块实现】| 【01】日志系统实现
    【C++模块实现】| 【02】日志系统优化
    【C++模块实现】| 【03】文件管理模块
    【C++模块实现】| 【04】配置模块
    【C++模块实现】| 【05】日志模块增加配置模块的功能
    【C++模块实现】| 【06】日志模块添加循环覆盖写文件功能
    【C++模块实现】| 【07】对于互斥、自旋锁、条件变量、信号量简介及封装

    该模块是从sylar服务器框架中学习的,以下将会对其进行总结以加深对该框架的理解;
    
    • 1

    ========》视频地址《========

    一、简介

    ========》Linux中的线程、互斥量、信号量的基本使用《========
    =========》线程、线程同步、线程安全《========

    二、代码实现

    以下实现采用POSIX 线程(pthread),基于pthread封装的一个thread类,线程执行函数static void* run(void* arg),通过
    传入std::function,执行函数主体,可通过std::bind对函数的参数进行绑定;
    可设置线程名称,构造函数中提供名称参数,pthread_setname_np(pthread_self(), thread->m_name.substr(0, 15).c_str());
    使用该函数设置指定线程名称;
    
    • 1
    • 2
    • 3
    • 4
    
    /**
     * @brief 线程类
     */
    class Thread : Noncopyable {
    public:
        /// 线程智能指针类型
        typedef std::shared_ptr<Thread> ptr;
    
        /**
         * @brief 构造函数
         * @param[in] cb 线程执行函数
         * @param[in] name 线程名称
         */
        Thread(std::function<void()> cb, const std::string& name);
    
    //    Thread();
    
    //    void add_task(std::function cb);
    
        /**
         * @brief 析构函数
         */
        ~Thread();
    
        /**
         * @brief 线程ID
         */
        pid_t getId() const { return m_id;}
    
        /**
         * @brief 线程名称
         */
        const std::string& getName() const { return m_name;}
    
        /**
         * @brief 等待线程执行完成
         */
        void join();
    
        /**
         * @brief 获取当前的线程指针
         */
        static Thread* GetThis();
    
        /**
         * @brief 获取当前的线程名称
         */
        static const std::string& GetName();
    
        /**
         * @brief 设置当前线程名称
         * @param[in] name 线程名称
         */
        static void SetName(const std::string& name);
    private:
    
        /**
         * @brief 线程执行函数
         */
        static void* run(void* arg);
    
        /**
         * @brief 线程执行函数
         */
    //    static void* call_run(void* arg);
    
    private:
        /// 线程id
        pid_t m_id = -1;
        /// 线程结构
        pthread_t m_thread = 0;
        /// 线程执行函数
        std::function<void()> m_cb;
        /// 线程名称
        std::string m_name;
        /// 信号量
        Semaphore m_semaphore;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    提供两个线程局部变量,可通过Get、Set方法对其进行设置和获取;
    
    • 1
    static thread_local Thread* t_thread = nullptr;
    static thread_local std::string t_thread_name = "UNKNOW";
    
    static sylar::Logger::ptr g_logger = SYLAR_LOG_NAME("system");
    
    Thread* Thread::GetThis() {
        return t_thread;
    }
    
    const std::string& Thread::GetName() {
        return t_thread_name;
    }
    
    void Thread::SetName(const std::string& name) {
        if(name.empty()) {
            return;
        }
        if(t_thread) {
            t_thread->m_name = name;
        }
        t_thread_name = name;
    }
    
    Thread::Thread(std::function<void()> cb, const std::string& name)
        :m_cb(cb)
        ,m_name(name) {
        if(name.empty()) {
            m_name = "UNKNOW";
        }
        int rt = pthread_create(&m_thread, nullptr, &Thread::run, this);
        if(rt) {
            SYLAR_LOG_ERROR(g_logger) << "pthread_create thread fail, rt=" << rt
                << " name=" << name;
            throw std::logic_error("pthread_create error");
        }
        m_semaphore.wait();
    }
    
    Thread::~Thread() {
        if(m_thread) {
            pthread_detach(m_thread);
        }
    }
    
    void Thread::join() {
        if(m_thread) {
            int rt = pthread_join(m_thread, nullptr);
            if(rt) {
                SYLAR_LOG_ERROR(g_logger) << "pthread_join thread fail, rt=" << rt
                    << " name=" << m_name;
                throw std::logic_error("pthread_join error");
            }
            m_thread = 0;
        }
    }
    
    void* Thread::run(void* arg) {
        Thread* thread = (Thread*)arg;
        t_thread = thread;
        t_thread_name = thread->m_name;
        thread->m_id = sylar::GetThreadId();
        pthread_setname_np(pthread_self(), thread->m_name.substr(0, 15).c_str());
    
        std::function<void()> cb;
        cb.swap(thread->m_cb);
    
        thread->m_semaphore.notify();
    
        cb();
        return 0;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72

    三、测试

    void fun2() {
            SYLAR_LOG_INFO(g_logger) << "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx";
    }
    
    void fun3() {
            SYLAR_LOG_INFO(g_logger) << "========================================";
    }
    
    void test() {
        SYLAR_LOG_INFO(g_logger) << "thread test begin";
    
        std::vector<sylar::Thread::ptr> thrs;
        for(int i = 0; i < 100; ++i) {
            sylar::Thread::ptr thr(new sylar::Thread(&fun2, "name_" + std::to_string(i * 2)));
            sylar::Thread::ptr thr2(new sylar::Thread(&fun3, "name_" + std::to_string(i * 2 + 1)));
            thrs.push_back(thr);
            thrs.push_back(thr2);
        }
    
        for(size_t i = 0; i < thrs.size(); ++i) {
            thrs[i]->join();
        }
        SYLAR_LOG_INFO(g_logger) << "thread test end";
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    在这里插入图片描述

    四、线程池

    class ThreadPool {
    public:
        typedef std::function<void()> task_type;
        ThreadPool(int count=10);
        ~ThreadPool();
        void add_task(task_type task);
    
        bool IsTaskEmpty() const { return !m_tasks.empty(); }
        void run();
        void stop();
    
    private:
        std::queue<task_type> m_tasks;
        std::vector<Thread::ptr> m_threads;
        int m_threadNums;
        bool m_isRunning;
    
        OwnSemaphore* m_sem;
    };
    
    
    ThreadPool::ThreadPool(int count)
        :m_threadNums(count) {
        m_isRunning = true;
        m_sem = new OwnSemaphore(10);
        for(int i=0; i<count; ++i) {
            sylar::Thread::ptr thr(new Thread(std::bind(&ThreadPool::run, this), "test_" + std::to_string(i)));
            m_threads.push_back(thr);
        }
    }
    
    ThreadPool::~ThreadPool() {
        if(m_isRunning) {
            stop();
        }
    }
    
    void ThreadPool::add_task(task_type task) {
        m_tasks.push(task);
        m_sem->notify();
    }
    
    void ThreadPool::stop() {
        m_isRunning = false;
        for (int i = 0; i < m_threadNums; i++)
        {
            m_threads[i]->join();
        }
    }
    
    void ThreadPool::run() {
        task_type task = nullptr;
        while (m_isRunning)
        {
            while (m_tasks.empty())
            {
                if(!m_isRunning) {
                    SYLAR_LOG_INFO(g_logger) << "stop";
                    break;
                }
                m_sem->wait();
            }
    
            task_type task = m_tasks.front();
            m_tasks.pop();
    
            if (!task)
            {
                SYLAR_LOG_INFO(g_logger) << "thread " << pthread_self() << "will exit";
                return;
            }
            task();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    五、测试

    void test_pool() {
        sylar::ThreadPool threadPool(10);
        for(int i = 0; i < 20; i++)
        {
            threadPool.add_task([]{
                SYLAR_LOG_INFO(g_logger) << "xxxx" << pthread_self();
                sleep(2);
                return 0;
            });
        }
    
        while (1) {
    //        sleep(1);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    在这里插入图片描述

  • 相关阅读:
    【Nginx】Windows平台下配置Nginx服务实现负载均衡
    Golang | Leetcode Golang题解之第169题多数元素
    金仓数据库KingbaseES物理备份恢复最佳实践(数据恢复解决方案)
    496.下一个更大的元素I
    算法通关村第一关-链表白银经典问题笔记
    基苯乙烯微球支载L-脯氨酸磺酰胺/水滑石@磺化聚苯乙烯微球/石蜡聚苯乙烯微球的制备
    二叉搜索树(Binary Search Tree,BST)
    Java_接口使用实例
    爬虫实现自己的翻译服务器
    软件测试/测试开发丨接口自动化测试学习笔记,整体结构响应断言
  • 原文地址:https://blog.csdn.net/weixin_45926547/article/details/126178503