• (十二) 共享模型之无锁【原子整数、原子引用、原子数组】


    一、原子整数

    J.U.C 并发包提供了:

    AtomicBoolean

    AtomicInteger

    AtomicLong

    1. public class Test34 {
    2. public static void main(String[] args) {
    3. AtomicInteger i = new AtomicInteger(0);
    4. // 获取并自增(i = 0, 结果 i = 1, 返回 0),类似于 i++
    5. System.out.println(i.getAndIncrement());
    6. // 自增并获取(i = 1, 结果 i = 2, 返回 2),类似于 ++i
    7. System.out.println(i.incrementAndGet());
    8. // 自减并获取(i = 2, 结果 i = 1, 返回 1),类似于 --i
    9. System.out.println(i.decrementAndGet());
    10. // 获取并自减(i = 1, 结果 i = 0, 返回 1),类似于 i--
    11. System.out.println(i.getAndDecrement());
    12. // 获取并加值(i = 0, 结果 i = 5, 返回 0)
    13. System.out.println(i.getAndAdd(5));
    14. // 加值并获取(i = 5, 结果 i = 0, 返回 0)
    15. System.out.println(i.addAndGet(-5));
    16. // 获取并更新(i = 0, p 为 i 的当前值, 结果 i = -2, 返回 0)
    17. // 其中函数中的操作能保证原子,但函数需要无副作用
    18. System.out.println(i.getAndUpdate(p -> p - 2));
    19. // 更新并获取(i = -2, p 为 i 的当前值, 结果 i = 0, 返回 0)
    20. // 其中函数中的操作能保证原子,但函数需要无副作用
    21. System.out.println(i.updateAndGet(p -> p + 2));
    22. // 获取并计算(i = 0, p 为 i 的当前值, x 为参数1, 结果 i = 10, 返回 0)
    23. // 其中函数中的操作能保证原子,但函数需要无副作用
    24. // getAndUpdate 如果在 lambda 中引用了外部的局部变量,要保证该局部变量是 final 的
    25. // getAndAccumulate 可以通过 参数1 来引用外部的局部变量,但因为其不在 lambda 中因此不必是 final
    26. System.out.println(i.getAndAccumulate(10, (p, x) -> p + x));
    27. // 计算并获取(i = 10, p 为 i 的当前值, x 为参数1, 结果 i = 0, 返回 0)
    28. // 其中函数中的操作能保证原子,但函数需要无副作用
    29. System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));
    30. }
    31. }

    二、原子引用

    J.U.C 并发包提供了:

    AtomicReference

    AtomicMarkableReference

    AtomicStampedReference

    1. 安全实现-使用 CAS

    1. @Slf4j(topic = "c.Test35")
    2. public class Test35 {
    3. public static void main(String[] args) {
    4. DecimalAccount.demo(new DecimalAccountCas(new BigDecimal("10000")));
    5. }
    6. }
    7. class DecimalAccountCas implements DecimalAccount {
    8. private AtomicReference balance;
    9. public DecimalAccountCas(BigDecimal balance) {
    10. // this.balance = balance;
    11. this.balance = new AtomicReference<>(balance);
    12. }
    13. @Override
    14. public BigDecimal getBalance() {
    15. return balance.get();
    16. }
    17. @Override
    18. public void withdraw(BigDecimal amount) {
    19. while(true) {
    20. BigDecimal prev = balance.get();
    21. BigDecimal next = prev.subtract(amount);
    22. if (balance.compareAndSet(prev, next)) {
    23. break;
    24. }
    25. }
    26. }
    27. }
    28. interface DecimalAccount {
    29. // 获取余额
    30. BigDecimal getBalance();
    31. // 取款
    32. void withdraw(BigDecimal amount);
    33. /**
    34. * 方法内会启动 1000 个线程,每个线程做 -10 元 的操作
    35. * 如果初始余额为 10000 那么正确的结果应当是 0
    36. */
    37. static void demo(DecimalAccount account) {
    38. List ts = new ArrayList<>();
    39. for (int i = 0; i < 1000; i++) {
    40. ts.add(new Thread(() -> {
    41. account.withdraw(BigDecimal.TEN);
    42. }));
    43. }
    44. ts.forEach(Thread::start);
    45. ts.forEach(t -> {
    46. try {
    47. t.join();
    48. } catch (InterruptedException e) {
    49. e.printStackTrace();
    50. }
    51. });
    52. System.out.println(account.getBalance());
    53. }
    54. }

    2. ABA 问题及解决

    1. @Slf4j(topic = "c.Test36")
    2. public class Test36 {
    3. static AtomicReference ref = new AtomicReference<>("A");
    4. public static void main(String[] args) throws InterruptedException {
    5. log.debug("main start...");
    6. // 获取值 A
    7. // 这个共享变量被其他线程修改过
    8. String prev = ref.get();
    9. other();
    10. sleep(5);
    11. // 尝试改为 C
    12. log.debug("change A->C {}", ref.compareAndSet(prev, "C"));
    13. }
    14. private static void other() throws InterruptedException {
    15. new Thread(() -> {
    16. log.debug("change A->B", ref.compareAndSet(ref.get() ,"B"));
    17. }, "t1").start();
    18. sleep( 1);
    19. new Thread(() -> {
    20. log.debug("change B->A", ref.compareAndSet(ref.get() ,"A"));
    21. }, "t2").start();
    22. }
    23. }
    主线程仅能判断出共享变量的值与最初值 A 是否相同,不能感知到这种从 A 改为 B 又 改回 A 的情况,如果主线程希望:
    只要有其它线程【动过了】共享变量,那么自己的 cas 就算失败,这时,仅比较值是不够的,需要再加一个版本号

    3. AtomicStampedReference

    1. @Slf4j(topic = "c.Test36")
    2. public class Test36 {
    3. static AtomicStampedReference ref = new AtomicStampedReference<>("A", 0);
    4. public static void main(String[] args) throws InterruptedException {
    5. log.debug("main start...");
    6. // 获取值 A
    7. String prev = ref.getReference();
    8. // 获取版本号
    9. int stamp = ref.getStamp();
    10. log.debug("版本 {}", stamp);
    11. // 如果中间有其它线程干扰,发生了 ABA 现象
    12. other();
    13. sleep(1);
    14. // 尝试改为 C
    15. log.debug("change A->C {}", ref.compareAndSet(prev, "C", stamp, stamp + 1));
    16. }
    17. private static void other() {
    18. new Thread(() -> {
    19. log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B", ref.getStamp(), ref.getStamp() + 1));
    20. log.debug("更新版本为 {}", ref.getStamp());
    21. }, "t1").start();
    22. sleep(0.5);
    23. new Thread(() -> {
    24. log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A", ref.getStamp(), ref.getStamp() + 1));
    25. log.debug("更新版本为 {}", ref.getStamp());
    26. }, "t2").start();
    27. }
    28. }

    4. AtomicMarkableReference

    但是有时候,并不关心引用变量更改了几次,只是单纯的关心 是否更改过 ,所以就有了
    AtomicMarkableReference
    1. @Slf4j(topic = "c.Test38")
    2. public class Test38 {
    3. public static void main(String[] args) throws InterruptedException {
    4. GarbageBag bag = new GarbageBag("装满了垃圾");
    5. // 参数2 mark 可以看作一个标记,表示垃圾袋满了
    6. AtomicMarkableReference ref = new AtomicMarkableReference<>(bag, true);
    7. log.debug("start...");
    8. GarbageBag prev = ref.getReference();
    9. log.debug(prev.toString());
    10. new Thread(() -> {
    11. log.debug("start...");
    12. bag.setDesc("空垃圾袋");
    13. ref.compareAndSet(bag, bag, true, false);
    14. log.debug(bag.toString());
    15. },"保洁阿姨").start();
    16. sleep(1);
    17. log.debug("想换一只新垃圾袋?");
    18. boolean success = ref.compareAndSet(prev, new GarbageBag("空垃圾袋"), true, false);
    19. log.debug("换了么?" + success);
    20. log.debug(ref.getReference().toString());
    21. }
    22. }
    23. class GarbageBag {
    24. String desc;
    25. public GarbageBag(String desc) {
    26. this.desc = desc;
    27. }
    28. public void setDesc(String desc) {
    29. this.desc = desc;
    30. }
    31. @Override
    32. public String toString() {
    33. return super.toString() + " " + desc;
    34. }
    35. }

    三、原子数组

    J.U.C 并发包提供了:

    AtomicIntegerArray

    AtomicLongArray

    AtomicReferenceArray

    1. public class Test39 {
    2. public static void main(String[] args) {
    3. demo(
    4. ()->new int[10],
    5. (array)->array.length,
    6. (array, index) -> array[index]++,
    7. array-> System.out.println(Arrays.toString(array))
    8. );
    9. demo(
    10. ()-> new AtomicIntegerArray(10),
    11. (array) -> array.length(),
    12. (array, index) -> array.getAndIncrement(index),
    13. array -> System.out.println(array)
    14. );
    15. }
    16. /**
    17. 参数1,提供数组、可以是线程不安全数组或线程安全数组
    18. 参数2,获取数组长度的方法
    19. 参数3,自增方法,回传 array, index
    20. 参数4,打印数组的方法
    21. */
    22. // supplier 提供者 无中生有 ()->结果
    23. // function 函数 一个参数一个结果 (参数)->结果 , BiFunction (参数1,参数2)->结果
    24. // consumer 消费者 一个参数没结果 (参数)->void, BiConsumer (参数1,参数2)->
    25. private static void demo(
    26. Supplier arraySupplier,
    27. Function lengthFun,
    28. BiConsumer putConsumer,
    29. Consumer printConsumer ) {
    30. List ts = new ArrayList<>();
    31. T array = arraySupplier.get();
    32. int length = lengthFun.apply(array);
    33. for (int i = 0; i < length; i++) {
    34. // 每个线程对数组作 10000 次操作
    35. ts.add(new Thread(() -> {
    36. for (int j = 0; j < 10000; j++) {
    37. putConsumer.accept(array, j%length);
    38. }
    39. }));
    40. }
    41. ts.forEach(t -> t.start()); // 启动所有线程
    42. ts.forEach(t -> {
    43. try {
    44. t.join();
    45. } catch (InterruptedException e) {
    46. e.printStackTrace();
    47. }
    48. }); // 等所有线程结束
    49. printConsumer.accept(array);
    50. }
    51. }

  • 相关阅读:
    什么是 x10 开发工具?「GitHub 热点速览」
    Kubernetes中Pod容器的使用
    将ppt里的视频导出来
    安卓sqlite数据库问题请教
    5、nerf++(pytorch)
    Openharmony的usb从框架到hdf驱动流程梳理
    内存泄漏,内存溢出,抽象类和接口,netstat、ping、ifconfig的区别
    纷享销客对话真趣科技丨撬动一家物联网企业的增长极限
    Kubeadm搭建kubernetes集群
    图书管理系统的分析与设计
  • 原文地址:https://blog.csdn.net/yirenyuan/article/details/128174951