• AQS 组件



    AQS 是一个构造同步器的框架,用来构造同步器,如 ReentrantLock、倒计时器、以及自定义同步器

    使用方法(模板模式)

    继承 AbstractQueuedSynchronizer 并重写指定的方法,定义获取与释放state的流程

    有关等待队列的步骤不能也不用重写(因为被final定义,而且进队列出队列的方式以及被AQS写好了)

    源码分析以及原理

    使用volatile关键字来修饰int类型的状态state,这个值实现了可重入锁与信号量等功能

        private volatile int state;
    
    • 1

    如果某个线程可以修改state,标记该线程为可用线程,如果不可获取,则加入等待队列,使用CAS实现对state的修改(导入了Unsafe类),获取资源的流程则是由用户继承后写入的

    	protected final int getState() {
    	        return state;
    	}
    	protected final void setState(int newState) {
    	        state = newState;
    	}
        protected final boolean compareAndSetState(int expect, int update) {
            return U.compareAndSetInt(this, STATE, expect, update);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    有一个FIFO的队列CLH来完成获取资源线程的排队工作,这个队列中装着内部类Node,AQS有一套完整的线程等待与唤醒机制

        static final class Node {
            /** Marker to indicate a node is waiting in shared mode */
            static final Node SHARED = new Node();
            /** Marker to indicate a node is waiting in exclusive mode */
            static final Node EXCLUSIVE = null;
            ...
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    同样,还有一些这个节点进行操作的对象

        private transient volatile Node head;
        private transient volatile Node tail;
        ...
    
    • 1
    • 2
    • 3

    AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中

    公平锁和非公平锁

    ReentrantLock 默认采用非公平锁,因为考虑获得更好的性能,通过 boolean 来决定是否用公平锁(传入true则使用公平锁)

    对公平锁而言,首先判断state是否为0,如果为0,直接判断CLH队列中有没有在等待的线程,如果有,它会在后面排队;如果没有则CAS拿锁;如果state不为0,后面排队

    	// 这是公平锁的acquire方法
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
        // 公平锁特有的tryAcquire方法
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                // 和非公平锁相比,这里多了一个判断:是否有线程在等待
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 这里和非公平锁一样,都是去排队
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    对非公平锁而言,在调用lock函数的时候它会直接CAS试试能不能拿锁,然后进入和公平锁差不多的acquire方法,如果发现锁这个时候被释放了(state==0),非公平锁会直接CAS抢锁,其他的步骤与公平锁相似

    static final class NonfairSync extends Sync {
        final void lock() {
            // 和公平锁相比,这里会直接先进行一次CAS,成功就返回了
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
        // AbstractQueuedSynchronizer.acquire(int arg)
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
        // 非公平锁的tryAcquire,主要执行nonfairTryAcquire方法
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
    
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            // 这里没有对阻塞队列进行判断,直接尝试去抢锁
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 如果抢不到,那就算了
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    对资源共享的方式

    独占

    只有一个线程可以获取状态,如 ReentrantLock

    在独占状态下可以实现两种模式,公平锁(排队获取锁)与非公平锁(抢锁)

    ReentrantLock

    锁的使用非常简单,但是需要注意一点,lock 方法下必须使用 try 环绕

    // 方式一:Oracle 官方推荐的写法
    private val look = ReentrantLock()
    fun printNumber() {
        look.lock()
        try {
            // TODO
        } finally {
            look.unlock()
        }
    }
    
    // 方式二:错误的写法
    private val look = ReentrantLock()
    fun printNumber() {
        try {
            look.lock()
        } finally {
            look.unlock()
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    方法一是 Oracle 推荐的方式, 并且在 「阿里巴巴JAVA开发手册」 明确规定了不建议使用 方式二, 即不建议将 lock.lock() 写在 try…finally 代码块内部。这么做是为了避免线程还未加锁就抛出异常,解锁时对没有没有被上锁的对象解锁,此时会 unlock 方法会抛出异常,覆盖之前的异常信息

    有 tryLock 尝试加锁操作,如果失败了,则会立即返回 false。lockInterruptibly 打断其他线程操作

    private val look = ReentrantLock()
    fun printNumber() {
        val isLocked = look.tryLock()
        if (isLocked) {
            try {
                // TODO
            } finally {
                look.unlock()
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    在构造方法中加入 true 定义此锁为公平锁

    ReadWriteLock

    读写锁

    和数据库的读写锁定义一样

            ReadWriteLock lock = new ReentrantReadWriteLock();
            Lock lock1 = lock.readLock();
            Lock lock2 = lock.writeLock();
    
    • 1
    • 2
    • 3

    共享

    多个资源都可以获得状态,如信号量、倒计时器

    CountDownLatch(减少计数)

    计算减少锁,中文翻译为倒计时器,基于AQS

    作用是调用 await 方法让一个或者多个线程阻塞,直至一些线程调用 countDown 方法将减少计数内部的 state 减少为0,被阻塞的方法才会继续执行

    以下是一些常用方法

    CountDownLatch(int count); //构造方法,创建一个值为count 的计数器。await();//阻塞当前线程,将当前线程加入阻塞队列。await(long timeout, TimeUnit unit);//在timeout的时间之内阻塞当前线程,时间一过则当前线程可以执行,countDown();//对计数器进行递减1操作,当计数器递减至0时,当前线程会去唤醒阻塞队列里的所有线程。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    比如如果多个线程执行完毕之后,才可以打印日志,此时可以使用这个类,注意,这个倒计时器只能使用一次。以下是一些使用示例

            CountDownLatch latch = new CountDownLatch(priceKeyList.size());
            List<String> hotel = priceKeyList.stream().map(t -> {
            			try {
                            return intToList.apply(hotelPriceQueryParam);
                        } finally {
                            latch.countDown();
                        }
                    })
            ).collect(Collectors.toList());
    
            try {
                latch.await(timeout, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
            	e.sout;
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    CyclicBarrier(循环栅栏)

    它的作用是调用await方法让线程等待并且将栅栏中的state加1,直到屏障满了才会让它们继续执行并且调用栅栏中的方法,和人满发车一个道理,以下是它的使用方法:

    //循环屏障的定义
    CyclicBarrier cyclicBarrier = new CyclicBarrier(20, () -> {System.out.println("ready");});
    //在线程中使用这个方法让线程等待
    cyclicBarrier.await();
    
    • 1
    • 2
    • 3
    • 4

    它与CountDownLatch的最大区别在与它是让多个线程同时等待,而CountDownLatch是让一个线程等待多个线程完成

    Semphore(信号量)

    信号量在Linux中也是一个比较重要的进程间通信方式

    它定义最多有几个线程同时执行,只有抢到state的线程才会运行,抢到state的线程可能有多个,其他的线程会在CLH队列中等待,如果其中某一个线程运行完毕调用release方法,信号量会自动唤醒等待队列中的线程

    信号量最大的作用就是限制一定数目的线程同时访问某个资源,因此,在某些业务资源需要被更改的情况需要特别注意

    //定义最多有两个线程同时运行
    Semaphore semaphore = new Semaphore(2);
    //得到运行许可
    semaphore.acquire();
    //释放运行许可
    semaphore.release();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    除了以上这些之外,还有BlockingQueue族(用来解决生产者消费者问题)等一些其他API,都在JUC包中

  • 相关阅读:
    OpenCV DNN C++ 使用 YOLO 模型推理
    如何设计高性能架构
    基于Go语言GoFrame+Vue+ElementUI的OA办公系统
    智能运维管理系统平台:数字化转型时代的运维先锋
    MySQL高级9-锁
    Xpath注入学习记录
    GBase8s数据库INTO EXTERNAL 子句
    音频裁剪软件有哪些?来看看这几个实用软件
    zabbix监控部署keepalived高可用
    Android studio添加aidl文件时,添加按钮为黑色不可点击添加解决办法
  • 原文地址:https://blog.csdn.net/sekever/article/details/126591905