• 【深入浅出Java并发编程指南】「实战篇」教你如何使用AbstractQueuedSynchronizer实现自己的同步器组件


    前提概要

    之前的文章中会涉及到了相关AQS的原理和相关源码的分析,所谓实践是检验真理的唯一标准!接下来就让我们活化一下AQS技术,主要针对于自己动手实现一个AQS同步器。

    定义MyLock实现Lock

    Doug Lea大神在JDK1.5编写了一个Lock接口,里面定义了实现一个锁的基本方法,我们只需编写一个MyLock类实现这个接口就好。

    下面就是针对于Lock实现类的基本基础架构机制,从而实现对应的各个核心的方法内容。

    class MyLock implements Lock {
    }
    
    • 1
    • 2

    对应的Lock接口的方法实现

    加锁

    如果加锁竞争锁资源不成功则进入等待队列。

     @Override
    public void lock() {}
    
    • 1
    • 2
    加锁

    如果加锁竞争锁资源不成功则进入等待队列,在处于等待过程中是允许外界调用interrupt进行中断等待状态。

    @Override
    public void lockInterruptibly() throws InterruptedException {}
    
    • 1
    • 2
    尝试加锁

    与try的区别就是try会因为没有获取到锁资源而进入等待队列。而对于tryLock,如果出现失败后,则会立刻返回不会进入等待队列,而是返回给调用端false,如果获取到资源则进行返回true,有一点乐观锁的感觉。

    @Override
    public boolean tryLock() {}
    
    • 1
    • 2
    尝试加锁 带超时的

    他会尝试阻塞和等待对应的时间time和对应的TimeUnit时间戳单位,进行等待加锁。

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {}
    
    • 1
    • 2
    释放锁
    @Override
    public void unlock() {}
    
    • 1
    • 2
    返回一个条件变量(不在本案例谈论)
    @Override
    public Condition newCondition() {}
    
    • 1
    • 2

    定义好MyLock后,接下来就是实现各个方法的逻辑,达到真正的用于线程间sync互斥的需求。

    自定义一个同步器

    我们定义MySync然后继承自AbstractQueuedSynchronizer的Sync对象类。实现自定义的MySync前,先了解AQS内部的一些基本概念。在AQS中主要的一些成员属性如下:

    • state:用于标记资源状态,如果为0表示资源没有被占用,可以加锁成功。如果大于0表示资源已经被占用,然后根据自己的定义去实现是否允许对共享资源进行操作

      • 比如:ReentrantLock的实现方式是当state大于0,那么表示已经有线程获得锁了,我们都知道ReentrantLock是可重入的,其原理就是当有线程次进入同一个lock标记的临界区时。先判断这个线程是否是获得锁的那个线程,如果是,state会+1,此时state会等于2。
      • 当unlock时,会一层一层的减1,直到state等于0则表示完全释放锁成功。
    • head、tail:用于存放获得锁失败的线程。在AQS中,每一个线程会被封装成一个Node节点,这些节点如果获得锁资源失败会链在head、tail中,成为一个双向链表结构。

    • exclusiveOwnerThread用于存放当前获得锁的线程,正如在state说明的那样。ReentrantLock判断可重入的条件就是用这个exclusiveOwnerThread线程跟申请获得锁的线程做比较,如果是同一个线程,则state+1,并重入加锁成功

    知道这些概念后我们就可以自定义一个AQS:

    public final class MySync extends AbstractQueuedSynchronizer {
        /**
        * 尝试加锁
        */
        @Override
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0, 1)) {
                // 修改state状态成功后设置当前线程为占有锁资源线程
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        /**
        * 释放锁
        */
        @Override
        protected boolean tryRelease(int arg) {
            setExclusiveOwnerThread(null);
            // state有volatile修饰,为了保证解锁后其他的一些变量对其他线程可见,把setExclusiveOwnerThread(null)放到上面 happens-before中定义的 volatile规则
            setState(0);
            return true;
        }
        /**
        * 判断是否是独占锁
        */
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    将MySync组合进MyLock

    最后一步就是将第一步中的所有方法逻辑完成

    class MyLock implements Lock {
    
        // 组合自定义sync器
        private MySync sync = new MySync();
    
        /**
         * 加锁。如果不成功则进入等待队列
         */
        public void lock() {
            sync.acquire(1);
        }
        /**
        * 加锁(可被interrupt)
        */
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
        }
        /**
         * 尝试加锁
         */
        public boolean tryLock() {
            return sync.tryAcquire(1);
        }
        /**
         * 加锁 带超时的
         */
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return sync.tryAcquireNanos(1, unit.toMillis(time));
        }
        /**
        * 释放锁
        */
        public void unlock() {
            sync.release(0);
        }
        /**
        * 返回一个条件变量(不在本案例谈论)
        */
        @Override
        public Condition newCondition() {
            return null;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    完成整个MyLock的逻辑后,发现在lock()、unlock()中调用的自定义sync的方法tryAcquire()和tryRelease()方法。我们就以在lock()方法中调用acquire()方法说明模板设计模式在AQS中的应用。

    点进.acquire()方法后,发现改该方法是来自AbstractQueuedSynchronizer中:

    • 在这里面可以看到tryAcquire方法,继续点进去看看tryAcquire(),发现该方法是一个必须被重写的方法,否则抛出一个运行时异常。

    • 模板方法设计模式在这里得以体现,再回到我们第二部中自定义的MySync中,就是重写了AQS中的tryAcquire()方法。

    因此整个自定义加锁的流程如下:

    • 调用MyLock的lock(),lock()方法调用AQS的acquire()方法
    • 在acquire()方法中调用了tryAcquire()方法进行加锁
    • 而tryAcquire()方法在AQS中是一个必须让子类自定义重写的方法,否则会抛出一个异常
    • 因此调用tryAcquire()时实际上是调用了我们自定义的MySync类中tryAcquire()方法

    总结

    AQS作为Java并发体系下的关键类,在各种并发工具中都有它的身影,如ReentrantLock、Semaphore等。这些并发工具用于控制sync互斥的手段都是采用AQS,外加Cas机制。AQS采用了模板方法设计模式让子类们自定义sync互斥的条件,比如本案例中MySync类重写了tryAcquire方法。

    下面实现一个自定义的sync:

    public class SelfSynchronizer {
    
        private final Sync sync = new Sync();
    
        public void lock() {
            sync.acquire(1);
        }
    
        public boolean tryLock() {
            return sync.tryAcquire(1);
        }
    
        public boolean unLock() {
            return sync.release(1);
        }
    
        static class Sync extends AbstractQueuedSynchronizer {
            //是否处于占用状态
            @Override
            protected boolean isHeldExclusively() {
                return getState() == 1;
            }
    
            /**
             * 获取sync资源
             * @param acquires
             * @return
             */
            @Override
            public boolean tryAcquire(int acquires) {
                if(compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                //这里没有考虑可重入锁
                /*else if (Thread.currentThread() == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }*/
                return false;
            }
    
            /**
             * 释放sync资源
             * @param releases
             * @return
             */
            @Override
            protected boolean tryRelease(int releases) {
                int c = getState() - releases;
                boolean free = false;
                if (c == 0) {
                    free = true;
                }
                setState(c);
                return free;
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    ReentrantLock源码和上面自定义的sync很相似,测试下该sync,i++在多线程下执行情况:

    public class TestSelfSynchronizer {
        private static int a = 0;
        private static int b = 0;
        private static SelfSynchronizer selfSynchronizer = new SelfSynchronizer();
        private static ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 50, 1, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>());
        private static ExecutorService ec = Executors.newFixedThreadPool(20);
        public static void main(String[] args) throws InterruptedException {
            for (int i = 0; i < 20 ; i++) {
                executor.submit(new Task());
            }
            for (int j = 0; j < 20 ; j++) {
                ec.submit(new TaskSync());
            }
            Thread.sleep(10000);
            System.out.println("a的值:"+ a);
            System.out.println("b的值" + b);
            executor.shutdown();
            ec.shutdown();
        }
        static class Task implements Runnable {
            @Override
            public void run() {
                for(int i=0;i<10000;i++) {
                    a++;
                }
            }
        }
        static class TaskSync implements Runnable {
            @Override
            public void run() {
                for (int i = 0; i < 10000; i++) {
                	//使用sync器加锁
                    selfSynchronizer.lock();
                    b++;
                    selfSynchronizer.unLock();
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    开启两个线程池,对int型变量自增10000次,如果不加sync器,最后值小于200000,使用了自定义sync器则最后值正常等于200000,这是因为每次自增操作加锁

  • 相关阅读:
    分享一下怎么做陪诊小程序
    Talk|加州大学圣地亚哥分校程旭欣:视觉反馈下足式机器人的全身操作与运动
    电子学会python三级笔记
    JSD-2204-Knife4j框架-处理响应结果-Day07
    MySQL【基本select语句】
    35 _ Trie树:如何实现搜索引擎的搜索关键词提示功能?
    Elasticsearch深入理解(七) —— Index Setting一览
    GIT合并任意两个指定分支
    Springboot毕设项目办公用品管理系统c1139(java+VUE+Mybatis+Maven+Mysql)
    Flutter:环境搭建、项目创建
  • 原文地址:https://blog.csdn.net/l569590478/article/details/127998152