• 【Linux】线程池


    1.线程池概念

    线程池是一种线程使用模式。

    线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线
    程,等待着监督管理者分配可并发执行的任务。

    2.线程池的优点

    • 线程池避免了在处理短时间任务时创建与销毁线程的代价。
    • 程池不仅能够保证内核的充分利用,还能防止过分调度。

    可用线程的数量取决于可用的并发处理器、处理器内核、内存、网络、sockets等。

    3.线程池的应用场景

    (1).需要大量的线程来完成任务,且完成任务的时间比较短。

    比如:WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大 。

    (2). 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。

    (3).接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。

    突发性大量客户请求,在没有线程池情况下,将产生大量线程,短时间内产生大量线程可能使内存到达极限。

    4.线程池的实现

    线程池对外暴露一个接口push接口,用于任务的加入。

    图示:

    image-20221123154359756

    threadpool.hpp

    实现线程模板:

    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    using namespace std;
    
    
    const int Thread_num=10;
    template <class T>
    class threadpool{
    private:
        threadpool(const int num=Thread_num):threadnum(num)
        {
            assert(threadnum>0);
            isrunning=false;
            pthread_cond_init(&cond_,nullptr);
            pthread_mutex_init(&mutex_,nullptr);
        }
        threadpool(const threadpool<T> &)=delete;    //拷贝构造
        threadpool<T>& operator=(const threadpool<T>&)=delete;
    public:
        ~threadpool(){
            pthread_mutex_destroy(&mutex_);
            pthread_cond_destroy(&cond_);
        }
        static threadpool<T>*getInstance(){
            static pthread_mutex_t mutex_static;
            if(instance==nullptr){
                //判断是否需要创建线程池
                pthread_mutex_lock(&mutex_static);
                if(instance==nullptr){
                    instance=new threadpool<T>();
                }
                pthread_mutex_unlock(&mutex_static);
            }
            pthread_mutex_destroy(&mutex_static);
            return instance;
        }
        //类内函数默认第一个参数是this指针,而由于线程的执行函数只有一个参数,所以设置为静态函数,否则第一个参数被this占用
        static void* threadroutine(void* arg){
            pthread_detach(pthread_self()); //分离线程
            threadpool<T>* pool=static_cast<threadpool<T>* >(arg);
            prctl(PR_SET_NAME, "follower");
            //线程不断的获取任务,并执行
            while (true)
            {
                pool->lockQueue();
                //判断是否为空
                //为空则等待唤醒
                //不为空,取任务执行
                while (!pool->haveTask())
                {
                    pool->waitForTask();
                }
                T t=pool->pop();
                int em1,em2;
                char op;
                t.get(&em1,&em2,&op);
                cout << "consumer[" << pthread_self() << "] " << (unsigned long)time(nullptr) 
                << " 消费了一个任务: " << em1 << op << em2 << "=" << t() << endl;
                pool->unlockQueue();
            }
            return nullptr;
        }
        void start(){
            assert(!isrunning);
            for(int i=0;i<threadnum;i++){
                pthread_t tid;
                pthread_create(&tid,nullptr,threadroutine,(void*)instance);
            }
        }
    
        void push(const T& t)
        {
            lockQueue();
            workqueue_.push(t);
            choiceThreadForHandler();
            unlockQueue();
        }
    private:
        void lockQueue() { pthread_mutex_lock(&mutex_); }
        void unlockQueue() { pthread_mutex_unlock(&mutex_); }
        bool haveTask() { return !workqueue_.empty(); }
        void waitForTask() { pthread_cond_wait(&cond_, &mutex_); }
        void choiceThreadForHandler() { pthread_cond_signal(&cond_); }
        T pop()
        {
            T temp =workqueue_.front();
            workqueue_.pop();
            return temp;
        }
        queue<T> workqueue_;    //工作队列
        int threadnum;  //线程数量
        pthread_mutex_t mutex_; 
        pthread_cond_t cond_;
        bool isrunning; //判断线程池是否允许
        static threadpool<T>* instance;
    };
    
    template<class T>
    threadpool<T>* threadpool<T>::instance=nullptr;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105

    注意点:

    • 当某线程被唤醒时,其可能是被异常或是伪唤醒,或者是一些广播类的唤醒线程操作而导致所有线程被唤醒,使得在被唤醒的若干线程中,只有个别线程能拿到任务。此时应该让被唤醒的线程再次判断是否满足被唤醒条件,所以在判断任务队列是否为空时,应该使用while进行判断,而不是if。
    • pthread_cond_broadcast函数的作用是唤醒条件变量下的所有线程,而外部可能只Push了一个任务,我们却把全部在等待的线程都唤醒了,此时这些线程就都会去任务队列获取任务,但最终只有一个线程能得到任务。一瞬间唤醒大量的线程可能会导致系统震荡。这个现象也叫做惊群效应

    为什么线程函数要设置为静态类型?

    • 类内函数默认第一个参数是this指针,而由于线程的执行函数只有一个参数,所以设置为静态函数,否则第一个参数被this占用
    • 静态成员函数属于类,而不属于某个对象,也就是说静态成员函数是没有隐藏的this指针的,因此我们需要将Routine设置为静态方法,此时Routine函数才真正只有一个参数类型为void*的参数。

    测试文件

    #include "threadpool.hpp"
    #include "task.hpp"
    #include 
    #include 
    
    const std::string ops = "+-*/%";
    int main()
    {
    
        prctl(PR_SET_NAME, "main");
        //使用智能指针
        unique_ptr<threadpool<Task>>pool(threadpool<Task>::getInstance());
        pool->start();
        //生产任务
        srand((unsigned long)time(nullptr) ^ getpid() ^ pthread_self());
        while (true)
        {
            int em1=rand()%100,em2=rand()%30;
            char op=ops[rand()%4];
            Task t(em1,em2,op);
            pool->push(t);
            cout << "producter[" << pthread_self() << "] " << (unsigned long)time(nullptr) 
            << " 生产了一个任务: " << em1 << op << em2 << "=?" << endl;
            sleep(1);
        }
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    使用下面的指令对轻量级线程进行监控:

    while :; ps -aL|grep -1&&ps -aL|grep threadpool_test|grep -v grep;echo "#######";sleep 1;done
    
    • 1

    执行结果:

    image-20221123172208398

    5.STL和智能指针和线程安全

    STL中的容器不是线程安全的

    • STL 的设计初衷是将性能挖掘到极致, 而一旦涉及到加锁保证线程安全, 会对性能造成巨大的影响。
    • 而且对于不同的容器, 加锁方式的不同, 性能可能也不同(例如hash表的锁表和锁桶).
    • 因此 STL 默认不是线程安全. 如果需要在多线程环境下使用, 往往需要调用者自行保证线程安全

    智能指针是否是线程安全的?

    • 对于 unique_ptr, 由于只是在当前代码块范围内生效, 因此不涉及线程安全问题.
    • 对于 shared_ptr, 多个对象需要共用一个引用计数变量, 所以会存在线程安全问题. 但是标准库实现的时候考虑到了这个问题, 基于原子操作(CAS)的方式保证 shared_ptr 能够高效, 原子的操作引用计数
    5.1其他常见锁
    • 悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,
      行锁等),当其他线程想要访问数据时,被阻塞挂起。
    • 乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS操作。
    • CAS操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试

    自旋锁和挂起等待锁

    • 自旋锁:轮询检测锁是否锁就绪

    挂起等待锁适合在临界区长时间允许占有锁的情况。而挂起等待锁适合在临界区运行时间短,等待锁时间短的情况。

    自旋锁的接口

    pthread_spin_init();
    与互斥锁的接口一样,只需要将mutex修改为spin即可
    
    • 1
    • 2
    5.2读写锁

    在编写多线程的时候,有一种情况是十分常见的。那就是,有些公共数据修改的机会比较少。相比较改写,它们读
    的机会反而高的多。

    通常而言,在读的过程中,往往伴随着查找的操作,中间耗时很长。给这种代码段加锁,会极大地降低我们程序的效率。而读写锁就可以解决读多写少的情况。

    image-20221123173603762

    • **注意:写独占,读共享,读锁优先级高 **

    接口

    int pthread_rwlockattr_setkind_np(pthread_rwlockattr_t *attr, int pref);
    /*
    pref 共有 3 种选择
    PTHREAD_RWLOCK_PREFER_READER_NP (默认设置) 读者优先,可能会导致写者饥饿情况
    PTHREAD_RWLOCK_PREFER_WRITER_NP 写者优先,目前有 BUG,导致表现行为和
    PTHREAD_RWLOCK_PREFER_READER_NP 一致
    PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP 写者优先,但写者不能递归加锁
    */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    初始化

    int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,const pthread_rwlockattr_t
    *restrict attr);
    
    • 1
    • 2

    销毁

    int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
    
    • 1

    **加锁和解锁 **

    int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
    int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
    int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
    
    • 1
    • 2
    • 3
  • 相关阅读:
    day17正则表达式作业
    大数据-之LibrA数据库系统告警处理(ALM-12054 证书文件失效)
    ORA-00904: “EQUIPMENTS0_“.“CREATETIME“: 标识符无效
    【夜读】自我提升的8个好习惯,迷茫时看一看
    java计算机毕业设计HTML5历史车轮—汴京网站源码+mysql数据库+系统+lw文档+部署
    javaScript贪吃蛇
    GitHub上狂揽62Kstars的程序员做饭指南
    7-8 HashSet存入自定义类对象
    WPS前骨干历时10年打造新型软件,Excel用户:我为此改用WPS
    AOP结合注解实现项目中接口调用情况监控
  • 原文地址:https://blog.csdn.net/qq_53893431/article/details/128005493