• Java多线程(2)


    目录

    1、多线程带来的风险 ------ 线程安全

    1、线程安全的概念

    2、线程不安全的原因

    3、synchronized -线程同步

    4、synchronized 使用示例

    1、同步代码块

    2、修饰普通方法

    3、修饰静态方法 

    5、常见的线程安全类

     6、volatile 关键字

    1、volatile 修饰的变量能够保证“内存可见性”

    7、wait 和 notify 

     1、wait()方法

    2、notify()方法

     3、notifyall()方法

    8、多线程案例

    1、单例模式

    2、饿汉模式

    3、懒汉模式-单线程版

    4、懒汉模式-多线程版

    5、懒汉模式-多线程版(改进)

    以下代码在加锁的基础上,做出了进一步改动:

     9、阻塞式队列

    1、阻塞队列是什么?

    2、生产者消费者模型

    3、标准库中的阻塞队列

     ​编辑

    4、阻塞队列实现

    10、定时器

    1、什么是定时器

    2、标准库中的定时器

    3、实现定时器


    1、多线程带来的风险 ------ 线程安全

    什么是线程不安全,请看以下代码:

    1. public class ThreadDemo1 {
    2. static int N = 10_0000;
    3. static int res = 0;
    4. public static class MyRunnable implements Runnable {
    5. @Override
    6. public void run() {
    7. for (int i = 0; i < N; i ++) {
    8. res++;
    9. }
    10. }
    11. }
    12. public static void main(String[] args) throws InterruptedException {
    13. Thread t1 = new Thread(new MyRunnable());
    14. t1.start();
    15. for (int i = 0; i < N; i ++) {
    16. res--;
    17. }
    18. t1.join();
    19. System.out.println(res);
    20. }
    21. }

    我们进行了等量的操作(对res ++ 和 res -- ) 观察结果:

    为什么不是0 而是一个奇怪的数字,当我们再次执行观察:

     

    这就是线程不安全导致的。

    1、线程安全的概念

    通俗的讲:如果在多线程环境下代码运行的结果是符合我们预期的,即在单线程环境应该的结果,则说明这个程序是线程安全的

    2、线程不安全的原因

    修改共享数据

    上面的线程不安全的代码中,涉及到多个线程对 res 变量进行修改

    由于这个变量在方法区上,因此多个线程都能进行访问和修改, 

     我们直到计算机根据指令工作,思考以下情景:

    我们对res++ 在从寄存器中拿出 res ,++完成之后准备放回,而此时恰好发生了线程调度,又拿出res  -- 完成之后 我们的++操作被调度回来 我们放回 ,但此时++的操作已经被 -- 覆盖了。

    什么是原子性

    我们把代码想象成一个房间,每个线程就是要进入这个房间的人,如果没有任何机制,A进入房间以后还没出来,B此时也进入房间,打断了A的隐私,这个就没有具备原子性

    因此我们需要一把锁,当A进入房间后,加上锁后其他人不允许进入房间,这样就保证了代码的原子性,有时我们也把它叫做同步互斥

    可见性

    可见性是指,一个线程对共享数据的修改,能够及时被其他线程看到

    java内存模型(JMM):java虚拟机中规范定义了java内存模型。

    目的是屏蔽掉各种硬件和操作系统的内存访问差异,以实现让java程序在各种平台下能达到一致的并发性。

    因为每个线程有自己独立的工作内存,因此修改其中一个工作内存的值 ,另一个工作内存不一定及时变化。

    代码顺序性

    什么是代码重排序?

    假设一段代码:

    1、去前台取U盘

    2、去教室写10分钟作业

    3、去前台取快递

    如果是在单线程环境下:JVM,CPU会对其优化 1->3->2  可以少跑一次前台

    什么情况下数据会存在安全问题呢?

    得满足以下三个条件

    1、多线程并发 

    2、有共享数据

    3、有修改行为

    怎么解决线程不安全的问题?

    1. public class ThreadDemo1 {
    2. static int N = 10_0000;
    3. static int res = 0;
    4. static Object sys = new Object();
    5. public static class MyRunnable implements Runnable {
    6. @Override
    7. public void run() {
    8. synchronized (sys) {
    9. for (int i = 0; i < N; i ++) {
    10. res++;
    11. }
    12. }
    13. }
    14. }
    15. public static void main(String[] args) throws InterruptedException {
    16. Thread t1 = new Thread(new MyRunnable());
    17. t1.start();
    18. synchronized (sys) {
    19. for (int i = 0; i < N; i ++) {
    20. res--;
    21. }
    22. }
    23. t1.join();
    24. System.out.println(res);
    25. }
    26. }

    我们对于res++ 和 res--的操作都加上了sys这把锁,因此我们可以得到准确的结果

    3、synchronized -线程同步

    互斥

    synchronized 会起到互斥效果 ,某个线程执行到某个对象的synchronized 中时,其他线程如果也执行到同一个对象synchronized 就会阻塞等待

     可以粗略的理解成:每个对象在内存中存储的时候,都存有一块区域表示当前的“锁定”状态,如果当前是“无人”状态,那么就可以使用,使用时都需要设为“有人”状态。

    当前是“有人”状态,那就只能排队

     

     刷新内存

    synchronized 的工作过程:

    1、获得互斥锁

    2、从主内存中拷贝变量的最新副本到工作内存

    3、执行代码

    4、将更改后的共享变量的值刷新到主内存

    5、释放互斥锁

    可重入

    synchronized 同步块对同一条线程来说是可重入的,不会出现把自己锁死的问题。

    4、synchronized 使用示例

    1、同步代码块

    2、修饰普通方法

    3、修饰静态方法 

    5、常见的线程安全类

     java标准库中很多线程都是不安全的,这些类可能会涉及到多线程修改共享数据,但又没有加锁措施

    一些线程安全的类:

     6、volatile 关键字

    1、volatile 修饰的变量能够保证“内存可见性”

    代码在写入volatile 修饰的变量的时候:

    改变线程工作内存中变量的值

    将改变后的值从工作内存刷新到主内存

     代码在读取volatile 修饰的变量的时候:

    从主内存中读取最新值到工作内存

    从工作内存中读取volatile变量的副本

    1. public class ThreadDemo1 {
    2. static class Counter {
    3. int flag = 0;
    4. }
    5. public static void main(String[] args) {
    6. Counter c = new Counter();
    7. Thread t1 = new Thread(new Runnable() {
    8. @Override
    9. public void run() {
    10. while (c.flag == 0) {
    11. }
    12. System.out.println("循环结束!");
    13. }
    14. });
    15. Thread t2 = new Thread(new Runnable() {
    16. @Override
    17. public void run() {
    18. System.out.println("输入一个整数以改变flag的值");
    19. Scanner sc = new Scanner(System.in);
    20. c.flag = sc.nextInt();
    21. }
    22. });
    23. t1.start();
    24. t2.start();
    25. }
    26. }

     我们本设想 t2 改变了 flag 的值 t1 应该停止循环,但由于 t1 读的是自己工作内存中的内容,因此即使 t2 对 flag 变量修改,t1 感知不到 flag 的变化

    因此我们要给 flag 加上volatile 

     

    2、volatile 不保证原子性

    volatile 和 synchronized 有着本质的区别,synchronized 能够保证原子性, valatile 保证的是内存可见性

    1. public class ThreadDemo1 {
    2. static volatile int res = 0;
    3. Object o = new Object();
    4. public static void main(String[] args) throws InterruptedException {
    5. Thread t1 = new Thread(new Runnable() {
    6. @Override
    7. public void run() {
    8. for (int i = 0; i < 10000000; i ++) res++;
    9. }
    10. });
    11. t1.start();
    12. for (int i = 0; i < 10000000; i ++) res--;
    13. t1.join();
    14. System.out.println(res);
    15. }
    16. }

    即使我们加上volatile 但结果仍然不是0 

    因为我们 volatile 保证的是如果其他线程更改res ,我们拿到的res是最新的

    但不能保证 res++ 的原子性 ,因此依然存在线程不安全

    3、synchronized 也能保证内存可见性

     

     

    7、wait 和 notify 

    由于线程间是抢占执行的,因此线程之间的执行先后顺序难以预知

    但是实际开发中有时候我们希望合理协调多个线程之间的执行先后顺序

     1、wait()方法

    wait 做的事情:

    使当前执行的代码的线程进行等待(把线程放到等待队列中)

    释放当前的锁

    满足一定条件时唤醒,重新尝试获取这个锁

    wait 要搭配 sychronized 来使用,脱离synchronized 使用 wait 会直接抛出异常

    wait 结束等待的条件:

    其他线程调用该对象的 notify 方法

    wait 等待时间超时 (wait 方法提供一个带有 timeout 参数的版本,来指定等待时间)

    其他线程调用该线程的 interrupted 方法,导致 wait 抛出 InterruptedException 异常

    2、notify()方法

    notify 方法是唤醒等待的线程

    接下来我们做一个线程唤醒地小栗子:

    1. public class ThreadDemo1 {
    2. static class WaitTask implements Runnable {
    3. public Object lock;
    4. public WaitTask(Object lock) {
    5. this.lock = lock;
    6. }
    7. @Override
    8. public void run() {
    9. synchronized (lock) {
    10. while (true) {
    11. try {
    12. System.out.println("wait开始");
    13. lock.wait();
    14. System.out.println("wait结束");
    15. } catch (InterruptedException e) {
    16. e.printStackTrace();
    17. }
    18. }
    19. }
    20. }
    21. }
    22. static class NotifyTask implements Runnable {
    23. public Object lock;
    24. public NotifyTask (Object lock) {
    25. this.lock = lock;
    26. }
    27. @Override
    28. public void run() {
    29. synchronized (lock) {
    30. System.out.println("notify开始");
    31. lock.notify();
    32. System.out.println("notify结束");
    33. }
    34. }
    35. }
    36. public static void main(String[] args) throws InterruptedException {
    37. Object lock = new Object();
    38. Thread t1 = new Thread(new WaitTask(lock));
    39. Thread t2 = new Thread(new NotifyTask(lock));
    40. t1.start();
    41. Thread.sleep(1000);
    42. t2.start();
    43. }
    44. }

    首先wait线程执行到wait时释放锁并且等待唤醒,而notify在执行完代码块中所有内容后才会释放锁去唤醒waittask

     3、notifyall()方法

    notifyall方法相比 notify 的不同就是它可以唤醒锁上的所有线程,但线程执行依然有先后顺序,当一个线程占有锁后,其他线程是无法进入的

    wait() 和 sleep() 对比?(面试题常考)

    首先这两个方法似乎八竿子打不着北,wait()是实现进程间通信的方法,sleep()是让线程阻塞的,唯一的相同点是都能让线程阻塞一段时间

    总结:

    1、wait()是针对对象(锁)而言的,一定一定需要搭配synchronized使用,sleep不需要

    2、wait是Object下的方法,而sleep是Thread下的静态方法

    8、多线程案例

    1、单例模式

    单例模式是最经典的设计模式之一

    什么是设计模式?

    单例模式就是保证一个类在程序中只存在一份实例,而不会创建多个实例

    比如 JDBC 中的 DataSource 实例就只需要一个

    单例模式的具体实现可以分为 “饿汉” 和 “懒汉”

    2、饿汉模式

    类加载的同时,创建实例

    1. public class ThreadDemo1 {
    2. public static void main(String[] args) {
    3. Singleton s = Singleton.getInstance();
    4. }
    5. }
    6. class Singleton {
    7. private static Singleton instance = new Singleton();
    8. private Singleton() {}
    9. public static Singleton getInstance() {
    10. return instance;
    11. }
    12. }

     这样如果我们要获得SIngleton的实例只能通过getInstance方法,因为无论构造方法还是属性都是private的,

    3、懒汉模式-单线程版

    首次调用getInstance时才创建对象(延时加载)

    1. class Singleton {
    2. private static Singleton instance = null;
    3. private Singleton() {}
    4. public static Singleton getInstance() {
    5. if (instance == null) instance = new Singleton();
    6. return instance;
    7. }
    8. }

    4、懒汉模式-多线程版

    上面的懒汉模式实现其实是不安全的

    因此我们加锁改善线程安全问题

     

    5、懒汉模式-多线程版(改进)

    以下代码在加锁的基础上,做出了进一步改动:

     使用双重 if 判定,降低锁竞争的频率。

    给 instance 加上了 volatile

    1. class Singleton {
    2. private volatile static Singleton instance = null;
    3. private Singleton() {}
    4. public static Singleton getInstance() {
    5. if (instance == null) {
    6. synchronized (Singleton.class) {
    7. if (instance == null) {
    8. instance = new Singleton();
    9. }
    10. }
    11. }
    12. return instance;
    13. }
    14. }

    如何理解 双重if  / volatile

    加锁/解锁 本身是一件开销比较高的事情,懒汉模式的线程不安全只是发生在首次创建实例的时候,因此后序没必要加锁

    外层的if就是判断是不是 instance 实例已经创建出来了

    volatile 是为了避免“内存可见性”导致读取的 instance 出现偏差

     

     

     

     

     9、阻塞式队列

    1、阻塞队列是什么?

    阻塞队列是一种特殊的队列,遵守“先进先出”的原则

    阻塞队列能是一种线程安全的数据结构,并且具有以下特性:

    当队列满的时候,继续入队列就会阻塞,直到有其他线程从队列中取走元素

    当队列空的时候,继续出队列也会阻塞,直到有其他线程往队列中插入元素

    阻塞队列的一个典型应用就是:生产者消费者模型。

    2、生产者消费者模型

    生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题

    生产者和消费者彼此之间不直接通讯,而是通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列中取

     

    3、标准库中的阻塞队列

     

     

    1. public static void main(String[] args) throws InterruptedException {
    2. BlockingDeque<String> queue = new LinkedBlockingDeque<>();
    3. queue.push("hello");
    4. String res = queue.take();
    5. System.out.println(res);
    6. }

    生产者消费者模型:

    1. public static void main(String[] args) throws InterruptedException {
    2. BlockingDeque<Integer> queue = new LinkedBlockingDeque<>();
    3. Thread customer = new Thread(new Runnable() {
    4. @Override
    5. public void run() {
    6. while (true) {
    7. try {
    8. int val = queue.take();
    9. System.out.println("消费者消耗:" + val);
    10. } catch (InterruptedException e) {
    11. e.printStackTrace();
    12. }
    13. }
    14. }
    15. },"消费者");
    16. customer.start();
    17. Thread producer = new Thread(new Runnable() {
    18. @Override
    19. public void run() {
    20. Random random = new Random();
    21. while (true) {
    22. try {
    23. int num = random.nextInt(1000);
    24. System.out.println("生产者生产:" + num);
    25. queue.put(num);
    26. Thread.sleep(1000);
    27. } catch (InterruptedException e) {
    28. e.printStackTrace();
    29. }
    30. }
    31. }
    32. },"生产者");
    33. producer.start();
    34. customer.join();
    35. producer.join();
    36. }

    4、阻塞队列实现

    通过“循环队列”的方式来实现

    使用 synchronized 进行加锁控制

    put 插入元素的时候,判定如果队列满了,就进行 wait (要在循环中wait,被唤醒时不一定队列就不满,因为同时可能唤醒了多个进程)

    take 取出元素时,判定如果队列为空,就进行 wait 

    1. public static void main(String[] args) throws InterruptedException {
    2. BlockingQueue queue = new BlockingQueue();
    3. Thread customer = new Thread(new Runnable() {
    4. @Override
    5. public void run() {
    6. while (true) {
    7. try {
    8. int val = queue.take();
    9. System.out.println("消费者消耗:" + val);
    10. } catch (InterruptedException e) {
    11. e.printStackTrace();
    12. }
    13. }
    14. }
    15. },"消费者");
    16. customer.start();
    17. Thread producer = new Thread(new Runnable() {
    18. @Override
    19. public void run() {
    20. Random random = new Random();
    21. while (true) {
    22. try {
    23. int num = random.nextInt(1000);
    24. System.out.println("生产者生产:" + num);
    25. queue.put(num);
    26. Thread.sleep(1000);
    27. } catch (InterruptedException e) {
    28. e.printStackTrace();
    29. }
    30. }
    31. }
    32. },"生产者");
    33. producer.start();
    34. customer.join();
    35. producer.join();
    36. }
    37. static class BlockingQueue {
    38. private int[] items = new int[1000];
    39. private volatile int size;
    40. private int head;
    41. private int tail;
    42. public void put(int value) throws InterruptedException {
    43. synchronized (this) {
    44. while (size == items.length) {
    45. wait();
    46. }
    47. items[tail] = value;
    48. tail = (tail + 1) % items.length;
    49. size++;
    50. notifyAll();
    51. }
    52. }
    53. public int take() throws InterruptedException {
    54. synchronized (this) {
    55. while (size == 0) {
    56. wait();
    57. }
    58. int res = items[head];
    59. head = (head + 1) % items.length;
    60. size--;
    61. notifyAll();
    62. return res;
    63. }
    64. }
    65. }

     测试:

    10、定时器

    1、什么是定时器

     定时器也是软件开发中的一个重要组件,类似于一个闹钟,达到一个设定的时间后,就执行某个指定好的代码

    2、标准库中的定时器

     标准库中的提供了一个 Timer 类。Timer 类的核心方法为 schedule 

    schedule 包含了两个参数

    第一个参数指定即将要执行的任务代码

    第二个参数指定多长时间之后执行(单位是毫秒)

    1. public static void main(String[] args) {
    2. Timer timer = new Timer();
    3. timer.schedule(new TimerTask() {
    4. @Override
    5. public void run() {
    6. System.out.println("hello");
    7. }
    8. },2000);
    9. }

    3、实现定时器

    定时器的构成:

    一个带优先级的阻塞队列

    为啥要带优先级?

    因为阻塞队列中的任务都有各自执行的时刻(delay),最先执行的任务一定是 delay 最小的,使用带优先级队列就可以高效地把这个 delay 最小的任务找出来

    队列中地每个元素是一个 task 对象

    task 中带有一个属性,队首元素就是即将执行地task

    同时带有一个worker 线程一直扫描队首元素,看队首元素是否需要执行

     

    1. public class Timer {
    2. public void schedule(Runnable command,long after) {
    3. //TODO
    4. }
    5. }

    1. static class Task implements Comparable<Task> {
    2. private Runnable command;
    3. private long time;
    4. public Task(Runnable command,long time) {
    5. this.command = command;
    6. this.time = System.currentTimeMillis() + time;
    7. }
    8. public void run() {
    9. command.run();
    10. }
    11. @Override
    12. public int compareTo(Task o) {
    13. return (int) (time - o.time);
    14. }
    15. }

     

    1. PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<>();
    2. public void schedule(Runnable command,long after) {
    3. Task task = new Task(command,after);
    4. queue.offer(task);
    5. }

    1. class Worker extends Thread{
    2. @Override
    3. public void run() {
    4. while (true) {
    5. try {
    6. Task task = queue.take();
    7. long cur = System.currentTimeMillis();
    8. if (task.time > cur) {
    9. queue.put(task);
    10. } else {
    11. task.run();
    12. }
    13. } catch (InterruptedException e) {
    14. e.printStackTrace();
    15. break;
    16. }
    17. }
    18. }
    19. }

      

     

      

    完整代码:

    1. public class Timer {
    2. private Object mailBox = new Object();
    3. PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<>();
    4. public void schedule(Runnable command,long after) {
    5. Task task = new Task(command,after);
    6. queue.offer(task);
    7. synchronized (mailBox) {
    8. notify();
    9. }
    10. }
    11. class Worker extends Thread{
    12. @Override
    13. public void run() {
    14. try {
    15. Task task = queue.take();
    16. long cur = System.currentTimeMillis();
    17. if (task.time > cur) {
    18. queue.put(task);
    19. synchronized (mailBox) {
    20. mailBox.wait(task.time - cur);
    21. }
    22. } else {
    23. task.run();
    24. }
    25. } catch (InterruptedException e) {
    26. e.printStackTrace();
    27. }
    28. }
    29. }
    30. static class Task implements Comparable<Task> {
    31. private Runnable command;
    32. private long time;
    33. public Task(Runnable command,long time) {
    34. this.command = command;
    35. this.time = System.currentTimeMillis() + time;
    36. }
    37. public void run() {
    38. command.run();
    39. }
    40. @Override
    41. public int compareTo(Task o) {
    42. return (int) (time - o.time);
    43. }
    44. }
    45. }

  • 相关阅读:
    嵌入式开发:嵌入式基础——代码和数据空间揭秘
    Ae 动态图形模板
    SpringMVC 04 RestFul风格
    【翻译】Seastar 教程(二)
    DSP2335的按键输入key工程笔记
    从零开始搭建仿抖音短视频APP-构建后端项目
    基础知识java
    kafka 消费者
    File类、IO分类、InputStream、OutputStream、Reader、Writer
    iOS 关于 SocketRocket 报错 _utf8_nextCharSafeBody
  • 原文地址:https://blog.csdn.net/qq_59539549/article/details/124900475