• java数据结构之LinkedBlockingQueue


    这篇文章介绍java的数据结构之链表阻塞队列LinkedBlockingQueue

    1、LinkedBlockingQueue

    LinkedBlockingQueue是一个链表形式的阻塞队列,遵循FIFO的原则,是线程安全的。

    队列的添加数据和获取数据都比较简单,在LinkedBlockingQueue队列中,最重要的是线程安全性,这篇文章不讨论数据的插入和获取,主要讨论LinkedBlockingQueue是如何保证线程安全的。

    读这篇文章的前提要对ReentrantLock锁机制有所了解,可以参考以下文章:

    ReentrantLock及Condition加锁解锁机制

    2、LinkedBlockingQueue的锁

    LinkedBlockingQueue中有两个锁及锁的状态,如下:

    1. /** Lock held by take, poll, etc */
    2. private final ReentrantLock takeLock = new ReentrantLock();
    3. /** Wait queue for waiting takes */
    4. private final Condition notEmpty = takeLock.newCondition();
    5. /** Lock held by put, offer, etc */
    6. private final ReentrantLock putLock = new ReentrantLock();
    7. /** Wait queue for waiting puts */
    8. private final Condition notFull = putLock.newCondition();

    -- takeLock:在获取数据时进行线程加锁

    -- notEmpty:在获取数据时,如果队列为空时,使用notEmpty进行线程的阻塞,直到队列不为空时,被唤起,获取数据。

    -- putLock:在添加数据时进行线程加锁

    -- notFull:在添加数据时,如果队列数据已满,使用notFull进行线程阻塞,直到队列不满时被唤起,插入数据。

    3、线程安全--数据插入

    有两个数据插入的方法:

    put(E e): 如果队列已满,阻塞线程,直到队列不满时被唤起,插入数据。

    offer(E e):如果队列已满,直接返回false,不插入数据。

    put(E e)

    1. public void put(E e) throws InterruptedException {
    2. if (e == null) throw new NullPointerException();
    3. // Note: convention in all put/take/etc is to preset local var
    4. // holding count negative to indicate failure unless set.
    5. int c = -1;
    6. //作者注:构造一个包含插入数据的Node节点
    7. Node node = new Node(e);
    8. final ReentrantLock putLock = this.putLock;
    9. final AtomicInteger count = this.count;
    10. //作者注:获取putLock的锁
    11. putLock.lockInterruptibly();
    12. try {
    13. //作者注:如果队列已满,调用await进行线程挂起(阻塞)
    14. // 直到队列不满时,被唤起(参看take函数中的notFull.signal())
    15. while (count.get() == capacity) {
    16. notFull.await();
    17. }
    18. /**
    19. * 作者注:如果队列不满或者线程被唤起(线程被唤起时会重新获取putLock锁)
    20. * 将node节点插入到链表队列的末尾
    21. */
    22. enqueue(node);
    23. c = count.getAndIncrement();
    24. //作者注:此时notFull的阻塞队列里面有可能被阻塞了很多线程,所以每一次插入一条数据时,都尝试进行一次唤起。
    25. if (c + 1 < capacity)
    26. notFull.signal();
    27. } finally {
    28. //作者注:释放putLock锁
    29. putLock.unlock();
    30. }
    31. //作者注:如果此时c == 0,说明之前队列时空的,有可能有线程在插入数据时被挂起了,此时将这些线程进行唤起,告诉他们队列里有数据了。
    32. if (c == 0)
    33. signalNotEmpty();
    34. }

    offer(E e)

    offer方法和put方法唯一的区别是,如果队列已满,不进行阻塞等待,直接返回false,不再插入数据。

    1. public boolean offer(E e) {
    2. if (e == null) throw new NullPointerException();
    3. final AtomicInteger count = this.count;
    4. //如果队列已满,直接返回
    5. if (count.get() == capacity)
    6. return false;
    7. int c = -1;
    8. Node node = new Node(e);
    9. final ReentrantLock putLock = this.putLock;
    10. putLock.lock();
    11. try {
    12. if (count.get() < capacity) {
    13. enqueue(node);
    14. c = count.getAndIncrement();
    15. if (c + 1 < capacity)
    16. notFull.signal();
    17. }
    18. } finally {
    19. putLock.unlock();
    20. }
    21. if (c == 0)
    22. signalNotEmpty();
    23. return c >= 0;
    24. }

    4、线程安全--数据获取

    获取数据的方法比较多,有以下几个:

    -- take(): 如果队列为空,阻塞线程,直到队列不空时,获取数据,并删除数据。

    -- peek():如果队列为空,不阻塞线程,直接返回null。如果队列不为空,获取数据,但不从队列删除数据

    -- poll():如果队列为空,不阻塞线程,直接返回null。如果队列不为空,获取数据,同时从队列中将数据删除

    take():

    1. public E take() throws InterruptedException {
    2. E x;
    3. int c = -1;
    4. final AtomicInteger count = this.count;
    5. final ReentrantLock takeLock = this.takeLock;
    6. //作者注:获取takeLock的锁,获取成功继续执行,获取失败进行等待
    7. // 等待和阻塞的区别:
    8. // 等待:有权利获取锁
    9. // 阻塞:没有权利获取锁,只有被唤醒,才有权利获取锁
    10. takeLock.lockInterruptibly();
    11. try {
    12. //作者注:如果队列未空,调用await进行线程挂起(阻塞),并释放takeLock锁
    13. // 直到队列不为空时,被唤起(参看put函数中的notEmpty.signal())
    14. while (count.get() == 0) {
    15. notEmpty.await();
    16. }
    17. /**
    18. * 作者注:如果队列不空或者线程被唤起(线程被唤起时会重新获取takeLock锁)
    19. * 获取head头部数据,并删除
    20. */
    21. x = dequeue();
    22. c = count.getAndDecrement();
    23. //作者注:此时notEmpty的阻塞队列里面有可能被阻塞了很多线程,所以每一次插入一条数据时,都尝试进行一次唤起。
    24. if (c > 1)
    25. notEmpty.signal();
    26. } finally {
    27. //作者注: 释放takeLock锁
    28. takeLock.unlock();
    29. }
    30. //作者注:如果此时c == capacity,说明之前队列满时,有可能有线程在插入数据时被挂起了,此时将这些线程进行唤起,告诉他们队列已经不满了,可以插入数据了。
    31. // NotFull.signal()就在此方法中。
    32. if (c == capacity)
    33. signalNotFull();
    34. return x;
    35. }

    peek():

    与take的不同点:

    (1)、take在队列为空时阻塞等待,peek在队列为空时直接返回,不阻塞。

    (2)、take在获取数据后,将数据从队列删除;peek只获取数据,不删除数据。

    1. public E peek() {
    2. //作者注:如果队列为空,直接返回,不进行阻塞等待。
    3. if (count.get() == 0)
    4. return null;
    5. final ReentrantLock takeLock = this.takeLock;
    6. takeLock.lock();
    7. try {
    8. Node first = head.next;
    9. if (first == null)
    10. return null;
    11. else //作者注:只返回数据,但不移动head指针,代表不删除数据。
    12. return first.item;
    13. } finally {
    14. takeLock.unlock();
    15. }
    16. }

    pool():

    与take的不同点:take在队列为空时阻塞等待,pool在队列为空时直接返回,不阻塞

    与take的相同点:如果获取到数据,将数据从队列里删除。

    1. public E poll() {
    2. final AtomicInteger count = this.count;
    3. //作者注:如果队列为空,直接返回,不进行阻塞等待。
    4. if (count.get() == 0)
    5. return null;
    6. E x = null;
    7. int c = -1;
    8. final ReentrantLock takeLock = this.takeLock;
    9. takeLock.lock();
    10. try {
    11. if (count.get() > 0) {
    12. //作者注:获取数据,并且将获取的数据从队列中删除。
    13. x = dequeue();
    14. c = count.getAndDecrement();
    15. if (c > 1)
    16. notEmpty.signal();
    17. }
    18. } finally {
    19. takeLock.unlock();
    20. }
    21. if (c == capacity)
    22. signalNotFull();
    23. return x;
    24. }

  • 相关阅读:
    卷积神经网络卷积层公式,卷积神经网络层数计算
    Net6Configuration & Options 源码分析 Part3 IOptionsMonitor 是如何接收到配置文件变更并同步数据源的
    【历史上的今天】7 月 5 日:Google 之母出生;同一天诞生的两位图灵奖先驱
    pyspark常用功能记录
    LeetCode 1758. 生成交替二进制字符串的最少操作数
    monaco-editor 简单使用
    基于多尺度卷积神经网络特征融合的植株叶片检测技术
    本地笔记同步到博客
    Linux下的系统编程——共享存储映射(十)
    美团滑块验证
  • 原文地址:https://blog.csdn.net/chenzhiang1/article/details/126725618