• 【Linux】线程池&读写锁


    线程池

    应用场景

    线程池不仅要提高程序运行效率,还要提高程序处理业务的种类,提高程序运行效率自然要创建多个线程

    说到提高业务的种类,应该不难想到switch case语句结合if语句来实现,示例如下:

    在这里插入图片描述

    这种实现不同业务的方式比较简单,但是相对也很有局限性,如果业务种类比较多,这种分支语句就不适用了。

    一个线程被创建之后,只能执行一个线程入口函数,后续是没有办法更改的,基于这种场景,线程可能执行的代码也就是固定了。换句话说即使线程入口函数当中有很多分支,但是相对来说线程执行的路线都是固定的,要么时A分支,要么时B分支,要么是C分支。这里的分支是指类似if,else语句。这里如果我们要是在后续再想新增加新的业务判断逻辑,那就只能在原有线程入口函数进行增加写代码,这样就会导致一个线程入口函数的代码愈来愈多。那么如果代码的耦合性过高,只要一个地方出现错误,我们查找bug时就会十分头疼。
    所以为了能让线程执行不同的业务代码,就要考虑线程从队列中获取元素的身上下功夫。让线程可以通过线程元素来执行不同的代码。

    线程池原理

    线程池原理 = 一堆线程 + 线程安全的队列

    在这里插入图片描述

    构造线程池

    创建固定数量的线程池,循环从任务队列中获取任务对象
    获取到任务对象之后执行任务对象的任务接口

    代码实现

    #include
    #include
    #include
    #include
    using namespace std;
    
    //定义队列元素的类型
    //   数据
    //   处理数据的方法
    
    typedef void(*Handler)(int data);
    //创建一个新类型,Handler是一个指向没有返回值,接收一个整形参数的函数指针
    
    class QueueData{
    public:
      QueueData(){
    
      }
      QueueData(int data, Handler handler){
        data_ = data;
        handler_ = handler;
      }
    
      //如何通过函数处理数据
      void Run(){
        handler_(data_);
      }
    private:
      int data_;//要处理的数据
      Handler handler_;
      //处理数据的函数(要保存一个函数的地址)
      //Handler 函数指针
      //handler 函数指针变量,保存函数地址
    };
    
    //线程安全队列
    //互斥 + 同步
    //条件变量
    class SafeQueue{
    public:
      SafeQueue(){
        capacity_ = 1;
        pthread_mutex_init(&lock_, NULL);
        pthread_cond_init(&prod_cond_, NULL);
        pthread_cond_init(&cons_cond_, NULL);
      }
      ~SafeQueue(){
        pthread_mutex_destroy(&lock_);
        pthread_cond_destroy(&prod_cond_);
        pthread_cond_destroy(&cons_cond_);
      }
      //入队函数
      void Push(QueueData& data){
        pthread_mutex_lock(&lock_);
        while(que_.size() >= capacity_){
          pthread_cond_wait(&prod_cond_, &lock_);
        }
        que_.push(data);
        pthread_mutex_unlock(&lock_);
        pthread_cond_signal(&cons_cond_);
      }
    
      //出队函数
      void Pop(QueueData* data, int flag_exit){
        pthread_mutex_lock(&lock_);
        while(que_.empty()){
          if(flag_exit == 1){
            pthread_mutex_unlock(&lock_);
            pthread_exit(NULL);
          }
          pthread_cond_wait(&cons_cond_, &lock_);
          //条件变量等待函数,将调用该函数的线程放到PCB等待队列中
          //lock_:该线程等待的互斥锁
        }
        *data = que_.front();
        que_.pop();
        pthread_mutex_unlock(&lock_);
        pthread_cond_signal(&prod_cond_);
      }
    
      void BroadcaseAllConsume(){
        pthread_cond_broadcast(&cons_cond_);
      }
    private:
      queue<QueueData> que_;
      size_t capacity_;
      //互斥锁
      pthread_mutex_t lock_;
    
      //同步
      pthread_cond_t prod_cond_;
      //生产者的条件变量
      pthread_cond_t cons_cond_;
      //消费者的条件变量
      //线程池中的一堆线程,在逻辑上可以认为是消费者
    };
    
    //线程池:一堆线程 + 线程安全的队列
    class ThreadPool{
    public:
      ThreadPool(){
      }
      ~ThreadPool(){
        if(sq_ != NULL){
          delete sq_;
        }
      }
      int InitThreadPool(int thread_count){
        //0:线程继续运行
        //1:线程退出
        flag_exit_ = 0;
        sq_ = new SafeQueue;
        if(sq_ == NULL){
          printf("Init thread pool failed\n");
          return -1;
        }
        thread_count_ = thread_count;
        
        for(int i=0; i<thread_count_; ++i){
          pthread_t tid;
          int ret = pthread_create(&tid, NULL, worker_start, (void*)this);
          if(ret < 0){
            thread_count--;
            continue;
          }
        }
        //根据目前线程的数量来判断创建线程是否成功
        if(thread_count_ <= 0){
          printf("create thread all failed\n");
          return -1;
        } 
        return 0;//初始化成功
      }
    
      //线程池的使用接口
      //只需要给使用者提供Push接口,让使用者可以将线程push到队列中就好
      //而pop接口不需要提供,因为线程池中的线程可以自己调用到
      void Push(QueueData& qd){
        sq_->Push(qd);
      }
    
      //线程入口函数
      static void* worker_start(void* arg){
        pthread_detach(pthread_self());
        //从队列当中拿元素
        //处理元素
        ThreadPool* tp = (ThreadPool*)arg;
        while(1){
          QueueData qd;
          tp->sq_->Pop(&qd, tp->flag_exit_);
          qd.Run();
        }
      }
      void thread_pool_exit(){
        flag_exit_ = 1;
        sq_->BroadcaseAllConsume();
      }
    private:
      //线程安全的队列
      SafeQueue* sq_;
      //线程池中线程的数量
      int thread_count_;
      //标志线程是否退出的标志位
      int flag_exit_;
    };
    
    void Deal1(int data){
      printf("I am Deal1, i deal %d\n", data);
    }
    
    void Deal2(int data){
      printf("hello! I am Deal2, i deal %d\n", data);
    }
    
    int main(){
      //创建线程池
      //往线程池中放数据
      ThreadPool tp;
      int ret = tp.InitThreadPool(2);
      if(ret < 0) return 0;
      for(int i=0; i<100; ++i){
        QueueData qd(i, Deal2);
        tp.Push(qd);
      }
      tp.thread_pool_exit();
      while(1){
        sleep(1);
      }
      return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190

    读写锁

    应用场景

    • 大量读取,少量写的场景
    • 允许多个线程并行读,多个线程互斥写

    读写锁的三种状态

    • 以读模式加锁的状态
    • 以写模式加锁的状态
    • 不加锁的状态

    读写锁的接口

    初始化接口

    int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,const pthread_rwlockattr_t *restrict attr)
    
    • 1

    参数:

    • pthread_rwlock_t:读写锁的类型,rwlock:传递读写锁
    • attr:NULL,默认属性

    销毁接口

    int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
    
    • 1

    以读模式加锁

    int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
    //阻塞接口
    
    • 1
    • 2

    允许多个线程以并行已读模式获取读写锁

    引用计数:用来记录当前读写锁有多少个线程以读模式获取了读写锁

    1.每当有线程以读模式进行加锁,引用计数++;
    2.每当读模式的线程释放锁,引用计数–;

    引用计数的作用,当引用计数为0时,那么证明当前没有线程在进行读取操作,那么写的线程就可以获取到这把读写锁进行写。

    int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
    //非阻塞接口
    
    • 1
    • 2

    以写模式加锁

    int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);//非阻塞接口
    int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);//阻塞接口
    
    • 1
    • 2

    解锁接口

    int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);//解锁接口
    
    • 1

    常见问题

    现在如果有线程A和线程B在对一个资源进行读取,此时来了一个线程C要以写的方式获取这把读写锁,然后又来了一个线程D要进行读取,这里问线程C要不要等待线程D也读取完之后再对资源进行写?

    这里读写锁的内部是有个机制的:如果读写锁已经在读模式打开了,有一个线程A想要以写模式打开获取读写锁,则需要等待,如果在等待期间,又来了读模式加锁的线程,那读模式的线程要等待写线程先执行完再说,因为如果这时读取的线程可以获取这把锁,读写锁本来就是大量读少量写的使用场景,那么就会导致写的线程一直拿不到这把锁,这是不合理的。线程就会饥饿。

    乐观锁/悲观锁

    乐观锁

    针对某个线程访问临界区修改数据的时候,乐观锁认为只有该线程在修改,大概率不会存在并行的情况,所以修改数据不加锁,但是,在修改完毕,进行更新的时候,进行判断,例如:版本号控制、CAS无锁编程

    悲观锁

    针对某个线程访问临界区修改数据的时候,都会认为可能有其他线程并行修改的情况发生,所以在线程修改数据之前就进行加锁,让多个线程互斥访问。悲观锁有:互斥锁、读写锁、自旋锁等

    自旋锁

    自旋锁 (busy-waiting类型) 和互斥锁 (sleep-waiting类型)的区别:

    1.自旋锁加锁时,加不到锁,线程不会切换 (时间片没有到的情况,时间片到了,也会线程切换),会持续的尝试拿锁, 直到拿到自旋锁
    2.互斥锁加锁时, 加不到锁,线程会切换(时间片没有到,也会切换),进入睡眠状态,当其他线程释放互斥锁(解锁)之后, 被唤醒。在切换回来,进行抢锁

    3.自旋锁的优点:因为自旋锁不会引起调用者睡眠,所以自旋锁的效率远高于互斥锁。
    4.自旋锁的缺点:自旋锁一直占用着CPU,他在未获得锁的情况下,一直运行(自旋),所以占用着CPU,如果不能在很短的时间内获得锁,这无疑会使CPU效率降低
    5.适用于临界区代码较短时(直白的说: 临界区代码执行时间短)的情况, 使用自旋锁效率比较高。因为线程不用来回切换
    6.当临界区当中执行时间较长, 自旋锁就不适用了, 因为拿不到锁会占用CPU一直抢占锁。

    自旋锁API:

    pthread_spin_init:初始化自旋锁

    int pthread_spin_init(pthread_spinlock_t *lock, int pshared);
    
    • 1

    pthread_spin_destroy:销毁自旋锁

    int pthread_spin_destroy(pthread_spinlock_t *lock);
    
    • 1

    pthread_spin_lock:尝试获取自旋锁,如果自旋锁已经被锁定,线程将自旋等待。

    int pthread_spin_lock(pthread_spinlock_t *lock);
    
    • 1

    pthread_spin_trylock:尝试获取自旋锁,如果自旋锁已经被锁定,它不会等待,而是立即返回

    int pthread_spin_trylock(pthread_spinlock_t *lock);
    
    • 1

    pthread_spin_unlock:释放自旋锁,允许其他线程获取它

    int pthread_spin_unlock(pthread_spinlock_t *lock);
    
    • 1
  • 相关阅读:
    leetcode91-解码方法
    Docker容器数据卷
    Python+Selenium做自动化测试
    Base64编码知识记录
    面试题:手动实现一个sizeof
    Python自动化测试selenium指定截图文件名方法
    基于JAVA医院患者管理系统计算机毕业设计源码+系统+数据库+lw文档+部署
    直播软件开发技巧:7个实时视频传输和弹幕功能的关键步骤
    AD20~PCB的板层设计和布线
    国产etl工具BeeDI 产品 之“数据联邦“ 经典功能组件
  • 原文地址:https://blog.csdn.net/weixin_56916549/article/details/132699818