• C++ 并发编程实战 第九章


    目录

    9.1 线程池 

    9.1.1 最简易可行的线程池

    9.1.2 等待提交给线程池的任务完成运行

    9.1.3等待其他任务完成的任务

    9.1.4 避免任务队列上的争夺

    9.1.5 任务窃取

    9.2 中断线程

    9.2.1 发起一个线程,以及把他中断

    9.2.2 检测线程是否被中断

    9.2.3 中断条件变量上的等待

    9.2.4 中断条件变量std::condition_variable_any上的等待

    9.2.5 中断其他阻塞型等待

    9.2.6 处理中断

    9.2.7 在应用程序推出时中断后台任务

    9.3 小结


    参考:https://github.com/xiaoweiChen/CPP-Concurrency-In-Action-2ed-2019/blob/master/content/chapter9/9.1-chinese.md

    9.1 线程池 

    大多数系统中,将每个任务指定给某个线程是不切实际的,不过可以利用并发性,进行并发执行。线程池提供了这样的功能,将提交到线程池中的任务并发执行,提交的任务将会挂在任务队列上。工作线程会从队列中的获取任务,当任务执行完成后,再从任务队列中获取下一个任务。

    创建一个线程池时,会遇到几个关键性的设计问题,比如:可使用的线程数量,高效的任务分配方式,以及是否需要等待一个任务完成。

    9.1.1 最简易可行的线程池

    代码9.1 简单的线程池

    1. class thread_pool
    2. {
    3. std::atomic_bool done;
    4. thread_safe_queuevoid()> > work_queue; // 1
    5. std::vector threads; // 2
    6. join_threads joiner; // 3
    7. void worker_thread()
    8. {
    9. while(!done) // 4
    10. {
    11. std::function<void()> task;
    12. if(work_queue.try_pop(task)) // 5
    13. {
    14. task(); // 6
    15. }
    16. else
    17. {
    18. std::this_thread::yield(); // 7
    19. }
    20. }
    21. }
    22. public:
    23. thread_pool():
    24. done(false),joiner(threads)
    25. {
    26. unsigned const thread_count=std::thread::hardware_concurrency(); // 8
    27. try
    28. {
    29. for(unsigned i=0;i
    30. {
    31. threads.push_back(
    32. std::thread(&thread_pool::worker_thread,this)); // 9
    33. }
    34. }
    35. catch(...)
    36. {
    37. done=true; // 10
    38. throw;
    39. }
    40. }
    41. ~thread_pool()
    42. {
    43. done=true; // 11
    44. }
    45. template<typename FunctionType>
    46. void submit(FunctionType f)
    47. {
    48. work_queue.push(std::function<void()>(f)); // 12
    49. }
    50. };

    这样简单的线程池就完成了,特别是任务没有返回值,或需要执行阻塞操作的任务。很多情况下,这样的线程池是不够用的,其他情况使用这样简单的线程池可能会出现问题,比如:死锁。同样,在简单例子中使用std::async能提供更好的功能。

    9.1.2 等待提交给线程池的任务完成运行

    第8章中的例子中,线程间的任务划分完成后,代码会显式生成新线程,主线程通常是等待新线程在返回调用之后结束,确保所有任务都完成。使用线程池就需要等待任务提交到线程池中,而非直接提交给单个线程。与基于std::async的方法类似,使用代码9.1中的简单线程池,使用第4章中提到的工具:条件变量和future。虽然会增加代码的复杂度,不过要比直接对任务进行等待好很多。

    通过增加线程池的复杂度,可以直接等待任务完成。使用submit()函数返回对任务描述的句柄,可用来等待任务的完成。任务句柄会用条件变量或future进行包装,从而简化线程池的实现。

    一种特殊的情况是,执行任务的线程需要返回结果到主线程上进行处理。本这种情况下,需要用future对最终的结果进行转移。代码9.2展示了对简单线程池的修改,通过修改就能等待任务完成,以及在工作线程完成后,返回一个结果到等待线程中去,不过std::packaged_task<>实例是不可拷贝的,仅可移动,所以不能再使用std::function<>来实现任务队列,因为std::function<>需要存储可复制构造的函数对象。包装一个自定义函数,用来处理可移动的类型,就是一个带有函数操作符的类型擦除类。只需要处理没有入参的函数和无返回的函数即可,所以这只是一个简单的虚函数调用。

    代码9.2 可等待任务的线程池

    1. class function_wrapper
    2. {
    3. struct impl_base {
    4. virtual void call()=0;
    5. virtual ~impl_base() {}
    6. };
    7. std::unique_ptr impl;
    8. template<typename F>
    9. struct impl_type: impl_base
    10. {
    11. F f;
    12. impl_type(F&& f_): f(std::move(f_)) {}
    13. void call() { f(); }
    14. };
    15. public:
    16. template<typename F>
    17. function_wrapper(F&& f):
    18. impl(new impl_type(std::move(f)))
    19. {}
    20. void operator()() { impl->call(); }
    21. function_wrapper() = default;
    22. function_wrapper(function_wrapper&& other):
    23. impl(std::move(other.impl))
    24. {}
    25. function_wrapper& operator=(function_wrapper&& other)
    26. {
    27. impl=std::move(other.impl);
    28. return *this;
    29. }
    30. function_wrapper(const function_wrapper&)=delete;
    31. function_wrapper(function_wrapper&)=delete;
    32. function_wrapper& operator=(const function_wrapper&)=delete;
    33. };
    34. class thread_pool
    35. {
    36. thread_safe_queue work_queue; // 使用function_wrapper,而非使用std::function
    37. void worker_thread()
    38. {
    39. while(!done)
    40. {
    41. function_wrapper task;
    42. if(work_queue.try_pop(task))
    43. {
    44. task();
    45. }
    46. else
    47. {
    48. std::this_thread::yield();
    49. }
    50. }
    51. }
    52. public:
    53. template<typename FunctionType>
    54. std::future<typename std::result_of<FunctionType()>::type> // 1
    55. submit(FunctionType f)
    56. {
    57. typedef typename std::result_of<FunctionType()>::type
    58. result_type; // 2
    59. std::packaged_task<result_type()> task(std::move(f)); // 3
    60. std::future res(task.get_future()); // 4
    61. work_queue.push(std::move(task)); // 5
    62. return res; // 6
    63. }
    64. // 和之前一样
    65. };

    9.1.3等待其他任务完成的任务

    最简单的方法就是在thread_pool中添加一个新函数,来执行任务队列上的任务,并对线程池进行管理。高级线程池的实现可能会在等待函数中添加逻辑,或等待其他函数来处理这个任务,优先的任务会让其他的任务进行等待。下面代码中的实现,就展示了一个新run_pending_task()函数,对于快速排序的修改将会在代码9.5中展示。

    代码9.4 run_pending_task()函数实现

    1. void thread_pool::run_pending_task()
    2. {
    3. function_wrapper task;
    4. if(work_queue.try_pop(task))
    5. {
    6. task();
    7. }
    8. else
    9. {
    10. std::this_thread::yield();
    11. }
    12. }

    下面快速排序算法的实现要比代码8.1中版本简单许多,因为所有线程管理逻辑都移到线程池中了。

    代码9.5 基于线程池的快速排序实现

    1. template<typename T>
    2. struct sorter // 1
    3. {
    4. thread_pool pool; // 2
    5. std::list do_sort(std::list& chunk_data)
    6. {
    7. if(chunk_data.empty())
    8. {
    9. return chunk_data;
    10. }
    11. std::list result;
    12. result.splice(result.begin(),chunk_data,chunk_data.begin());
    13. T const& partition_val=*result.begin();
    14. typename std::list::iterator divide_point=
    15. std::partition(chunk_data.begin(),chunk_data.end(),
    16. [&](T const& val){return val
    17. std::list new_lower_chunk;
    18. new_lower_chunk.splice(new_lower_chunk.end(),
    19. chunk_data,chunk_data.begin(),
    20. divide_point);
    21. std::future > new_lower= // 3
    22. pool.submit(std::bind(&sorter::do_sort,this,
    23. std::move(new_lower_chunk)));
    24. std::list new_higher(do_sort(chunk_data));
    25. result.splice(result.end(),new_higher);
    26. while(!new_lower.wait_for(std::chrono::seconds(0)) ==
    27. std::future_status::timeout)
    28. {
    29. pool.run_pending_task(); // 4
    30. }
    31. result.splice(result.begin(),new_lower.get());
    32. return result;
    33. }
    34. };
    35. template<typename T>
    36. std::list parallel_quick_sort(std::list input)
    37. {
    38. if(input.empty())
    39. {
    40. return input;
    41. }
    42. sorter s;
    43. return s.do_sort(input);
    44. }

    9.1.4 避免任务队列上的争夺

    为了避免乒乓缓存,每个线程建立独立的任务队列。这样,每个线程就会将新任务放在自己的任务队列上,并且当线程上的任务队列没有任务时,去全局的任务列表中取任务。下面列表中的实现,使用了一个thread_local变量,来保证每个线程都拥有自己的任务列表(如全局列表那样)。

    代码9.6 线程池——线程具有本地任务队列        

    1. class thread_pool
    2. {
    3. thread_safe_queue pool_work_queue;
    4. typedef std::queue local_queue_type; // 1
    5. static thread_local std::unique_ptr
    6. local_work_queue; // 2
    7. void worker_thread()
    8. {
    9. local_work_queue.reset(new local_queue_type); // 3
    10. while(!done)
    11. {
    12. run_pending_task();
    13. }
    14. }
    15. public:
    16. template<typename FunctionType>
    17. std::future<typename std::result_of<FunctionType()>::type>
    18. submit(FunctionType f)
    19. {
    20. typedef typename std::result_of<FunctionType()>::type result_type;
    21. std::packaged_task<result_type()> task(f);
    22. std::future res(task.get_future());
    23. if(local_work_queue) // 4
    24. {
    25. local_work_queue->push(std::move(task));
    26. }
    27. else
    28. {
    29. pool_work_queue.push(std::move(task)); // 5
    30. }
    31. return res;
    32. }
    33. void run_pending_task()
    34. {
    35. function_wrapper task;
    36. if(local_work_queue && !local_work_queue->empty()) // 6
    37. {
    38. task=std::move(local_work_queue->front());
    39. local_work_queue->pop();
    40. task();
    41. }
    42. else if(pool_work_queue.try_pop(task)) // 7
    43. {
    44. task();
    45. }
    46. else
    47. {
    48. std::this_thread::yield();
    49. }
    50. }
    51. // rest as before
    52. };

    9.1.5 任务窃取

    任务分配不均时,造成的结果就是:某个线程本地队列中有很多任务的同时,其他线程无所事事。例如:举一个快速排序的例子,一开始的数据块能在线程池上被处理,因为剩余部分会放在工作线程的本地队列上进行处理,这样的使用方式也违背使用线程池的初衷。

    幸好这个问题有解:本地工作队列和全局工作队列上没有任务时,可从别的线程队列中窃取任务。

    代码9.7 基于锁的任务窃取队列

    1. class work_stealing_queue
    2. {
    3. private:
    4. typedef function_wrapper data_type;
    5. std::deque the_queue; // 1
    6. mutable std::mutex the_mutex;
    7. public:
    8. work_stealing_queue()
    9. {}
    10. work_stealing_queue(const work_stealing_queue& other)=delete;
    11. work_stealing_queue& operator=(
    12. const work_stealing_queue& other)=delete;
    13. void push(data_type data) // 2
    14. {
    15. std::lock_guard lock(the_mutex);
    16. the_queue.push_front(std::move(data));
    17. }
    18. bool empty() const
    19. {
    20. std::lock_guard lock(the_mutex);
    21. return the_queue.empty();
    22. }
    23. bool try_pop(data_type& res) // 3
    24. {
    25. std::lock_guard lock(the_mutex);
    26. if(the_queue.empty())
    27. {
    28. return false;
    29. }
    30. res=std::move(the_queue.front());
    31. the_queue.pop_front();
    32. return true;
    33. }
    34. bool try_steal(data_type& res) // 4
    35. {
    36. std::lock_guard lock(the_mutex);
    37. if(the_queue.empty())
    38. {
    39. return false;
    40. }
    41. res=std::move(the_queue.back());
    42. the_queue.pop_back();
    43. return true;
    44. }
    45. };

    这就说明每个线程中的“队列”是一个后进先出的栈,最新推入的任务将会第一个执行。从缓存角度来看,这将对性能有所提升,因为任务相关的数据一直存于缓存中,要比提前将任务相关数据推送到栈上好。同样,这种方式很好的映射到某个算法上,例如:快速排序。之前的实现中,每次调用do_sort()都会推送一个任务到栈上,并且等待这个任务执行完毕。通过对最新推入任务的处理,就可以保证在将当前所需数据块处理完成前,其他任务是否需要这些数据块,从而可以减少活动任务的数量和栈的使用次数。try_steal()从队列末尾获取任务,为了减少与try_pop()之间的竞争。使用在第6、7章中的所讨论的技术来让try_pop()和try_steal()并发执行。

    现在拥有了一个很不错的任务队列,并且支持窃取。那如何在线程池中使用这个队列呢?这里简单的展示一下。

    代码9.8 使用任务窃取的线程池

    1. class thread_pool
    2. {
    3. typedef function_wrapper task_type;
    4. std::atomic_bool done;
    5. thread_safe_queue pool_work_queue;
    6. std::vector > queues; // 1
    7. std::vector threads;
    8. join_threads joiner;
    9. static thread_local work_stealing_queue* local_work_queue; // 2
    10. static thread_local unsigned my_index;
    11. void worker_thread(unsigned my_index_)
    12. {
    13. my_index=my_index_;
    14. local_work_queue=queues[my_index].get(); // 3
    15. while(!done)
    16. {
    17. run_pending_task();
    18. }
    19. }
    20. bool pop_task_from_local_queue(task_type& task)
    21. {
    22. return local_work_queue && local_work_queue->try_pop(task);
    23. }
    24. bool pop_task_from_pool_queue(task_type& task)
    25. {
    26. return pool_work_queue.try_pop(task);
    27. }
    28. bool pop_task_from_other_thread_queue(task_type& task) // 4
    29. {
    30. for(unsigned i=0;isize();++i)
    31. {
    32. unsigned const index=(my_index+i+1)%queues.size(); // 5
    33. if(queues[index]->try_steal(task))
    34. {
    35. return true;
    36. }
    37. }
    38. return false;
    39. }
    40. public:
    41. thread_pool():
    42. done(false),joiner(threads)
    43. {
    44. unsigned const thread_count=std::thread::hardware_concurrency();
    45. try
    46. {
    47. for(unsigned i=0;i
    48. {
    49. queues.push_back(std::unique_ptr( // 6
    50. new work_stealing_queue));
    51. threads.push_back(
    52. std::thread(&thread_pool::worker_thread,this,i));
    53. }
    54. }
    55. catch(...)
    56. {
    57. done=true;
    58. throw;
    59. }
    60. }
    61. ~thread_pool()
    62. {
    63. done=true;
    64. }
    65. template<typename FunctionType>
    66. std::future<typename std::result_of<FunctionType()>::type> submit(
    67. FunctionType f)
    68. {
    69. typedef typename std::result_of<FunctionType()>::type result_type;
    70. std::packaged_task<result_type()> task(f);
    71. std::future res(task.get_future());
    72. if(local_work_queue)
    73. {
    74. local_work_queue->push(std::move(task));
    75. }
    76. else
    77. {
    78. pool_work_queue.push(std::move(task));
    79. }
    80. return res;
    81. }
    82. void run_pending_task()
    83. {
    84. task_type task;
    85. if(pop_task_from_local_queue(task) || // 7
    86. pop_task_from_pool_queue(task) || // 8
    87. pop_task_from_other_thread_queue(task)) // 9
    88. {
    89. task();
    90. }
    91. else
    92. {
    93. std::this_thread::yield();
    94. }
    95. }
    96. };

    9.2 中断线程

    9.2.1 发起一个线程,以及把他中断

    代码9.9 interruptible_thread的基本实现

    1. class interrupt_flag
    2. {
    3. public:
    4. void set();
    5. bool is_set() const;
    6. };
    7. thread_local interrupt_flag this_thread_interrupt_flag; // 1
    8. class interruptible_thread
    9. {
    10. std::thread internal_thread;
    11. interrupt_flag* flag;
    12. public:
    13. template<typename FunctionType>
    14. interruptible_thread(FunctionType f)
    15. {
    16. std::promise p; // 2
    17. internal_thread=std::thread([f,&p]{ // 3
    18. p.set_value(&this_thread_interrupt_flag);
    19. f(); // 4
    20. });
    21. flag=p.get_future().get(); // 5
    22. }
    23. void interrupt()
    24. {
    25. if(flag)
    26. {
    27. flag->set(); // 6
    28. }
    29. }
    30. };

    9.2.2 检测线程是否被中断

    9.2.3 中断条件变量上的等待

    代码9.11 为std::condition_variable在interruptible_wait中使用超时

    1. class interrupt_flag
    2. {
    3. std::atomic<bool> flag;
    4. std::condition_variable* thread_cond;
    5. std::mutex set_clear_mutex;
    6. public:
    7. interrupt_flag():
    8. thread_cond(0)
    9. {}
    10. void set()
    11. {
    12. flag.store(true,std::memory_order_relaxed);
    13. std::lock_guard lk(set_clear_mutex);
    14. if(thread_cond)
    15. {
    16. thread_cond->notify_all();
    17. }
    18. }
    19. bool is_set() const
    20. {
    21. return flag.load(std::memory_order_relaxed);
    22. }
    23. void set_condition_variable(std::condition_variable& cv)
    24. {
    25. std::lock_guard lk(set_clear_mutex);
    26. thread_cond=&cv;
    27. }
    28. void clear_condition_variable()
    29. {
    30. std::lock_guard lk(set_clear_mutex);
    31. thread_cond=0;
    32. }
    33. struct clear_cv_on_destruct
    34. {
    35. ~clear_cv_on_destruct()
    36. {
    37. this_thread_interrupt_flag.clear_condition_variable();
    38. }
    39. };
    40. };
    41. void interruptible_wait(std::condition_variable& cv,
    42. std::unique_lock& lk)
    43. {
    44. interruption_point();
    45. this_thread_interrupt_flag.set_condition_variable(cv);
    46. interrupt_flag::clear_cv_on_destruct guard;
    47. interruption_point();
    48. cv.wait_for(lk,std::chrono::milliseconds(1));
    49. interruption_point();
    50. }

    9.2.4 中断条件变量std::condition_variable_any上的等待

    代码9.12 为std::condition_variable_any设计的interruptible_wait

    1. class interrupt_flag
    2. {
    3. std::atomic<bool> flag;
    4. std::condition_variable* thread_cond;
    5. std::condition_variable_any* thread_cond_any;
    6. std::mutex set_clear_mutex;
    7. public:
    8. interrupt_flag():
    9. thread_cond(0),thread_cond_any(0)
    10. {}
    11. void set()
    12. {
    13. flag.store(true,std::memory_order_relaxed);
    14. std::lock_guard lk(set_clear_mutex);
    15. if(thread_cond)
    16. {
    17. thread_cond->notify_all();
    18. }
    19. else if(thread_cond_any)
    20. {
    21. thread_cond_any->notify_all();
    22. }
    23. }
    24. template<typename Lockable>
    25. void wait(std::condition_variable_any& cv,Lockable& lk)
    26. {
    27. struct custom_lock
    28. {
    29. interrupt_flag* self;
    30. Lockable& lk;
    31. custom_lock(interrupt_flag* self_,
    32. std::condition_variable_any& cond,
    33. Lockable& lk_):
    34. self(self_),lk(lk_)
    35. {
    36. self->set_clear_mutex.lock(); // 1
    37. self->thread_cond_any=&cond; // 2
    38. }
    39. void unlock() // 3
    40. {
    41. lk.unlock();
    42. self->set_clear_mutex.unlock();
    43. }
    44. void lock()
    45. {
    46. std::lock(self->set_clear_mutex,lk); // 4
    47. }
    48. ~custom_lock()
    49. {
    50. self->thread_cond_any=0; // 5
    51. self->set_clear_mutex.unlock();
    52. }
    53. };
    54. custom_lock cl(this,cv,lk);
    55. interruption_point();
    56. cv.wait(cl);
    57. interruption_point();
    58. }
    59. // rest as before
    60. };
    61. template<typename Lockable>
    62. void interruptible_wait(std::condition_variable_any& cv,
    63. Lockable& lk)
    64. {
    65. this_thread_interrupt_flag.wait(cv,lk);
    66. }

    9.2.5 中断其他阻塞型等待

    9.2.6 处理中断

    9.2.7 在应用程序推出时中断后台任务

    试想在桌面上查找一个应用。这就需要与用户互动,应用的状态需要能在显示器上显示,就能看出应用有什么改变。为了避免影响GUI的响应时间,通常会将处理线程放在后台运行。后台进程需要一直执行,直到应用退出。后台线程会作为应用启动的一部分被启动,并且在应用终止的时候停止运行。通常这样的应用只有在机器关闭时才会退出,因为应用需要更新应用最新的状态,就需要全时间运行。在某些情况下,当应用关闭,需要使用有序的方式将后台线程关闭,其中一种方式就是中断。

    下面代码中为一个系统实现了简单的线程管理部分。

    代码9.13 后台监视文件系统

    1. std::mutex config_mutex;
    2. std::vector background_threads;
    3. void background_thread(int disk_id)
    4. {
    5. while(true)
    6. {
    7. interruption_point(); // 1
    8. fs_change fsc=get_fs_changes(disk_id); // 2
    9. if(fsc.has_changes())
    10. {
    11. update_index(fsc); // 3
    12. }
    13. }
    14. }
    15. void start_background_processing()
    16. {
    17. background_threads.push_back(
    18. interruptible_thread(background_thread,disk_1));
    19. background_threads.push_back(
    20. interruptible_thread(background_thread,disk_2));
    21. }
    22. int main()
    23. {
    24. start_background_processing(); // 4
    25. process_gui_until_exit(); // 5
    26. std::unique_lock lk(config_mutex);
    27. for(unsigned i=0;isize();++i)
    28. {
    29. background_threads[i].interrupt(); // 6
    30. }
    31. for(unsigned i=0;isize();++i)
    32. {
    33. background_threads[i].join(); // 7
    34. }
    35. }

    9.3 小结

    本章中了解各种线程管理的高级技术:线程池和中断线程。也了解了如何使用本地任务队列,使用任务窃取的方式减小同步开销,提高线程池的吞吐量,等待子任务完成的同时执行队列中其他任务,从而来避免死锁。

    还有,使用线程去中断另一个处理线程的各种方式,比如:使用特定的断点和函数执行中断,要不就是使用某种方法对阻塞等待进行中断。

  • 相关阅读:
    代码编辑快捷键使用说明
    Golang实现一个批量自动化执行树莓派指令的软件(2)指令
    leetCode 面试题 02.04. 分割链表
    十三、Vue CLI(2)
    Mybatis-plus的介绍与使用
    利用CNN进行手写数字识别
    如何编写优秀的测试用例,建议收藏和转发
    网络上怎么赚点零花钱
    jquery 事件和事件对象
    Docker浅尝
  • 原文地址:https://blog.csdn.net/qq_52758467/article/details/133468661