• Java 进阶多线程(二)


    一、线程通信(了解)

    • 概念

    线程通信就是线程间相互发送数据,线程间共享一个资源即可实现线程通信

    • 常见方式

    通过共享一个数据的方式实现
    根据共享数据的情况决定自己该怎么做,以及通知其他线程怎么做

    • 场景

    生产者与消费者模型:生产者线程负责生产数据,消费者线程负责消费生产者产生的数据

    • 要求

    生产者线程生产完数据后唤醒消费者,然后等待自己,消费者消费完该数据后唤醒生产者,然后等待自己

    方法名解释
    void wait​()当前线程等待,直到另一个线程调用notify() 或 notifyAll()唤醒自己
    void notify()唤醒正在等待对象监视器(锁对象)的单个线程
    void notifyAll()唤醒正在等待对象监视器(锁对象)的所有线程

    二、线程池(重点)

    • 概述

    可以复用线程的技术

    1、获得线程池对象

    用ExecutorService实现类ThreadPoolExecutor自创建一个线程池对象
    用Executors(线程池的工具类)调用方法返回不同特点的线程池对象

    public ThreadPoolExecutor(
                int corePoolSize,//核心线程数
                int maximumPoolSize,//最大线程数
                long keepAliveTime,//线程空闲时间
                TimeUnit unit,//时间单位
                BlockingQueue<Runnable> workQueue,//任务队列
                ThreadFactory threadFactory,//线程工厂
                RejectedExecutionHandler handler//拒绝策略
        )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    参数名解释
    corePoolSize 指定线程池的线程数量(核心线程),不能小于0
    maximumPoolSize 指定线程池可支持的最大线程数,最大数量 >= 核心线程数量
    keepAliveTime 指定临时线程的最大存活时间 ,不能小于0
    unit 指定存活时间的单位(秒、分、时、天)
    workQueue 指定任务队列 ,不能为null
    threadFactory 指定用哪个线程工厂创建线程,不能为null
    handler 指定线程忙,任务满的时候,新任务来了怎么办,不能为null
    TimeUnit.DAYS;               //天
    TimeUnit.HOURS;             //小时
    TimeUnit.MINUTES;           //分钟
    TimeUnit.SECONDS;           //秒
    TimeUnit.MILLISECONDS;      //毫秒
    TimeUnit.MICROSECONDS;      //微妙
    TimeUnit.NANOSECONDS;       //纳秒
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    2、线程池处理Runnable

    ExecutorService方法解释
    void execute(Runnable command) 执行任务/命令,没有返回值,一般用来执行 Runnable 任务
    Future submit(Callable task) 执行任务,返回未来任务对象获取线程结果,一般拿来执行 Callable 任务
    void shutdown() 等任务执行完毕后关闭线程池
    List shutdownNow() 立刻关闭,停止正在执行的任务,并返回队列中未执行的任务
    4种策略解释
    ThreadPoolExecutor.AbortPolicy丢弃任务并抛出RejectedExecutionException异常。是默认的策略
    ThreadPoolExecutor.DiscardPolicy丢弃任务,但是不抛出异常 这是不推荐的做法
    ThreadPoolExecutor.DiscardOldestPolicy抛弃队列中等待最久的任务 然后把当前任务加入队列中
    ThreadPoolExecutor.CallerRunsPolicy由主线程负责调用任务的run()方法从而绕过线程池直接执行

    1)AbortPolicy实战

    当运行任务数超过核心数时,会报RejectedExecutionException错误
    在这里插入图片描述

    import java.util.concurrent.*;
    
    public class ThreadPoolExecutorTest implements Runnable {
        private String name;
    
        public ThreadPoolExecutorTest(String name) {
            this.name = name;
        }
    
        @Override
        public void run() {
            try {
                System.out.println("当前线程名: " + Thread.currentThread().getName() + ", 任务 " + name + " is running!");
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
            //线程池
            ExecutorService pools = new ThreadPoolExecutor(
                    1,
                    1,
                    1,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(1),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.AbortPolicy());
    
            ThreadPoolExecutorTest run = null;
            // 循环创建线程
            for (int i = 0; i < 5; i++) {
                run = new ThreadPoolExecutorTest("" + i);
                // 将任务添加到线程池中
                pools.execute(run);
            }
    
            //关闭线程池
            pools.shutdown();
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    2)DiscardPolicy实战

    在这里插入图片描述

    import java.util.concurrent.*;
    
    public class ThreadPoolExecutorTest implements Runnable {
        private String name;
    
        public ThreadPoolExecutorTest(String name) {
            this.name = name;
        }
    
        @Override
        public void run() {
            try {
                System.out.println("当前线程名: " + Thread.currentThread().getName() + ", 任务 " + name + " is running!");
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
            //线程池
            ExecutorService pools = new ThreadPoolExecutor(
                    1,
                    1,
                    1,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(1),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.DiscardPolicy());
    
            ThreadPoolExecutorTest run = null;
            // 循环创建线程
            for (int i = 0; i < 5; i++) {
                run = new ThreadPoolExecutorTest("" + i);
                // 将任务添加到线程池中
                pools.execute(run);
            }
    
            //关闭线程池
            pools.shutdown();
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    3)DiscardOldestPolicy实战

    在这里插入图片描述

    import java.util.concurrent.*;
    
    public class ThreadPoolExecutorTest implements Runnable {
        private String name;
    
        public ThreadPoolExecutorTest(String name) {
            this.name = name;
        }
    
        @Override
        public void run() {
            try {
                System.out.println("当前线程名: " + Thread.currentThread().getName() + ", 任务 " + name + " is running!");
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
            //线程池
            ExecutorService pools = new ThreadPoolExecutor(
                    1,
                    1,
                    1,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(1),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.DiscardOldestPolicy());
    
            ThreadPoolExecutorTest run = null;
            // 循环创建线程
            for (int i = 0; i < 5; i++) {
                run = new ThreadPoolExecutorTest("" + i);
                // 将任务添加到线程池中
                pools.execute(run);
            }
    
            //关闭线程池
            pools.shutdown();
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    4)CallerRunsPolicy实战

    在这里插入图片描述

    import java.util.concurrent.*;
    
    public class ThreadPoolExecutorTest implements Runnable {
        private String name;
    
        public ThreadPoolExecutorTest(String name) {
            this.name = name;
        }
    
        @Override
        public void run() {
            try {
                System.out.println("当前线程名: " + Thread.currentThread().getName() + ", 任务 " + name + " is running!");
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
            //线程池
            ExecutorService pools = new ThreadPoolExecutor(
                    1,
                    1,
                    1,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(1),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.CallerRunsPolicy());
    
            ThreadPoolExecutorTest run = null;
            // 循环创建线程
            for (int i = 0; i < 5; i++) {
                run = new ThreadPoolExecutorTest("" + i);
                // 将任务添加到线程池中
                pools.execute(run);
            }
    
            //关闭线程池
            pools.shutdown();
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    3、线程池处理Callable

    ExecutorService方法解释
    void execute(Runnable command) 执行任务/命令,没有返回值,一般用来执行 Runnable 任务
    Future submit(Callable task) 执行任务,返回未来任务对象获取线程结果,一般拿来执行 Callable 任务
    void shutdown() 等任务执行完毕后关闭线程池
    List shutdownNow() 立刻关闭,停止正在执行的任务,并返回队列中未执行的任务

    在这里插入图片描述

    import java.util.concurrent.Callable;
    
    public class MyCallable implements Callable {
        private String name;
    
        public MyCallable(String name) {
            this.name = name;
        }
    
        @Override
        public Object call() throws Exception {
            for (int i = 0; i < 10; i++) {
                System.out.println(Thread.currentThread().getName() + name + " " + i);
            }
            return true;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    import java.util.concurrent.*;
    
    public class ThreadPoolExecutorTest {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            //创建线程池
            ExecutorService pools = Executors.newFixedThreadPool(3);
    
            MyCallable myCallable1 = new MyCallable(" hadoop");
            MyCallable myCallable2 = new MyCallable(" flink");
            MyCallable myCallable3 = new MyCallable(" spark");
    
            //提交执行
            Future<Boolean> sb1 = pools.submit(myCallable1);
            Future<Boolean> sb2 = pools.submit(myCallable2);
            Future<Boolean> sb3 = pools.submit(myCallable3);
    
            //获取结果
            boolean b1 = sb1.get();
            boolean b2 = sb2.get();
            boolean b3 = sb3.get();
    
            System.out.println(b1);
            System.out.println(b2);
            System.out.println(b3);
    
            // 关闭线程池,如不关闭,线程池会一直运行
            pools.shutdown();
        }
    
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    4、Callable和Runnable接口的区别

    Callable接口中的Call方法有返回值,Runnable接口中的Run方法没有返回值
    Callable接口中的Call方法有声明异常,Runnable接口中的Run方法没有异常

  • 相关阅读:
    mfc 浮动窗口
    Python实现一个简单的http服务,Url传参输出html页面
    读后感-雷军2022年度演讲全文
    编程获取图像中的圆半径
    Oracle中实现获取指定表名称批量修改表中的字段类型
    Trino 391 安装配置
    已解决javax.transaction.InvalidTransactionException:事务无效的正确解决方法,亲测有效!!!
    【飞控开发高级教程7】疯壳·开源编队无人机-编队飞行
    华为笔试面试题
    6.前端·新建子模块与开发(常规开发)
  • 原文地址:https://blog.csdn.net/walykyy/article/details/126880582