目录
10.2.2 std::execution::sequenced_policy
10.2.3 std::execution::parallel_policy
10.2.4 std::execution::parallel_unsequenced_policy
参考《C++ 并发编程实战》 著:Anthony Williams 译: 吴天明
C++17为标准库添加并行算法。都是对之前已存在的一些标准算法的重载,例
如:std::find,std::transform
和std::reduce。并行版本的签名除了和单线程版本的函数签名相同之外,在参
数列表的中新添加了一个参数(第一个参数)——指定要使用的执行策略。例如:
- 1.std::vector<int> my_data;
- 2.std::sort(std::execution::par,my_data.begin(),my_data.end());
std::execution::par
的执行策略,表示允许使用多个线程调用此并行算法。这是一种权限,而不是一个申请——如果
需要,这个算法依旧可以以串行的方式运行。
三个标注执行策略:
std::execution::sequenced_policy
std::execution::parallel_policy
std::execution::parallel_unsequenced_policy
这些类都定义在
std::execution::seq
std::execution::par
std::execution::par_unseq
除了复制这三种类型的对象外,不能以自己的方式对执行策略对象进行构造,因为它们可能有一些特殊的初始化要
求。实现还会定义一些其他的执行策略,但开发者不能自定义执行策略。
将执行策略传递给标准算法库中的算法,那么算法的行为就由执行策略控制。这会有几方面的影响:
算法复杂化
抛出异常时的行为
算法执行的位置、方式和时间
向算法提供执行策略时,算法的复杂度就会发生变化:除了对并行的管理调度开销外,并行算法的核心操作将会被多次执行(交换,比较,以及提供的函数对象),目的是在总运行时间方面提供性能的整体改进。
顺序策略并不是并行策略:它使用强制的方式实现,在执行线程上函数的所有操作。但它仍然是一个执行策略,因此对算法的复杂性和异常影响与其他标准执行策略相同。
这不仅需要在同一线程上执行所有操作,而且必须按照一定的顺序进行执行,这样步骤间就不会有交错。具体的顺序是未指定的,并且对函数的不同调用也是不存在的。尤其是在没有执行策略的情况下,不能保证函数的执行顺序与相应的重载执行顺序相同。例如:下面对std::for_each
的调用,是将1~1000填充到vector中,这里没有指定填充的顺序。这就与没有执行策略的重载不同,执行策略就要按顺序对数字进行存储:
- 1.std::vector<int> v(1000);
- 2.int count=0;
- 3.std::for_each(std::execution::seq,v.begin(),v.end(),
- 4.
- [&](int& x){ x=++count; });
并行策略提供了在多个线程下运行的算法版本。操作可以在调用算法的线程上执行,也可以在库创建的线程上执行。在给定线程上执行需要按照一定的顺序,不能交错执行,但十分具体的顺序是不指定的;并且在不同的调用间,指定的顺序可能会不同。给定的操作将在整个持续时间内,在固定线程上执行。这就对算法所使用的迭代器、相关值和可调用对象有了额外的要求:想要并行调用,他们间就不能有数据竞争,也不能依赖于线程上运行的其他操作,或者依赖的操作不能在同一线程上。大多数情况下,可以使用并行执行策略,这样可能会使用到没有执行策略的标准库算法。只有在所需要的元素间有特定的顺序,或者对共享数据有非同步访问时,才会出现问题。将vector中的所有数都加上同一个值,就可以并行进行:
std::for_each(std::execution::par,v.begin(),v.end(),[](auto& x){++x;});
若使用并行策略填充一个vector中,那这个例子肯定有问题;具体的讲,这样会出现未定义行为:
-
- std::for_each(std::execution::par,v.begin(),v.end(),
- [&](int& x){ x=++count; });
每次调用Lambda表达式时,都会对计数器进行修改,如果有多个线程在执行Lambda表达式,那么这里就会出现数据竞争,从而导致未定义行为。std::execution::parallel_ policy要求优先考虑这一点:即使库没有使用多线程,之前的调用依旧会产生未定义行为。对象是否出现未定义是调用的静态属性,而不是依赖库实现的细节。不过,这里允许在函数调用间进行同步,因此可以通过将count设置为std::atomic
并行不排序策略提供了最大程度的并行化算法,用以得到对算法使用的迭代器、相关值和可调用对象的严格要求。
使用并行不排序策略调用的算法,可以在任意线程上执行,这些线程彼此间没有顺序。也就是,在单线程上也可以交叉运行,这样在第一个线程完成前,第二个操作会在同一个线程上启动,并且操作可以在线程间迁移,因此给定的操作可以在一个线程上启动,在另一个线程上运行,并在第三个线程上完成。使用并行不排序策略时,算法使用的迭代器、相关值和可调用对象不能使用任何形式的同步,也不能调用任何需要同步的函数。
也就是,必须对相关的元素或基于该元素可以访问的数据进行操作,并且不能修改线程之间或元素之前共享的状态。
标准库中的大多数被执行策略重载的算法都在
none_of,for_each,for_each_n,find,find_if,find_end,find_first_of,adjacent_find,
count,count_if,mismatch,equal,search,search_n,copy,copy_n,copy_if,move,
swap_ranges,transform,replace,replace_if,replace_copy,replace_copy_if,fill,
fill_n,generate,generate_n,remove,remove_if,remove_copy,remove_copy_if,unique,
unique_copy,reverse,reverse_copy,rotate,rotate_copy,is_partitioned,partition,
stable_partition,partition_copy,sort,stable_sort,partial_sort,partial_sort_copy,
is_sorted,is_sorted_until,nth_element,merge,inplace_merge,includes,set_union,
set_intersection,set_difference,set_symmetric_difference,is_heap,is_heap_until,
min_element,max_element,minmax_element,lexicographical_compare,reduce,
transform_reduce,exclusive_scan,inclusive_scan,transform_exclusive_scan,
transform_inclusive_scan和adjacent_difference 。
对于列表中的每一个算法,每个”普通”算法的重载都有一个新的参数(第一个参数),这个参数将传入执行策略——“普通”重载的相应参数在此执行策略参数之后。例如,std::sort有两个没有执行策略的“普通”重载:
- 1.template<class RandomAccessIterator>
- 2.void sort(RandomAccessIterator first, RandomAccessIterator last);
- 3.
- 4.template<class RandomAccessIterator, class Compare>
- 5.void sort(
- 6.
- RandomAccessIterator first, RandomAccessIterator last, Compare comp);
因此,它还具有两个有执行策略的重载:
- 1.template<class ExecutionPolicy, class RandomAccessIterator>
- 2.void sort(
- 3.ExecutionPolicy&& exec,
- 4.RandomAccessIterator first, RandomAccessIterator last);
- 5.
- 6.template<class ExecutionPolicy, class RandomAccessIterator, class Compare>
- 7.void sort(
- 8.ExecutionPolicy&& exec,
- 9.RandomAccessIterator first, RandomAccessIterator last, Compare comp);
有执行策略和没有执行策略的函数列表间有一个重要的区别,这只会影响到一些算法:如果“普通”算法允许输入迭代器或输出迭代器,那执行策略的重载则需要前向迭代器。因为输入迭代器是单向迭代的:只能访问当前元素,并且不能将迭代器存储到以前的元素。输出迭代器只允许写入当前元素:不能在写入后面的元素后,后退再写入前面的元素。
虽然,模板参数的命名没有从编译器的角度带来任何影响,但从C++标准的角度来看:标准库算法模板参数的名称表示语义约束的类型,并且算法的操作将依赖于这些约束,且具有特定的语义。对于输入迭代器与前向迭代器,前者对迭代器的引用返回允许取消代理类型,代理类型可转换为迭代器的值类型,而后者对迭代器的引用返回要求取消对值的实际引用,并且所有相同的迭代器都返回对相一值的引用。
这对于并行性很重要:这意味着迭代器可以自由地复制,并等价地使用。此外,增加正向迭代器不会使其他副本失效也很重要,因为这意味着单线程可以在迭代器的副本上操作,需要时增加副本,而不必担心使其他线程的迭代器失效。如果带有执行策略的重载允许使用输入迭代器,将强制线程序列化,对用于从源序列读取唯一迭代器的访问,显然限制了并行的可能性。
执行策略的选择
std::execution::par是最常使用的策略,除非实现提供了更适合的非标准策略。如果代码适合并行化,那应该与std::execution::par一起工作。某些情况下,可以使用std::execution::par_unseq进行代替。这可能根本没什么用(没有任何标准的执行策略可以保证能达到并行性的级别),但它可以给库额外的空间,通过重新排序和交错任务执行来提高代码的性能,以换取对代码更严格的要求。更严格的要求中最值得注意的是,访问元素或对元素执行操作时不使用同步。这意味着不能使用互斥量或原子变量,或前面章节中描述的任何其他同步机制,以确保多线程的访问是安全的;相反,必须依赖于算法本身,而不是使用多个线程访问同一个元素,并在调用并行算法之外使用外部
同步,从而避免其他线程访问数据。
清单10.1中的示例显示的代码中,可以使用std::execution::par,但不能使用std::execution::par_unseq。使用内部互斥量同步意味着使用std::execution::par_unseq将会导致未定义的行为。
清单10.1 具有内部同步并行算法的类
- 1.
- class X{
- 2.mutable std::mutex m;
- 3.int data;
- 4.
- public:
- 5.X():data(0){}
- 6.int get_value() const{
- 7.std::lock_guard guard(m);
- 8.return data;
- 9.}
- 10.void increment(){
- 11.std::lock_guard guard(m);
- 12.++data;
- }
- 13.
- 14.};
- 15.void increment_all(std::vector
& v) { - std::for_each(std::execution::par,v.begin(),v.end(),
- 16.
- 17.[](X& x){
- 18.x.increment();
- });
- 19.
- 20.
- }
假设有一个运作繁忙的网站,日志有数百万条条目,你希望对这些日志进行处理以便查看相关数据:每页访问多少次、访问来自何处、使用的是哪个浏览器,等等。分析日志有两个部分:处理每一行以提取相关信息,将结果聚合在一起。对于使用并行算法来说,这是一个理想的场景,因为处理每一条单独的行完全独立于其他所有行,并且如果最终的合计是正确的,可以逐个汇总结果。
这种类型的任务适合transform_reduce,下面的代码展示了如何将其用于该任务。
清单10.3 使用transform_reduce来记录网站的页面被访问的次数
- 1.#include
- 2.#include
- 3.#include
- 4.#include
- 5.
- 6.
- struct log_info {
- 7.std::string page;
- 8.time_t visit_time;
- 9.std::string browser;
- // any other fields
- 10.
- 11.
- };
- 12.
- 13.extern log_info parse_log_line(std::string const &line); // 1
- 14.using visit_map_type= std::unordered_map
unsigned long long>; - 15.visit_map_type
- 16.count_visits_per_page(std::vector
const &log_lines) { - struct combine_visits {
- 17.
- 18.visit_map_type
- 19.operator()(visit_map_type lhs, visit_map_type rhs) const { // 3
- if(lhs.size() < rhs.size())
- 20.
- std::swap(lhs, rhs);
- 21.
- for(auto const &entry : rhs) {
- 22.
- lhs[entry.first]+= entry.second;
- 23.
- }
- 24.
- return lhs;
- 25.
- 26.}
- 27.visit_map_type operator()(log_info log,visit_map_type map) const{ // 4
- 28.++map[log.page];
- 29.return map;
- 30.}
- 31.visit_map_type operator()(visit_map_type map,log_info log) const{ // 5
- ++map[log.page];
- 32.
- return map;
- 33.
- 34.}
- 35.visit_map_type operator()(log_info log1,log_info log2) const{ // 6
- 36.visit_map_type map;
- 37.++map[log1.page];
- 38.++map[log2.page];
- 39.return map;
- }
- 40.
- 41.};
- 42.return std::transform_reduce( // 2
- 43.std::execution::par, log_lines.begin(), log_lines.end(),
- 44.visit_map_type(), combine_visits(), parse_log_line);
- 45.
- }