• 消息队列的实现


    8.8 消息队列

    队列是一种先进先出的结构,消息队列是进程(线程)常用的一种方法,实现消息队列常用的方法:

    (1)阻塞队列 (2)无锁队列 (3)环形队列

    值得注意的是:在pop 和push 一定要使用while循环,避免虚假唤醒

    8.8.1 阻塞队列

    (1)pthread

    实现阻塞队列使用生产者-消费者模式,使用步骤:

    #include 
    #include 
    //第一步,添加成员变量
    std::queue queue_;
    int max_size_;
    pthread_mutex_t mutex_;
    pthread_cond_t condition_var_;
    //第二步,实现push函数
    void Push(const T& item) {
        pthread_mutex_lock(&mutex_);
    
        while (queue_.size() >= max_size_) {
            pthread_cond_wait(&condition_var_, &mutex_);
        }
    
        queue_.push(item);
        pthread_cond_signal(&condition_var_);
    
        pthread_mutex_unlock(&mutex_);
    }
    //第三步,实现pop函数
     T Pop() 
     {
         pthread_mutex_lock(&mutex_);
    
         while (queue_.empty()) {
             pthread_cond_wait(&condition_var_, &mutex_);
         }
    
         T item = queue_.front();
         queue_.pop();
         pthread_cond_signal(&condition_var_);
    
         pthread_mutex_unlock(&mutex_);
    
         return item;
      }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    示例代码:

    #include 
    #include 
    
    template 
    class BlockingQueue {
    public:
        BlockingQueue() : max_size_(100) {
            pthread_mutex_init(&mutex_, nullptr);
            pthread_cond_init(&condition_var_, nullptr);
        }
    
        ~BlockingQueue() {
            pthread_mutex_destroy(&mutex_);
            pthread_cond_destroy(&condition_var_);
        }
    
        void Push(const T& item) {
            pthread_mutex_lock(&mutex_);
    
            while (queue_.size() >= max_size_) {
                pthread_cond_wait(&condition_var_, &mutex_);
            }
    
            queue_.push(item);
            pthread_cond_signal(&condition_var_);
    
            pthread_mutex_unlock(&mutex_);
        }
    
        T Pop() {
            pthread_mutex_lock(&mutex_);
    
            while (queue_.empty()) {
                pthread_cond_wait(&condition_var_, &mutex_);
            }
    
            T item = queue_.front();
            queue_.pop();
            pthread_cond_signal(&condition_var_);
    
            pthread_mutex_unlock(&mutex_);
    
            return item;
        }
    
    private:
        std::queue queue_;
        int max_size_;
        pthread_mutex_t mutex_;
        pthread_cond_t condition_var_;
    };
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    测试代码:

    #include 
    #include 
    #include 
    #include 
    
    BlockingQueue queue;
    
    void* ProducerThread(void* arg) {
        int thread_id = *static_cast(arg);
        for (int i = 1; i <= 5; ++i) {
            int item = thread_id * 10 + i;
            queue.Push(item);
            std::cout << "Producer " << thread_id << " produced: " << item << std::endl;
            sleep(1);
        }
        pthread_exit(nullptr);
    }
    
    void* ConsumerThread(void* arg) {
        int thread_id = *static_cast(arg);
        for (int i = 1; i <= 5; ++i) {
            int item = queue.Pop();
            std::cout << "Consumer " << thread_id << " consumed: " << item << std::endl;
            sleep(2);
        }
        pthread_exit(nullptr);
    }
    
    int main() {
        const int num_producers = 3;
        const int num_consumers = 2;
    
        std::vector producer_threads(num_producers);
        std::vector consumer_threads(num_consumers);
    
        for (int i = 0; i < num_producers; ++i) {
            int* thread_id = new int(i);
            pthread_create(&producer_threads[i], nullptr, ProducerThread, static_cast(thread_id));
        }
    
        for (int i = 0; i < num_consumers; ++i) {
            int* thread_id = new int(i);
            pthread_create(&consumer_threads[i], nullptr, ConsumerThread, static_cast(thread_id));
        }
    
        for (int i = 0; i < num_producers; ++i) {
            pthread_join(producer_threads[i], nullptr);
        }
    
        for (int i = 0; i < num_consumers; ++i) {
            pthread_join(consumer_threads[i], nullptr);
        }
    
        return 0;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    **(2)STD **

    示例代码:

    #include 
    #include 
    #include 
    
    template 
    class BlockingQueue {
    public:
        BlockingQueue() = default;
    
        void Push(const T& item) {
            std::unique_lock lock(mutex_);
    
            while (queue_.size() >= max_size_) {
                condition_var_.wait(lock);
            }
    
            queue_.push(item);
            condition_var_.notify_one();
        }
    
        T Pop() {
            std::unique_lock lock(mutex_);
    
            while (queue_.empty()) {
                condition_var_.wait(lock);
            }
    
            T item = queue_.front();
            queue_.pop();
            condition_var_.notify_one();
    
            return item;
        }
    
    private:
        std::queue queue_;
        std::mutex mutex_;
        std::condition_variable condition_var_;
        const size_t max_size_ = 100; // 最大容量
    };
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    8.8.2 环形队列

    环形队列的特点:(1)固定容量 (2)前后指针 (3)循环存储

    (1)pthread

    示例代码:

    #include 
    #include 
    #include 
    
    template 
    class CircularQueue {
    public:
        CircularQueue(int capacity) : capacity_(capacity), front_(0), rear_(0), size_(0) {
            queue_.resize(capacity_);
            pthread_mutex_init(&mutex_, nullptr);
            pthread_cond_init(&condition_var_, nullptr);
        }
    
        ~CircularQueue() {
            pthread_mutex_destroy(&mutex_);
            pthread_cond_destroy(&condition_var_);
        }
    
        void Push(const T& item) {
            pthread_mutex_lock(&mutex_);
    
            while (size_ >= capacity_) {
                pthread_cond_wait(&condition_var_, &mutex_);
            }
    
            queue_[rear_] = item;
            rear_ = (rear_ + 1) % capacity_;
            ++size_;
    
            pthread_cond_signal(&condition_var_);
    
            pthread_mutex_unlock(&mutex_);
        }
    
        T Pop() {
            pthread_mutex_lock(&mutex_);
    
            while (size_ <= 0) {
                pthread_cond_wait(&condition_var_, &mutex_);
            }
    
            T item = queue_[front_];
            front_ = (front_ + 1) % capacity_;
            --size_;
    
            pthread_cond_signal(&condition_var_);
    
            pthread_mutex_unlock(&mutex_);
    
            return item;
        }
    
    private:
        std::vector queue_;
        int capacity_;
        int front_;
        int rear_;
        int size_;
        pthread_mutex_t mutex_;
        pthread_cond_t condition_var_;
    };
    
    void* ProducerThread(void* arg) {
        CircularQueue* queue = static_cast*>(arg);
    
        for (int i = 1; i <= 10; ++i) {
            queue->Push(i);
            std::cout << "Produced: " << i << std::endl;
            sleep(1);
        }
    
        return nullptr;
    }
    
    void* ConsumerThread(void* arg) {
        CircularQueue* queue = static_cast*>(arg);
    
        for (int i = 1; i <= 10; ++i) {
            int item = queue->Pop();
            std::cout << "Consumed: " << item << std::endl;
            sleep(2);
        }
    
        return nullptr;
    }
    
    int main() {
        CircularQueue queue(5);
    
        pthread_t producer_thread;
        pthread_create(&producer_thread, nullptr, ProducerThread, static_cast(&queue));
    
        pthread_t consumer_thread;
        pthread_create(&consumer_thread, nullptr, ConsumerThread, static_cast(&queue));
    
        pthread_join(producer_thread, nullptr);
        pthread_join(consumer_thread, nullptr);
    
        return 0;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101

    (2)STD

    示例代码:

    #include 
    #include 
    #include 
    
    template 
    class CircularQueue {
    public:
        CircularQueue(int capacity) : capacity_(capacity), front_(0), rear_(0), size_(0), queue_(capacity_) {}
    
        void Push(const T& item) {
            std::unique_lock lock(mutex_);
    
            while (IsFull()) {
                condition_var_.wait(lock);
            }
    
            queue_[rear_] = item;
            rear_ = (rear_ + 1) % capacity_;
            ++size_;
    
            condition_var_.notify_one();
        }
    
        T Pop() {
            std::unique_lock lock(mutex_);
    
            while (IsEmpty()) {
                condition_var_.wait(lock);
            }
    
            T item = queue_[front_];
            front_ = (front_ + 1) % capacity_;
            --size_;
    
            condition_var_.notify_one();
    
            return item;
        }
    
        bool IsEmpty() const {
            return size_ == 0;
        }
    
        bool IsFull() const {
            return size_ == capacity_;
        }
    
    private:
        int capacity_;
        int front_;
        int rear_;
        int size_;
        std::vector queue_;
        std::mutex mutex_;
        std::condition_variable condition_var_;
    };
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
  • 相关阅读:
    微服务拆分治理最佳实践
    DIV-CSS布局
    Delphi Enterprise具有强大视觉设计功能
    iPhone 15 或将换成 USB-C 接口?网友:Lightning 已经落后了
    GBASE 8A v953报错集锦22--在 redhat7.3 上安装集群安装包中 c3 rpm 包报错
    Git入门
    新零售O2O 电商模式解析
    修改一个MD5的VB源码,使用它支持UTF8编码
    【学术综述】-如何写出一篇好综述-写好综述要注意的问题
    springboot源码编译报错Unable to start the daemon process
  • 原文地址:https://blog.csdn.net/m0_47549429/article/details/136430871