• Java并发编程学习一:线程基础


    一、线程的创建

    线程是并发编程中基础的基础,只有先了解如何创建线程,才能进一步学习并发相关的知识。

    常见的实现线程的方法有以下几种:

    1. Runnable接口

    可以通过实现Runnable接口,并重写run()方法,之后将实现Runnable接口的类传入Thread中即可创建线程。

    public class RunnableThread implements Runnable {
    
        @Override
        public void run() {
            System.out.println('用实现Runnable接口实现线程');
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    2. Thread类

    继承Thread类,重写其中的run()方法创建线程:

    public class ExtendsThread extends Thread {
    
        @Override
        public void run() {
            System.out.println('用Thread类实现线程');
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    3. 线程池

    通过线程池,可以指定线程数量,由线程池创建线程。

    对于线程池而言,本质上是通过线程工厂创建线程的,默认采用 DefaultThreadFactory ,它会给线程池创建的线程设置一些默认值,比如:线程的名字、是否是守护线程,以及线程的优先级等。

    /** 
    * 描述: 用固定线程数的线程池执行10000个任务 
    */ 
    public class ThreadPoolDemo { 
     
        public static void main(String[] args) { 
            ExecutorService service = Executors.newFixedThreadPool(5);
            for (int i = 0; i < 10000; i++) { 
                service.execute(new Task());
            } 
        System.out.println(Thread.currentThread().getName());
        } 
     
        static class Task implements Runnable { 
     
            public void run() { 
                System.out.println("Thread Name: " + Thread.currentThread().getName());
            } 
        } 
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    4. 带返回值的Callable接口

    实现Callable接口,并重写call()方法,之后将实现Callable接口的类传入Thread中创建线程。

    这种方式与Runnable接口的方式相类似,但是Callable接口通过Future将线程执行的结果作为返回值返回。

    class CallableTask implements Callable<Integer> {
    
        @Override
        public Integer call() throws Exception {
            return new Random().nextInt();
        }
    }
    
    //创建线程池
    ExecutorService service = Executors.newFixedThreadPool(10);
    //提交任务,并用 Future提交返回结果
    Future<Integer> future = service.submit(new CallableTask());
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    5. 其他

    a. 定时器
    新建一个 Timer,令其每隔一段时间之后,执行一些任务,这时它也会创建线程并执行任务。

    public class Run {
        private static Timer timer=new Timer();
    
        public static void main(String[] args) throws ParseException
        {
            timer.schedule(new Mytask(), TimeUtil.df.get().parse("2017-09-14 09:19:30"));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    b. 匿名内部类或 lambda 表达式

    /**
     *描述:匿名内部类创建线程
     */
    new Thread(new Runnable() {
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName());
        }
    }).start();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    /**
     *描述:lambda表达式创建线程
     */
    new Thread(() -> System.out.println(Thread.currentThread().getName())).start();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    6. 到底有几种创建线程的方法?

    前面介绍了5中创建线程的方法,按理来说,创建线程的方法应该是很多的,然而实际上,创建线程的方法只有一种而已

    对于线程池而言,跟进DefaultThreadFactory 的源码查看具体创建线程的实现:

    static class DefaultThreadFactory implements ThreadFactory {
     
        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                poolNumber.getAndIncrement() +
                "-thread-";
        }
     
    
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                        namePrefix + threadNumber.getAndIncrement(),
    0);
    
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    可以看到,线程池底层还是通过Thread类创建线程的,只是在创建时传入的参数更多一些。

    对于Callable接口,其底层实现依赖FutureTask,而FutureTask实际上实现了Runnable接口,并进行了一系列封装后能够返回返回值。因此Callable接口还是离不开Runnable接口。

    对于定时器Timer,其内部实现,本质上还是会有一个继承自 Thread 类的 TimerThread,因此还是通过Thread类创建线程的。

    置于匿名内部类和lambda表达式,它们仅仅是一种语法实现,本质上还是基于Thread类

    那么归根到底,其实只有Thread类和Runnable接口两种方式咯?其实这两种方式本质上也是一种。

    线程的启动需要调用Thread.start()方法,而start()方法最终会调用run()方法:

    @Override
    public void run() {
        if (target != null) {
            target.run();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    这里有两个run()方法,一个是外层的run(),一个是内层的run()。

    上述代码中的target其实就是一个 Runnable,实现Runnable并重写run()方法,实际上就是重写这里内层的run()的内部逻辑。

    而实现Thread类,重写run()方法,实际上是重写上述代码的外层run()方法。

    因此可以这样说,本质上,实现线程只有一种方式,而要想实现线程执行的内容,却有两种方式,也就是可以通过 实现 Runnable 接口的方式,或是继承 Thread 类重写 run() 方法的方式

    二、正确停止线程

    通常情况下,我们不会手动停止一个线程,而是允许线程运行到结束,然后让它自然停止。但是如果遇到特殊情况比如:用户突然关闭程序,或程序运行出错重启等,这种情况下,如何安全停止线程就很重要了。

    对于 Java 而言,最正确的停止线程的方式是使用 interrupt。但 interrupt 仅仅起到通知被停止线程的作用。而对于被停止的线程而言,它拥有完全的自主权,它既可以选择立即停止,也可以选择一段时间后停止,也可以选择压根不停止。

    那么为什么 Java 不提供强制停止线程的能力呢?因为如果不了解对方正在做的工作,贸然强制停止线程就可能会造成一些安全的问题,为了避免造成问题就需要给对方一定的时间来整理收尾工作。比如:线程正在写入一个文件,这时收到终止信号,它就需要根据自身业务判断,是选择立即停止,还是将整个文件写入成功后停止,而如果选择立即停止就可能造成数据不完整,不管是中断命令发起者,还是接收者都不希望数据出现问题。

    1. interrupt正确停止线程

    一旦调用某个线程的 interrupt() 之后,这个线程的中断标记位就会被设置成 true。每个线程都有这样的标记位,当线程执行时,应该定期检查这个标记位,如果标记位被设置成 true,就说明有程序想终止该线程。

    要想使用interrupt正确停止线程,应当在线程中做如下设计,在 while 循环体判断语句中,首先通过 Thread.currentThread().isInterrupt() 判断线程是否被中断,随后检查是否还有工作要做。&& 逻辑表示只有当两个判断条件同时满足的情况下,才会去执行下面的工作。

    while (!Thread.currentThread().isInterrupted() && more work to do) {
        do more work
    }
    
    • 1
    • 2
    • 3

    以下是interrupt的使用示例:

    public class StopThread implements Runnable {
     
        @Override
        public void run() {
            int count = 0;
            while (!Thread.currentThread().isInterrupted() && count < 1000) {
                System.out.println("count = " + count++);
            }
        }
     
        public static void main(String[] args) throws InterruptedException {
            Thread thread = new Thread(new StopThread());
            thread.start();
            Thread.sleep(5);
            thread.interrupt();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    run() 方法中,首先判断线程是否被中断,然后判断 count 值是否小于 1000。线程中打印 0~999 的数字,每打印一个数字 count 值加 1。线程会在每次循环开始之前,检查是否被中断了。接下来在 main 函数中会启动该线程,然后休眠 5 毫秒后立刻中断线程,该线程会检测到中断信号,于是在还没打印完1000个数的时候就会停下来,这种就属于通过 interrupt 正确停止线程的情况。

    2. sleep 期间能否感受到中断?

    现在考虑一种特殊情况,如果线程在执行任务期间有休眠需求,也就是每打印一个数字,就进入一次 sleep ,而此时将 Thread.sleep() 的休眠时间设置为 1000 秒钟

    Runnable runnable = () -> {
        int num = 0;
        try {
            while (!Thread.currentThread().isInterrupted() && 
            num <= 1000) {
                System.out.println(num);
                num++;
                Thread.sleep(1000000);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    在主线程中,休眠 5 毫秒后,通知子线程中断,此时子线程仍在执行 sleep 语句,处于休眠中。那么就需要考虑一点,在休眠中的线程是否能够感受到中断通知呢?是否需要等到休眠结束后才能中断线程呢?如果是这样,就会带来严重的问题,因为响应中断太不及时了。

    public class StopDuringSleep {
     
        public static void main(String[] args) throws InterruptedException {
            Runnable runnable = () -> {
                int num = 0;
                try {
                    while (!Thread.currentThread().isInterrupted() && num <= 1000) {
                        System.out.println(num);
                        num++;
                        Thread.sleep(1000000);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
            Thread thread = new Thread(runnable);
            thread.start();
            Thread.sleep(5);
            thread.interrupt();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    Java 设计者在设计之初就考虑到以上的问题,如果 sleep、wait 等可以让线程进入阻塞的方法使线程休眠了,而处于休眠中的线程被中断,那么线程是可以感受到中断信号的,并且会抛出一个 InterruptedException 异常,同时清除中断信号,将中断标记位设置成 false。这样一来就不用担心长时间休眠中线程感受不到中断了,因为即便线程还在休眠,仍然能够响应中断通知,并抛出异常。

    3. sleep / wait 时的解决方案

    上面提到, sleep、wait 等方法可以使线程休眠,而处于休眠中的线程被中断,将会抛出一个 InterruptedException 异常,并清除中断信号,将中断标记位设置成 false。

    在实际的开发者中,不同的人负责编写不同的方法,然后相互调用来实现整个业务的逻辑。那么如果我们负责编写的方法需要被别人调用,同时我们的方法内调用了 sleep 或者 wait 等能响应中断的方法时,要怎么处理?

    a. 方法签名抛异常 / run() 强制 try/catch

    如果只是捕捉到异常,没有进行任何处理逻辑,相当于把中断信号给隐藏了,这样做非常不合理。

    调用方要不使用 try/catch 并在 catch 中正确处理异常,要不将异常声明到方法签名中。如果每层逻辑都遵守规范,便可以将中断信号层层传递到顶层,最终让 run() 方法可以捕获到异常。而对于 run() 方法而言,它本身没有抛出 checkedException 的能力,只能通过 try/catch 来处理异常。层层传递异常的逻辑保障了异常不会被遗漏,而对 run() 方法而言,就可以根据不同的业务逻辑来进行相应的处理。

    void subTask2() throws InterruptedException {
        Thread.sleep(1000);
    }
    
    • 1
    • 2
    • 3

    b. 再次中断
    在 catch 语句中再次中断线程。如代码所示,需要在 catch 语句块中调用 Thread.currentThread().interrupt() 函数。因为如果线程在休眠期间被中断,那么会自动清除中断信号。如果这时手动添加中断信号,中断信号依然可以被捕捉到。这样后续执行的方法依然可以检测到这里发生过中断,可以做出相应的处理,整个线程可以正常退出。

    private void reInterrupt() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            e.printStackTrace();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    4. volatile标记位停止线程(场景有限)

    除了interrupt之外, volatile标记位也能起到停止线程的目的,但是这种方法有局限性,适用的场景有限。

    a. 适用的场景

    public class VolatileCanStop implements Runnable {
     
        private volatile boolean canceled = false;
     
        @Override
        public void run() {
            int num = 0;
            try {
                while (!canceled && num <= 1000000) {
                    if (num % 10 == 0) {
                        System.out.println(num + "是10的倍数。");
                    }
                    num++;
                    Thread.sleep(1);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
     
        public static void main(String[] args) throws InterruptedException {
            VolatileCanStop r = new VolatileCanStop();
            Thread thread = new Thread(r);
            thread.start();
            Thread.sleep(3000);
            r.canceled = true;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    声明一个叫作 VolatileStopThread 的类, 它实现了 Runnable 接口,然后在 run() 中进行 while 循环,在循环体中又进行了两层判断,首先判断 canceled 变量的值,canceled 变量是一个被 volatile 修饰的初始值为 false 的布尔值,当该值变为 true 时,while 跳出循环,while 的第二个判断条件是 num 值小于1000000(一百万),在while 循环体里,只要是 10 的倍数就打印出来,然后 num++。

    首先启动线程,然后经过 3 秒钟的时间,把用 volatile 修饰的布尔值的标记位设置成 true,这样,正在运行的线程就会在下一次 while 循环中判断出 canceled 的值已经变成 true 了,这样就不再满足 while 的判断条件,跳出整个 while 循环,线程就停止了,这种情况是演示 volatile 修饰的标记位可以正常工作的情况

    b. 不适用的场景

    class Producer implements Runnable {
        public volatile boolean canceled = false;
        BlockingQueue storage;
        public Producer(BlockingQueue storage) {
            this.storage = storage;
        }
     
        @Override
        public void run() {
            int num = 0;
            try {
                while (num <= 100000 && !canceled) {
                    if (num % 50 == 0) {
                        storage.put(num);
                        System.out.println(num + "是50的倍数,被放到仓库中了。");
                    }
                    num++;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println("生产者结束运行");
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    声明了一个生产者 Producer,通过 volatile 标记的初始值为 false 的布尔值 canceled 来停止线程。而在 run() 方法中,while 的判断语句是 num 是否小于 100000 及 canceled 是否被标记。while 循环体中判断 num 如果是 50 的倍数就放到 storage 仓库中,storage 是生产者与消费者之间进行通信的存储器,当 num 大于 100000 或被通知停止时,会跳出 while 循环并执行 finally 语句块,告诉大家“生产者结束运行”。

    class Consumer {
        BlockingQueue storage;
        public Consumer(BlockingQueue storage) {
            this.storage = storage;
        }
        public boolean needMoreNums() {
            if (Math.random() > 0.97) {
                return false;
            }
            return true;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    对于消费者 Consumer,它与生产者共用同一个仓库 storage,并且在方法内通过 needMoreNums() 方法判断是否需要继续使用更多的数字,刚才生产者生产了一些 50 的倍数供消费者使用,消费者是否继续使用数字的判断条件是产生一个随机数并与 0.97 进行比较,大于 0.97 就不再继续使用数字。

    public static void main(String[] args) throws InterruptedException {
            ArrayBlockingQueue storage = new ArrayBlockingQueue(8);
    
            Producer producer = new Producer(storage);
            Thread producerThread = new Thread(producer);
            producerThread.start();
            Thread.sleep(500);
    
            Consumer consumer = new Consumer(storage);
            while (consumer.needMoreNums()) {
                System.out.println(consumer.storage.take() + "被消费了");
                Thread.sleep(100);
            }
            System.out.println("消费者不需要更多数据了。");
    
            //一旦消费不需要更多数据了,我们应该让生产者也停下来,但是实际情况却停不下来
            producer.canceled = true;
            System.out.println(producer.canceled);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    main 函数中,首先创建了生产者/消费者共用的仓库 BlockingQueue storage,仓库容量是 8,并且建立生产者并将生产者放入线程后启动线程,启动后进行 500 毫秒的休眠,休眠时间保障生产者有足够的时间把仓库塞满,而仓库达到容量后就不会再继续往里塞,这时生产者会阻塞,500 毫秒后消费者也被创建出来,并判断是否需要使用更多的数字,然后每次消费后休眠 100 毫秒。

    当消费者不再需要数据,就会将 canceled 的标记位设置为 true,理论上此时生产者会跳出 while 循环,并打印输出“生产者运行结束”。

    但实际结果是,尽管已经把 canceled 设置成 true,但生产者仍然没有停止,这是因为在这种情况下,生产者在执行 storage.put(num) 时发生阻塞,在它被叫醒之前是没有办法进入下一次循环判断 canceled 的值的,所以在这种情况下用 volatile 是没有办法让生产者停下来的

    总结以上两种场景,volatile标记位停止线程的方法会在存在阻塞的时候失效。这种情况下,使用interrupt 语句来中断,即使生产者处于阻塞状态,仍然能够感受到中断信号,并做响应处理。因此相比interrupt ,volatile的使用还是有局限性的。

    5. 错误停止线程的方法

    常见停止线程的方法有: stop(),suspend() 和 resume(),这些方法已经被 Java 直接标记为 @Deprecated。

    对于stop()方法,它会直接把线程停止,这样就没有给线程足够的时间来处理想要在停止前保存数据的逻辑,任务戛然而止,会导致出现数据完整性等问题。

    对于 suspend() 和 resume() 而言,它们的问题在于如果线程调用 suspend(),它并不会释放锁,就开始进入休眠,但此时有可能仍持有锁,这样就容易导致死锁问题,因为这把锁在线程被 resume() 之前,是不会被释放的。

    三、线程状态切换

    在Java中,线程的生命周期一共有6种状态:

    • 新建 – New
    • 可运行 – Runnable
    • 阻塞 – Blocked
    • 等待 – Waiting
    • 计时等待 – Timed Waiting
    • 终止 – Terminated

    可以通过getState() 方法获取当前线程所处的状态,线程在运行期间的任何时刻只能处于一种状态。

    1. 新建 – New

    线程被创建但尚未启动的状态,当我们用 new Thread() 新建一个线程时,如果线程没有开始运行 start() 方法,那么此时它的状态就是 New。而一旦线程调用了 start(),它的状态就会从 New 变成 Runnable。

    2. 可运行 – Runnable

    该状态对应操作系统线程状态中的两种状态,分别是 Running 和 Ready。也就是说,Java 中处于 Runnable 状态的线程有可能正在执行,也有可能没有正在执行,正在等待被分配 CPU 资源。

    如果一个正在运行的线程是 Runnable 状态,当它运行到任务的一半时,执行该线程的 CPU 被调度去做其他事情,导致该线程暂时不运行,它的状态依然不变,还是 Runnable,因为它有可能随时被调度回来继续执行任务。

    3. 阻塞 – Blocked / 等待 – Waiting / 计时等待 – Timed Waiting

    这三种状态被统称为阻塞状态

    阻塞 – Blocked
    从 Runnable 状态进入 Blocked 状态只有一种可能,就是进入 synchronized 保护的代码时没有抢到 monitor 锁,无论是进入 synchronized 代码块,还是 synchronized 方法,都是一样。

    当处于 Blocked 的线程抢到 monitor 锁,就会从 Blocked 状态回到Runnable 状态。

    等待 – Waiting
    线程从 Runnable 状态进入 Waiting 状态有三种可能性:

    • 没有设置 Timeout 参数的 Object.wait() 方法。
    • 没有设置 Timeout 参数的 Thread.join() 方法。
    • LockSupport.park() 方法。

    Blocked 与 Waiting 的区别是 Blocked 在等待其他线程释放 monitor 锁,而 Waiting 则是在等待某个条件,比如 join 的线程执行完毕,或者是 notify()/notifyAll() 。进入 waiting 状态是线程主动的,而进入 blocked 状态是被动的。

    计时等待 – Timed Waiting
    计时等待与等待相似,区别仅在于有没有时间限制,Timed Waiting 会等待超时,由系统自动唤醒,或者在超时前被唤醒信号唤醒。

    线程从 Runnable 状态进入 Timed Waiting 状态有四种可能性:

    • 设置了时间参数的 Thread.sleep(long millis) 方法;
    • 设置了时间参数的 Object.wait(long timeout) 方法;
    • 设置了时间参数的 Thread.join(long millis) 方法;
    • 设置了时间参数的 LockSupport.parkNanos(long nanos) 方法和 LockSupport.parkUntil(long deadline) 方法。

    三种状态的流转

    Blocked -> Runnable
    Blocked 状态进入 Runnable 状态,要求线程获取 monitor 锁

    Waiting -> Runnable
    从 Waiting 状态流转到其他状态则比较特殊,因为首先 Waiting 是不限时的,也就是说无论过了多长时间它都不会主动恢复。只有当执行了 LockSupport.unpark(),或者 join 的线程运行结束,或者被中断时才可以进入 Runnable 状态。

    Waiting -> Blocked
    如果其他线程调用 notify() 或 notifyAll()来唤醒它,当前Waiting的线程会直接进入 Blocked 状态。因为唤醒 Waiting 线程的线程如果调用 notify() 或 notifyAll(),要求必须首先持有该 monitor 锁,所以处于 Waiting 状态的线程被唤醒时拿不到该锁,就会进入 Blocked 状态,直到执行了 notify()/notifyAll() 的唤醒它的线程执行完毕并释放 monitor 锁,才可能轮到它去抢夺这把锁,如果它能抢到,就会从 Blocked 状态回到 Runnable 状态。

    Timed Waiting -> Blocked
    Timed Waiting 中执行 notify() 和 notifyAll() 也是一样的道理,它们会先进入 Blocked 状态,然后抢夺锁成功后,再回到 Runnable 状态。

    Timed Waiting -> Runnable
    Timed Waiting状态的线程,当超时时间到了且能直接获取到锁/join的线程运行结束/被中断/调用了LockSupport.unpark(),会直接恢复到 Runnable 状态,而无需经历 Blocked 状态。

    3. 终止 – Terminated

    进入Terminated状态有两种可能:

    • run() 方法执行完毕,线程正常退出。
    • 出现一个没有捕获的异常,终止了 run() 方法,最终导致意外终止

    四、wait / notify / notifyAll / sleep

    线程的状态切换中,wait / notify / notifyAll / sleep等方法都起到了很重要的作用,而这些方法在实际使用中有不少注意事项。

    1. wait 必须在 synchronized 保护的同步代码中

    在使用 wait 方法时,必须把 wait 方法写在 synchronized 保护的 while 代码块中,并始终判断执行条件是否满足,如果满足就往下继续执行,如果不满足就执行 wait 方法,而在执行 wait 方法之前,必须先持有对象的 monitor 锁,也就是通常所说的 synchronized 锁

    假如不按照以上的方法使用wait ,会出现什么问题?下面使用案例进行分析

    class BlockingQueue {
        Queue<String> buffer = new LinkedList<String>();
        
        public void give(String data) {
            buffer.add(data);
            notify();  // Since someone may be waiting in take
        }
        
        public String take() throws InterruptedException {
            while (buffer.isEmpty()) {
                wait();
            }
            return buffer.remove();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    give 方法负责往 buffer 中添加数据,添加完之后执行 notify 方法来唤醒之前等待的线程,而 take 方法负责检查整个 buffer 是否为空,如果为空就进入等待,如果不为空就取出一个数据。

    如果这段代码并没有受 synchronized 保护,会有以下场景:

    • 首先,消费者线程调用 take 方法并判断 buffer.isEmpty 方法是否返回 true,若为 true 代表buffer是空的,则线程希望进入等待,但是在线程调用 wait 方法之前,就被调度器暂停了,所以此时还没来得及执行 wait 方法。
    • 生产者开始运行,执行了整个 give 方法,往 buffer 中添加了数据,并执行了 notify 方法,但 notify 并没有任何效果,因为消费者线程的 wait 方法没来得及执行,所以没有线程在等待被唤醒。
    • 此时,刚才被调度器暂停的消费者线程回来继续执行 wait 方法并进入了等待。

    虽然消费者判断了 buffer.isEmpty 条件,但真正执行 wait 方法时,之前的 buffer.isEmpty 的结果已经过期了。原因在于这里的“判断-执行”不是一个原子操作,它在中间被打断了,是线程不安全的

    如果这时没有更多的生产者进行生产,消费者便有可能陷入无穷无尽的等待,因为它错过了刚才 give 方法内的 notify 的唤醒。

    为了解决上述问题,需要用synchronized方法,确保 notify 方法永远不会在 buffer.isEmpty 和 wait 方法之间被调用,提升了程序的安全性。另外,wait 方法会释放 monitor 锁,这也要求我们必须首先进入到 synchronized 内持有这把锁。

    public void give(String data) {
       synchronized (this) {
          buffer.add(data);
          notify();
      }
    }
     
    public String take() throws InterruptedException {
       synchronized (this) {
        while (buffer.isEmpty()) {
             wait();
           }
         return buffer.remove();
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    另外,实际上还会发生“虚假唤醒”(spurious wakeup)的问题,线程可能在既没有被notify/notifyAll,也没有被中断或者超时的情况下被唤醒。虽然虚假唤醒发生的概率很小,但是程序依然需要保证在发生虚假唤醒的时候的正确性,所以就需要采用while循环的结构。即便被虚假唤醒了,也会再次检查while里面的条件,如果不满足条件,就会继续wait,也就消除了虚假唤醒的风险。

    while (condition does not hold)
        obj.wait();
    
    
    • 1
    • 2
    • 3

    2. wait/notify/notifyAll 被定义在 Object 类中,而 sleep 定义在 Thread 类中

    • Java 中每个对象都有一把称之为 monitor 监视器的锁,由于每个对象都可以上锁,这就要求在对象头中有一个用来保存锁信息的位置。这个锁是对象级别的,而非线程级别的,wait/notify/notifyAll 也都是锁级别的操作,它们的锁属于对象,所以把它们定义在 Object 类中是最合适
    • 如果把 wait/notify/notifyAll 方法定义在 Thread 类中,会带来很大的局限性,比如一个线程可能持有多把锁,以便实现相互配合的复杂逻辑,假设此时 wait 方法定义在 Thread 类中,如何实现让一个线程持有多把锁呢?又如何明确线程等待的是哪把锁呢?既然是让当前线程去等待某个对象的锁,自然应该通过操作对象来实现,而不是操作线程

    3. wait/notify vs sleep

    这两种方式的相同点在于:

    • 都可以让线程阻塞
    • 都可以响应 interrupt 中断:在等待的过程中如果收到中断信号,都可以进行响应,并抛出 InterruptedException 异常

    这两种方式的区别在于:

    • wait 方法必须在 synchronized 保护的代码中使用,而 sleep 方法并没有这个要求
    • 在同步代码中执行 sleep 方法时,并不会释放 monitor 锁,但执行 wait 方法时会主动释放 monitor 锁
    • sleep 方法中会要求必须定义一个时间,时间到期后会主动恢复,而对于没有参数的 wait 方法而言,意味着永久等待,直到被中断或被唤醒才能恢复,它并不会主动恢复
    • wait/notify 是 Object 类的方法,而 sleep 是 Thread 类的方法

    五、生产者消费者模式

    生产者消费者模式是程序设计中非常常见的一种设计模式,被广泛运用在解耦、消息队列等场景。把生产商品的一方称为生产者,把消费商品的一方称为消费者,有时生产者的生产速度特别快,但消费者的消费速度跟不上,俗称“产能过剩”,有时是多个生产者对应多个消费者。这时在生产者和消费者之间就需要一个中介来进行调度,于是便诞生了生产者消费者模式。

    消费者模式通常需要在两者之间增加一个阻塞队列作为媒介,生产者在生产数据后将数据存放在阻塞队列中,消费者获取阻塞队列中的数据。而当阻塞队列已满或是已空都可能会产生线程阻塞

    那么什么时候阻塞线程需要被唤醒?第一种情况是当消费者看到阻塞队列为空时,开始进入等待,这时生产者一旦往队列中放入数据,就会通知所有的消费者,唤醒阻塞的消费者线程。另一种情况是如果生产者发现队列已经满了,也会被阻塞,而一旦消费者获取数据之后就相当于队列空了一个位置,这时消费者就会通知所有正在阻塞的生产者进行生产。

    那么如何实现生产者消费者模式呢?这里介绍三种方法:

    1. BlockingQueue 实现生产者消费者模式

    public static void main(String[] args) {
     
    	BlockingQueue<Object> queue = new ArrayBlockingQueue<>(10);
     
    	Runnable producer = () -> {
    	   while (true) {
    	         queue.put(new Object());
    	 	}
    	};
     
    	new Thread(producer).start();
    	new Thread(producer).start();
     
    	Runnable consumer = () -> {
    	      while (true) {
    	           queue.take();
    		}
    	};
    
    	 new Thread(consumer).start();
    	 new Thread(consumer).start();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    首先,创建了一个 ArrayBlockingQueue 类型的 BlockingQueue,命名为 queue 并将它的容量设置为 10;其次,创建一个简单的生产者,while(true) 循环体中的queue.put() 负责往队列添加数据;然后,创建两个生产者线程并启动;同样消费者也非常简单,while(true) 循环体中的 queue.take() 负责消费数据,同时创建两个消费者线程并启动。为了简介设计,代码里省略了 try/catch 检测。

    BlockingQueue实现生产者消费者模式看上去很简单,但实际上 ArrayBlockingQueue 已经在背后完成了很多工作,比如队列满了就去阻塞生产者线程,队列有空就去唤醒生产者线程等。

    2. Condition 实现生产者消费者模式

    public class MyBlockingQueue {
     
       private Queue queue;
       private int max = 16;
       private ReentrantLock lock = new ReentrantLock();
       private Condition notEmpty = lock.newCondition();
       private Condition notFull = lock.newCondition();
     
     
       public MyBlockingQueue (int size) {
           this.max = size;
           queue = new LinkedList();
       }
     
       public void put(Object o) throws InterruptedException {
           lock.lock();
           try {
               while (queue.size() == max) {
                   notFull.await();
               }
               queue.add(o);
               notEmpty.signalAll();
           } finally {
               lock.unlock();
           }
       }
     
       public Object take() throws InterruptedException {
           lock.lock();
           try {
               while (queue.size() == 0) {
                   notEmpty.await();
               }
               Object item = queue.remove();
               notFull.signalAll();
               return item;
           } finally {
               lock.unlock();
           }
       }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    利用Condition自己实现BlockingQueue:

    首先,定义了一个队列变量 queue 并设置最大容量为 16;其次,定义了一个 ReentrantLock 类型的 Lock 锁,并在 Lock 锁的基础上创建两个 Condition,一个是 notEmpty,另一个是 notFull,分别代表队列没有空和没有满的条件;最后,声明了 put 和 take 这两个核心方法。

    生产者消费者模式通常是面对多线程的场景,因此需要一定的同步措施保障线程安全。

    对于put 方法,先将 Lock 锁上,然后,在 while 的条件里检测 queue 是不是已经满了,如果已经满了,则调用 notFull 的 await() 阻塞生产者线程并释放 Lock,如果没有满,则往队列放入数据并利用 notEmpty.signalAll() 通知正在等待的所有消费者并唤醒它们。最后在 finally 中利用 lock.unlock() 方法解锁,把 unlock 方法放在 finally 中是一个基本原则,否则可能会产生无法释放锁的情况。

    对于take方法,实际上是与 put 方法相互对应的,同样是通过 while 检查队列是否为空,如果为空,消费者开始等待,如果不为空则从队列中获取数据并通知生产者队列有空余位置,最后在 finally 中解锁。

    上述代码中,生产者和消费者为什么要用while循环检查队列情况,而不用if判断呢?以消费者为例,生产者消费者往往是多线程的,如果将while( queue.size() == 0 ) 改为 if( queue.size() == 0 ),第一个消费者线程获取数据时,发现队列为空,便进入等待状态;因为第一个线程在等待时会释放 Lock 锁,所以第二个消费者可以进入并执行 if( queue.size() == 0 ),也发现队列为空,于是第二个线程也进入等待;而此时,如果生产者生产了一个数据,便会唤醒两个消费者线程,而两个线程中只有一个线程可以拿到锁,并执行 queue.remove 操作,另外一个线程因为没有拿到锁而卡在被唤醒的地方,而第一个线程执行完操作后会在 finally 中通过 unlock 解锁,而此时第二个线程便可以拿到被第一个线程释放的锁,继续执行操作,也会去调用 queue.remove 操作,然而这个时候队列已经为空了,所以会抛出 NoSuchElementException 异常。而如果用 while 做检查,当第一个消费者被唤醒得到锁并移除数据之后,第二个线程在执行 remove 前仍会进行 while 检查,发现此时依然满足 queue.size() == 0 的条件,就会继续执行 await 方法,避免了获取的数据为 null 或抛出异常的情况。

    3. wait/notify 实现生产者消费者模式

    利用wait/notify自己实现BlockingQueue:

    class MyBlockingQueue {
     
       private int maxSize;
       private LinkedList<Object> storage;
     
       public MyBlockingQueue(int size) {
           this.maxSize = size;
           storage = new LinkedList<>();
       }
     
       public synchronized void put() throws InterruptedException {
           while (storage.size() == maxSize) {
               wait();
           }
           storage.add(new Object());
           notifyAll();
       }
     
       public synchronized void take() throws InterruptedException {
           while (storage.size() == 0) {
               wait();
           }
           System.out.println(storage.remove());
           notifyAll();
       }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    仍然是take 与 put 方法。对于put 方法,put 方法被 synchronized 保护,while 检查队列是否为满,如果不满就往里放入数据并通过 notifyAll() 唤醒其他线程。同样,take 方法也被 synchronized 修饰,while 检查队列是否为空,如果不为空就获取数据并唤醒其他线程。

    基于Condition和wait/notify实现BlockingQueue之后,以下为生产者消费者代码:

    public class Test {
     
       public static void main(String[] args) {
           MyBlockingQueue myBlockingQueue = new MyBlockingQueue(10);
           Producer producer = new Producer(myBlockingQueue);
           Consumer consumer = new Consumer(myBlockingQueue);
           new Thread(producer).start();
           new Thread(consumer).start();
       }
    }
     
    class Producer implements Runnable {
     
       private MyBlockingQueue storage;
     
       public Producer(MyBlockingQueue storage) {
           this.storage = storage;
       }
     
       @Override
       public void run() {
           for (int i = 0; i < 100; i++) {
               try {
                   storage.put();
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
       }
    }
     
    class Consumer implements Runnable {
     
       private MyBlockingQueue storage;
     
       public Consumer(MyBlockingQueue storage) {
           this.storage = storage;
       }
     
       @Override
       public void run() {
           for (int i = 0; i < 100; i++) {
               try {
                   storage.take();
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
       }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
  • 相关阅读:
    Java转Android:第4天 用Layout布局实现罗盘和三叉戟
    Day10—SQL那些事(特殊场景的查询)
    短视频不火怎么办?加上配音试试看|教你制作最近超火的配音旁白
    LR学习笔记——初识lightroom
    Kotlin(十) 空指针检查、字符串内嵌表达式以及函数默认值
    《分析模式》漫谈06-实例不是“一种”隔壁老王
    《HelloGitHub》第 72 期
    面试必问的ConcurrentHashMap实现原理:数据结构、get与put操作
    美容院怎么实现无线wifi短信认证?
    IntelliJ IDEA 常用快捷键-个人查阅
  • 原文地址:https://blog.csdn.net/weixin_41402069/article/details/125976626