• Java多线程(6):锁与AQS(下)


    您好,我是湘王,这是我的CSDN博客,欢迎您来,欢迎您再来~

    之前说过,AQS(抽象队列同步器)是Java锁机制的底层实现。既然它这么优秀,是骡子是马,就拉出来溜溜吧。

    首先用重入锁来实现简单的累加,就像这样:

    1. /**
    2. * 用重入锁实现累加
    3. *
    4. * @author 湘王
    5. */
    6. public class MyLockTest {
    7. private final Lock lock = new ReentrantLock();
    8. private int value;
    9. public int getNext() {
    10. lock.lock();
    11. try {
    12. value++;
    13. } finally {
    14. lock.unlock();
    15. }
    16. return value;
    17. }
    18. public static void main(String[] args) {
    19. MyLockTest myLock = new MyLockTest();
    20. for (int i = 0; i < 5; i++) {
    21. new Thread(new Runnable() {
    22. @Override
    23. public void run() {
    24. for (int i = 0; i < 5; i++) {
    25. System.out.println(myLock.getNext());
    26. }
    27. }
    28. }).start();
    29. }
    30. }
    31. }

    运行结果显示数据有重复:

    这么简单的计算都能出现重复,这肯定是无法接受的。

    再用独占锁来试试看:

    1. public class MyExclusiveLock implements Lock {
    2. @Override
    3. public void lock() {
    4. }
    5. @Override
    6. public void lockInterruptibly() throws InterruptedException {
    7. }
    8. @Override
    9. public boolean tryLock() {
    10. return false;
    11. }
    12. @Override
    13. public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    14. return false;
    15. }
    16. @Override
    17. public void unlock() {
    18. }
    19. @Override
    20. public Condition newCondition() {
    21. return null;
    22. }
    23. }

    可以看到,实现lock接口,就需要实现若干自定义的接口。然后以内部类继承AQS的方式,实现排他锁,昨天也说过,AQS中tryAcquire()和tryRelease()是一一对应的,也就是也管获取,一个管释放,所以代码是:

    1. /**
    2. * 内部类继承AQS的方式,实现排他锁
    3. */
    4. private static class SyncHelper extends AbstractQueuedSynchronizer {
    5. private static final long serialVersionUID = -7666580981453962426L;
    6. /**
    7. * 第一个线程进来,拿到锁就返回true;后面的线程进来,拿不到锁就返回false
    8. */
    9. @Override
    10. protected boolean tryAcquire(int arg) {
    11. // 获取资源状态
    12. int state = getState();
    13. if (0 == state) {// 如果没有线程拿到资源的锁
    14. if (compareAndSetState(0, arg)) {
    15. // 保存当前持有同步锁的线程
    16. setExclusiveOwnerThread(Thread.currentThread());
    17. return true;
    18. }
    19. } else if (Thread.currentThread() == getExclusiveOwnerThread()) {
    20. // 如果当前线程再次进来,state + 1,可重入
    21. // 如果这里没有这个判断,那么程序会卡死
    22. setState(state + arg);
    23. return true;
    24. }
    25. return false;
    26. }
    27. /**
    28. * 锁的获取和释放需要一一对应
    29. */
    30. @Override
    31. protected boolean tryRelease(int arg) {
    32. // 获取资源状态
    33. int state = getState();
    34. // 返回最后一个通过setExclusiveOwnerThread()方法设置过的线程,或者null
    35. if (Thread.currentThread() != getExclusiveOwnerThread()) {
    36. throw new RuntimeException();
    37. }
    38. setState(state - arg);
    39. if (0 == state) {
    40. setExclusiveOwnerThread(null);
    41. return true;
    42. }
    43. return false;
    44. }
    45. protected Condition newCondition() {
    46. return new ConditionObject();
    47. }
    48. }

    然后再用AQS实现lock接口的方法:

    1. /**
    2. * 利用AQS实现自定义独占锁
    3. *
    4. * @author 湘王
    5. */
    6. public class MyExclusiveLock implements Lock {
    7. private final SyncHelper synchepler = new SyncHelper();
    8. @Override
    9. public void lock() {
    10. synchepler.acquire(1);
    11. }
    12. @Override
    13. public void lockInterruptibly() throws InterruptedException {
    14. synchepler.acquireInterruptibly(1);
    15. }
    16. @Override
    17. public boolean tryLock() {
    18. return synchepler.tryAcquire(1);
    19. }
    20. @Override
    21. public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    22. return synchepler.tryAcquireNanos(1, unit.toNanos(time));
    23. }
    24. @Override
    25. public void unlock() {
    26. synchepler.release(1);
    27. }
    28. @Override
    29. public Condition newCondition() {
    30. return synchepler.newCondition();
    31. }
    32. /**
    33. * 内部类继承AQS的方式,实现排他锁
    34. */
    35. private static class SyncHelper extends AbstractQueuedSynchronizer {
    36. private static final long serialVersionUID = -7666580981453962426L;
    37. /**
    38. * 第一个线程进来,拿到锁就返回true;后面的线程进来,拿不到锁就返回false
    39. */
    40. @Override
    41. protected boolean tryAcquire(int arg) {
    42. // 获取资源状态
    43. int state = getState();
    44. if (0 == state) {// 如果没有线程拿到资源的锁
    45. if (compareAndSetState(0, arg)) {
    46. // 保存当前持有同步锁的线程
    47. setExclusiveOwnerThread(Thread.currentThread());
    48. return true;
    49. }
    50. } else if (Thread.currentThread() == getExclusiveOwnerThread()) {
    51. // 如果当前线程再次进来,state + 1,可重入
    52. // 如果这里没有这个判断,那么程序会卡死
    53. setState(state + arg);
    54. return true;
    55. }
    56. return false;
    57. }
    58. /**
    59. * 锁的获取和释放需要一一对应
    60. */
    61. @Override
    62. protected boolean tryRelease(int arg) {
    63. // 获取资源状态
    64. int state = getState();
    65. // 返回最后一个通过setExclusiveOwnerThread()方法设置过的线程,或者null
    66. if (Thread.currentThread() != getExclusiveOwnerThread()) {
    67. throw new RuntimeException();
    68. }
    69. setState(state - arg);
    70. if (0 == state) {
    71. setExclusiveOwnerThread(null);
    72. return true;
    73. }
    74. return false;
    75. }
    76. protected Condition newCondition() {
    77. return new ConditionObject();
    78. }
    79. }
    80. }

    然后再运行测试:

    1. /**
    2. * 实现Lock接口方法并运行排他锁测试
    3. *
    4. * @author 湘王
    5. */
    6. public class MyExclusiveLockTester {
    7. // 用自定义AQS独占锁实现
    8. private Lock lock = new MyExclusiveLock();
    9. private int value;
    10. public int accmulator() {
    11. lock.lock();
    12. try {
    13. ++value;
    14. } finally {
    15. lock.unlock();
    16. }
    17. return value;
    18. }
    19. public static void main(String[] args) throws InterruptedException {
    20. MyExclusiveLockTester test = new MyExclusiveLockTester();
    21. for (int i = 0; i < 5; i++) {
    22. new Thread(new Runnable() {
    23. @Override
    24. public void run() {
    25. for (int i = 0; i < 5; i++) {
    26. System.out.println(test.accmulator());
    27. }
    28. }
    29. }).start();
    30. }
    31. }
    32. }

    可以看到,结果无论怎么样都不会再重复了。

    这个只是简单的累加,接下来用AQS来实现一个实际的生活场景。比如周末带女票或男票去步行街吃饭,这时候人特别多,需要摇号,而且一次只能进去三张号(不按人头算,按叫到的号来算),该怎么实现呢?

    可以顺着这个思路:摇号机虽有很多号,但它本质上是个共享资源,很多人可以共享,但是每次共享的数量有限。这其实就是个可以指定数量的共享锁而已。

    既然有了思路,那接下来就好办了。

    1. /**
    2. * 利用AQS实现自定义共享锁
    3. *
    4. * @author 湘王
    5. */
    6. public class MyShareLock implements Lock {
    7. @Override
    8. public void lock() {
    9. }
    10. @Override
    11. public void lockInterruptibly() throws InterruptedException {
    12. }
    13. @Override
    14. public boolean tryLock() {
    15. return false;
    16. }
    17. @Override
    18. public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    19. return false;
    20. }
    21. @Override
    22. public void unlock() {
    23. }
    24. @Override
    25. public Condition newCondition() {
    26. return null;
    27. }
    28. }

    还是一样实现Lock接口,但这次是用AQS实现共享锁。

    1. /**
    2. * 内部类继承AQS实现共享锁
    3. *
    4. */
    5. private static class SyncHelper extends AbstractQueuedSynchronizer {
    6. private static final long serialVersionUID = -7357716912664213942L;
    7. /**
    8. * count表示允许几个线程能同时获得锁
    9. */
    10. public SyncHelper(int count) {
    11. if (count <= 0) {
    12. throw new IllegalArgumentException("锁资源数量必须大于0");
    13. }
    14. // 设置资源总数
    15. setState(count);
    16. }
    17. /**
    18. * 一次允许多少个线程进来,允许数量的线程都能拿到锁,其他的线程进入队列
    19. */
    20. @Override
    21. protected int tryAcquireShared(int acquires) {
    22. // 自旋
    23. for (;;) {
    24. int state = getState();
    25. int remain = state - acquires;
    26. // 判断剩余锁资源是否已小于0或者CAS执行是否成功
    27. if (remain < 0 || compareAndSetState(state, remain)) {
    28. return remain;
    29. }
    30. }
    31. }
    32. /**
    33. * 锁资源的获取和释放要一一对应
    34. */
    35. @Override
    36. protected boolean tryReleaseShared(int releases) {
    37. // 自旋
    38. for (;;) {
    39. // 获取当前state
    40. int current = getState();
    41. // 释放状态state增加releases
    42. int next = current + releases;
    43. if (next < current) {// 溢出
    44. throw new Error("Maximum permit count exceeded");
    45. }
    46. // 通过CAS更新state的值
    47. // 这里不能用setState()
    48. if (compareAndSetState(current, next)) {
    49. return true;
    50. }
    51. }
    52. }
    53. protected Condition newCondition() {
    54. return new ConditionObject();
    55. }
    56. }

    然后再来改造之前实现的接口:

    1. /**
    2. * 利用AQS实现自定义共享锁
    3. *
    4. * @author 湘王
    5. */
    6. public class MyShareLock implements Lock {
    7. public static int count;
    8. private final SyncHelper synchepler = new SyncHelper(count);
    9. @Override
    10. public void lock() {
    11. synchepler.acquireShared(1);
    12. }
    13. @Override
    14. public void lockInterruptibly() throws InterruptedException {
    15. synchepler.acquireSharedInterruptibly(1);
    16. }
    17. @Override
    18. public boolean tryLock() {
    19. return synchepler.tryAcquireShared(1) > 0;
    20. }
    21. @Override
    22. public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    23. return synchepler.tryAcquireSharedNanos(1, unit.toNanos(time));
    24. }
    25. @Override
    26. public void unlock() {
    27. synchepler.releaseShared(1);
    28. }
    29. @Override
    30. public Condition newCondition() {
    31. return synchepler.newCondition();
    32. }
    33. /**
    34. * 内部类继承AQS实现共享锁
    35. *
    36. */
    37. private static class SyncHelper extends AbstractQueuedSynchronizer {
    38. private static final long serialVersionUID = -7357716912664213942L;
    39. /**
    40. * count表示允许几个线程能同时获得锁
    41. */
    42. public SyncHelper(int count) {
    43. if (count <= 0) {
    44. throw new IllegalArgumentException("锁资源数量必须大于0");
    45. }
    46. // 设置资源总数
    47. setState(count);
    48. }
    49. /**
    50. * 一次允许多少个线程进来,允许数量的线程都能拿到锁,其他的线程进入队列
    51. */
    52. @Override
    53. protected int tryAcquireShared(int acquires) {
    54. // 自旋
    55. for (;;) {
    56. int state = getState();
    57. int remain = state - acquires;
    58. // 判断剩余锁资源是否已小于0或者CAS执行是否成功
    59. if (remain < 0 || compareAndSetState(state, remain)) {
    60. return remain;
    61. }
    62. }
    63. }
    64. /**
    65. * 锁资源的获取和释放要一一对应
    66. */
    67. @Override
    68. protected boolean tryReleaseShared(int releases) {
    69. // 自旋
    70. for (;;) {
    71. // 获取当前state
    72. int current = getState();
    73. // 释放状态state增加releases
    74. int next = current + releases;
    75. if (next < current) {// 溢出
    76. throw new Error("Maximum permit count exceeded");
    77. }
    78. // 通过CAS更新state的值
    79. // 这里不能用setState()
    80. if (compareAndSetState(current, next)) {
    81. return true;
    82. }
    83. }
    84. }
    85. protected Condition newCondition() {
    86. return new ConditionObject();
    87. }
    88. }
    89. }

    接下来就该测试咱们需要的效果是否能实现了:

    1. public class MyShareLockTester {
    2. public static void main(String[] args) throws InterruptedException {
    3. // 用自定义AQS共享锁实现
    4. // 一次允许发放三把锁
    5. MyShareLock.count = 3;
    6. final Lock lock = new MyShareLock();
    7. // 模拟20个客户端访问
    8. for (int i = 0; i < 20; i++) {
    9. new Thread(new Runnable() {
    10. @Override
    11. public void run() {
    12. try {
    13. lock.lock();
    14. System.out.println("持有 " + Thread.currentThread().getName() + " 的客人可以进餐厅就餐");
    15. // 每两次叫号之间间隔一段时间,模拟真实场景
    16. Thread.sleep(3000);
    17. } catch (InterruptedException e) {
    18. e.printStackTrace();
    19. } finally {
    20. // 使用完成释放锁
    21. lock.unlock();
    22. }
    23. }
    24. }).start();
    25. }
    26. }
    27. }

    这里有20个号,每次只能发放3张,运行之后就可以看到确实如此。

    AQS是个很神奇也很好玩的东西,就像它的作者(也是除了高司令就是对Java影响最大的那个人,整个Java的多线程juc包代码就是他编写的)Doug Lea在AbstractQueuedSynchronizer的注释中所说:AQS只是一个框架,至于怎么玩,就是你的事了!


    感谢您的大驾光临!咨询技术、产品、运营和管理相关问题,请关注后留言。欢迎骚扰,不胜荣幸~

  • 相关阅读:
    视频抽帧转图片,opencv和ffmpeg效果测评
    杭州和智高薪招聘云计算/存储/网络工程师!
    【Pytorch】深度学习之优化器
    UG\NX二次开发 取消抑制特征 UF_MODL_unsuppress_feature
    Python点云处理(十九)点云地面点提取——CSF布料模拟算法
    七牛qshell 批量上传 mac 本地目录
    gitlub 加载慢问题处理
    docker入门配置
    【web应用系统实践】第四章作业
    股票程序化交易如何把交易分析简单化?
  • 原文地址:https://blog.csdn.net/lostrex/article/details/127604946