• java基础-并发编程-CyclicBarrier(JDK1.8)源码学习


    CyclicBarrier源码学习

    Java并发编程(十六):CyclicBarrier源码分析

    CyclicBarrier执行流程:

    最后一个就位线程负责把所有条件队列中的线程添加到同步队列,然后在finally中执行ReentrantLock的unlock方法唤醒同步队列中的head.next,本节点成为新的head节点,然后被唤醒线程同样会到finally的unlock方法中唤醒下一个线程,这样传递唤醒

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    一、初始化

        public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            // 需要多少数量的线程到达才可以打碎屏障
            this.parties = parties;
            // 到达屏障线程的计数器
            this.count = parties;
            // 线程到达屏障时的任务方法
            this.barrierCommand = barrierAction;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    二、设置屏障:public int await()

        public int await() throws InterruptedException, BrokenBarrierException {
            try {
            	// 调用为设置超时时间的等待
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe); // cannot happen
            }
        }
    
        private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
                   TimeoutException {
            // 设置ReentrantLock 非公平锁 lock 
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
            	// 每次使用屏障的都表示为一个生成实例,当屏障跳闸时generation会重置
                final Generation g = generation;
    			// 检测当前屏障是否被打碎,下方调用breakBarrier()地方都会打碎
                if (g.broken)
                    throw new BrokenBarrierException();
    			// 当前线程被哦中断,则调用breakBarrier(),中断屏障
                if (Thread.interrupted()) {
                    breakBarrier();
                    throw new InterruptedException();
                }
    			// 每次有一个线程到达屏障,则将屏障等待线程计数器-1
                int index = --count;
                // 到达屏障的线程数已满足条件
                if (index == 0) {  // tripped
                	// 标识打碎屏障后的后续动作是否执行完成
                    boolean ranAction = false;
                    try {
                    	// 执行到达屏障后的汇总任务
                        final Runnable command = barrierCommand;
                        if (command != null)
                            command.run();
                        // 标识打碎屏障后的后续动作已经执行完成
                        ranAction = true;
                        // 屏障跳闸时generation会重置
                        nextGeneration();
                        return 0;
                    } finally {
                        if (!ranAction)
                            breakBarrier();
                    }
                }
    
                // 到达屏障的前n-1个线程会走到
                for (;;) {
                    try {
                    	// 是否超时等待
                        if (!timed)
                            trip.await();
                        else if (nanos > 0L)
                            nanos = trip.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
                        if (g == generation && ! g.broken) {
                            breakBarrier();
                            throw ie;
                        } else {
                            // We're about to finish waiting even if we had not
                            // been interrupted, so this interrupt is deemed to
                            // "belong" to subsequent execution.
                            Thread.currentThread().interrupt();
                        }
                    }
    				// 异常情况处理
                    if (g.broken)
                        throw new BrokenBarrierException();
    
                    if (g != generation)
                        return index;
    
                    if (timed && nanos <= 0L) {
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
            } finally {
                lock.unlock();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
        private void nextGeneration() {
            // 此处是将条件队列中的所有节点转移到同步队列,当此线程(到达屏障的第n个线程)执行到finally 中调用 lock.unlock();,唤醒下一个节点,被唤醒节点成为新的head节点,同样会到finally的unlock方法中唤醒下一个线程,这样传递唤醒
            trip.signalAll();
            // 重置计数器
            count = parties;
            // 重新生成屏障实例
            generation = new Generation();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    	public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 将当前线程生成node添加到条件队列,被lastWaiter引用
            Node node = addConditionWaiter();
            // 释放全部锁资源
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            // 判断是否在等待队列上
            // 前n-1个线程都会执行到次数,此时都不在等待队列上,因此进入循环进行阻塞
            // 第n个线程到达屏障后会将所有条件队列中的节点放到等待队列,详细参考方法nextGeneration()注解
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                // 如果线程被打断,则break
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // ReentrantLock入队阻塞逻辑,正常情况因为唤醒是在 finally 中调用 lock.unlock();,唤醒下一个节点的情况
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
            private Node addConditionWaiter() {
                Node t = lastWaiter;
                // 此节点已被取消,从条件队列中移除
                if (t != null && t.waitStatus != Node.CONDITION) {
                    unlinkCancelledWaiters();
                    t = lastWaiter;
                }
                // 构建条件节点,添加到条件队列末尾
                Node node = new Node(Thread.currentThread(), Node.CONDITION);
                if (t == null)
                    firstWaiter = node;
                else
                    t.nextWaiter = node;
                lastWaiter = node;
                return node;
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
            private void unlinkCancelledWaiters() {
                Node t = firstWaiter;
                Node trail = null;
                while (t != null) {
                    Node next = t.nextWaiter;
                    // 移除条件队列中的非条件节点
                    if (t.waitStatus != Node.CONDITION) {
                        t.nextWaiter = null;
                        if (trail == null)
                            firstWaiter = next;
                        else
                            trail.nextWaiter = next;
                        if (next == null)
                            lastWaiter = trail;
                    }
                    else
                        trail = t;
                    t = next;
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
        final int fullyRelease(Node node) {
            boolean failed = true;
            try {
                int savedState = getState();
                // 释放AQS中的state,此处其实是当前线程已经加入条件队列,那么可以"释放锁",但是没有执行释放锁的逻辑
                if (release(savedState)) {
                    failed = false;
                    return savedState;
                } else {
                    throw new IllegalMonitorStateException();
                }
            } finally {
                if (failed)
                    node.waitStatus = Node.CANCELLED;
            }
        }
    
        public final boolean release(int arg) {
            if (tryRelease(arg)) {
            	// 此时head为null,因为第n个线程到达屏障时才会将条件队列中的节点转移到同步队列,此时同步队列为空
                Node h = head;
                if (h != null && h.waitStatus != 0)
                	// 执行不到
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    
    
    • 1
            public final long awaitNanos(long nanosTimeout)
                    throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
                Node node = addConditionWaiter();
                int savedState = fullyRelease(node);
                // 线程挂起到的时间点
                final long deadline = System.nanoTime() + nanosTimeout;
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                	// 挂起超时则从条件队列中移除,添加到同步队列
                    if (nanosTimeout <= 0L) {
                        transferAfterCancelledWait(node);
                        break;
                    }
                    if (nanosTimeout >= spinForTimeoutThreshold)
                        LockSupport.parkNanos(this, nanosTimeout);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                    nanosTimeout = deadline - System.nanoTime();
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null)
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
                return deadline - System.nanoTime();
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
  • 相关阅读:
    【秋季热身赛】No.2.数字朋友 -- Java Version
    会计制度设计试题及答案
    Vue3为何使用Proxy实现数据监听
    springcloudalibaba架构(23):RocketMQ普通消息和顺序消息
    关于#r语言#的问题:有没有随机多属性可接受性分析(SMAA)的R语言代码
    字符串String倒序输出的四种方法
    基于php+thinkphp的网上书店购物商城系统
    尚品汇项目2
    没有弹性盒,如何玩转移动端?
    jmeter-9-断言之JsonSchema(超推荐)
  • 原文地址:https://blog.csdn.net/Semanteme/article/details/132981820