• J.U.C并发工具【CountDownLatch,Semaphore,CyclicBarrier】底层源码


    CountDownLatch的实现原理

    • 它可以让一个线程阻塞或者让多个线程阻塞,共享锁实现
    • 可以允许多个线程同时抢占到锁,然后等到计数器归零的时候,同时唤醒
    • state记录计算器
    • countDown的时候,实际上就是state–
    public abstract class BaseHealthChecker implements Runnable {
    
        private String serviceName; //服务名称
    
        private boolean serviceUp;
    
        public BaseHealthChecker(String serviceName) {
            this.serviceName = serviceName;
        }
    
        @Override
        public void run() {
            try {
                verifyService();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public abstract void verifyService() throws Exception;
    
        public String getServiceName() {
            return serviceName;
        }
    
        public boolean isServiceUp() {
            return serviceUp;
        }
    }
    
    public class CacheHealthChecker extends BaseHealthChecker{
    
        private CountDownLatch countDownLatch;
    
        public CacheHealthChecker(CountDownLatch countDownLatch) {
            super("CacheHealthChecker");
            this.countDownLatch=countDownLatch;
        }
    
        @Override
        public void verifyService() throws Exception {
            System.out.println("Checking:"+this.getServiceName());
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
                throw e;
            }
            countDownLatch.countDown();
            System.out.println(this.getServiceName()+" 健康状态正常");
        }
    }
    
    public class DatabaseHealthChecker extends BaseHealthChecker {
        private CountDownLatch countDownLatch;
    
    
        public DatabaseHealthChecker(CountDownLatch countDownLatch) {
            super("DatabaseHealthChecker");
            this.countDownLatch = countDownLatch;
        }
    
        @Override
        public void verifyService() throws Exception {
            System.out.println("Checking:" + this.getServiceName());
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
                throw e;
            }
            countDownLatch.countDown();
            System.out.println(this.getServiceName() + " 健康状态正常");
        }
    
    }
    
    public class ApplicationStartup {
    
        private static List<BaseHealthChecker> services;
        private static CountDownLatch countDownLatch=new CountDownLatch(2);
    
        static{
            services=new ArrayList<>();
            services.add(new CacheHealthChecker(countDownLatch));
            services.add(new DatabaseHealthChecker(countDownLatch));
        }
    
        public static boolean checkExternalServices() throws InterruptedException {
            for(BaseHealthChecker bh:services){
                new Thread(bh).start(); //针对每个服务采用线程来执行
            }
            countDownLatch.await();
            System.out.println("111");
            countDownLatch.await();
            System.out.println("222");
            return true;
        }
    }
    
    public class StartupMain {
        public static void main(String[] args) {
            try {
                ApplicationStartup.checkExternalServices();
            } catch (InterruptedException e) {
            }
            System.out.println("服务启动成功");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107

    CountDownLatch底层源码

    1. CountDownLatch.wait方法
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)    // 判断记录计算器是否等于0后表示可以直接通行
            doAcquireSharedInterruptibly(arg);    // 
    }
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }
    
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);    // 添加进等待队列当中
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();    // 当前node节点是第一个着尝试抢占资源
                if (p == head) {
                    int r = tryAcquireShared(arg);    // 抢占资源
                    if (r >= 0) {    // 抢占成功
                        setHeadAndPropagate(node, r);    // 将队列当中所有节点唤醒
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&    // 修改waitStatus状态标示为当前node需要等待
                    parkAndCheckInterrupt())    // 休眠等待
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    // 将队列当中所有节点唤醒
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);// 将队列的头部设置为node,从而退出队列
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();    // 为共享模式释放动作——向后继发出信号并确保传播
        }
    }
    // 将队列的头部设置为node,从而退出队列
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }
    // 为共享模式释放动作——向后继发出信号并确保传播
        }
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {    // 当前队列头节点node不等于尾节点node着继续唤醒
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {    // 如果当前node等于-1 着唤醒
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);    // 唤醒并移除当前node节点
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // 如果当前h节点等于头节点表示全部唤醒,并且结束
                break;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    1. countDownLatch.countDown方法
    public void countDown() {
        sync.releaseShared(1);    // state--
    }
    
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) { // 通过cas操作修改state--
            doReleaseShared();    // 唤醒等待队列所有对象
            return true;
        }
        return false;
    }
    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))    //cas 修改state状态
                return nextc == 0;
        }
    }
    private void doReleaseShared() {
        for (;;) {    
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {    //判断node节点是否等于-1 标示等待
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))    // cas修改唤醒状态
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);    // 唤醒,并重新指向下一个节点
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0) //当前node的waitStatus如果为-1表示需要修改为0
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {   // 如果当前node的下一个节点为空或者是需要抛弃的节点着进入其中修改
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)    // 查询到小于等于0的那个节点,就是队列当中第一个线程A
                    s = t;
        }
        if (s != null)    
            LockSupport.unpark(s.thread);// 当前node节点唤醒
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    Semaphore的实现原理

    • 信号量,限流器,限制资源访问
    • 本质上就是:抢占同一个令牌,如果抢占成功着同行,未抢占着阻塞
    1. acquire() 抢占一个令牌
      acquire = 10-1 如:为0时着阻塞,也有可能同时阻塞多个线程
    2. release() 释放一个令牌
      release = 令牌 + 1 如:有令牌了,唤醒,从阻塞队列当中唤醒
    3. Semaphore采用的是共享锁机制,因为同时可以释放多个令牌,那么意味着可以同时多个线程可以抢占到锁
    public class SemaphoreTest {
    
        public static void main(String[] args) {
            //限制资源的并发数量.
            Semaphore semaphore=new Semaphore(10);
            for (int i = 0; i < 20; i++) {
                new Car(i,semaphore).start();
            }
        }
        static class Car extends  Thread{
            private int num;
            private Semaphore semaphore;
    
            public Car(int num, Semaphore semaphore) {
                this.num = num;
                this.semaphore = semaphore;
            }
            @Override
            public void run(){
                try {
                    semaphore.acquire(); //获得一个令牌
                    System.out.println("第 "+num+"俩车抢到一个车位");
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println("第 "+num+"走喽~");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    semaphore.release(); //释放一个令牌
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    1. acquire() 抢占一个令牌
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)    // 获取令牌
            doAcquireSharedInterruptibly(arg);    // 未抢到令牌
    }
    // 抢占令牌
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            int available = getState();    // 获取当前共享锁状态
            int remaining = available - acquires;    // 获取令牌
            if (remaining < 0 ||
                compareAndSetState(available, remaining))    // cas操作修改令牌状态
                return remaining;
        }
    }
    // 未获取令牌
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);    // 将当前线程放进等待队列当中
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {    // 校验当前线程是否时队列当中第一个节点
                    int r = tryAcquireShared(arg);    // 抢占令牌
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);    // 抢占成功,将当前线程noe节点移除队列当中
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&    // node节点修改状态并且阻塞
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    1. release() 释放一个令牌
    public void release() {
        sync.releaseShared(1);
    }
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    // 释放一个令牌
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            int next = current + releases;
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            if (compareAndSetState(current, next))   // 通过cas 释放一个令牌
                return true;
        }
    }
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {    // 修改node状态并且唤醒
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    CyclicBarrier原理
    可重复的栅栏,实现,相当于 , 多个线程通过CountDownLatch的await 。然后另外一个线程使用countDown方法来唤醒。

    public class CyclicBarrierTest {
    
        public static void main(String[] args) {
            int n=4;
            CyclicBarrier barrier=new CyclicBarrier(4,()->{
                System.out.println("所有线程都写入完成,继续处理其他任务");
            });  // 4
            for (int i = 0; i < n; i++) {
                new Writer(barrier).start();
            }
        }
        static class Writer extends Thread{
            private CyclicBarrier cyclicBarrier;
            public Writer(CyclicBarrier barrier){
                this.cyclicBarrier=barrier;
            }
            @Override
            public void run(){
                try {
                    Thread.sleep(1000);
                    System.out.println(Thread.currentThread().getName()+"写入数据完毕,等待其他线程");
                    cyclicBarrier.await();  //-1的动作
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
    
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    1. 创建CyclicBarrier对象
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;    // 当前栅栏数量
        this.count = parties;    // 仍在等待人数,每当归0后会重置为栅栏数量
        this.barrierCommand = barrierAction;    // 数量
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    1. await()方法
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();    // 通过lock保证每个线程调用wait访问都是唯一性的
        try {
            // Generation每次使用都表示为一个生成实例。每当障碍被触发或重置时,生成就会改变。使用generation的线程可以关联很多代——由于锁分配给等待线程的方式不确定——但一次只能有一个是活动的(应用计数的那个),其余的要么是中断的,要么是触发的。
            // 如果有一个中断,但没有后续重置,则不需要有一个活动的生成
            final Generation g = generation;
    
            if (g.broken)
                throw new BrokenBarrierException();
    
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
    
            int index = --count;
            if (index == 0) {  // 栅栏为0表示需要唤醒等待队列对象继续执行代码
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();   // 通过栅栏执行任务
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }
    
            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    if (!timed)
                        trip.await();    //当前线程等待
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
    
                if (g.broken)
                    throw new BrokenBarrierException();
    
                if (g != generation)
                    return index;
    
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }
    // 唤醒队列当中所有对象
    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();    // 唤醒队列当中所有对象
        // set up next generation
        count = parties;    // 重置栅栏数量
        generation = new Generation();    // 重新创建新的generation对象
    }
    public final void signalAll() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignalAll(first);    // 唤醒
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
  • 相关阅读:
    基于机器学习之模型树短期负荷预测(Matlab代码实现)
    全文解读 | 五部委印发:元宇宙产业创新发展三年行动计划(2023-2025年)
    跨域和验证码的实现
    操作系统课程设计:新增Linux驱动程序(重制版)
    数据库原理(数据库设计)——(3)
    Kotlin,解决调用了函数但是函数体内没有执行的问题,什么时候使用invoke
    Stacked Attention Networks for Image Question Answering(用于图像问答的堆叠注意力网络)
    LeetCode——字符串(Java)
    Devops 开发运维高级篇之Jenkins+Docker+SpringCloud微服务持续集成
    D-Desthiobiotin|D-脱硫生物素|CAS:533-48-2用于蛋白质和细胞的标记
  • 原文地址:https://blog.csdn.net/weixin_46919552/article/details/125604199