• 2022-10-20 C++并发编程( 三十五 )



    前言

    并发编程相对于单线程编程, 在异常安全上需要注意更多问题.

    我学C++时, 只是了解了异常, 异常捕获等等, 没有进行过深的研究, 也就是 C++ primer 所涉猎的部分, 仅此而已, 但书上没有阐明什么是真正的异常安全, 需要补充些前置知识.

    所谓异常安全, 意味着无资源泄漏风险, 无数据破坏风险.

    C++ 用 RAII 的方式巧妙的应对资源泄漏, 无论异常是否引发, 都会析构, 通过这种设计, 保证不会有资源泄漏.

    而应对数据破坏, 则可以使用 swap 策略, 使得数据要不然成功修改, 要不然保持原样. 更深的理解则需要查看更多的资料, 本文作者也没有能力完全阐明清楚, 程序可使用, 和程序有极强的鲁棒性, 难度差异极大.


    一、并行算法中的异常安全

    从前期文章中提到的并发累加算法实现, 看看不涉及异常安全的代码是怎么样的:

    #include 
    #include 
    #include 
    #include 
    #include 
    
    // 累加仿函数
    template <typename Iterator, typename T>
    struct accumulateBlock
    {
        void operator()(Iterator first, Iterator last, T &result)
        {
            result = std::accumulate(first, last, result);
        }
    };
    
    // 并行累加
    template <typename Iterator, typename T>
    auto parallelAccumulate(Iterator first, Iterator last, T init) -> T
    {
        const unsigned long length = std::distance(first, last);
    
        if (!length)
        {
            return init;
        }
    
        const unsigned long minPerThread = 25;
    
        // 算法保证最大线程数为 25 的倍数, 如余数不为 0 则加一
        const unsigned long maxThreads = (length + minPerThread - 1) / minPerThread;
    
        const unsigned long hardwareThreads = std::thread::hardware_concurrency();
    
        const unsigned long numThreads =
            std::min(hardwareThreads != 0 ? hardwareThreads : 2, maxThreads);
    
        // 每个线程需处理的数据量, 余数由主线程处理
        const unsigned long blockSize = length / numThreads;
    
        // 结果数组, 个数为线程数
        std::vector<T> results(numThreads);
        std::vector<std::thread> threads(numThreads - 1);
    
        Iterator blockStart = first;
    
        for (unsigned long i = 0; i < (numThreads - 1); ++i)
        {
            Iterator blockEnd = blockStart;
    
            // 使迭代器 blockEnd 偏移 blockSize
            std::advance(blockEnd, blockSize);
            threads[i] = std::thread(accumulateBlock<Iterator, T>(), blockStart,
                                     blockEnd, std::ref(results[i]));
            blockStart = blockEnd;
        }
    
        // 其它线程累加剩下的元素由主线程收尾
        accumulateBlock<Iterator, T>()(blockStart, last, results[numThreads - 1]);
    
        std::for_each(threads.begin(), threads.end(),
                      std::mem_fn(&std::thread::join));
    
        return std::accumulate(results.begin(), results.end(), init);
    }
    
    auto main() -> int
    {
        std::vector<int> vi;
        vi.reserve(100);
        for (int i = 0; i != 100; ++i)
        {
            vi.push_back(i);
        }
    
        int result = parallelAccumulate(vi.begin(), vi.end(), 0);
    
        std::cout << result << std::endl;
    
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81

    以上代码中, accumulateBlock 的内部实现使用了算法库的函数, 而此函数未曾封装在 try catch 结构中, 所以所有对仿函数的调用都可能引发异常, 导致 std::terminate 结束程序.

    所以, 这个并发累加的算法函数实现是非异常安全的.

    要让程序有一定的异常安全性, 可以改用标准库 std::packaged_task 和 std::future 结构封装, 这两个结构是异常安全的. 为了配合这种结构, 可以将 accumulateBlock 仿函数更改为返回结果的实现, 用于将结果封装于 packaged_task 传递给 future.

    所有由其它线程返回的结果都保存在 future 中, 在线程执行运算时, 由 future 进行结果封装或异常抛出.

    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    
    // 累加仿函数
    template <typename Iterator, typename T>
    struct accumulateBlock
    {
        auto operator()(Iterator first, Iterator last) -> T
        {
            return std::accumulate(first, last, T());
        }
    };
    
    // 并行累加
    template <typename Iterator, typename T>
    auto parallelAccumulate(Iterator first, Iterator last, T init) -> T
    {
        const unsigned long length = std::distance(first, last);
    
        if (!length)
        {
            return init;
        }
    
        const unsigned long minPerThread = 25;
    
        // 算法保证最大线程数为 25 的倍数, 如余数不为 0 则加一
        const unsigned long maxThreads = (length + minPerThread - 1) / minPerThread;
    
        const unsigned long hardwareThreads = std::thread::hardware_concurrency();
    
        const unsigned long numThreads =
            std::min(hardwareThreads != 0 ? hardwareThreads : 2, maxThreads);
    
        // 每个线程需处理的数据量, 余数由主线程处理
        const unsigned long blockSize = length / numThreads;
    
        // 结果数组, 个数为线程数
        std::vector<std::future<T>> futures(numThreads - 1);
        std::vector<std::thread> threads(numThreads - 1);
    
        Iterator blockStart = first;
    
        for (unsigned long i = 0; i < (numThreads - 1); ++i)
        {
            Iterator blockEnd = blockStart;
    
            // 使迭代器 blockEnd 偏移 blockSize
            std::advance(blockEnd, blockSize);
    
            std::packaged_task<T(Iterator, Iterator)> task(
                (accumulateBlock<Iterator, T>()));
    
            futures[i] = task.get_future();
    
            threads[i] = std::thread(std::move(task), blockStart, blockEnd);
    
            blockStart = blockEnd;
        }
    
        // 其它线程累加剩下的元素由主线程收尾
        T lastResult = accumulateBlock<Iterator, T>()(blockStart, last);
    
        std::for_each(threads.begin(), threads.end(),
                      std::mem_fn(&std::thread::join));
    
        T result = init;
    
        for (unsigned long i = 0; i < (numThreads - 1); ++i)
        {
            result += futures[i].get();
        }
    
        result += lastResult;
    
        return result;
    }
    
    auto main() -> int
    {
        std::vector<int> vi(100);
    
        for (int i = 0; i != 100; ++i)
        {
            vi[i] = i;
        }
    
        int result = parallelAccumulate(vi.begin(), vi.end(), 0);
    
        std::cout << result << std::endl;
    
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97

    以上代码制约了异常只能在最后结果汇合时抛出异常, 我们可以进一步, 将最后部分放入 tyr catch 结构中.

        T lastResult;
        try
        {
            for (unsigned long i = 0; i < (numThreads - 1); ++i)
            {
                Iterator blockEnd = blockStart;
    
                // 使迭代器 blockEnd 偏移 blockSize
                std::advance(blockEnd, blockSize);
    
                std::packaged_task<T(Iterator, Iterator)> task(
                    (accumulateBlock<Iterator, T>()));
    
                futures[i] = task.get_future();
    
                threads[i] = std::thread(std::move(task), blockStart, blockEnd);
    
                blockStart = blockEnd;
            }
    
            // 其它线程累加剩下的元素由主线程收尾
            lastResult = accumulateBlock<Iterator, T>()(blockStart, last);
    
            std::for_each(threads.begin(), threads.end(),
                          std::mem_fn(&std::thread::join));
        }
        catch (...)
        {
            for (unsigned long i = 0; i != (numThreads - 1); ++i)
            {
                if (threads[i].joinable())
                {
                    threads[i].join();
                }
                throw;
            }
        }
        
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    也可用 RAII 的方式, 用一个结构封装线程数组, 并在析构时保证 join, 可以省略 try catch 结构, 并且省去显示的 join 调用, 因为 future.get 会阻塞线程直至返回值.

    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    
    // 累加仿函数
    template <typename Iterator, typename T>
    struct accumulateBlock
    {
        auto operator()(Iterator first, Iterator last) -> T
        {
            return std::accumulate(first, last, T());
        }
    };
    
    struct joinThreads
    {
        explicit joinThreads(std::vector<std::thread> &rhs)
            : threads(rhs)
        {}
    
        ~joinThreads()
        {
            for (auto &thread : threads)
            {
                if (thread.joinable())
                {
                    thread.join();
                }
            }
        }
    
      private:
        std::vector<std::thread> &threads;
    };
    
    // 并行累加
    template <typename Iterator, typename T>
    auto parallelAccumulate(Iterator first, Iterator last, T init) -> T
    {
        const unsigned long length = std::distance(first, last);
    
        if (!length)
        {
            return init;
        }
    
        const unsigned long minPerThread = 25;
    
        // 算法保证最大线程数为 25 的倍数, 如余数不为 0 则加一
        const unsigned long maxThreads = (length + minPerThread - 1) / minPerThread;
    
        const unsigned long hardwareThreads = std::thread::hardware_concurrency();
    
        const unsigned long numThreads =
            std::min(hardwareThreads != 0 ? hardwareThreads : 2, maxThreads);
    
        // 每个线程需处理的数据量, 余数由主线程处理
        const unsigned long blockSize = length / numThreads;
    
        // 结果数组, 个数为线程数
        std::vector<std::future<T>> futures(numThreads - 1);
        std::vector<std::thread> threads(numThreads - 1);
    
        // 函数结束自动将所有线程 join
        joinThreads const joiner(threads);
    
        Iterator blockStart = first;
    
        T lastResult;
    
        for (unsigned long i = 0; i < (numThreads - 1); ++i)
        {
            Iterator blockEnd = blockStart;
    
            // 使迭代器 blockEnd 偏移 blockSize
            std::advance(blockEnd, blockSize);
    
            std::packaged_task<T(Iterator, Iterator)> task(
                (accumulateBlock<Iterator, T>()));
    
            futures[i] = task.get_future();
    
            threads[i] = std::thread(std::move(task), blockStart, blockEnd);
    
            blockStart = blockEnd;
        }
    
        // 其它线程累加剩下的元素由主线程收尾
        lastResult = accumulateBlock<Iterator, T>()(blockStart, last);
    
        T result = init;
    
        for (unsigned long i = 0; i < (numThreads - 1); ++i)
        {
            result += futures[i].get();
        }
    
        result += lastResult;
    
        return result;
    }
    
    auto main() -> int
    {
        std::vector<int> vi(100);
    
        for (int i = 0; i != 100; ++i)
        {
            vi[i] = i;
        }
    
        int result = parallelAccumulate(vi.begin(), vi.end(), 0);
    
        std::cout << result << std::endl;
    
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120

    用 std::async() 加递归的方式实现并发累加, 通过 async 自动分配线程, 通过 future 捕获异常, 保证异常安全.

    #include 
    #include 
    #include 
    #include 
    #include 
    
    template <typename Iterator, typename T>
    auto parallelAccumulate(Iterator first, Iterator last, T init) -> T
    {
        const unsigned long length = std::distance(first, last);
    
        const unsigned long maxChunkSize = 25;
    
        if (length <= maxChunkSize)
        {
            return std::accumulate(first, last, init);
        }
    
        Iterator midPoint = first;
    
        std::advance(midPoint, length / 2);
    
        std::future<T> firstHalfResult =
            std::async(parallelAccumulate<Iterator, T>, first, midPoint, init);
    
        T secondHalfResult = parallelAccumulate(midPoint, last, T());
    
        return firstHalfResult.get() + secondHalfResult;
    }
    
    auto main() -> int
    {
        std::vector<int> vi(100);
    
        for (int i = 0; i != 100; ++i)
        {
            vi[i] = i;
        }
    
        const int result = parallelAccumulate(vi.begin(), vi.end(), 0);
    
        std::cout << result << std::endl;
    
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    总结

    对于真正可用的并发代码, 需要注意的比单线程要多, 也更难, 异常安全是个较大的话题, 需要更多的资料学习尝试.

    参考: C++中的异常安全性

  • 相关阅读:
    js中使用getElementsByClassName获取class对象
    No suitable driver found for jdbc:mysql://localhost:3306/BookManagement
    01背包面试题系列(一)
    docker-compose安装和使用(自启、redis、mysql、rabbitmq、activemq、es、nginx、java应用)
    mybatisplus-分页插件PageHelper
    【FPGA数学公式】使用FPGA实现常用数学公式
    微客云升级会员制度
    3d代理模型怎么转换成标准模型---模大狮模型网
    蓝桥杯打卡Day3
    当个 PM 式程序员「GitHub 热点速览」
  • 原文地址:https://blog.csdn.net/m0_54206076/article/details/127432383