• 【系统编程】线程池以及API接口简介



            线程池(Thread Pool)是一种并发编程的设计模式,它用于管理和复用线程,以便更有效地处理并发任务。线程池的主要目标是降低线程的创建和销毁成本,提高系统的性能和资源利用率

    一、API接口

            API(Application Programming Interface)接口是一组定义了软件组件之间如何互相通信和交互的规范和协议。API允许不同的软件模块、应用程序或系统之间共享功能和数据,从而实现各种复杂的任务和功能。

            我们在学习线程池之前要明白,线程池本身不是一个API接口,而是一种用于管理和执行任务的并发编程模型。然而,线程池通常会被包装在API接口中,以便其他开发人员可以更容易地使用它来执行并发任务。

            API接口通常定义了一组可用于与软件组件、服务或库进行交互的方法和函数。在这种情况下,如果创建了一个包含线程池的库,并提供了一组方法或函数来操作和管理线程池,那么这些方法或函数可以被视为API接口的一部分。

            例如,可以设计一个具有以下功能的线程池API接口:

    init_pool():初始化线程池
    add_task():向线程池添加任务
    add_thread():添加新的工作线程
    remove_thread():从线程池中删除工作线程
    destroy_pool():销毁线程池

          
            这些方法将构成线程池API接口的一部分,其他开发人员可以使用这些方法来实现并发任务执行,而无需了解线程池的内部工作原理。

            所以,线程池可以成为一个可用于构建API接口的组件,以简化并发编程任务的处理,这就是我们了解AIP接口的目的,当然知道这些还不足以写出一个线程池应用程序,我们还需先了解一下有关线程池的结构体。

    二、线程池相关的结构体

            线程池相关的结构体在线程池的设计中扮演着重要角色,用于管理线程池的状态、任务队列等信息。

    1.线程池结构体(Thread Pool)

            线程池结构体通常包含了线程池的各种属性和信息,用于管理线程池的整体状态。以下是一般线程池结构体的示例:

    1. typedef struct ThreadPool {
    2. pthread_mutex_t lock; // 互斥锁,保护线程池内部数据
    3. pthread_cond_t cond; // 条件变量,用于线程之间的同步
    4. bool shutdown; // 线程池销毁标志
    5. struct Task *task_list; // 任务链队列
    6. pthread_t *tids; // 存储线程ID的数组
    7. unsigned int waiting_tasks; // 等待执行的任务数量
    8. unsigned int active_threads; // 活跃线程数量
    9. } ThreadPool;
    • lockcond是用于线程同步的互斥锁和条件变量;
    • shutdown标志用于指示线程池是否正在销毁;
    • task_list是一个任务链队列,存储待执行的任务;
    • tids是存储线程ID的数组;
    • waiting_tasks记录等待执行的任务数量;
    • active_threads表示当前活跃线程数量。

    2.任务结构体(Task)

            任务结构体表示线程池中的任务,包含了任务函数指针和参数。以下是一个示例:

    1. //任务结构体
    2. struct task
    3. {
    4. void *(*task)(void *arg); //返回值为void *的函数指针,参数列表void *arg,表示任务的地址
    5. void *arg; //表示任务需要的参数
    6. struct task *next; //表示下一个任务的地址
    7. };
    • task 是指向任务函数的指针,任务函数接受一个void *参数;
    • arg 是传递给任务函数的参数;
    • next 是一个指向下一个任务的指针,用于构建任务队列。


            这些结构体协同工作,帮助线程池管理任务的执行和线程的管理。线程池结构体用于维护线程池的状态,任务结构体用于表示具体的任务。线程池中的线程会不断从任务队列中取出任务并执行,同时线程池负责管理线程的生命周期。

    三、线程池

            线程池(Thread Pool是一种并发编程的设计模式,它用于管理和复用线程,以便更有效地处理并发任务。线程池的主要目标是降低线程的创建和销毁成本,提高系统的性能和资源利用率

    1.基本原理


            (1)线程复用: 线程池在启动时创建一组线程,这些线程一直保持活动状态,可用于处理任务。线程复用消除了频繁创建和销毁线程的开销,这也是为什么我们要学习线程池的原因。

            (2)任务队列: 线程池通常包括一个任务队列,用于存储等待执行的任务(条件变量)。当任务到达时,线程池将任务放入队列,并从池中的空闲线程中选择一个来执行任务。

            (3)线程调度: 线程池负责调度任务并分配给空闲线程。一旦任务完成,线程将返回池中等待下一个任务。

            (4)线程池大小控制: 线程池的大小通常是有限的,可以根据系统资源和性能需求进行配置。这有助于避免创建过多线程,从而导致资源耗尽和性能下降,也就是做到合理利用资源。

    2.组成部分


            (1)线程池管理器(Thread Pool Manager): 负责创建、管理和监控线程池的核心组件。它维护线程池的状态,包括活动线程数、等待任务数等。

            (2)任务队列(Task Queue): 用于存储等待执行的任务。任务可以是函数、方法或对象,线程池从队列中取出任务并将其分配给可用线程。

            (3)工作线程(Worker Threads): 线程池中的线程,用于执行任务。这些线程在初始化时启动,并在完成任务后返回池中以供重用。

            (4)任务接口(Task Interface): 描述任务的接口或抽象类,通常包括一个run方法或函数,线程池根据此接口来执行任务。

    四、C语言实现线程池

    1. 初始化线程池

              用于初始化线程池,设置线程池的初始状态

    原型:

            bool init_pool(thread_pool * pool, unsigned int threads_number);

    参数:
            pool        线程池结构体指针,用于表示要初始化的线程池
            threads_number        指定线程池中的初始线程数量

    1. // 初始化线程池结构体里面的成员,根据传入的线程个数,创建线程
    2. bool init_pool(thread_pool *pool, unsigned int threads_number)
    3. {
    4. //初始化互斥锁
    5. pthread_mutex_init(&pool->lock, NULL);
    6. //初始化条件变量
    7. pthread_cond_init(&pool->cond, NULL);
    8. // 关闭销毁线程池标识
    9. pool->shutdown = false;
    10. // 任务队列头结点
    11. pool->task_list = malloc(sizeof(struct task));
    12. // 线程ID的指针申请空间
    13. pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS);
    14. // 判断任务队列头结点指针跟线程ID的指针是否申请成功
    15. if(pool->task_list == NULL || pool->tids == NULL)
    16. {
    17. perror("allocate memory error");
    18. return false;
    19. }
    20. //将任务链式队列的下一个节点的地址初始化
    21. pool->task_list->next = NULL;
    22. //初始化任务个数0
    23. pool->waiting_tasks = 0;
    24. //初始化活跃线程个数为传入的threads_number
    25. pool->active_threads = threads_number;
    26. int i;
    27. // 循环创建指定数目线程
    28. for(i = 0; i<pool->active_threads; i++)
    29. {
    30. //调用pthread_create函数创建线程,线程ID存放在pool->tids的数组里面
    31. if(pthread_create(&((pool->tids)[i]), NULL, routine, (void *)pool) != 0)
    32. {
    33. perror("create threads error");
    34. return false;
    35. }
    36. }
    37. return true;
    38. }

    2. 添加任务

            将一个任务添加到线程池的任务队列中,等待线程池的线程执行。

    原型:

            bool add_task(thread_pool *pool, void *(*do_task)(void *arg), void * arg);

    参数:
            pool        线程池结构体指针,表示要添加任务的线程池
            do_task        任务函数的指针,表示要执行的任务
            arg        任务函数需要的参数,可以是任何类型的数据

    1. // 往链式任务队列添加任务,单个唤醒线程去执行任务
    2. bool add_task(thread_pool *pool, void *(*task)(void *arg), void *arg)
    3. {
    4. // 新任务节点
    5. struct task *new_task = malloc(sizeof(struct task));
    6. //新任务节点创建失败
    7. if(new_task == NULL)
    8. {
    9. perror("allocate memory error");
    10. return false;
    11. }
    12. //新任务节点的函数指针做初始化
    13. new_task->task = task;
    14. //新任务节点的函数指针需要的参数做初始化
    15. new_task->arg = arg;
    16. //新任务节点的下一个节点的地址初始化为NULL
    17. new_task->next = NULL;
    18. // 访问任务队列前获取互斥锁,此处无需注册取消处理例程
    19. pthread_mutex_lock(&pool->lock);
    20. //如果 任务链队列里面任务个数 大于等于 最大任务个数
    21. if(pool->waiting_tasks >= MAX_WAITING_TASKS)
    22. {
    23. //解锁
    24. pthread_mutex_unlock(&pool->lock);
    25. fprintf(stderr, "too many tasks.\n");
    26. free(new_task);
    27. return false;
    28. }
    29. //拿链式队列的头节点指针
    30. struct task *tmp = pool->task_list;
    31. //循环遍历找到最后一个节点的地址
    32. while(tmp->next != NULL)
    33. {
    34. tmp = tmp->next;
    35. }
    36. // 添加新的任务节点
    37. tmp->next = new_task;
    38. // 等待任务个数+1
    39. pool->waiting_tasks++;
    40. // 释放互斥锁
    41. pthread_mutex_unlock(&pool->lock);
    42. // 并唤醒其中一个阻塞在条件变量上的线程
    43. pthread_cond_signal(&pool->cond);
    44. return true;
    45. }

    3. 添加活跃线程

            向线程池中添加额外的活跃线程,增加线程池的处理能力。

    原型:

            int add_thread(thread_pool *pool, unsigned int additional_threads);

    参数:
            pool        线程池结构体指针,表示要添加线程的线程池        
            additional_threads        指定要添加的额外线程数量        

    1. // 根据传入的数量创建线程
    2. int add_thread(thread_pool *pool, unsigned additional_threads)
    3. {
    4. //添加活跃线程个数为0,不需要往后执行了
    5. if(additional_threads == 0)
    6. {
    7. return 0;
    8. }
    9. // 定义一个变量total_threads = 当前活跃线程个数 + 添加活跃线程个数
    10. unsigned total_threads = pool->active_threads + additional_threads;
    11. int i, actual_increment = 0;
    12. // 循环地创建若干指定数目的线程
    13. for(i = pool->active_threads; i < total_threads && i < MAX_ACTIVE_THREADS; i++)
    14. {
    15. if(pthread_create(&((pool->tids)[i]), NULL, routine, (void *)pool) != 0)
    16. {
    17. perror("add threads error");
    18. //添加活跃线程个数如果一开始就为0,表示一个都没有创建成功
    19. if(actual_increment == 0)
    20. {
    21. return -1;
    22. }
    23. break;
    24. }
    25. actual_increment++;
    26. }
    27. //更新活跃线程个数,用创建成功的个数加上之前的个数
    28. pool->active_threads += actual_increment;
    29. return actual_increment;
    30. }

    4. 删除活跃线程

            从线程池中删除指定数量的活跃线程,减少线程池的处理能力。

    原型:

            int remove_thread(thread_pool *pool, unsigned int removing_threads);

    参数:
            pool        线程池结构体指针,表示要删除线程的线程池        
            removing_threads        指定要删除的线程数量        

    1. // 根据传入的个数去删除,返回删除之后的剩下的活跃线程个数
    2. int remove_thread(thread_pool *pool, unsigned int removing_threads)
    3. {
    4. //如果删除的线程个数为0,直接返回当前的活跃线程个数
    5. if(removing_threads == 0)
    6. {
    7. return pool->active_threads;
    8. }
    9. //定义一个变量删除之后剩下的线程个数 = 当前活跃线程个数-删除活跃线程个数
    10. int remain_threads = pool->active_threads - removing_threads;
    11. //如果删除之后的线程个数小于0,保留一个,如果不小于0,就有多少保留多少
    12. remain_threads = remain_threads > 0 ? remain_threads : 1;
    13. int i;
    14. // 循环地取消掉指定数目的线程
    15. for(i = pool->active_threads-1; i>remain_threads-1; i--)
    16. {
    17. errno = pthread_cancel(pool->tids[i]);
    18. if(errno != 0)
    19. {
    20. break;
    21. }
    22. }
    23. //如果一个都没有删,返回-1
    24. if(i == pool->active_threads-1)
    25. {
    26. return -1;
    27. }
    28. else
    29. {
    30. //更新最大活跃线程个数
    31. pool->active_threads = i+1;
    32. return i+1;
    33. }
    34. }

    5. 销毁线程池

            销毁线程池,释放线程池占用的资源,并停止线程池的运行。

    原型:

            bool destroy_pool(thread_pool *pool);

    参数:

      pool        线程池结构体指针,表示要销毁的线程池

    1. // 释放资源
    2. bool destroy_pool(thread_pool *pool)
    3. {
    4. //线程池销毁标志为真
    5. pool->shutdown = true;
    6. //广播唤醒条件变量等待队列里面的线程
    7. pthread_cond_broadcast(&pool->cond);
    8. int i;
    9. for(i=0; i<pool->active_threads; i++)
    10. {
    11. //pthread_join( )指定的线程如果尚在运行,那么他将会阻塞等待
    12. errno = pthread_join(pool->tids[i], NULL);
    13. if(errno != 0)
    14. {
    15. printf("join tids[%d] error: %s\n", i, strerror(errno));
    16. }
    17. else
    18. {
    19. printf("[%u] is joined\n", (unsigned)pool->tids[i]);
    20. }
    21. }
    22. free(pool->task_list);
    23. free(pool->tids);
    24. free(pool);
    25. return true;
    26. }

    6.线程任务函数

            routine( )函数是线程池中线程的主要工作函数,它执行以下任务:

            ①不断地从任务队列中取出任务;
            ②执行取出的任务;
            ③如果任务队列为空且线程池没有被销毁,则进入等待状态,等待新任务的到来。

    1. //创建线程之后,开始去执行任务,有任务就执行,没有任务,线程进入条件变量等待队列等待唤醒执行任务
    2. void *routine(void *arg)
    3. {
    4. //接收传入进来的参数pool
    5. thread_pool *pool = (thread_pool *)arg;
    6. //定义任务结构体指针变量
    7. struct task *p;
    8. while(1)
    9. {
    10. // 访问任务队列前加锁,为防止取消后死锁,注册处理例程 handler
    11. pthread_cleanup_push(handler, (void *)&pool->lock);
    12. //加锁访问任务队列
    13. pthread_mutex_lock(&pool->lock);
    14. // 若当前没有任务,且线程池关闭标志未关闭,则进入条件变量等待队列睡眠,等待新任务的到来或线程池关闭
    15. while(pool->waiting_tasks == 0 && !pool->shutdown)
    16. {
    17. //解锁 进入条件变量等待队列睡眠 收到通知的时候,加锁
    18. pthread_cond_wait(&pool->cond, &pool->lock);
    19. }
    20. // 若当前没有任务,且线程池关闭标识为真,则立即释放互斥锁并退出
    21. if(pool->waiting_tasks == 0 && pool->shutdown == true)
    22. {
    23. //解锁
    24. pthread_mutex_unlock(&pool->lock);
    25. //线程退出
    26. pthread_exit(NULL);
    27. }
    28. // 若当前有任务,则消费任务队列中的任务
    29. // 拿链式队列的下一个节点的地址
    30. p = pool->task_list->next;
    31. //将链式队列当前指向,指向下一个
    32. pool->task_list->next = p->next;
    33. //任务个数-1
    34. pool->waiting_tasks--;
    35. // 释放互斥锁,并弹栈 handler(但不执行他)
    36. pthread_mutex_unlock(&pool->lock);
    37. pthread_cleanup_pop(0);
    38. // 执行任务,并且在此期间禁止响应取消请求,执行期间用pthread_cancel发送取消请求
    39. pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
    40. //调用任务结构体里面的函数,传参数
    41. (p->task)(p->arg); // task(arg)
    42. //执行任务完成,并且在此期间使能响应取消请求
    43. pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
    44. //释放p指向的地址空间
    45. free(p);
    46. }
    47. //线程退出
    48. pthread_exit(NULL);
    49. }

    7.任务函数

    void *mytask(void *arg);

            该函数接受一个 void 指针类型的参数 arg,并返回一个 void 指针。函数主要用于你想要完成的任务操作,这里举例了一个文件复制函数:

    1. //任务函数(想要完成的操作)
    2. void *mytask(void *arg)
    3. {
    4. //文件复制
    5. int fd1 = open("./1.txt", O_RDWR);//int fd1 = open(arg->first, O_RDWR);
    6. if (fd1 == -1)
    7. {
    8. perror("open fd1 error");
    9. return NULL;
    10. }
    11. int fd2 = open("./2.txt", O_RDWR|O_CREAT, 0777);//int fd2 = open(arg->second, O_RDWR|O_CREAT, 0777);
    12. if (fd2 == -1)
    13. {
    14. perror("open fd2 error");
    15. return NULL;
    16. }
    17. char buf[1024];
    18. int size;
    19. while(1)
    20. {
    21. bzero(buf, 1024);
    22. size = read(fd1, buf, 1024);
    23. if (size==0)
    24. {
    25. break;
    26. }
    27. write(fd2, buf, size);
    28. }
    29. close(fd1);
    30. close(fd2);
    31. return NULL;
    32. }

            这里我们创建了五条线程的一个线程池,来复制一个拥有50个文件大小为45MB的大文件,可以看到用时4.433s,然而如果采用单线程处理的话,至少时间在8s左右。当然具体情况因个人电脑配置不同而异。

    五、总结

            让我们用一个类比来解释一下线程池,每辆出租车就是一个线程它的工作是执行各种任务,就像司机可以运送乘客到不同的目的地一样。而线程池就像是一家出租公司这家公司拥有多组出租车(线程),并根据需要分配任务给这些出租车。公司负责管理、维护和监控这些出租车,以确保它们随时可以为乘客提供服务。任务(工作)就像是乘客需要到达的目的地。这可以是任何需要执行的工作,例如计算、数据处理、文件上传等等。线程池接受这些任务,并将它们分配给可用的出租车(线程),从而高效的完成他们。

    1.优点

    • 资源控制: 线程池可以限制同时运行的线程数量,有效控制系统资源的使用。
    • 性能提升: 通过减少线程的创建和销毁,线程池可以提高应用程序的性能,减少了线程切换的开销。
    • 任务管理: 线程池可以管理任务的排队和执行,确保任务按照顺序或优先级执行。
    • 可伸缩性: 线程池可以根据系统负载动态调整线程数量,以适应不同的工作负载。

    2.适用场景

            线程池适用于需要并发执行多个任务的情况,特别是在以下情况下使用效果更佳:

    • Web服务器处理多个并发请求。
    • 数据库连接池管理多个数据库连接。
    • 后台任务处理,如日志处理、邮件发送、文件处理等。
    • 任何需要并发执行的计算密集型或I/O密集型任务。

    3.注意事项

    • 线程池的大小应该根据系统资源和负载需求进行调整,过大的线程池可能会消耗大量内存,而过小的线程池可能会导致任务排队和性能下降。

    • 线程池中的任务应该是独立的,不应该有共享状态或依赖关系,以避免竞态条件和死锁。

    • 异常处理对于线程池非常重要,确保捕获和处理任务中的异常,以避免线程终止和资源泄漏。

            更多C/C++语言Linux系统数据结构ARM板实战相关文章,关注专栏:

       手撕C语言

                玩转linux

                        脚踢数据结构

                                系统、网络编程

                                         探索C++

                                                 6818(ARM)开发板实战

    📢写在最后

    • 今天的分享就到这啦~
    • 觉得博主写的还不错的烦劳 一键三连喔~
    • 🎉🎉🎉感谢关注🎉🎉🎉
  • 相关阅读:
    【数据结构】详解链表(一)——单链表(动图讲解)
    锐捷SuperVlan实验配置
    沃尔玛Walmart EDI 850订单详解
    (C++版本)ros2发布者节点publisher示例源代码(改良版)
    Docker实战:docker compose 搭建Rocketmq
    麒麟系统开发笔记(十四):在国产麒麟系统上编译libmodbus库、搭建基础开发环境和移植测试Demo
    [Django开源学习 1]django-vue-admin
    JUnit单元测试
    Redis 哨兵模式的实现详解
    Vue项目实战之电商后台管理系统(四) 权限角色管理模块
  • 原文地址:https://blog.csdn.net/qq_64928278/article/details/132666155