• JUC学习笔记——共享模型之无锁


    在本系列内容中我们会对JUC做一个系统的学习,本片将会介绍JUC的无锁

    我们会分为以下几部分进行介绍:

    • 无锁操作
    • CAS与Volatile
    • 原子类型
    • 原理篇
    • Unsafe

    并发无锁操作

    这一小节我们将讲解如何用无锁操作完成并发操作

    问题展现

    我们给出一段之前并发展示代码:

    1. /*并发代码*/
    2. package cn.itcast;
    3. import java.util.ArrayList;
    4. import java.util.List;
    5. interface Account {
    6. // 获取余额
    7. Integer getBalance();
    8. // 取款
    9. void withdraw(Integer amount);
    10. /**
    11. * 方法内会启动 1000 个线程,每个线程做 -10 元 的操作
    12. * 如果初始余额为 10000 那么正确的结果应当是 0
    13. */
    14. static void demo(Account account) {
    15. List<Thread> ts = new ArrayList<>();
    16. long start = System.nanoTime();
    17. for (int i = 0; i < 1000; i++) {
    18. ts.add(new Thread(() -> {
    19. account.withdraw(10);
    20. }));
    21. }
    22. ts.forEach(Thread::start);
    23. ts.forEach(t -> {
    24. try {
    25. t.join();
    26. } catch (InterruptedException e) {
    27. e.printStackTrace();
    28. }
    29. });
    30. long end = System.nanoTime();
    31. System.out.println(account.getBalance()
    32. + " cost: " + (end-start)/1000_000 + " ms");
    33. }
    34. }
    35. /*主代码*/
    36. public static void main(String[] args) {
    37. Account.demo(new AccountUnsafe(10000));
    38. }
    39. /*输出结果*/
    40. 330 cost: 306 ms

    解决并发问题(锁)

    我们在之前已经学习过了锁的基本操作,并且可以解决并发问题:

    1. /*并发代码*/
    2. // 给 Account 对象加锁
    3. class AccountUnsafe implements Account {
    4. private Integer balance;
    5. public AccountUnsafe(Integer balance) {
    6. this.balance = balance;
    7. }
    8. @Override
    9. public synchronized Integer getBalance() {
    10. return balance;
    11. }
    12. @Override
    13. public synchronized void withdraw(Integer amount) {
    14. balance -= amount;
    15. }
    16. }
    17. /*主代码*/
    18. public static void main(String[] args) {
    19. Account.demo(new AccountUnsafe(10000));
    20. }
    21. /*输出结果*/
    22. 0 cost: 399 ms

    解决并发问题(无锁)

    JDK为我们提供了几种乐观锁的无锁并发问题解决类型:

    1. /*解释*/
    2. AtomicInteger:原子int类型,属于实现类,传入一个integer类型的参数,可以调用其内部方法对其改变
    3. AtomicInteger内部有一个value值,该值会存放你传入的Integer参数,其所有方法都是对该值进行改变或获得!
    4. /*并发代码*/
    5. class AccountSafe implements Account {
    6. // 定义共享数据为乐观锁AtomicInteger类型
    7. private AtomicInteger balance;
    8. // 构造方法,在创建类时,将传入的参数创建为AtomicInteger类型并赋值
    9. public AccountSafe(Integer balance) {
    10. this.balance = new AtomicInteger(balance);
    11. }
    12. // 获得:调用AtomicInteger类型的get方法
    13. @Override
    14. public Integer getBalance() {
    15. return balance.get();
    16. }
    17. // 改变:amount为值
    18. @Override
    19. public void withdraw(Integer amount) {
    20. // 一直进行直到完成操作
    21. while (true) {
    22. // 记录修改前的值和修改后的值
    23. int prev = balance.get();
    24. int next = prev - amount;
    25. // 采用compareAndSet,首先对比当前值是否为prev,如果是将该值修改为next,并返回true
    26. if (balance.compareAndSet(prev, next)) {
    27. // 执行成功,退出循环!
    28. break;
    29. }
    30. }
    31. // 可以简化为下面的方法
    32. // balance.addAndGet(-1 * amount);
    33. }
    34. }
    35. /*主代码*/
    36. public static void main(String[] args) {
    37. Account.demo(new AccountSafe(10000));
    38. }
    39. /*运行结果*/
    40. 0 cost: 302 ms
    41. /*补充说明*/
    42. 乐观锁并没有运用锁,它采用的是不断运行,如果可以执行就执行,如果不可以执行就一直运行直到执行成功!

    CAS 与 volatile

    这一小节我们将讲解无锁操作中的CAS和Volatile相关内容

    CAS简述

    首先我们介绍一下CAS:

    • compareAndSet,它的简称就是 CAS (也有 Compare And Swap 的说法),它必须是原子操作。

    此外我们还需要知道CAS本身就是原子操作

    • 其实 CAS 的底层是 lock cmpxchg 指令(X86 架构),在单核 CPU 和多核 CPU 下都能够保证【比较-交换】的原子性。

    • 在多核状态下,某个核执行到带 lock 的指令时,CPU 会让总线锁住,当这个核把此指令执行完毕,再 开启总线。

    • 这个过程中不会被线程的调度机制所打断,保证了多个线程对内存操作的准确性,是原子的。

    我们给出一张CAS操作的展示图:

    Volatile

    我们在之前已经详细的介绍了Volatile的内容:

    • Volatile可以保证该数据元素的可见性以及有序性
    • 获取共享变量时,为了保证该变量的可见性,需要使用 volatile 修饰。
    • 他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作 volatile 变量都是直接操作主存。
    • 即一个线程对 volatile 变量的修改,对另一个线程可见。

    CAS必须搭配Volatile共同使用:

    • CAS 必须借助volatile才能读取到共享变量的最新值来实现【比较并交换】的效果。

    CAS特点

    我们来简单介绍一下CAS的特点:

    • 结合 CAS 和 volatile 可以实现无锁并发,适用于线程数少、多核 CPU 的场景下。
    • CAS 体现的是无锁并发、无阻塞并发
    • 因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一
    • 但如果竞争激烈,可以想到重试必然频繁发生,反而效率会受影响

    我们反观Synchronized的特点:

    • synchronized 是基于悲观锁的思想:
    • 最悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想改,我改完了解开锁,你们才有机会。

    因而我们其实可以很清楚的明白无锁操作是要比锁操作速度要快的:

    • 无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,类似于自旋。而 synchronized 会让线程在没有获得锁的时候,发生上下文切换,进入阻塞。线程的上下文切换是费时的,在重试次数不是太多时,无锁的效率高于有锁。
    • 线程就好像高速跑道上的赛车,高速运行时,速度超快,一旦发生上下文切换,就好比赛车要减速、熄火, 等被唤醒又得重新打火、启动、加速... 恢复到高速运行,代价比较大

    但是也有特殊状况:

    • 但无锁情况下,因为线程要保持运行,需要额外 CPU 的支持,CPU 在这里就好比高速跑道,没有额外的跑道,线程想高速运行也无从谈起,虽然不会进入阻塞,但由于没有分到时间片,仍然会进入可运行状态,还是会导致上下文切换。
    • 所以总的来说,当线程数小于等于cpu核心数时,使用无锁方案是很合适的,因为有足够多的cpu让线程运行。
    • 当线程数远多于cpu核心数时,无锁效率相比于有锁就没有太大优势,因为依旧会发生上下文切换。

    原子类型

    这一小节我们将讲解无锁操作中的各种原子类型

    原子整数

    首先我们来介绍一下原子整数,大致分为三类:

    • AtomicBoolean
    • AtomicInteger
    • AtomicLong

    由于三种原子整数相似,我们仅给出一种实例:

    1. /*原子整数型介绍*/
    2. - AtomicBoolean:布尔类型的原子整数
    3. - AtomicInteger:int类型的原子整数
    4. - AtomicLong:Long类型的原子整数
    5. /*相关方法展示*/
    6. // 首先创建一个AtomicInteger类型
    7. AtomicInteger i = new AtomicInteger(0);
    8. // 获取并自增(i = 0, 结果 i = 1, 返回 0),类似于 i++
    9. System.out.println(i.getAndIncrement());
    10. // 自增并获取(i = 1, 结果 i = 2, 返回 2),类似于 ++i
    11. System.out.println(i.incrementAndGet());
    12. // 自减并获取(i = 2, 结果 i = 1, 返回 1),类似于 --i
    13. System.out.println(i.decrementAndGet());
    14. // 获取并自减(i = 1, 结果 i = 0, 返回 1),类似于 i--
    15. System.out.println(i.getAndDecrement());
    16. // 获取并加值(i = 0, 结果 i = 5, 返回 0
    17. System.out.println(i.getAndAdd(5));
    18. // 加值并获取(i = 5, 结果 i = 0, 返回 0
    19. System.out.println(i.addAndGet(-5));
    20. // 获取并更新(i = 0, p 为 i 的当前值, 结果 i = -2, 返回 0
    21. // 其中函数中的操作能保证原子,但函数需要无副作用
    22. System.out.println(i.getAndUpdate(p -> p - 2));
    23. // 更新并获取(i = -2, p 为 i 的当前值, 结果 i = 0, 返回 0
    24. // 其中函数中的操作能保证原子,但函数需要无副作用
    25. System.out.println(i.updateAndGet(p -> p + 2));
    26. // 获取并计算(i = 0, p 为 i 的当前值, x 为参数1, 结果 i = 10, 返回 0
    27. // 其中函数中的操作能保证原子,但函数需要无副作用
    28. // getAndUpdate 如果在 lambda 中引用了外部的局部变量,要保证该局部变量是 final
    29. // getAndAccumulate 可以通过 参数1 来引用外部的局部变量,但因为其不在 lambda 中因此不必是 final
    30. System.out.println(i.getAndAccumulate(10, (p, x) -> p + x));
    31. // 计算并获取(i = 10, p 为 i 的当前值, x 为参数1, 结果 i = 0, 返回 0
    32. // 其中函数中的操作能保证原子,但函数需要无副作用
    33. System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));
    34. /*相关知识补充*/
    35. getAndUpdate,updateAndGet,getAndAccumulate,accumulateAndGet所需的参数都是IntUnaryOperator类型
    36. 该类型只有一个抽象方法,其需要输入int类型,会返回类型,我们只需要采用Lambda表达式重新构造即可使用!
    37. /*源码展示*/
    38. // 以上方法都是以CAS为基础进行了封装,保证了方法的原子性和变量的可见性。
    39. // 我们调其中一个方法进行底层代码的剖析:
    40. public static int updateAndGet(AtomicInteger i, IntUnaryOperator operator){
    41. while (true){
    42. int prev = i.get();
    43. int next = operator.applyAsInt(prev);
    44. if(i.compareAndSet(prev,next)){
    45. return next;
    46. }
    47. }
    48. }

    原子引用

    我们的原子引用主要分为三类:

    • AtomicReference
    • AtomicMarkableReference
    • AtomicStampedReference

    我们针对三种不同的原子引用类型展开讲解:

    1. /*基本解释*/
    2. - AtomicReference :最基本的原子引用,引用对象后,针对其方法进行改变即可
    3. - AtomicMarkableReference :在AtomicReference的基础上,多了一个版本号,用来检测当前版本与当时未修改前的版本的差距
    4. - AtomicStampedReference :在AtomicReference的基础上,多一个判断,用来判断该对象是否被修改
    5. /*不安全版本*/
    6. class DecimalAccountUnsafe implements DecimalAccount {
    7. BigDecimal balance;
    8. public DecimalAccountUnsafe(BigDecimal balance) {
    9. this.balance = balance;
    10. }
    11. @Override
    12. public BigDecimal getBalance() {
    13. return balance;
    14. }
    15. @Override
    16. public void withdraw(BigDecimal amount) {
    17. BigDecimal balance = this.getBalance();
    18. this.balance = balance.subtract(amount);
    19. }
    20. }
    21. /**/
    22. class DecimalAccountSafeLock implements DecimalAccount {
    23. private final Object lock = new Object();
    24. BigDecimal balance;
    25. public DecimalAccountSafeLock(BigDecimal balance) {
    26. this.balance = balance;
    27. }
    28. @Override
    29. public BigDecimal getBalance() {
    30. return balance;
    31. }
    32. @Override
    33. public void withdraw(BigDecimal amount) {
    34. synchronized (lock) {
    35. BigDecimal balance = this.getBalance();
    36. this.balance = balance.subtract(amount);
    37. }
    38. }
    39. }
    40. /*AtomicReference*/
    41. class DecimalAccountSafeCas implements DecimalAccount {
    42. // 我们并不是直接获得对象本身,而是采用一个AtomicReference类进行包装,里面装的是BigDecimal类型的value
    43. // 同样我们后续的操作都是针对这个value值操作
    44. AtomicReference<BigDecimal> ref;
    45. // 初始化进行赋值
    46. public DecimalAccountSafeCas(BigDecimal balance) {
    47. ref = new AtomicReference<>(balance);
    48. }
    49. // 得到value
    50. @Override
    51. public BigDecimal getBalance() {
    52. return ref.get();
    53. }
    54. // 采用compareAndSet方法对value值进行判断并修改
    55. // 但这时我们只能根据其prev来进行判断,prev和当前值相同,改为next;若不同不修改;无法判断内部是否经过改变
    56. // 比如,最开始为a,后面有一个线程a->b,又出现一个线程b->a,当然我们当前线程就会默认没有发生变化而直接改变为next目标值
    57. @Override
    58. public void withdraw(BigDecimal amount) {
    59. while (true) {
    60. BigDecimal prev = ref.get();
    61. BigDecimal next = prev.subtract(amount);
    62. if (ref.compareAndSet(prev, next)) {
    63. break;
    64. }
    65. }
    66. }
    67. }
    68. /*AtomicMarkableReference */
    69. // 模拟操作
    70. @Slf4j
    71. public class TestABAAtomicMarkableReference {
    72. public static void main(String[] args) throws InterruptedException {
    73. // 首先创建一个垃圾袋装满垃圾
    74. GarbageBag bag = new GarbageBag("装满了垃圾");
    75. // 我们采用AtomicMarkableReference封装对象,对象为垃圾袋;参数2 mark 可以看作一个标记,表示垃圾袋满了
    76. AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag, true);
    77. // 开始操作(打印当前状况)
    78. log.debug("主线程 start...");
    79. GarbageBag prev = ref.getReference();
    80. log.debug(prev.toString());
    81. // 该线程负责打扫垃圾,这时我们会调用AtomicMarkableReference的compareAndSet方法
    82. // 里面不仅包含了value的prev和next还包含了mark的prev和next
    83. new Thread(() -> {
    84. log.debug("打扫卫生的线程 start...");
    85. bag.setDesc("空垃圾袋");
    86. while (!ref.compareAndSet(bag, bag, true, false)) {}
    87. log.debug(bag.toString());
    88. }).start();
    89. // 在前面的线程修改之后,我们如果还想判断修改,这时需要prev为true,但已经改变为false,所以不会执行
    90. Thread.sleep(1000);
    91. log.debug("主线程想换一只新垃圾袋?");
    92. boolean success = ref.compareAndSet(prev, new GarbageBag("空垃圾袋"), true, false);
    93. log.debug("换了么?" + success);
    94. log.debug(ref.getReference().toString());
    95. }
    96. }
    97. // 模拟垃圾袋
    98. class GarbageBag {
    99. String desc;
    100. public GarbageBag(String desc) {
    101. this.desc = desc;
    102. }
    103. public void setDesc(String desc) {
    104. this.desc = desc;
    105. }
    106. @Override
    107. public String toString() {
    108. return super.toString() + " " + desc;
    109. }
    110. }
    111. // 结果展示
    112. 2019-10-13 15:30:09.264 [main] 主线程 start...
    113. 2019-10-13 15:30:09.270 [main] cn.itcast.GarbageBag@5f0fd5a0 装满了垃圾
    114. 2019-10-13 15:30:09.293 [Thread-1] 打扫卫生的线程 start...
    115. 2019-10-13 15:30:09.294 [Thread-1] cn.itcast.GarbageBag@5f0fd5a0 空垃圾袋
    116. 2019-10-13 15:30:10.294 [main] 主线程想换一只新垃圾袋?
    117. 2019-10-13 15:30:10.294 [main] 换了么?false
    118. 2019-10-13 15:30:10.294 [main] cn.itcast.GarbageBag@5f0fd5a0 空垃圾袋
    119. /*AtomicStampedReference */
    120. // 代码展示
    121. // 我们同样采用AtomicStampedReference来装载对象,但是AtomicStampedReference会多一个版本号
    122. // 该版本号可以进行++修改,这样我们就可以得知我们进行了几次修改
    123. static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0);
    124. public static void main(String[] args) throws InterruptedException {
    125. log.debug("main start...");
    126. // 获取值 A
    127. String prev = ref.getReference();
    128. // 获取版本号
    129. int stamp = ref.getStamp();
    130. log.debug("版本 {}", stamp);
    131. // 如果中间有其它线程干扰,发生了 ABA 现象
    132. other();
    133. sleep(1);
    134. // 尝试改为 C
    135. log.debug("change A->C {}", ref.compareAndSet(prev, "C", stamp, stamp + 1));
    136. }
    137. private static void other() {
    138. new Thread(() -> {
    139. log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B",
    140. ref.getStamp(), ref.getStamp() + 1));
    141. log.debug("更新版本为 {}", ref.getStamp());
    142. }, "t1").start();
    143. sleep(0.5);
    144. new Thread(() -> {
    145. log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A",
    146. ref.getStamp(), ref.getStamp() + 1));
    147. log.debug("更新版本为 {}", ref.getStamp());
    148. }, "t2").start();
    149. }
    150. // 结果展示
    151. 15:41:34.891 c.Test36 [main] - main start...
    152. 15:41:34.894 c.Test36 [main] - 版本 0
    153. 15:41:34.956 c.Test36 [t1] - change A->B true
    154. 15:41:34.956 c.Test36 [t1] - 更新版本为 1
    155. 15:41:35.457 c.Test36 [t2] - change B->A true
    156. 15:41:35.457 c.Test36 [t2] - 更新版本为 2
    157. 15:41:36.457 c.Test36 [main] - change A->C false

    原子数组

    我们的原子引用主要分为三类:

    • AtomicIntegerArray
    • AtomicLongArray
    • AtomicReferenceArray

    这三种数组除了内部包含的元素不同外基本相同,所以我们仅介绍一种:

    1. /*lambda知识点补充*/
    2. 我们将Lambda里面的类型分为三种类型:
    3. supplier 提供者 无中生有 ()->结果
    4. function 函数 一个参数一个结果 (参数)->结果;BiFunction (参数1,参数2)->结果
    5. consumer 消费者 一个参数没结果 (参数)->void;BiConsumer (参数1,参数2)->void
    6. /*统一数组处理机制*/
    7. // 我们给出一套Lambda方法处理数组的Demo函数来进行检测
    8. // 参数1,提供数组、可以是线程不安全数组或线程安全数组
    9. // 参数2,获取数组长度的方法
    10. // 参数3,自增方法,回传 array, index
    11. // 参数4,打印数组的方法
    12. private static <T> void demo(
    13. Supplier<T> arraySupplier,
    14. Function<T, Integer> lengthFun,
    15. BiConsumer<T, Integer> putConsumer,
    16. Consumer<T> printConsumer ) {
    17. List<Thread> ts = new ArrayList<>();
    18. T array = arraySupplier.get();
    19. int length = lengthFun.apply(array);
    20. for (int i = 0; i < length; i++) {
    21. // 每个线程对数组作 10000 次操作
    22. ts.add(new Thread(() -> {
    23. for (int j = 0; j < 10000; j++) {
    24. putConsumer.accept(array, j%length);
    25. }
    26. }));
    27. }
    28. ts.forEach(t -> t.start()); // 启动所有线程
    29. ts.forEach(t -> {
    30. try {
    31. t.join();
    32. } catch (InterruptedException e) {
    33. e.printStackTrace();
    34. }
    35. }); // 等所有线程结束
    36. printConsumer.accept(array);
    37. }
    38. /*不安全数组*/
    39. // 代码
    40. demo(
    41. ()->new int[10],
    42. (array)->array.length,
    43. (array, index) -> array[index]++,
    44. array-> System.out.println(Arrays.toString(array))
    45. );
    46. // 结果
    47. [9870, 9862, 9774, 9697, 9683, 9678, 9679, 9668, 9680, 9698]
    48. /*安全数组:大部分方法和原子整数以及原子引用完全相同*/
    49. // 代码
    50. demo(
    51. ()-> new AtomicIntegerArray(10),
    52. (array) -> array.length(),
    53. (array, index) -> array.getAndIncrement(index),
    54. array -> System.out.println(array)
    55. );
    56. // 结果
    57. [10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]

    字段更新器

    我们首先来介绍一下字段更新器:

    • 利用字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合 volatile 修饰的字段使用,否则会出现异常

    我们的字段更新器通常分为三种:

    • AtomicIntegerFieldUpdater
    • AtomicLongFieldUpdater
    • AtomicReferenceFieldUpdater

    我们来进行代码展示:

    1. /*代码展示*/
    2. public class Test5 {
    3. // 在该类中设置字段
    4. private volatile int field;
    5. public static void main(String[] args) {
    6. // 创建字段更新器,前面为类名.class,后面为属性名
    7. AtomicIntegerFieldUpdater fieldUpdater =
    8. AtomicIntegerFieldUpdater.newUpdater(Test5.class, "field");
    9. // 我们需创建一个对象,作为字段更新器的类对象
    10. Test5 test5 = new Test5();
    11. // 修改0->10
    12. fieldUpdater.compareAndSet(test5, 0, 10);
    13. System.out.println(test5.field);
    14. // 修改10->20
    15. fieldUpdater.compareAndSet(test5, 10, 20);
    16. System.out.println(test5.field);
    17. // 修改10->30,修改失败!
    18. fieldUpdater.compareAndSet(test5, 10, 30);
    19. System.out.println(test5.field);
    20. }
    21. }
    22. /*结果展示*/
    23. 10
    24. 20
    25. 20

    原子累加器

    CAS专门创建了一种原子累加器,其由于性能远高于正常CAS,固被留下使用:

    • LongAdder
    • 性能提升的原因很简单,在有竞争时,设置多个累加单元,Therad-0 累加 Cell[0],而 Thread-1 累加 Cell[1]... 最后将结果汇总。
    • 这样它们在累加时操作的不同的 Cell 变量,因此减少了CAS 重试失败,从而提高性能。

    我们采用代码进行比对:

    1. /*主代码*/
    2. // 原子累加器
    3. for (int i = 0; i < 5; i++) {
    4. demo(() -> new LongAdder(), adder -> adder.increment());
    5. }
    6. // 正常CAS操作
    7. for (int i = 0; i < 5; i++) {
    8. demo(() -> new AtomicLong(), adder -> adder.getAndIncrement());
    9. }
    10. /*测试函数*/
    11. private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) {
    12. T adder = adderSupplier.get();
    13. long start = System.nanoTime();
    14. List<Thread> ts = new ArrayList<>();
    15. // 4 个线程,每人累加 50
    16. for (int i = 0; i < 40; i++) {
    17. ts.add(new Thread(() -> {
    18. for (int j = 0; j < 500000; j++) {
    19. action.accept(adder);
    20. }
    21. }));
    22. }
    23. ts.forEach(t -> t.start());
    24. ts.forEach(t -> {
    25. try {
    26. t.join();
    27. } catch (InterruptedException e) {
    28. e.printStackTrace();
    29. }
    30. });
    31. long end = System.nanoTime();
    32. System.out.println(adder + " cost:" + (end - start)/1000_000);
    33. }
    34. /*结果对比*/
    35. 1000000 cost:43
    36. 1000000 cost:9
    37. 1000000 cost:7
    38. 1000000 cost:7
    39. 1000000 cost:7
    40. 1000000 cost:31
    41. 1000000 cost:27
    42. 1000000 cost:28
    43. 1000000 cost:24
    44. 1000000 cost:22

    原理篇

    这一小节我们将讲解无锁操作中的一些原理内容

    LongAdder组成

    首先我们给出LongAdder组成部分:

    1. /*LongAdder组成*/
    2. // 累加单元数组, 懒惰初始化
    3. transient volatile Cell[] cells;
    4. // 基础值, 如果没有竞争, 则用 cas 累加这个域
    5. transient volatile long base;
    6. // 在 cells 创建或扩容时, 置为 1, 表示加锁
    7. transient volatile int cellsBusy;
    8. /*Cell组成*/
    9. // 防止缓存行伪共享
    10. @sun.misc.Contended
    11. static final class Cell {
    12. volatile long value;
    13. Cell(long x) { value = x; }
    14. // 最重要的方法, 用来 cas 方式进行累加, prev 表示旧值, next 表示新值
    15. final boolean cas(long prev, long next) {
    16. return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next);
    17. }
    18. // 省略不重要代码
    19. }

    LongAdder加锁设置

    我们可以看到这里的是否创建cell采用的是一种CAS锁的机制,我们这里简单介绍一下:

    1. package cn.itcast.test;
    2. import lombok.extern.slf4j.Slf4j;
    3. import java.util.concurrent.atomic.AtomicInteger;
    4. import static cn.itcast.n2.util.Sleeper.sleep;
    5. @Slf4j(topic = "c.Test42")
    6. public class LockCas {
    7. // 0 没加锁
    8. // 1 加锁
    9. private AtomicInteger state = new AtomicInteger(0);
    10. public void lock() {
    11. while (true) {
    12. if (state.compareAndSet(0, 1)) {
    13. break;
    14. }
    15. }
    16. }
    17. public void unlock() {
    18. log.debug("unlock...");
    19. state.set(0);
    20. }
    21. public static void main(String[] args) {
    22. LockCas lock = new LockCas();
    23. new Thread(() -> {
    24. log.debug("begin...");
    25. lock.lock();
    26. try {
    27. log.debug("lock...");
    28. sleep(1);
    29. } finally {
    30. lock.unlock();
    31. }
    32. }).start();
    33. new Thread(() -> {
    34. log.debug("begin...");
    35. lock.lock();
    36. try {
    37. log.debug("lock...");
    38. } finally {
    39. lock.unlock();
    40. }
    41. }).start();
    42. }
    43. }

    LongAdder伪共享

    我们在Cell的类代码中可以看到这个注解:

    • @sun.misc.Contended:防止缓存行伪共享

    首先我们知道内存之上还有缓存,其速度是具有极大差距的:

    从 cpu 到大约需要的时钟周期
    寄存器1 cycle (4GHz 的 CPU 约为0.25ns)
    L13~4 cycle
    L210~20 cycle
    L340~45 cycle
    内存120~240 cycle

    缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中

    同时CPU 要保证数据的一致性,如果某个 CPU 核心更改了数据,其它 CPU 核心对应的整个缓存行必须失效

    我们给出简单示例图:

    所以如果我们想要实现CAS的多处理器直接操作最后相加的想法就需要使缓存的这个特性变化:

    因而我们就采用注解的方法:

    • @sun.misc.Contended 用来解决这个问题
    • 它的原理是在使用此注解的对象或字段的前后各增加 128 字节大小的padding,
    • 从而让 CPU 将对象预读至缓存时占用不同的缓存行,这样不会造成对方缓存行的失效

    Unsafe

    这一小节我们将讲解Unsafe类

    Unsafe概述

    我们首先来简单介绍一下Unsafe:

    • Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射获得
    • jdk8直接调用Unsafe.getUnsafe()获得的unsafe不能用。

    我们给出简单例子展示:

    1. /*获得Unsafe*/
    2. public class UnsafeAccessor {
    3. static Unsafe unsafe;
    4. static {
    5. try {
    6. Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
    7. theUnsafe.setAccessible(true);
    8. unsafe = (Unsafe) theUnsafe.get(null);
    9. } catch (NoSuchFieldException | IllegalAccessException e) {
    10. throw new Error(e);
    11. }
    12. }
    13. static Unsafe getUnsafe() {
    14. return unsafe;
    15. }
    16. }
    17. /*unsafe使用(会操作底层内存等数据,尽量不要使用)*/
    18. //以下三个方法只执行一次,成功返回true,不成功返回false
    19. public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
    20. public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
    21. public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
    22. //以下方法都是在以上三个方法的基础上进行封装,会循环直到成功为止。
    23. public final int getAndAddInt(Object var1, long var2, int var4) {
    24. int var5;
    25. do {
    26. var5 = this.getIntVolatile(var1, var2);
    27. } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
    28. return var5;
    29. }
    30. public final long getAndAddLong(Object var1, long var2, long var4) {
    31. long var6;
    32. do {
    33. var6 = this.getLongVolatile(var1, var2);
    34. } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));
    35. return var6;
    36. }
    37. public final int getAndSetInt(Object var1, long var2, int var4) {
    38. int var5;
    39. do {
    40. var5 = this.getIntVolatile(var1, var2);
    41. } while(!this.compareAndSwapInt(var1, var2, var5, var4));
    42. return var5;
    43. }
    44. public final long getAndSetLong(Object var1, long var2, long var4) {
    45. long var6;
    46. do {
    47. var6 = this.getLongVolatile(var1, var2);
    48. } while(!this.compareAndSwapLong(var1, var2, var6, var4));
    49. return var6;
    50. }
    51. public final Object getAndSetObject(Object var1, long var2, Object var4) {
    52. Object var5;
    53. do {
    54. var5 = this.getObjectVolatile(var1, var2);
    55. } while(!this.compareAndSwapObject(var1, var2, var5, var4));

    Unsafe CAS 操作

    下面我们讲解Unsafe来进行CAS操作的具体代码:

    1. /*unsafe实现字段更新*/
    2. // 主函数
    3. public class unsafeOperator{
    4. Unsafe unsafe = UnsafeAccessor.getUnsafe();
    5. Field id = Student.class.getDeclaredField("id");
    6. Field name = Student.class.getDeclaredField("name");
    7. // 获得成员变量的偏移量
    8. long idOffset = UnsafeAccessor.unsafe.objectFieldOffset(id);
    9. long nameOffset = UnsafeAccessor.unsafe.objectFieldOffset(name);
    10. Student student = new Student();
    11. // 使用 cas 方法替换成员变量的值
    12. UnsafeAccessor.unsafe.compareAndSwapInt(student, idOffset, 0, 20); // 返回 true
    13. UnsafeAccessor.unsafe.compareAndSwapObject(student, nameOffset, null, "张三"); // 返回 true
    14. System.out.println(student);
    15. }
    16. // 学生类
    17. @Data
    18. class Student {
    19. volatile int id;
    20. volatile String name;
    21. }
    22. // 输出
    23. Student(id=20, name=张三)
    24. /*unsafe实现原子整数*/
    25. // 主函数
    26. class AtomicData {
    27. private volatile int data;
    28. static final Unsafe unsafe;
    29. static final long DATA_OFFSET;
    30. static {
    31. unsafe = UnsafeAccessor.getUnsafe();
    32. try {
    33. // data 属性在 DataContainer 对象中的偏移量,用于 Unsafe 直接访问该属性
    34. DATA_OFFSET = unsafe.objectFieldOffset(AtomicData.class.getDeclaredField("data"));
    35. } catch (NoSuchFieldException e) {
    36. throw new Error(e);
    37. }
    38. }
    39. public AtomicData(int data) {
    40. this.data = data;
    41. }
    42. public void decrease(int amount) {
    43. int oldValue;
    44. while(true) {
    45. // 获取共享变量旧值,可以在这一行加入断点,修改 data 调试来加深理解
    46. oldValue = data;
    47. // cas 尝试修改 data 为 旧值 + amount,如果期间旧值被别的线程改了,返回 false
    48. if (unsafe.compareAndSwapInt(this, DATA_OFFSET, oldValue, oldValue - amount)) {
    49. return;
    50. }
    51. }
    52. }
    53. public int getData() {
    54. return data;
    55. }
    56. }
    57. // Account
    58. Account.demo(new Account() {
    59. AtomicData atomicData = new AtomicData(10000);
    60. @Override
    61. public Integer getBalance() {
    62. return atomicData.getData();
    63. }
    64. @Override
    65. public void withdraw(Integer amount) {
    66. atomicData.decrease(amount);
    67. }
    68. });
    69. /*手动实现原子整数完整版+测试*/
    70. public class UnsafeAtomicTest{
    71. public static void main(String[] args) {
    72. //赋初始值10000,调用demo后正确的输出结果为0
    73. AccountImpl account = new AccountImpl(10000);
    74. //结果正确地输出0
    75. account.demo();
    76. }
    77. }
    78. interface Account{
    79. //获取balance的方法
    80. int getBalance();
    81. //取款的方法
    82. void decrease(int amount);
    83. //演示多线程取款,检查安全性。
    84. default void demo(){
    85. ArrayList<Thread> ts = new ArrayList<>(1000);
    86. for (int i = 0; i < 1000; i++) {
    87. ts.add(new Thread(() -> {
    88. decrease(10);
    89. }));
    90. }
    91. for (Thread t:ts) {
    92. t.start();
    93. }
    94. for (Thread t:ts) {
    95. try {
    96. t.join();
    97. } catch (InterruptedException e) {
    98. e.printStackTrace();
    99. }
    100. }
    101. System.out.println(getBalance());
    102. }
    103. }
    104. //实现账户类,使用手动实现的原子整数作为余额类型
    105. class AccountImpl implements Account{
    106. UnsafeAtomicInteger balance;
    107. public AccountImpl(int balance){
    108. this.balance = new UnsafeAtomicInteger(balance);
    109. }
    110. @Override
    111. public int getBalance() {
    112. return balance.get();
    113. }
    114. @Override
    115. public void decrease(int amount) {
    116. balance.getAndAccumulate(amount,(x,y) -> y - x);
    117. }
    118. }
    119. //手动实现原子整数类
    120. class UnsafeAtomicInteger {
    121. //value声明为volatile,因为乐观锁需要可见性。
    122. private volatile int value;
    123. //需要Unsafe的cas本地方法实现操作。
    124. private static final Unsafe unsafe;
    125. //偏移量,这两个变量很重要且通用、不可变,所以均声明为private static final
    126. private static final long offset;
    127. static{
    128. //静态代码块初始化unsafe
    129. unsafe = UnsafeAccessor.getUnsafe();
    130. try {
    131. //获取value在当前类中的偏移量
    132. offset = unsafe.objectFieldOffset(UnsafeAtomicInteger.class.getDeclaredField("value"));
    133. } catch (NoSuchFieldException e) {
    134. e.printStackTrace();
    135. //待研究
    136. throw new Error(e);
    137. }
    138. }
    139. public UnsafeAtomicInteger(){
    140. }
    141. public UnsafeAtomicInteger(int value){
    142. this.value = value;
    143. }
    144. public final int get(){
    145. return value;
    146. }
    147. public final boolean compareAndSet(int expext,int update){
    148. return unsafe.compareAndSwapInt(this, offset, expext, update);
    149. }
    150. public final int getAndIncrement(){
    151. //局部变量是必须的,因为多次从主存中读取value的值不可靠。
    152. int oldValue;
    153. while (true){
    154. oldValue = value;
    155. if(unsafe.compareAndSwapInt(this,offset,oldValue,oldValue + 1)){
    156. return oldValue;
    157. }
    158. }
    159. }
    160. public final int incrementAndGet(){
    161. int oldValue;
    162. while (true){
    163. oldValue = value;
    164. if (unsafe.compareAndSwapInt(this, offset, oldValue, oldValue + 1)) {
    165. return oldValue + 1;
    166. }
    167. }
    168. }
    169. public final int getAndDecrement(){
    170. int oldValue;
    171. while (true){
    172. oldValue = value;
    173. if (unsafe.compareAndSwapInt(this, offset, oldValue, oldValue - 1)) {
    174. return oldValue;
    175. }
    176. }
    177. }
    178. public final int decrementAndGet(){
    179. int oldValue;
    180. while (true){
    181. oldValue = value;
    182. if (unsafe.compareAndSwapInt(this, offset, oldValue, oldValue - 1)) {
    183. return oldValue - 1;
    184. }
    185. }
    186. }
    187. public final int getAndUpdate(IntUnaryOperator operator){
    188. int oldValue;
    189. int newValue;
    190. while (true){
    191. oldValue = value;
    192. newValue = operator.applyAsInt(oldValue);
    193. if (unsafe.compareAndSwapInt(this, offset, oldValue, newValue)) {
    194. return oldValue;
    195. }
    196. }
    197. }
    198. public final int updateAndGet(IntUnaryOperator operator){
    199. int oldValue;
    200. int newValue;
    201. while (true){
    202. oldValue = value;
    203. newValue = operator.applyAsInt(oldValue);
    204. if (unsafe.compareAndSwapInt(this, offset, oldValue, newValue)) {
    205. return newValue;
    206. }
    207. }
    208. }
    209. public final int getAndAccumulate(int x, IntBinaryOperator operator){
    210. int oldValue;
    211. int newValue;
    212. while (true){
    213. oldValue = value;
    214. newValue = operator.applyAsInt(x,oldValue);
    215. if (unsafe.compareAndSwapInt(this, offset, oldValue, newValue)) {
    216. return newValue;
    217. }
    218. }
    219. }
    220. public final int accumulateAndGet(int x, IntBinaryOperator operator){
    221. int oldValue;
    222. int newValue;
    223. while (true){
    224. oldValue = value;
    225. newValue = operator.applyAsInt(x,oldValue);
    226. if (unsafe.compareAndSwapInt(this, offset, oldValue, newValue)) {
    227. return oldValue;
    228. }
    229. }
    230. }
    231. }
    232. class UnsafeAccessor{
    233. public static Unsafe getUnsafe(){
    234. Field field;
    235. Unsafe unsafe = null;
    236. try {
    237. field = Unsafe.class.getDeclaredField("theUnsafe");
    238. field.setAccessible(true);
    239. unsafe = (Unsafe)field.get(null);
    240. } catch (Exception e) {
    241. e.printStackTrace();
    242. }
    243. return unsafe;
    244. }
    245. }

    总结

    我们在这里做一下该篇文章核心内容总结:

    • CAS 与 volatile
    • API
      • 原子整数
      • 原子引用
      • 原子数组
      • 字段更新器
      • 原子累加器
    • Unsafe
    • 伪共享
  • 相关阅读:
    Vue中父子组件通信方式
    SA实战 ·《SpringCloud Alibaba实战》第03章-微服务介绍
    C#中的事件聚合器实现方法
    鼠标拖拽拖动盒子时,与盒子内某些点击事件冲突问题解决
    如何全面提升架构设计的质量
    56资源网系统源码搭建知识付费-含源码
    SDRAM与DRAM
    java毕业设计旅游攻略开发系统mybatis+源码+调试部署+系统+数据库+lw
    Go 使用 RabbitMQ---------------之一
    WPF 分组
  • 原文地址:https://blog.csdn.net/sinat_40572875/article/details/127932635