• java并发编程 SynchronousQueue详解



    java 并发编程系列文章目录

    1 SynchronousQueue是什么

    java的注释上写着:一种阻塞队列,其中每个插入操作都必须等待另一个线程执行相应的移除操作,反之亦然。元素的size()方法返回一定是0,就是内部元素不可见,你要么阻塞添加元素,要么阻塞获取元素,获取到元素,返回,提供元素的线程也返回。如果非阻塞,那么只有当前队列里有等待transfer的线程和你的mode不同才会成功。

    2 核心属性详解

    通过内部类实现数据传输功能,有两个实现,即公平和非公平,对应的数据结构是queue 和 stack

    	abstract static class Transferer<E> {
           	//对于生成者 E是提交的元素,不为null,消费者是null, 剩下两个参数表示是否是有阻塞,和超时时间
            abstract E transfer(E e, boolean timed, long nanos);
        }
        //...省略其他
    
    • 1
    • 2
    • 3
    • 4
    • 5

    3 核心方法详解

    3.1 transfer(E e, boolean timed, long nanos)

    3.1.1 TransferStack实现

    基于stack栈的数据结构,每次都是设置head,然后按照特性,如果不带时间的情况,transfer数据如果head没有,说明没有其他线程在等待transfer,所以直接失败
    如果带有时间的,会park住,等待其他transfer的来匹配唤醒。整体流程就是这样

        E transfer(E e, boolean timed, long nanos) {
            SNode s = null;
            //数据为null就表示来拿数据的,否则就是put数据的
            int mode = (e == null) ? REQUEST : DATA;
    
            for (;;) {
                SNode h = head;
                //如果此时是空,或者当前头结点的mode和自己相同
                if (h == null || h.mode == mode) {
                	//1. 需要timed 但是nanos 是非正数,那就不需要等待,直接返回,其实就是必定不成功,因为h.mode == mode,不一样的时候h == null 没法匹配其他线程的数据
                	//此时会清理match == this的head,就是匹配失败的node
                    if (timed && nanos <= 0) {
                        if (h != null && h.isCancelled())
                            casHead(h, h.next);     // pop cancelled node
                        else
                            return null;
                    } 
                    //此时是非timed 或者 timed 且 nanos > 0的 因为此时是stack结构,所以设置head为自己
                    else if (casHead(h, s = snode(s, e, h, mode))) {
                    	//设置成功 获取Node 如果返回的是自己,说明获取失败 下面会描述该方法
                        SNode m = awaitFulfill(s, timed, nanos);
                        if (m == s) {
                        	//这个方法就是重新设置head
                            clean(s);
                            return null;
                        }
                        //获取数据成功了,重新设置head
                        if ((h = head) != null && h.next == s)
                            casHead(h, s.next);
                        //返回对应的数据
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    }
                } 
                //此时h != null 或者 mode不相同
                //此时head 没有被占用
                else if (!isFulfilling(h.mode)) { 
                	//对match == this去除掉
                    if (h.isCancelled())           
                        casHead(h, h.next);  
                    //设置head       
                    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                    	//设置成功
                        for (;;) {
                            SNode m = s.next;
                            if (m == null) {
                                casHead(s, null);
                                s = null;
                                break;
                            }
                            SNode mn = m.next;
                            //尝试匹配 把m.match设置成s 此时会唤醒对应的阻塞线程
                            if (m.tryMatch(s)) {
                                casHead(s, mn); 
                                return (E) ((mode == REQUEST) ? m.item : s.item);
                            } else                  
                                s.casNext(m, mn);
                        }
                    }
                } else {
                	//此时head是FULFILLING状态 那就拿head.next去尝试匹配,相同的原理                      
                    SNode m = h.next;               
                    if (m == null)                 
                        casHead(h, null);  
                    else {
                        SNode mn = m.next;
                        if (m.tryMatch(h)) 
                            casHead(h, mn); 
                        else                   
                            h.casNext(m, mn);    
                    }
                }
            }
        }
        //这个方法就是线程到这匹配数据 匹配不到就阻塞。整体逻辑是这个
        SNode awaitFulfill(SNode s, boolean timed, long nanos) {
          final long deadline = timed ? System.nanoTime() + nanos : 0L;
          Thread w = Thread.currentThread();
          //
          int spins = (shouldSpin(s) ?
                       (timed ? maxTimedSpins : maxUntimedSpins) : 0);
          for (;;) {
              if (w.isInterrupted())
                  s.tryCancel();
              SNode m = s.match;
              if (m != null)
                  return m;
              if (timed) {
                  nanos = deadline - System.nanoTime();
                  if (nanos <= 0L) {
                      s.tryCancel();
                      continue;
                  }
              }
              if (spins > 0)
                  spins = shouldSpin(s) ? (spins-1) : 0;
              //此处以上代码就是判断超时 循环次数 节点是否匹配过,在一个cas + for循环每次都要验证数据的
              //设置等待的线程,之后会被其他线程来匹配的时候唤醒。唤醒是在tryMatch
              else if (s.waiter == null)
                  s.waiter = w; // establish waiter so can park next iter
              else if (!timed)
              		//阻塞当前线程
                  LockSupport.park(this);
              else if (nanos > spinForTimeoutThreshold)
              		//阻塞带有超时时间的
                  LockSupport.parkNanos(this, nanos);
          }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107

    3.1.2 TransferQueue实现

    只是数据结构改变而已,实现逻辑几乎不变

      3.2 外部方法

      3.2.1 put(E e)

      放入一个元素,如果返回的是Null,按照transfer原理,是走到LockSupport.park(this); 即没匹配带数据就会阻塞,如果返回Null就是中断的情况所以会重置中断位且抛出中断异常

          public void put(E e) throws InterruptedException {
              if (e == null) throw new NullPointerException();
              if (transferer.transfer(e, false, 0) == null) {
                  Thread.interrupted();
                  throw new InterruptedException();
              }
          }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7

      3.2.2 offer(E e)

      因为timed == true 且nanos = 0 所以如果此时没有head 会立马返回

          public boolean offer(E e) {
              if (e == null) throw new NullPointerException();
              //只是返回成功或者失败
              return transferer.transfer(e, true, 0) != null;
          }
      
      • 1
      • 2
      • 3
      • 4
      • 5

      3.2.3 take()

      timed == false 其实和put一样会进入LockSupport.park(this) 阻塞自己,此时会出现中断情况,所以会判断返回结果,是null重置中断位,抛出异常。

          public E take() throws InterruptedException {
              E e = transferer.transfer(null, false, 0);
              if (e != null)
                  return e;
              Thread.interrupted();
              throw new InterruptedException();
          }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7

      3.2.4 poll()

      poll方法 timed == true nanos == 0 所以如果head == null 会直接返回,这个方法就是直接获取数据,返回结果 可能为null

          public E poll() {
              return transferer.transfer(null, true, 0);
          }
      
      • 1
      • 2
      • 3

      4 总结

      利用transfer方法提供了两种实现,让多个线程之间可以去交换数据,它与其他队列的区别在于,不是用一个数据结构去永久的存储数据,这里你想把数据一定能给别人使用,只有阻塞等待别人来匹配,用offer的方式,此时head == null 就会立马失败返回null

    • 相关阅读:
      个人博客系列-后端项目-RBAC角色管理(6)
      SQL编程 Task05.SQL高级处理
      C#8.0本质论第七章--继承
      Hbuilder中微信小程序上传多图的案例分享
      (杭州中科微)全星座定位导航模块GM36的应用推荐及性能指标解析
      C++:获取文件末的50个字符(附完整源码)
      3天带你走向实战,阿里顶配版Spring全家桶面试进阶笔记有多强?
      hypervisor相关的知识点
      springboot项目获取真实用户ip(不是虚拟ip)
      Axios 封装
    • 原文地址:https://blog.csdn.net/weixin_46082526/article/details/132792728