• (02)Cartographer源码无死角解析-(25) 阻塞队列BlockingQueue,与OrderedMultiQueue成员函数


    讲解关于slam一系列文章汇总链接:史上最全slam从零开始,针对于本栏目讲解(02)Cartographer源码无死角解析-链接如下:
    (02)Cartographer源码无死角解析- (00)目录_最新无死角讲解:https://blog.csdn.net/weixin_43013761/article/details/127350885
     
    文 末 正 下 方 中 心 提 供 了 本 人 联 系 方 式 , 点 击 本 人 照 片 即 可 显 示 W X → 官 方 认 证 {\color{blue}{文末正下方中心}提供了本人 \color{red} 联系方式,\color{blue}点击本人照片即可显示WX→官方认证} WX
     

    一、前言

    在上一篇文章中,对 src/cartographer/cartographer/sensor/internal/ordered_multi_queue.cc 中的 class OrderedMultiQueue 还有好些函数没有进行讲解。回顾一下 ordered_multi_queue.h 这个头文件,可以看到之前提及到的结构体

      struct Queue {
        common::BlockingQueue<std::unique_ptr<Data>> queue;   // 存储数据的队列
        Callback callback;                                    // 本数据队列对应的回调函数
        bool finished = false;                                // 这个queue是否finished
      };
    
    • 1
    • 2
    • 3
    • 4
    • 5

    那么先来看看 BlockingQueue 的实现,位于 src/cartographer/cartographer/common/internal/blocking_queue.h 文件中。在 src 文件中存在 src/cartographer 与 src/cartographer_ros 两个文件夹,其实 src/cartographer 个独立的工程, src/cartographer_ros 是数据格式进行转换,然后加载到 src/cartographer 的数据队列中即可。也就是说 src/cartographer_ros 类似一个中转站,把传感器数据整理好之后交给 src/cartographer,src/cartographer 就可以正常运转了。

    BlockingQueue 其就是一个阻塞队列,也就是生产者与消费者的红线,通过他把 生产者(src/cartographer_ros) 与 消费者( src/cartographer) 联系到了一起,下面简单描述一下:

    // A thread-safe blocking queue that is useful for producer/consumer patterns.
    // 'T' must be movable.
    // 一个线程安全的阻塞队列, 对 生产者/消费者模式 很有用.
    
    /**
      为什么要使用生产者消费者模式, 顺序执行不就可以了吗?生产者消费者到底有什么意义?
    
      解耦
      生产者和消费者之间不直接依赖, 通过缓冲区通讯, 将两个类之间的耦合度降到最低。
    
      并发 (异步)
      生产者直接调用消费者, 两者是同步(阻塞)的, 如果消费者吞吐数据很慢, 这时候生产者白白浪费大好时光。
      而使用这种模式之后, 生产者将数据丢到缓冲区, 继续生产, 完全不依赖消费者, 程序执行效率会大大提高。
    
      复用:通过将生产者类和消费者类独立开来, 可以对生产者类和消费者类进行独立的复用与扩展
     */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

     

    二、class BlockingQueue 函数分析

    ( 1 ) \color{blue}(1) (1) BlockingQueue 是一个模板类,模板参数就是 std::deque 队列存储的数据类类型,其存在成员变量 std::deque deque_ GUARDED_BY(mutex_),表示该队列存储的数据类型为T,且如果 mutex_ 上锁则会阻塞,也就是众多消费者与生产者之中,在一个时刻有且只能有一个线程进行队列操作,从而达到数据保护的目的。

    ( 2 ) \color{blue}(2) (2) BlockingQueue::Push(T t)的功能是往队列中添加一个数据。当然在添加之前会调用一个 lambda 表示式函数 predicate,判断一下当前队列是否已满,如果满了则会进入阻塞状态。该函数是在 OrderedMultiQueue::Add() 中被调用的。

    ( 3 ) \color{blue}(3) (3) BlockingQueue::QueueNotFullCondition() 的功能就是判断判断队列的数据是否满了,需要注意的是,如果创建 BlockingQueue 实例时,不传入参数,则 queue_size_ = kInfiniteQueueSize = 0。那么 BlockingQueue::QueueNotFullCondition() 永远都返回 true,也就是说队列永远没有满,无限制大小,默许默认就是这样设计的。

    ( 4 ) \color{blue}(4) (4)BlockingQueue::Pop() 的功能是取出数据,其先上锁,然后调用 lambda 表达式函数判断数据是否为空,为空则阻塞等待,否则取出数据进行消费。该函数在 OrderedMultiQueue::Dispatch() 中被调用,下面会进行详细的讲解。

    ( 5 ) \color{blue}(5) (5) BlockingQueue::PopWithTimeout 其功能与 BlockingQueue::Pop() 比较类似,只是他的阻塞是有时间限定的,如果超过了限定的时间,队列中依旧没有数据可以取出,则放回一个 nullptr 指针。
     

    三、class BlockingQueue 代码注释

    代码于 src/cartographer/cartographer/common/internal/blocking_queue.h 中实现

    template <typename T>
    class BlockingQueue {
     public:
      static constexpr size_t kInfiniteQueueSize = 0;
    
      // Constructs a blocking queue with infinite queue size.
      // 构造一个具有无限队列大小的阻塞队列
      //注意这里的默认构造函数调用了带一个参数的构造函数
      BlockingQueue() : BlockingQueue(kInfiniteQueueSize) {}
    
      BlockingQueue(const BlockingQueue&) = delete; //禁用拷贝构造函数
      BlockingQueue& operator=(const BlockingQueue&) = delete; //禁用赋值符重载函数
    
      // Constructs a blocking queue with a size of 'queue_size'.
      // 构造一个大小为 queue_size 的阻塞队列,
      explicit BlockingQueue(const size_t queue_size) : queue_size_(queue_size) {}
    
      // Pushes a value onto the queue. Blocks if the queue is full.
      // 将值压入队列. 如果队列已满, 则阻塞
      void Push(T t) {
        // 首先定义判断函数
        // 因为QueueNotFullCondition()函数会判断队列大小
        //所以执行该函数时需要上锁,使用 EXCLUSIVE_LOCKS_REQUIRED 声明
        //如果调用该函数时没有上锁,则会报错
        const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
          return QueueNotFullCondition();
        };
    
        // absl::Mutex的更多信息可看: https://www.jianshu.com/p/d2834abd6796
        // absl官网: https://abseil.io/about/
    
        // 如果数据满了, 就进行等待
        absl::MutexLock lock(&mutex_); //上锁
        mutex_.Await(absl::Condition(&predicate)); //阻塞等待
    
        // 将数据加入队列, 移动而非拷贝
        deque_.push_back(std::move(t));
      }
    
      // Like push, but returns false if 'timeout' is reached.
      // 与Push()类似, 但是超时后返回false
      bool PushWithTimeout(T t, const common::Duration timeout) {
        const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
          return QueueNotFullCondition();
        };
        absl::MutexLock lock(&mutex_);
        if (!mutex_.AwaitWithTimeout(absl::Condition(&predicate),
                                     absl::FromChrono(timeout))) {
          return false;
        }
        deque_.push_back(std::move(t));
        return true;
      }
    
      // Pops the next value from the queue. Blocks until a value is available.
      // 取出数据, 如果数据队列为空则进行等待
      T Pop() {
        const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
          return !QueueEmptyCondition();
        };
        // 等待直到数据队列中至少有一个数据
        absl::MutexLock lock(&mutex_);
        mutex_.Await(absl::Condition(&predicate));
    
        T t = std::move(deque_.front());
        deque_.pop_front();
        return t;
      }
    
      // Like Pop, but can timeout. Returns nullptr in this case.
      // 与Pop()类似, 但是超时后返回nullptr
      T PopWithTimeout(const common::Duration timeout) {
        const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
          return !QueueEmptyCondition();
        };
        absl::MutexLock lock(&mutex_);
        if (!mutex_.AwaitWithTimeout(absl::Condition(&predicate),
                                     absl::FromChrono(timeout))) {
          return nullptr;
        }
        T t = std::move(deque_.front());
        deque_.pop_front();
        return t;
      }
    
      // Like Peek, but can timeout. Returns nullptr in this case.
      // 与Peek()类似, 但是超时后返回nullptr
      template <typename R>
      R* PeekWithTimeout(const common::Duration timeout) {
        const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
          return !QueueEmptyCondition();
        };
        absl::MutexLock lock(&mutex_);
        if (!mutex_.AwaitWithTimeout(absl::Condition(&predicate),
                                     absl::FromChrono(timeout))) {
          return nullptr;
        }
        return deque_.front().get();
      }
    
      // Returns the next value in the queue or nullptr if the queue is empty.
      // Maintains ownership. This assumes a member function get() that returns
      // a pointer to the given type R.
      // 返回第一个数据的指针, 如果队列为空则返回nullptr
      template <typename R>
      const R* Peek() {
        absl::MutexLock lock(&mutex_);
        if (deque_.empty()) {
          return nullptr;
        }
        return deque_.front().get();
      }
    
      // Returns the number of items currently in the queue.
      // 返回当前队列中的项目数
      size_t Size() {
        absl::MutexLock lock(&mutex_);
        return deque_.size();
      }
    
      // Blocks until the queue is empty.
      // 等待直到队列为空
      void WaitUntilEmpty() {
        const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
          return QueueEmptyCondition();
        };
        absl::MutexLock lock(&mutex_);
        mutex_.Await(absl::Condition(&predicate));
      }
    
     private:
      // Returns true iff the queue is empty.
      // 如果队列为空, 则返回true
      bool QueueEmptyCondition() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
        return deque_.empty();
      }
    
      // Returns true iff the queue is not full.
      // 如果队列未满, 则返回true
      bool QueueNotFullCondition() EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
        return queue_size_ == kInfiniteQueueSize || deque_.size() < queue_size_;
      }
    
      absl::Mutex mutex_;
      const size_t queue_size_ GUARDED_BY(mutex_);
      std::deque<T> deque_ GUARDED_BY(mutex_);
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147

     

    四、OrderedMultiQueue 函数分析

    了解了 BlockingQueue() 之后,在继续上一篇的博客,对 OrderedMultiQueue 进行分析。在 OrderedMultiQueue::Add() 函数中,其会调用 it->second.queue.Push(std::move(data)) ,把数据 data 添加到与之对应的 trajectory_id topic 队列之中,然后调用 OrderedMultiQueue::Dispatch() 进行数据的分发。在讲解该函数之前,先来了解如下几个函数:

    1、CannotMakeProgress()
    // 标记queue_key为阻塞者,并按条件发布log,等等这个数据
    void OrderedMultiQueue::CannotMakeProgress(const QueueKey& queue_key) {
      // 标记queue_key为阻塞者
      blocker_ = queue_key;
      for (auto& entry : queues_) {
        // queue_key对应的数据队列为空,而某一个传感器数据队列的数据已经大于kMaxQueueSize了
        // 有问题, 进行报错
        if (entry.second.queue.Size() > kMaxQueueSize) {
          // 在该语句第1、61、121……次被执行的时候, 记录日志信息
          LOG_EVERY_N(WARNING, 60) << "Queue waiting for data: " << queue_key;
    
          // [ WARN] [1628516438.493835120, 1606808659.273453929]: W0809 21:40:38.000000 10662 ordered_multi_queue.cc:230] Queue waiting for data: (0, points2)
          // [ WARN] [1628516439.089736487, 1606808659.869309184]: W0809 21:40:39.000000 10662 ordered_multi_queue.cc:230] Queue waiting for data: (0, points2)
          return;
        }
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    该函数一般是在队列为空的时候对用,假若队列 queue_key 为空,则会对其他所有队列进行遍历,查看一下有没有是否存在数据超过 kMaxQueueSize=500,如果有,则会信息打印进行警告,类似如下:

    [ WARN] [1628516438.493835120, 1606808659.273453929]: W0809 21:40:38.000000 10662 ordered_multi_queue.cc:230] Queue waiting for data: (0, points2)
    
    • 1

    一般情况下,如果订阅的话题错了,或者话题发布者没有正常发送消息,则会出现该类打印。

    2、CannotMakeProgress()
    // 返回阻塞的队列的QueueKey
    QueueKey OrderedMultiQueue::GetBlocker() const {
      CHECK(!queues_.empty());
      return blocker_;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    如果目前有数据队列阻塞了,则该队列的 QueueKey 就会被保存在 OrderedMultiQueue::blocker_ 之中,调用该函数即可查看。

     

    五、结语

    通过该篇博客,了解到了 BlockingQueue 与 OrderedMultiQueue 的成员函数,但是他们是怎么串联起来的,如何被调用的,或许不是很清楚。不过,没有关系,下一篇博客就会进行详细讲解,其主要设计的函数就是 OrderedMultiQueue::Dispatch()。

     
     
     

  • 相关阅读:
    C/C++学生综合测评系统
    JSP基于Javaweb学籍管理系
    关于IDEA配置maven环境
    RocketMQ源码分析(十三)之ConsumeQueue
    Go语言的跨文件调包
    文件上传—WAF拦截的绕过方式
    [C++][算法基础]求组合数(III)
    vben admin 当中使用 mock 模拟数据以及对响应数据进行处理
    【工业设计】产品的第一印象是什么?
    2022-05-14前端周报 巴厘岛项目交接完成
  • 原文地址:https://blog.csdn.net/weixin_43013761/article/details/127948693