• 2022-11-08 C++并发编程( 四十 )



    前言

    C++ 17 开始, 算法部分引入了并行算法的重载, 形式上的区别就是多了一个执行策略:

    // 顺序执行, 非并行, 可自由使用同步机制
    std::execution::sequenced_policy
    
    // 并发执行, 不能有数据竞争, 元素间不能有特定顺序, 可使用同步机制
    std::execution::parallel_policy
    
    // 并发乱序执行, 无顺序执行, 不可使用同步机制
    std::execution::parallel_unsequenced_policy
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    并行算法可以提高效率, 但与普通算法的语义有所不同, 如果了解不深, 可能会得出你不想要的结果.

    比如浮点类型的运算, 哪怕是累加, 当顺序累加和并行累加时, 很可能结果是不同的.


    一、标准库并行算法

    看个最简单的例子, 在每个元素和其它元素无关的情况下, 下面两种并行代码是等价的

        std::vector<int> iVec(1000);
    
    #pragma omp parallel for
        for (int i = 0; i != 1000; ++i)
        {
            iVec[i] = 1;
        }
    
        std::for_each(std::execution::par, iVec.begin(), iVec.end(),
                      [](int &i) { i = 1; });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    openMP 本身的要求是元素无关, 这也是并发算法的限制, 此例中, 操作元素的函数也不存在竞争, 天然就适用于并发.

    向下面这种, 恐怕就比较有问题了, 你期待的是顺序增大, 但得到的结果是混乱的, 甚至是有重复值的, 因为数据竞争.

        std::vector<int> iVec(1000);
        // 非原子操作, 会有数据竞争导致的未定义行为
        int count = 0;
        std::for_each(std::execution::par, iVec.begin(), iVec.end(),
                      [&](int &x) { x = ++count; });
    
    • 1
    • 2
    • 3
    • 4
    • 5

    就算用原子变量引入同步, 消除重复值, 但顺序性仍旧无法保证, 所以对于有顺序性的计算, 并行算法可能不适用.

        std::atomic<int> count = 0;
    
    • 1

    关于同步, 如果算法在处理某个容器中的值的时候, 有其它函数也在处理容器数据, 就需要同步机制保护, 对于std::execution::par 策略, 可以使用每个元素的同步.

    下面示例是在操作每个元素时, 使用锁防止数据竞争. 如果有其它函数对 std::vector XVec 进行处理, 没有问题.

    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    
    struct X
    {
        X() = default;
    
        auto getValue() const -> int
        {
            std::lock_guard const guard(m);
            return data;
        }
    
        void increment()
        {
            std::lock_guard const guard(m);
            ++data;
        }
    
      private:
        mutable std::mutex m;
        int data = 0;
    };
    
    void incrementAll(std::vector<X> &v)
    {
        std::for_each(std::execution::par_unseq, v.begin(), v.end(),
                      [](X &x) { x.increment(); });
    }
    
    auto main() -> int
    {
        std::vector<X> XVec(100);
    
        incrementAll(XVec);
    
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    对于 std::execution::par_unseq 策略, 以上的同步机制就可能发生未定义行为, 此时, 需要算法独占整个容器:

    
    #include 
    #include 
    #include 
    #include 
    
    struct Y
    {
        Y() = default;
        [[nodiscard]] auto getValue() const -> int
        {
            return data;
        }
    
        void increment()
        {
            ++data;
        }
    
      private:
        int data = 0;
    };
    
    struct ProtectedY
    {
        void lock()
        {
            m.lock();
        }
        void unlock()
        {
            m.unlock();
        }
        auto getVec() -> std::vector<Y> &
        {
            return v;
        }
    
      private:
        std::mutex m;
        std::vector<Y> v;
    };
    
    void incrementAll(ProtectedY &data)
    {
        std::lock_guard const guard(data);
        auto &v = data.getVec();
        std::for_each(std::execution::par_unseq, v.begin(), v.end(),
                      [](Y &y) { y.increment(); });
    }
    
    auto main() -> int
    {
        ProtectedY py;
    
        auto &pyVec = py.getVec();
    
        for (int i = 0; i != 10; ++i)
        {
            pyVec.emplace_back();
        }
    
        incrementAll(py);
    
        return 0;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67

    最后是一个算法示例: 模拟日志信息聚合. 对于每个元素都相互无关且聚合时元素计算顺序不会影响到最终结果, 天然适合并行计算.

    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    
    // log信息类
    struct logInfo
    {
        std::string page;
        time_t visitTime;
        std::string browser;
    };
    
    // 通过 string 获取 log 信息
    extern auto parseLogLine(const std::string &line) -> logInfo;
    
    // 哈希表重命名
    using visitMapType = std::unordered_map<std::string, unsigned long long>;
    
    // 合并访问类
    struct combineVisits
    {
        auto operator()(visitMapType lhs, visitMapType rhs) const -> visitMapType
        {
            if (lhs.size() < rhs.size())
            {
                std::swap(lhs, rhs);
            }
            for (const auto &entry : rhs)
            {
                lhs[entry.first] += entry.second;
            }
            return lhs;
        }
    
        auto operator()(const logInfo &log, visitMapType map) const -> visitMapType
        {
            ++map[log.page];
            return map;
        }
    
        auto operator()(visitMapType map, const logInfo &log) const -> visitMapType
        {
            ++map[log.page];
            return map;
        }
    
        auto operator()(const logInfo &log1, const logInfo &log2) const
            -> visitMapType
        {
            visitMapType map;
            ++map[log1.page];
            ++map[log2.page];
            return map;
        }
    };
    
    // 访问计数
    auto countVisitsPerPage(const std::vector<std::string> &logLines)
        -> visitMapType
    {
        // 对容器中每个元素用 parseLogLine 转化处理, 交由 combineVisits 合并处理
        return std::transform_reduce(std::execution::par, logLines.begin(),
                                     logLines.end(), visitMapType(),
                                     combineVisits(), parseLogLine);
    }
    
    auto main() -> int
    {
        std::vector<std::string> const strVec{"a", "b", "c", "b", "d",
                                              "e", "f", "a", "c"};
    
        auto result = countVisitsPerPage(strVec);
    
        return 0;
    }
    
    auto parseLogLine(const std::string &line) -> logInfo
    {
        logInfo lgInfo;
        lgInfo.page = line;
        lgInfo.browser = line;
        lgInfo.visitTime = clock();
        return lgInfo;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87

    总结

    常用的并行算法, 可以直接使用标准库算法.

    策略的使用, 要注意是否可使用同步.

    并行算法依赖数据的分割, 对于不可交换, 及要求顺序性的操作, 需要小心.

    另外目前 Clang 还没有实现并行算法, 可使用 gcc 或 vs 工具链编译.

  • 相关阅读:
    Dive into TensorFlow系列(1)-静态图运行原理
    【面经&八股】大模型方向:面试记录(一)
    最全电脑固态硬盘SSD入门级白皮书
    面向任务对话系统(TOD)综述
    计算机网络体系结构——网络层
    C# Const、readonly、Static区别
    火柴排队.
    Git分支
    java8 实现递归查询
    Telent
  • 原文地址:https://blog.csdn.net/m0_54206076/article/details/127744066