• 一种用于多线程中间状态同步的屏障机制


    一种用于多线程中间状态同步的屏障机制

    为了解决在多线程环境中,需要一个内置的计数屏障对于多个线程中的某一个部分进行检查,确保所有线程均到达该点后才能继续执行。

    该屏障常被用于多线程流水线中的中间检查,适用于阶段分割,是一种有效的同步机制。

    此处构建了一个barrier类,其中arrive_and_wait()函数是对应的屏障方法,work是测试线程。

    此处代码注释掉的是使用busy-wait进行循环的忙等版本,保留了使用条件变量和unique_lock的阻塞wait_for版本,可以对比两者之间的性能差距。

    一般来说,线程规模较小,任务量较少时busy-wait效率较高,是由于sleep-awake过程中有系统调用。当任务规模达到一定程度时,wait_for通常性能较好。

    代码如下:

    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include "barrier.hpp"
    
    class barrier {
    public:
    
        barrier(size_t expected)
        :_expected(expected)
        ,_arrived(0)
        ,_passed(0)
        {}
    
        void arrive_and_wait();
    
    private:
        const size_t _expected;
        std::atomic<size_t> _arrived;
        std::atomic<size_t> _passed;
        std::mutex mtx;
        std::condition_variable cv;
    };
    
    /*void barrier::arrive_and_wait()
    {
        auto passed = _passed.load();
        if (_arrived.fetch_add(1) == _expected - 1) {
            // NOTE: reset *before* incrementing, otherwise we might reset to zero a
            // thread already waiting on the next wave
            _arrived = 0;
            _passed++;
        } else {
            while (_passed.load() == passed) {
            // busy-wait
            }
        }
    }*/
    
    void barrier::arrive_and_wait()
    {
        auto passed = _passed.load();
        if (_arrived.fetch_add(1) == _expected - 1) {
            // NOTE: reset *before* incrementing, otherwise we might reset to zero a
            // thread already waiting on the next wave
            _arrived = 0;
            _passed++;
            cv.notify_all();
        } else { //block
            std::unique_lock lck(mtx);
            cv.wait(lck, [&] {
                return _passed.load() != passed;
            });
            lck.unlock();
            cv.notify_all();
        }
    }
    
    void work(size_t id, barrier& b)
    {
        printf("+%ld\n", id); fflush(stdout);
        b.arrive_and_wait();
        printf(".%ld\n", id); fflush(stdout);
        b.arrive_and_wait();
        printf("-%ld\n", id); fflush(stdout);
    }
    
    int main(int argc, char** argv)
    {
        auto nthreads = atoi(argv[1]);
        barrier b(nthreads);
        std::vector threads;
    
        for (auto i = nthreads; i; i--) {
            threads.emplace_back(work, i, std::ref(b));
        }
    
        for (auto& thread : threads) {
            thread.join();
        }
    }
    
  • 相关阅读:
    Spark 内核 (二) --------- Spark 部署模式
    LuatOS-SOC接口文档(air780E)--lora2 - lora2驱动模块(支持多挂)
    Hadoop HA高可用环境搭建
    ESP8266-Arduino编程实例-PIR(被动红外)传感器驱动
    高斯锁表导致sql报错处理
    mars3d调用高亮openHighlight,实现点击按钮高亮小车
    支持笔记本电脑直插直充,TOWE 65W智能快充PDU超级插座
    【YOLO格式的数据标签,目标检测】
    2024 CKA 最新 | 基础操作教程(十六)
    docker保存镜像出错
  • 原文地址:https://www.cnblogs.com/kazusarua/p/18028539