10.3 C++标准库中的并行算法

    所有C++并发化的算法都在这个列表中了。像std::accumlate这样的算法,严格来说是一个连续的积累,但在其广义的std::reduce也出现了在这个列表中——标准中适当的警告,即如果约简运算不同时具有关联性和交换性时,由于未指定的运算顺序,结果可能不确定。

    对于列表中的每一个算法,每个”普通”算法的重载都有一个新的参数(第一个参数),这个参数将传入执行策略——“普通”重载的相应参数在此执行策略参数之后。例如,std::sort有两个没有执行策略的“普通”重载:

    因此,它还具有两个有执行策略的重载:

    1. template<class ExecutionPolicy, class RandomAccessIterator>
    2. void sort(
    3. ExecutionPolicy&& exec,
    4. RandomAccessIterator first, RandomAccessIterator last);
    5. template<class ExecutionPolicy, class RandomAccessIterator, class Compare>
    6. void sort(
    7. ExecutionPolicy&& exec,
    8. RandomAccessIterator first, RandomAccessIterator last, Compare comp);

    有执行策略和没有执行策略的函数列表间有一个重要的区别,这只会影响到一些算法:如果“普通”算法允许输入迭代器或输出迭代器,那执行策略的重载则需要前向迭代器。因为输入迭代器是单向迭代的:只能访问当前元素,并且不能将迭代器存储到以前的元素。输出迭代器只允许写入当前元素:不能在写入后面的元素后,后退再写入前面的元素。

    展示一下std::copy的“普通”函数签名:

    1. template<class InputIterator, class OutputIterator>
    2. OutputIterator copy(
    3. InputIterator first, InputIterator last, OutputIterator result);

    带有执行策略的版本:

    虽然,模板参数的命名没有从编译器的角度带来任何影响,但从C++标准的角度来看:标准库算法模板参数的名称表示语义约束的类型,并且算法的操作将依赖于这些约束,且具有特定的语义。对于输入迭代器与前向迭代器,前者对迭代器的引用返回允许取消代理类型,代理类型可转换为迭代器的值类型,而后者对迭代器的引用返回要求取消对值的实际引用,并且所有相同的迭代器都返回对相一值的引用。

    这对于并行性很重要:这意味着迭代器可以自由地复制,并等价地使用。此外,增加正向迭代器不会使其他副本失效也很重要,因为这意味着单线程可以在迭代器的副本上操作,需要时增加副本,而不必担心使其他线程的迭代器失效。如果带有执行策略的重载允许使用输入迭代器,将强制线程序列化,对用于从源序列读取唯一迭代器的访问,显然限制了并行的可能性。

    来看一下具体例子。

    最简单的例子就是并行循环:对容器的每个元素进行处理。这是令人尴尬的并行场景:每个项目都是独立的,所以有最大的并行性。使用支持OpenMP的编译器,代码就可以写成:

    1. #pragma omp parallel for
    2. for(unsigned i=0;i<v.size();++i){
    3. do_stuff(v[i]);
    4. }

    使用C++标准库算法,可以改写成:

    1. std::for_each(std::execution::par,v.begin(),v.end(),do_stuff);

    标准库将创建内部线程,并对数据进行划分,且对每个元素x调用do_stuff(x)。其中在线程间划分元素是一个实现细节。

    执行策略的选择

    std::execution::par是最常使用的策略,除非实现提供了更适合的非标准策略。如果代码适合并行化,那应该与std::execution::par一起工作。某些情况下,可以使用std::execution::par_unseq进行代替。这可能根本没什么用(没有任何标准的执行策略可以保证能达到并行性的级别),但它可以给库额外的空间,通过重新排序和交错任务执行来提高代码的性能,以换取对代码更严格的要求。更严格的要求中最值得注意的是,访问元素或对元素执行操作时不使用同步。这意味着不能使用互斥量或原子变量,或前面章节中描述的任何其他同步机制,以确保多线程的访问是安全的;相反,必须依赖于算法本身,而不是使用多个线程访问同一个元素,并在调用并行算法之外使用外部同步,从而避免其他线程访问数据。

    清单10.1 具有内部同步并行算法的类

    下个清单展示了可使用std::execution::par_unseq的代码段。这种情况下,内部元素互斥量替换为整个容器的互斥量。

    清单10.2 无内部同步并行算法的类

    1. class Y{
    2. int data;
    3. public:
    4. Y():data(0){}
    5. int get_value() const{
    6. return data;
    7. }
    8. void increment(){
    9. ++data;
    10. }
    11. class ProtectedY{
    12. std::vector<Y> v;
    13. public:
    14. void lock(){
    15. m.lock();
    16. }
    17. void unlock(){
    18. m.unlock();
    19. }
    20. std::vector<Y>& get_vec(){
    21. return v;
    22. }
    23. };
    24. void increment_all(ProtectedY& data){
    25. std::lock_guard guard(data);
    26. auto& v=data.get_vec();
    27. std::for_each(std::execution::par_unseq,v.begin(),v.end(),
    28. [](Y& y){
    29. y.increment();
    30. });
    31. }

    清单10.2中的元素访问目前没有同步,使用std::execution::par_unseq是安全的。缺点是并行算法调用之外,其他线程的并发访问现在也必须等待整个操作完成,互斥锁的粒度与清单10.1中不同。

    现在让我们来看一个现实的例子,详述如何使用并行算法:记录访问网站的次数。

    10.3.2 计数访问

    假设有一个运作繁忙的网站,日志有数百万条条目,你希望对这些日志进行处理以便查看相关数据:每页访问多少次、访问来自何处、使用的是哪个浏览器,等等。分析日志有两个部分:处理每一行以提取相关信息,将结果聚合在一起。对于使用并行算法来说,这是一个理想的场景,因为处理每一条单独的行完全独立于其他所有行,并且如果最终的合计是正确的,可以逐个汇总结果。

    这种类型的任务适合transform_reduce,下面的代码展示了如何将其用于该任务。

    清单10.3 使用transform_reduce来记录网站的页面被访问的次数

    1. #include <vector>
    2. #include <string>
    3. #include <unordered_map>
    4. #include <numeric>
    5. struct log_info {
    6. std::string page;
    7. time_t visit_time;
    8. std::string browser;
    9. // any other fields
    10. extern log_info parse_log_line(std::string const &line); // 1
    11. using visit_map_type= std::unordered_map<std::string, unsigned long long>;
    12. visit_map_type
    13. count_visits_per_page(std::vector<std::string> const &log_lines) {
    14. struct combine_visits {
    15. visit_map_type
    16. operator()(visit_map_type lhs, visit_map_type rhs) const { // 3
    17. if(lhs.size() < rhs.size())
    18. std::swap(lhs, rhs);
    19. for(auto const &entry : rhs) {
    20. lhs[entry.first]+= entry.second;
    21. }
    22. return lhs;
    23. }
    24. visit_map_type operator()(log_info log,visit_map_type map) const{ // 4
    25. ++map[log.page];
    26. return map;
    27. }
    28. visit_map_type operator()(visit_map_type map,log_info log) const{ // 5
    29. ++map[log.page];
    30. return map;
    31. }
    32. visit_map_type operator()(log_info log1,log_info log2) const{ // 6
    33. visit_map_type map;
    34. ++map[log1.page];
    35. ++map[log2.page];
    36. return map;
    37. }
    38. };
    39. return std::transform_reduce( // 2
    40. std::execution::par, log_lines.begin(), log_lines.end(),
    41. }

    假设函数parse_log_line的功能是从日志条目中提取相关信息①,count_visits_per_page 函数是一个简单的包装器,将对std::transform_reduce的调用进行包装②。复杂度来源于规约操作:需要组合两个log_info结构体来生成一个映射,一个log_info结构体和一个映射(无论是哪种方式),或两个映射。这意味着combine_visits函数对象需要重载4个函数运算符,③④⑤和⑥;虽然这些重载很简单,但这里没有用Lambda表达式来实现。

    因此,std::transform_reduce将使用硬件并行执行此计算(因为传了std::execution::par)。人工编写这个算法是非常重要的,正如在上一章中看到的,因此这允许将实现并行性的艰苦工作委托给标准库实现者,这样开发者就可以专注于期望的结果了。