• SynchronousQueue源码分析_第一讲:引子



    1、SynchronousQueue简介

    同步队列:
    虽然它叫队列,但是它没有容量,只能存一个数据,这个数据出来了,其它数据才能加进去,如果还没有数据就去拿,那么就会阻塞,直到加进去数据。

    非公平模式 TransferStack (以栈结构实现LIFO 后进先出,非公平的)
    公平模式 TransferQueue (以队列结构实现FIFO 先进先出,公平)
    一种是通过栈实现的,一种是通过队列实现的


    首先明确一下:0,1 分别 代表两种不同的请求类型(只需要知道请求的类型是不一样的就可以了)
    我们先来看看怎么通过栈实现:

    在这里插入图片描述
    栈中已经有一个0类型的请求的,那么再来一个0类型的请求,就还会放进去(两个请求的类型一样)

    在这里插入图片描述栈中已经有一个 1类型的请求的,那么再来一个 1类型的请求,就还会放进去(两个请求的类型一样)。和上面一样

    在这里插入图片描述
    栈里先是有一个类型为1的请求,然后又来一个类型为0的请求,两种请求类型不一样时,在类型为0的请求入栈之前会进行一项操作 把这个请求类型 和 10(二进制) 按位或 得到 2(十进制),再把请求压入栈中,就是我们看到的红色的部分


    以上是基于栈来实现的,下面讲讲基于队列实现的
    首先声明 true 和 false 只是代表不同的类型,把请求的类型区分开,在请求中没有任何含义

    在这里插入图片描述
    上面这幅图的意思就是 因为队列里面是true,又来一个请求也是true,那么这个请求就会直接加入到队列

    在这里插入图片描述
    上这种请求还是请求都一样的情况,直接放到队列中

    在这里插入图片描述
    上面就是两种不同类型的请求(true和false),这两请求互补,此时true请求发现和队尾的请求是互补的之后,队头此时出列,就会和队头的请求配对(因为队尾请求和队头的请求类型一定一样)

    在这里插入图片描述
    这个配对就离谱!!!此时中间的false类型的请求 内心:明明我和你互补,你却和我旁边的人携手,在这里插入图片描述

    在这里插入图片描述

    至此栈类型和队列类型的实现就讲完了,后面的文章会分析源码,请持续关注哦!

    异步队列:BlockingQueue

    2、入门案例

    Demo

    public class SynchronousQueueDemo {
    
        public static void main(String[] args) {
            SynchronousQueue<String> synchronousQueue = new SynchronousQueue<>();
            new Thread(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + "\t 入队列 1");
                    synchronousQueue.put("1");
                    System.out.println(Thread.currentThread().getName() + "\t 入队列 2");
                    synchronousQueue.put("2");
                    System.out.println(Thread.currentThread().getName() + "\t 入队列 3");
                    synchronousQueue.put("3");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "AAAAA").start();
    
            new Thread(() -> {
                try {
                    TimeUnit.SECONDS.sleep(5);
                    System.out.println(Thread.currentThread().getName() + "\t 出队列 " + synchronousQueue.take());
                    TimeUnit.SECONDS.sleep(5);
                    System.out.println(Thread.currentThread().getName() + "\t 出队列 " + synchronousQueue.take());
                    TimeUnit.SECONDS.sleep(5);
                    System.out.println(Thread.currentThread().getName() + "\t 出队列 " + synchronousQueue.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "BBBBB").start();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    执行结果:

    AAAAA	 入队列 1
    BBBBB	 出队列 1
    AAAAA	 入队列 2
    BBBBB	 出队列 2
    AAAAA	 入队列 3
    BBBBB	 出队列 3
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    3、部分源码分析

    重要内部类 Transferer

      abstract static class Transferer<E> {
            /**
             * @param e 可以为null,null时表示这个请求是一个request类型的请求
             *      如果不是null,说明这个请求是一个data类型的请求
             *          request请求就是来拿数据的请求,data请求就是来放数据的请求
             *
             * @param timed  如果为true,表示指定了超时时间,如果为false 表示不支持超时,表示当前请求一直等待到匹配为止,或者被中断
             * @param nanos  超时时间限制 单位 纳秒
             *
             * @return E:   1.如果当前请求是一个request 类型的请求,返回值如果不为null,表示 匹配成功,如果返回null,表示request类型的请求超时 或 被中断
             *
             *              2.如果当前请求是一个data 类型的请求,返回值如果不为null 表示 匹配成功,返回当前线程put的数据 (就是当前放进去的数据)
             *              如果返回值为null 表示:data类型的请求超时 或者 被中断  都会返回null
             */
            abstract E transfer(E e, boolean timed, long nanos);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    成员属性

       //为什么需要自旋这个操作?
        //因为线程 挂起 唤醒操作站在cpu角度去看的话,是非常消耗资源的,涉及到用户态和内核态的切换
        //自旋的好处,自旋期间线程会一直检查自己的状态是否被匹配到,如果自旋期间被匹配到,那么直接就返回了
        //如果自旋期间为被匹配到,自旋次数达到某个指标后,还是会将当前线程挂起的
        // 思考:NCPUS 等于 1 时,也就是CPU的数量为1时,还需要自旋吗
        //      不需要,因为 一个cpu 同一时刻只能执行一个线程自旋没有意义,
        //      (自旋时要有其它线程来对队列进行操作,这样我们的自旋才会匹配上它,只有一个自旋线程,没有其它线程就没有自旋的意义了)
        //      栈的状态更不会改变了,当只有一个cpu时 会直接选择 LockSupport.park() 挂起等待者线程
    
    
        // 表示运行当前程序的平台,所拥有的CPU数量
        static final int NCPUS = Runtime.getRuntime().availableProcessors();
    
        /**
         * 为什么要自旋 而不是直接把线程挂起呢?
         *
         * 先自旋一会的原因是 挂起的成本比较高,先自旋一会可能就匹配上了
         * 如果自旋了一会没有匹配上就挂起
         */
        //表示指定超时时间的话,当前线程最大自旋次数。
        //只有一个cpu,自旋次数为0
        //当cpu大于1时,说明当前平台是多核平台,那么指定超时时间的请求的最大自旋次数是32次
        //32是经验值
        static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
    
        // 表示为指定超时限制的话,线程等待匹配时,自旋次数
        //是指定超时限制的请求的自旋次数的16倍
        static final int maxUntimedSpins = maxTimedSpins * 16;
    
        //如果请求是指定超时限制的话,如果超时nanos参数是< 1000 纳秒时,
        //禁止挂起(时间太短,挂起再唤醒的成本太高,还不如空转)
        static final long spinForTimeoutThreshold = 1000L;
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
  • 相关阅读:
    服务断路器_Resilience4j超时降级
    理解Gumbel softmax trick
    KubeEdge 边缘计算存储
    韩顺平0基础学java——第15天
    墨子web3实时周报
    计算机网络 | I/O模型、网络模型(OSI七层及TCP/IP四层)
    虹科方案|用Western Digital和ATTO技术优化SMR存储解决方案的负载
    wpscan专门针对wordpress的安全扫描工具
    前端不使用 il8n,如何优雅的实现多语言?
    11.用户信息——sql语句的使用、Tornado更改用户信息、Tornado上传图片头像
  • 原文地址:https://blog.csdn.net/m0_51809035/article/details/127708406