• 一种用于多线程中间状态同步的屏障机制


    一种用于多线程中间状态同步的屏障机制

    为了解决在多线程环境中,需要一个内置的计数屏障对于多个线程中的某一个部分进行检查,确保所有线程均到达该点后才能继续执行。

    该屏障常被用于多线程流水线中的中间检查,适用于阶段分割,是一种有效的同步机制。

    此处构建了一个barrier类,其中arrive_and_wait()函数是对应的屏障方法,work是测试线程。

    此处代码注释掉的是使用busy-wait进行循环的忙等版本,保留了使用条件变量和unique_lock的阻塞wait_for版本,可以对比两者之间的性能差距。

    一般来说,线程规模较小,任务量较少时busy-wait效率较高,是由于sleep-awake过程中有系统调用。当任务规模达到一定程度时,wait_for通常性能较好。

    代码如下:

    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include "barrier.hpp"
    
    class barrier {
    public:
    
        barrier(size_t expected)
        :_expected(expected)
        ,_arrived(0)
        ,_passed(0)
        {}
    
        void arrive_and_wait();
    
    private:
        const size_t _expected;
        std::atomic<size_t> _arrived;
        std::atomic<size_t> _passed;
        std::mutex mtx;
        std::condition_variable cv;
    };
    
    /*void barrier::arrive_and_wait()
    {
        auto passed = _passed.load();
        if (_arrived.fetch_add(1) == _expected - 1) {
            // NOTE: reset *before* incrementing, otherwise we might reset to zero a
            // thread already waiting on the next wave
            _arrived = 0;
            _passed++;
        } else {
            while (_passed.load() == passed) {
            // busy-wait
            }
        }
    }*/
    
    void barrier::arrive_and_wait()
    {
        auto passed = _passed.load();
        if (_arrived.fetch_add(1) == _expected - 1) {
            // NOTE: reset *before* incrementing, otherwise we might reset to zero a
            // thread already waiting on the next wave
            _arrived = 0;
            _passed++;
            cv.notify_all();
        } else { //block
            std::unique_lock lck(mtx);
            cv.wait(lck, [&] {
                return _passed.load() != passed;
            });
            lck.unlock();
            cv.notify_all();
        }
    }
    
    void work(size_t id, barrier& b)
    {
        printf("+%ld\n", id); fflush(stdout);
        b.arrive_and_wait();
        printf(".%ld\n", id); fflush(stdout);
        b.arrive_and_wait();
        printf("-%ld\n", id); fflush(stdout);
    }
    
    int main(int argc, char** argv)
    {
        auto nthreads = atoi(argv[1]);
        barrier b(nthreads);
        std::vector threads;
    
        for (auto i = nthreads; i; i--) {
            threads.emplace_back(work, i, std::ref(b));
        }
    
        for (auto& thread : threads) {
            thread.join();
        }
    }
    
  • 相关阅读:
    配置nacos组件
    Object.prototype.toString.call() 和 instanceOf 和 Array.isArray() 详解
    机器视觉-相机选型-20220819-持续修改
    Unity 渲染YUV数据 ---- 以Unity渲染Android Camera数据为例子
    kernel 定时数据机构和API
    TDengine 3.0 重磅发布,首届开发者大会圆满结束
    ssm基于Java web的校园滴滴代驾管理系统毕业设计源码260839
    206页上海BIM技术应用与发展报告2021
    分布式中间件(五):RocketMQ 源码
    flac转换成mp3,flac转mp3方法
  • 原文地址:https://www.cnblogs.com/kazusarua/p/18028539