• 2022-11-03 C++并发编程( 三十九 )



    前言

    上篇文章我们介绍了线程池, 比较复杂, 本文会在线程池的基础上增加中断线程, 完善线程的较高级操作.

    线程的中断需要让线程主动而安全的停止, 通过信号的传入结束线程, 将其停止运行.


    一、可中断线程类的线程池

    可中断的线程可以通过将 thread 包装到一个有中断标识的类中, 通过另一个中断检查或等待中断检查函数判断中断标识是否设置, 如果设置, 则线程弹出中断异常, 中断此线程.

    
    #ifndef THREADPOOL
    #define THREADPOOL
    
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    
    namespace TS
    {
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    这是线程中断异常, 继承自异常类, 会在后续的可中断线程类中用于异常抛出和捕获.

    // 线程中断异常
    struct threadInterrupted : std::exception
    {
        [[nodiscard]] auto what() const noexcept -> const char * override
        {
            return "thread interrupted.";
        }
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    以下是用于中断标识类的可锁类, 用于中断标识类的 wait 函数

    // 中断标识
    struct interruptFlag;
    
    // 自定义锁
    template <typename Lockable>
    struct customLock
    {
        customLock(interruptFlag *self_, std::condition_variable_any &cond,
                   Lockable &lock_);
    
        ~customLock();
    
        void unlock();
    
        void lock();
    
      private:
        // 中断标识指针
        interruptFlag *self;
    
        // 成员锁
        Lockable &theLock;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    以下是中断标识类的具体实现, 中断标识是一个原子布尔类对象, 当中断标识被设置, 线程中一旦触发 interruptionPoint() 函数则会立即抛出异常并中断.

    或者线程中有等待逻辑, 在中断设置时发出通知, 被 interruptibleWait() 等待中断函数接受, 抛出异常并中断.

    // 中断函数,
    // 若线程局部中断标识被设置,
    // 则抛出线程中断异常
    void interruptionPoint();
    
    // 中断标识
    struct interruptFlag
    {
        // 默认构造
        interruptFlag() = default;
    
        // 设置中断标识
        void set()
        {
            // 中断标识设置为 true
            atomicFlag.store(true, std::memory_order_relaxed);
    
            // 设置清除锁上锁
            const std::lock_guard<std::mutex> lock(setClearMutex);
    
            // 如果线程条件变量指针不为空, 通知所有线程
            if (threadCond != nullptr)
            {
                threadCond->notify_all();
            }
            // 否则如果线程任意条件变量指针不为空, 通知所有线程
            else if (threadCondAny != nullptr)
            {
                threadCondAny->notify_all();
            }
        }
    
        // 等待
        template <typename Lockable>
        void wait(std::condition_variable_any &condVarAny, Lockable &lock)
        {
            // 初始化自定义锁
            // 将条件变量 condVarAny 传递给本中断标识
            // 本中断标识的设置锁上锁 setClearMutex.lock()
            // 析构时清空本中断标识的条件变量指针
            // 并解锁设置锁 setClearMutex.unlock()
            customLock<Lockable> cusLock(this, condVarAny, lock);
    
            // 若线程局部中断标识被设置, 则抛出线程中断异常
            interruptionPoint();
    
            // 条件变量等待
            // 未通知时解锁参数中的 lock 和本中断标识的设置锁
            // 通知后将以上两锁加锁
            // 这是一个巧妙的结构,
            // 等待唤醒和上边检查中断异常之间是可以用 set() 设置中断标识的,
            // 但是 set() 想通知所有线程, 则是阻塞的, 因为 setClearMutex 已经锁定
            // 只有当下面 wait() 执行释放 cusLock 时, setClearMutex 解锁
            // 然后 notify_all 才会进行, 唤醒不会被遗漏
            condVarAny.wait(cusLock);
    
            // 若线程局部中断标识被设置, 则抛出线程中断异常
            interruptionPoint();
        }
    
        // 中断标识是否设置
        // 返回 atomicFlag 值
        [[nodiscard]] auto isSet() const -> bool
        {
            return atomicFlag.load(std::memory_order_relaxed);
        }
    
        // 设置条件变量
        // 给线程条件变量指针赋值
        void setConditionVariable(std::condition_variable &condVar)
        {
            std::lock_guard<std::mutex> const lock(setClearMutex);
            threadCond = &condVar;
        }
    
        // 将线程条件变量指针置空
        void clearConditionVariable()
        {
            std::lock_guard<std::mutex> const lock(setClearMutex);
            threadCond = nullptr;
        }
    
      private:
        // 原子标识
        std::atomic<bool> atomicFlag;
    
        // 线程条件变量指针
        std::condition_variable *threadCond = nullptr;
    
        // 线程任意条件变量指针
        std::condition_variable_any *threadCondAny = nullptr;
    
        // 互斥锁设置清除锁
        std::mutex setClearMutex;
    
        // 友元类
        template <typename Lockable>
        friend struct customLock;
    };
    
    // 自定义锁初始化,
    // 获取中断标识指针, 条件变量,
    // 中断标识的清理锁加锁,
    // 获取中断标识的线程任意条件变量指针
    template <typename Lockable>
    customLock<Lockable>::customLock(interruptFlag *self_,
                                     std::condition_variable_any &cond,
                                     Lockable &lock_)
        : self(self_)
        , theLock(lock_)
    {
        // 中断标识的清理锁加锁
        self->setClearMutex.lock();
        // 获取中断标识的线程任意条件变量指针
        self->threadCondAny = &cond;
    }
    
    // 自定义锁析构,
    // 中断标识的线程任意条件变量指针置空,
    // 中断标识的清理锁释放锁
    template <typename Lockable>
    customLock<Lockable>::~customLock()
    {
        self->threadCondAny = nullptr;
        self->setClearMutex.unlock();
    }
    
    // 自定义锁解锁,
    // 锁成员解锁,
    // 中断标识的清理锁解锁,
    template <typename Lockable>
    void customLock<Lockable>::unlock()
    {
        theLock.unlock();
        self->setClearMutex.unlock();
    }
    
    // 自定义锁加锁,
    // 锁成员加锁, 中断标识清理锁加锁
    template <typename Lockable>
    void customLock<Lockable>::lock()
    {
        std::lock(self->setClearMutex, theLock);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144

    以下是可中断线程类的具体实现, 依靠一个线程局部的中断标识确保本线程的中断操作.

    除了中断函数, 其余的都与普通线程类无异.

    构造可中断线程类时, 需要传入一个状态函数 explicit interruptibleThread(FunctionType func), 通常使用 lambda 进行封装, 因为没有放参数的地方. 对于 lambda 不熟悉的同学, 一定要好好学学.

    作为参数的状态函数, 通常会是一个循环结构, 包含 interruptionPoint() 检查中断函数或某个 interruptibleWait() 函数, 当线程设置中断标识后, 抛出异常, 中断线程.

    // 线程局部中断标识
    thread_local interruptFlag thisThreadInterruptFlag;
    
    // 清除局部线程中断标识的条件变量
    // 析构函数的一种运用
    struct clearCondVarOnDestruct
    {
        ~clearCondVarOnDestruct()
        {
            thisThreadInterruptFlag.clearConditionVariable();
        }
    };
    
    // 可中断线程类
    struct interruptibleThread
    {
        // 默认构造函数
        interruptibleThread() = default;
    
        // 构造函数
        template <typename FunctionType>
        explicit interruptibleThread(FunctionType func)
        {
            // 初始化中断标识指针 promise
            std::promise<interruptFlag *> promFlagPtr;
    
            // 初始化内部线程
            internalThread = std::thread([func, &promFlagPtr]() {
                // 给 通过 lambda 捕获的 promFlagPtr 赋值局部线程中断标识的指针
                promFlagPtr.set_value(&thisThreadInterruptFlag);
                // 试图运行 func
                try
                {
                    func();
                }
                // 捕获线程中断异常
                catch (const threadInterrupted &thrIntrpt)
                {}
            });
    
            // 通过 future 传递局部线程中断标识指针给 intrptFlag
            intrptFlag = promFlagPtr.get_future().get();
        }
    
        // 不可拷贝构造
        interruptibleThread(const interruptibleThread &) = delete;
    
        // 不可拷贝赋值
        auto operator=(const interruptibleThread &)
            -> interruptibleThread & = delete;
    
        // 移动构造
        interruptibleThread(interruptibleThread &&rhs) noexcept
            : internalThread(std::move(rhs.internalThread))
            , intrptFlag(rhs.intrptFlag)
        {
            rhs.intrptFlag = nullptr;
        }
    
        // 移动赋值
        auto operator=(interruptibleThread &&rhs) noexcept -> interruptibleThread &
        {
            internalThread = std::move(rhs.internalThread);
            intrptFlag = rhs.intrptFlag;
            rhs.intrptFlag = nullptr;
            return *this;
        }
    
        void join()
        {
            internalThread.join();
        }
    
        void detatch()
        {
            internalThread.detach();
        }
    
        [[nodiscard]] auto joinable() const -> bool
        {
            return internalThread.joinable();
        }
    
        // 中断
        void interrupt()
        {
            // 如果中断标识指针不为空
            // 设置中断标识
            if (intrptFlag != nullptr)
            {
                intrptFlag->set();
            }
        }
    
      private:
        // 内部线程
        std::thread internalThread;
    
        // 中断标识指针
        interruptFlag *intrptFlag = nullptr;
    };
    
    // 中断函数,
    // 若线程局部中断标识被设置,
    // 则抛出线程中断异常
    inline void interruptionPoint()
    {
        // 若线程局部中断标识被设置,
        // 则抛出线程中断异常
        if (thisThreadInterruptFlag.isSet())
        {
            throw threadInterrupted();
        }
    }
    
    // 中断等待
    // 封装条件变量的等待, 同时兼有中断功能
    // 不足: 过多的伪唤醒
    inline void interruptibleWait(std::condition_variable &condVar,
                                  std::unique_lock<std::mutex> &lock)
    {
        // 判断是否中断
        interruptionPoint();
    
        // 设置局部线程中断标识的条件变量指针
        // 这样, 当中断标识设置 set() 时, 会通知所有阻塞的条件变量
        thisThreadInterruptFlag.setConditionVariable(condVar);
    
        // 析构时清理上面设置的变量指针
        // 为了防止下面 wait_for 抛异常, 无法清空中断标识的条件变量指针
        const clearCondVarOnDestruct guard;
    
        // 判断是否中断
        interruptionPoint();
    
        // 等待唤醒, 如未通知, 1毫秒后唤醒
        // 为了防止线程中断标识在判断是否中断和 wait 之间设置 set(),
        // 导致通知错过, 无法唤醒
        // 此处用 wait_for
        condVar.wait_for(lock, std::chrono::milliseconds(1));
    
        // 判断是否中断
        // 无中断则继续
        interruptionPoint();
    }
    
    // 有前置条件的中断等待
    // 不足: 前置条件的检查次数过多
    template <typename Predicate>
    void interruptibleWait(std::condition_variable &condVar,
                           std::unique_lock<std::mutex> &lock, Predicate pred)
    {
        // 判断是否中断
        interruptionPoint();
    
        // 局部线程中断标识设置条件变量
        thisThreadInterruptFlag.setConditionVariable(condVar);
    
        // 析构时清除上面设置的条件变量指针
        clearCondVarOnDestruct const guard;
    
        // 当未设置局部线程中断标识, 且未达到前置条件, 进行循环
        while (!thisThreadInterruptFlag.isSet() && !pred())
        {
            // 等待唤醒, 如未通知, 1毫秒后唤醒
            condVar.wait_for(lock, std::chrono::milliseconds(1));
        }
    
        // 判断是否中断
        interruptionPoint();
    }
    
    // 通过局部线程中断标识的 wait() 函数实现的中断等待
    template <typename Lockable>
    void interruptibleWait(std::condition_variable_any &conVar, Lockable &lock)
    {
    
        thisThreadInterruptFlag.wait(conVar, lock);
    }
    
    // 通过等待 future 完成实现中断等待
    template <typename T>
    void interruptibleWait(std::future<T> &theFuture)
    {
        // 如果线程中断标识未设置, 则循环
        while (!thisThreadInterruptFlag.isSet())
        {
            // 如果 theFuture 等待 1 毫秒后 ready 则中断循环
            if (theFuture.wait_for(std::chrono::milliseconds(1)) ==
                std::future_status::ready)
            {
                break;
            }
        }
    
        // 检查是否中断
        interruptionPoint();
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199

    以上是可中断线程及其相关中断函数, 以下是在线程池中使用可中断线程类, 可以根据线程编号中断某个具体线程, 在完成所有任务后, 线程进入等待, 然后线程中断.

    另外注意, 线程池中任务队列没有阻塞机制, 也就是说由于任务是一个一个弹出的, 线程池只能被当前执行的任务所阻塞, 一旦线程池析构, 将抛弃所有任务队列中未完成任务, 所以使用线程池, 你需要有一个等待机制, 比如任务数是否为0, 这个应该不难实现.

    目前的线程池以经非常复杂了, 但依然有可改善空间, 比如引入无锁队列, 至于是否能提高效率, 还需要具体测试.

    
    template <typename T>
    struct threadSafeQueue
    {
        threadSafeQueue() = default;
    
        threadSafeQueue(const threadSafeQueue &rhs)
        {
            std::lock_guard<std::mutex> makeLock(rhs.mutx);
            dataQueue = rhs.dataQueue;
        }
    
        auto operator=(const threadSafeQueue &rhs) -> threadSafeQueue & = delete;
    
        void push(const T &newVal)
        {
            std::lock_guard<std::mutex> makeLock(mutx);
            dataQueue.push(newVal);
            dataCond.notify_one();
        }
    
        void push(T &&newVal)
        {
            std::lock_guard<std::mutex> makeLock(mutx);
            dataQueue.push(std::move(newVal));
            dataCond.notify_one();
        }
    
        // 通过返回 true false 判断赋值是否正确
        auto tryPop(T &val) -> bool
        {
            std::lock_guard<std::mutex> const makeLock(mutx);
            if (dataQueue.empty())
            {
                return false;
            }
            val = std::move(dataQueue.front());
            dataQueue.pop();
            return true;
        }
    
        // 通过返回 nullptr 或有效 shared_ptr 判定是否正确赋值
        auto tryPop() -> std::shared_ptr<T>
        {
            std::lock_guard<std::mutex> makeLock(mutx);
            if (dataQueue.empty())
            {
                // 返回 nullptr
                return std::shared_ptr<T>(nullptr);
            }
            std::shared_ptr<T> res(std::make_shared<T>(dataQueue.front()));
            dataQueue.pop();
            return res;
        }
    
        void waitAndPop(T &val)
        {
            std::unique_lock<std::mutex> makeLock(mutx);
            // 如果不为空,加锁,执行下面的程序
            // 如果为空,等待传来消息,再判断是否为空,为空则继续等
            // 不为空则则锁住,继续下面的程序
            // 就是没有通知,也会判断是否为空,为空等,不空锁,向下执行
            dataCond.wait(makeLock, [this] { return !dataQueue.empty(); });
            val = dataQueue.front();
            dataQueue.pop();
        }
    
        // 如果一直为空,没有程序产生队列数据,会一直阻塞,可能会让程序不可结束
        auto waitAndPop() -> std::shared_ptr<T>
        {
            std::unique_lock<std::mutex> makeLock(mutx);
            dataCond.wait(makeLock, [this] { return !dataQueue.empty(); });
            std::shared_ptr<T> res(std::make_shared<T>(dataQueue.front()));
            dataQueue.pop();
            return res;
        }
    
        auto empty() const -> bool
        {
            std::lock_guard<std::mutex> makeLock(mutx);
            return dataQueue.empty();
        }
    
      private:
        mutable std::mutex mutx;
        std::queue<T> dataQueue;
        std::condition_variable dataCond;
    };
    
    // 函数类型擦除类基类
    struct implBase
    {
        virtual void call() = 0;
        virtual ~implBase() = default;
    };
    
    // 函数类型擦除类
    template <typename FuncType>
    struct implType : implBase
    {
    
        explicit implType(FuncType &&rhs)
            : func(std::move(rhs))
        {}
    
        void call() override
        {
            func();
        }
    
      private:
        FuncType func;
    };
    
    // 函数包装类
    struct functionWrapper
    {
        functionWrapper() = default;
    
        // 禁用拷贝构造
        functionWrapper(const functionWrapper &) = delete;
    
        // 禁用引用拷贝构造
        functionWrapper(functionWrapper &) = delete;
    
        // 禁用拷贝赋值
        auto operator=(const functionWrapper &) -> functionWrapper & = delete;
    
        // 移动构造
        functionWrapper(functionWrapper &&rhs) noexcept
            : impl(std::move(rhs.impl))
        {}
    
        // 移动拷贝
        auto operator=(functionWrapper &&rhs) noexcept -> functionWrapper &
        {
            impl = std::move(rhs.impl);
            return *this;
        }
    
        // 通过可执行类对象构造
        template <typename FuncType>
        explicit functionWrapper(FuncType &&func)
            : impl(new implType<FuncType>(std::forward<FuncType>(func)))
        {}
    
        // 执行
        void operator()()
        {
            try
            {
                impl->call();
            }
            catch (...)
            {
                throw;
            }
        }
    
      private:
        // 通过智能指针封装, 无需担心资源回收
        std::unique_ptr<implBase> impl;
    };
    
    // 工作转移队列
    struct workStealQueue
    {
        using dataType = functionWrapper;
    
        workStealQueue() = default;
    
        // 不可拷贝构造
        workStealQueue(const workStealQueue &rhs) = delete;
    
        // 不可拷贝赋值
        auto operator=(const workStealQueue &rhs) -> workStealQueue & = delete;
    
        // 推入队列
        void push(dataType data)
        {
            const std::lock_guard<std::mutex> lock(theMutex);
            theQueue.push_front(std::move(data));
        }
    
        // 队列是否为空
        auto empty() const -> bool
        {
            const std::lock_guard<std::mutex> lock(theMutex);
            return theQueue.empty();
        }
    
        // 试图弹出数据
        auto tryPop(dataType &result) -> bool
        {
            const std::lock_guard<std::mutex> lock(theMutex);
    
            if (theQueue.empty())
            {
                return false;
            }
    
            result = std::move(theQueue.front());
            theQueue.pop_front();
    
            return true;
        }
    
        // 试图转移数据
        auto trySteal(dataType &result) -> bool
        {
            const std::lock_guard<std::mutex> lock(theMutex);
            if (theQueue.empty())
            {
                return false;
            }
    
            result = std::move(theQueue.back());
            theQueue.pop_back();
            return true;
        }
    
      private:
        // 双向队列
        std::deque<dataType> theQueue;
        // 互斥锁, 为了用于 const 函数, 要加 mutable
        mutable std::mutex theMutex;
    };
    
    // 线程池
    struct threadPool
    {
        // 构造函数, 线程异常安全
        threadPool()
            : done(false)
        {
            const unsigned threadCount = std::thread::hardware_concurrency();
            poolSize = threadCount;
            try
            {
                queues.reserve(threadCount);
    
                for (unsigned i = 0; i < threadCount; ++i)
                {
                    // 初始化工作转移队列并推入 queues
                    queues.push_back(std::make_unique<workStealQueue>());
                }
    
                threads.reserve(threadCount);
    
                for (unsigned i = 0; i < threadCount; ++i)
                {
                    // 初始化工作线程并推入线程数组
                    // threads.emplace_back(&threadPool::workerThread, this, i);
                    threads.emplace_back([this, i]() { workerThread(i); });
                }
            }
            catch (...)
            {
                done = true;
                throw;
            }
        }
    
        // 析构函数
        ~threadPool()
        {
            done = true;
    
            // 当所有任务结束, 通知所有线程, 以便顺利解除阻塞
            condVar.notify_all();
    
            for (auto &thread : threads)
            {
                if (thread.joinable())
                {
                    thread.join();
                }
            }
        }
    
        // 将函数提交到工作队列, 返回 future
        template <typename FunctionType>
        auto submit(FunctionType func)
            -> std::future<typename std::invoke_result<FunctionType>::type>
        {
            // invoke_result 获取结果类型, 其类型是函数类型而非函数本身,
            // 区别是 FunctionType 不带括号, 如有参数用逗号分隔写在后面
            using resultType = typename std::invoke_result<FunctionType>::type;
    
            // 封装函数
            std::packaged_task<resultType()> task(func);
    
            // 获取 future
            std::future<resultType> result(task.get_future());
    
            if (localWorkQueue != nullptr)
            {
                // 封装 task, 推入工作队列
                // packaged_task 类型只可移动不可拷贝
                localWorkQueue->push(functionWrapper(std::move(task)));
    
                // 通知以解除阻塞
                condVar.notify_one();
            }
            else
            {
                // 当没有局部工作队列, 则将任务放入总队列
                poolWorkQueue.push(functionWrapper(std::move(task)));
                // 通知以解除阻塞
                condVar.notify_one();
            }
    
            return result;
        }
    
        // 将函数均匀的提交到工作队列, 返回 future
        template <typename FunctionType>
        auto submitAll(FunctionType func)
            -> std::future<typename std::invoke_result<FunctionType>::type>
        {
            // invoke_result 获取结果类型, 其类型是函数类型而非函数本身,
            // 区别是 FunctionType 不带括号, 如有参数用逗号分隔写在后面
            using resultType = typename std::invoke_result<FunctionType>::type;
    
            // 封装函数
            std::packaged_task<resultType()> task(func);
    
            // 获取 future
            std::future<resultType> result(task.get_future());
    
            // 封装 task, 推入工作队列
            // packaged_task 类型只可移动不可拷贝
            queues[taskCnt++ % poolSize]->push(functionWrapper(std::move(task)));
    
            // 通知以解除阻塞
            condVar.notify_one();
    
            return result;
        }
    
        // 任务运行
        auto runPendingTask() -> bool
        {
            functionWrapper task;
    
            // 从局部队列, 或全局队列, 或其它线程队列获取任务
            if (popTaskFromLocalQueue(task) || popTaskFromPoolQueue(task) ||
                popTaskFromOtherThreadQueue(task))
            {
                // 成功则执行任务返回 true
                task();
                return true;
            }
    
            // 如有其它程序需要cpu资源, 让渡
            std::this_thread::yield();
    
            // 无任务返回 false
            return false;
        }
    
        // 中断线程
        void interrupt(unsigned threadId)
        {
            if (threadId < poolSize)
            {
                threads[threadId].interrupt();
            }
        }
    
        // 获取线程池中线程数
        auto size() const -> unsigned
        {
            return poolSize;
        }
    
      private:
        // 工作线程
        void workerThread(unsigned rhsMyIndex)
        {
            // 初始化局部索引
            myIndex = rhsMyIndex;
    
            // 初始化局部工作队列
            localWorkQueue = queues[myIndex].get();
    
            std::mutex mtx;
            std::unique_lock<std::mutex> uLock(mtx);
    
            while (!done)
            {
                // 线程中断点
                //  interruptionPoint();
    
                // 如果没有任务执行则等待, 否则继续循环
                if (!runPendingTask())
                {
                    // 等待
                    // condVar.wait_for(uLock, std::chrono::seconds(1));
                    //   condVar.wait(uLock);
                    interruptibleWait(condVar, uLock);
                }
            }
        }
    
        // 从局部队列弹出任务
        static auto popTaskFromLocalQueue(functionWrapper &task) -> bool
        {
            return (localWorkQueue != nullptr) && localWorkQueue->tryPop(task);
        }
    
        // 从全局队列弹出任务
        auto popTaskFromPoolQueue(functionWrapper &task) -> bool
        {
            return poolWorkQueue.tryPop(task);
        }
    
        // 从其它线程的局部队列弹出任务
        auto popTaskFromOtherThreadQueue(functionWrapper &task) -> bool
        {
            // 遍历线程池中其它线程的局部队列
            for (unsigned i = 0; i < queues.size(); ++i)
            {
                const unsigned index = (myIndex + i + 1) % queues.size();
    
                // 从其它线程的局部队列弹出任务
                if (queues[index]->trySteal(task))
                {
                    return true;
                }
            }
    
            return false;
        }
    
        // 所有任务完成标识
        std::atomic<bool> done;
    
        std::atomic<size_t> taskCnt = 0;
    
        // 工作队列
        TS::threadSafeQueue<functionWrapper> poolWorkQueue;
    
        // 转移队列数组
        std::vector<std::unique_ptr<workStealQueue>> queues;
    
        // 局部静态任务转移队列指针
        static thread_local workStealQueue *localWorkQueue;
    
        // 局部静态索引
        static thread_local unsigned myIndex;
    
        // 线程数组
        std::vector<interruptibleThread> threads;
    
        unsigned poolSize = 0;
    
        // 条件变量
        std::condition_variable_any condVar;
    };
    
    // 对于静态成员, 必须在类内声明, 类外定义, 一般放在 .cpp 文件中, 防止二次定义
    thread_local workStealQueue *threadPool::localWorkQueue;
    
    thread_local unsigned threadPool::myIndex;
    
    } // namespace TS
    
    #endif
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286
    • 287
    • 288
    • 289
    • 290
    • 291
    • 292
    • 293
    • 294
    • 295
    • 296
    • 297
    • 298
    • 299
    • 300
    • 301
    • 302
    • 303
    • 304
    • 305
    • 306
    • 307
    • 308
    • 309
    • 310
    • 311
    • 312
    • 313
    • 314
    • 315
    • 316
    • 317
    • 318
    • 319
    • 320
    • 321
    • 322
    • 323
    • 324
    • 325
    • 326
    • 327
    • 328
    • 329
    • 330
    • 331
    • 332
    • 333
    • 334
    • 335
    • 336
    • 337
    • 338
    • 339
    • 340
    • 341
    • 342
    • 343
    • 344
    • 345
    • 346
    • 347
    • 348
    • 349
    • 350
    • 351
    • 352
    • 353
    • 354
    • 355
    • 356
    • 357
    • 358
    • 359
    • 360
    • 361
    • 362
    • 363
    • 364
    • 365
    • 366
    • 367
    • 368
    • 369
    • 370
    • 371
    • 372
    • 373
    • 374
    • 375
    • 376
    • 377
    • 378
    • 379
    • 380
    • 381
    • 382
    • 383
    • 384
    • 385
    • 386
    • 387
    • 388
    • 389
    • 390
    • 391
    • 392
    • 393
    • 394
    • 395
    • 396
    • 397
    • 398
    • 399
    • 400
    • 401
    • 402
    • 403
    • 404
    • 405
    • 406
    • 407
    • 408
    • 409
    • 410
    • 411
    • 412
    • 413
    • 414
    • 415
    • 416
    • 417
    • 418
    • 419
    • 420
    • 421
    • 422
    • 423
    • 424
    • 425
    • 426
    • 427
    • 428
    • 429
    • 430
    • 431
    • 432
    • 433
    • 434
    • 435
    • 436
    • 437
    • 438
    • 439
    • 440
    • 441
    • 442
    • 443
    • 444
    • 445
    • 446
    • 447
    • 448
    • 449
    • 450
    • 451
    • 452
    • 453
    • 454
    • 455
    • 456
    • 457
    • 458
    • 459
    • 460
    • 461
    • 462
    • 463
    • 464
    • 465
    • 466
    • 467
    • 468
    • 469

    以下是一个运行示例, 向线程池压入任务, 并中断线程池中的大部分线程.

    你可能会发现, 在任务未结束时, 线程是无法中断的, 当当前任务完成, 再次压入任务, 则只有1个线程运行, 也就是其它线程已经中断, 无法复用.

    #include 
    #include 
    #include 
    
    auto main() -> int
    {
        TS::threadPool tpl;
    
        int const num = 100;
    
        for (int i = 0; i != num; ++i)
        {
            tpl.submitAll([i]() {
                std::this_thread::sleep_for(std::chrono::milliseconds(10 * i));
                std::cout << i << ' ';
            });
        }
    
        for (int i = 0; i != 15; ++i)
        {
            tpl.interrupt(i);
        }
    
        char chr;
        std::cout << "input: ";
        std::cin >> chr;
    
        for (int i = 0; i != num; ++i)
        {
            tpl.submitAll([i]() {
                std::this_thread::sleep_for(std::chrono::milliseconds(10 * i));
                std::cout << i << ' ';
            });
        }
    
        std::cin >> chr;
    
        return 0;
        
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    总结

    线程中断部分已经讲解完成, 代码稍显复杂, 可中断线程类是可以不必配合线程池使用的, 但基本的运用方法和线程池差不多.

  • 相关阅读:
    C. Card Game(dp&组合数)
    ENSP中用OSPF协议在MGRE结构中实现全网可达
    如何搭建公众号 含举例详情
    如何快速编辑图片?轻量级图片在线处理工具使用教程
    C++/Python/Qt编码规范大总结
    边缘计算(一):EdgeNeXt,面向移动开发的CNN-Transformer混合架构
    【小黑嵌入式系统第五课】嵌入式系统开发流程——开发工具、交叉开发环境、开发过程(生成&调试&测试)、发展趋势
    《有效学习》
    [前端基础] 浏览器篇
    组合优于继承:什么情况下可以使用继承?
  • 原文地址:https://blog.csdn.net/m0_54206076/article/details/127595379