• Java线程池


    自定义线程池

    1. 简介

    1.1 引入原因

    1. 一个任务过来,一个线程去做。如果每次过来都创建新线程,性能低且比较耗费内存
    2. 线程数多于cpu核心,线程切换,要保存原来线程的状态,运行现在的线程,势必会更加耗费资源
       线程数少于cpu核心,不能很好的利用多线程的性能
       
    3. 充分利用已有线程,去处理原来的任务
    
    • 1
    • 2
    • 3
    • 4
    • 5

    1.2. 线程池组件

    1. 消费者(线程池):                 保存一定数量线程来处理任务
    2. 生产者:                        客户端源源不断产生的新任务
    3. 阻塞队列(blocking queue):      平衡消费者和生产者之间,用来保存任务 的一个等待队列
    
    - 生产任务速度较快,多余的任务要等
    - 生产任务速度慢,那么线程池中存活的线程等
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    image-20221012105847884

    2. 自定义线程池

    2.1 不带超时

    阻塞队列
    package com.erick.multithread.d6;
    
    import java.util.ArrayDeque;
    import java.util.Deque;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class BlockingQueue<T> {
    
        /*阻塞队列的上限*/
        private int capacity;
        /*保存具体任务: 也就是Runnable的池子*/
        private Deque<T> blockingQueue = new ArrayDeque<>();
    
        /*锁:从池子中拿或取时需要*/
        private ReentrantLock lock = new ReentrantLock(true);
    
        /*池子满时,生产者线程等待*/
        private Condition producerRoom = lock.newCondition();
    
        /*池子空时,消费者线程等待*/
        private Condition consumerRoom = lock.newCondition();
    
        public BlockingQueue(int capacity) {
            this.capacity = capacity;
        }
    
        public T getTask() {
            try {
                lock.lock();
    
                while (blockingQueue.isEmpty()) {
                    System.out.println("阻塞队列为空,消费者等待");
                    try {
                        consumerRoom.await();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
    
                T task = blockingQueue.removeLast();
                producerRoom.signal();
                return task;
            } finally {
                lock.unlock();
            }
        }
    
        /*生产任务:一直等待*/
        public void addTask(T t) {
            try {
                lock.lock();
    
                while (blockingQueue.size() == capacity) {
                    System.out.println("阻塞队列已满,生产者等待");
                    try {
                        producerRoom.await();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
                blockingQueue.addFirst(t);
                consumerRoom.signal();
            } finally {
                lock.unlock();
            }
        }
    
        /*获取队列大小*/
        public int getSize() {
            try {
                lock.lock();
                return blockingQueue.size();
            } finally {
                lock.unlock();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    线程池
    package com.erick.multithread.d6;
    
    import java.util.HashSet;
    import java.util.Set;
    
    /*自定义线程池*/
    public class ErickThreadPool<T> {
        /*阻塞队列*/
        private BlockingQueue<T> blockingQueue;
    
        /*装工作线程的池子*/
        private final Set<Worker> pool = new HashSet<>();
        /*核心线程数*/
        private int coreThreadSize;
    
        public ErickThreadPool(int blockQueueCapacity, int coreThreadSize) {
            blockingQueue = new BlockingQueue<>(blockQueueCapacity);
            this.coreThreadSize = coreThreadSize;
        }
    
        /**
         * 任务具体执行流程: 外界接口的任务(Thread) 来了
         * 1. 当前池子没满,则新建一个线程并加入到池子中
         * 2. 如果池子已经满了,当前任务进入到阻塞队列中等待
         */
        public synchronized void executeTask(Runnable task) {
            if (pool.size() < coreThreadSize) {
                Worker worker = new Worker(task);
                pool.add(worker);
                System.out.println("创建新的线程来执行任务");
                worker.start();
            } else {
                System.out.println("线程池已满,生产者暂时等待");
                blockingQueue.addTask((T) task);
            }
        }
    
        /*线程池中具体干活的线程*/
        class Worker extends Thread {
            private Runnable task;
    
            public Worker(Runnable task) {
                this.task = task;
            }
    
            /*阻塞获取,一直等待*/
            @Override
            public void run() {
              // 获取任务的时候,是一直等待,死等,因此线程一直不会结束
                while (task != null || (task = (Runnable) blockingQueue.getTask()) != null) {
                    try {
                        task.run();
                    } catch (Exception e) {
                        System.out.println("线程执行任务出错");
                    } finally {
                        task = null;
                    }
                }
                /*任务执行完毕后,将该线程从池子中移除*/
                synchronized (pool) {
                    System.out.println("线程销毁");
                    pool.remove(this);
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    测试代码
    package com.erick.multithread.d6;
    
    import java.util.Date;
    
    public class Test {
        public static void main(String[] args) {
            ErickThreadPool pool = new ErickThreadPool(10, 3);
    
            for (int i = 0; i < 5; i++) {
                pool.executeTask(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println(Thread.currentThread().getName() + ":" + new Date());
                    }
                });
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    2.2 超时等待

    • 上面线程池中的worker线程获取blockingqueue的时候,即使阻塞队列中没有任务,也会一直死等,并不会结束
    阻塞队列
    package com.erick.multithread.d6;
    
    import java.util.ArrayDeque;
    import java.util.Deque;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class BlockingQueue<T> {
    
        /*阻塞队列的上限*/
        private int capacity;
        /*保存具体任务: 也就是Runnable的池子*/
        private Deque<T> blockingQueue = new ArrayDeque<>();
    
        /*锁:从池子中拿或取时需要*/
        private ReentrantLock lock = new ReentrantLock(true);
    
        /*池子满时,生产者线程等待*/
        private Condition producerRoom = lock.newCondition();
    
        /*池子空时,消费者线程等待*/
        private Condition consumerRoom = lock.newCondition();
    
        public BlockingQueue(int capacity) {
            this.capacity = capacity;
        }
    
        /*获取任务:一直等待*/
        public T getTask() {
            try {
                lock.lock();
    
                while (blockingQueue.isEmpty()) {
                    System.out.println("阻塞队列为空,消费者等待");
                    try {
                        consumerRoom.await();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
    
                T task = blockingQueue.removeLast();
                producerRoom.signal();
                return task;
            } finally {
                lock.unlock();
            }
        }
    
        /*获取任务: 超时不侯*/
        public T getTask(long timeout, TimeUnit timeUnit) {
            try {
                lock.lock();
                long nanos = timeUnit.toNanos(timeout);
    
                while (blockingQueue.isEmpty()) {
                    if (nanos < 0) {
                        return null;
                    }
    
                    try {
                        nanos = consumerRoom.awaitNanos(nanos);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
                T task = blockingQueue.removeLast();
                producerRoom.signal();
                return task;
            } finally {
                lock.unlock();
            }
        }
    
    
        /*生产任务:一直等待*/
        public void addTask(T t) {
            try {
                lock.lock();
    
                while (blockingQueue.size() == capacity) {
                    System.out.println("阻塞队列已满,生产者等待");
                    try {
                        producerRoom.await();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
                blockingQueue.addFirst(t);
                consumerRoom.signal();
            } finally {
                lock.unlock();
            }
        }
    
        /*获取队列大小*/
        public int getSize() {
            try {
                lock.lock();
                return blockingQueue.size();
            } finally {
                lock.unlock();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    线程池
    package com.erick.multithread.d6;
    
    import java.util.HashSet;
    import java.util.Set;
    import java.util.concurrent.TimeUnit;
    
    /*自定义线程池*/
    public class ErickThreadPool<T> {
        /*阻塞队列*/
        private BlockingQueue<T> blockingQueue;
    
        /*装工作线程的池子*/
        private final Set<Worker> pool = new HashSet<>();
        /*核心线程数*/
        private int coreThreadSize;
    
        /*线程池子中的线程,等待获取的任务的时候,如果超时,则线程kill掉*/
        private long timeout;
        private TimeUnit timeUnit;
    
        public ErickThreadPool(int blockQueueCapacity, int coreThreadSize) {
            blockingQueue = new BlockingQueue<>(blockQueueCapacity);
            this.coreThreadSize = coreThreadSize;
        }
    
        public ErickThreadPool(int blockQueueCapacity, int coreThreadSize, long timeout, TimeUnit timeUnit) {
            this(blockQueueCapacity, coreThreadSize);
            this.timeUnit = timeUnit;
            this.timeout = timeout;
        }
    
        /**
         * 任务具体执行流程: 外界接口的任务(Thread) 来了
         * 1. 当前池子没满,则新建一个线程并加入到池子中
         * 2. 如果池子已经满了,当前任务进入到阻塞队列中等待
         */
        public synchronized void executeTask(Runnable task) {
            if (pool.size() < coreThreadSize) {
                Worker worker = new Worker(task);
                pool.add(worker);
                System.out.println("创建新的线程来执行任务");
                worker.start();
            } else {
                System.out.println("线程池已满,生产者暂时等待");
                blockingQueue.addTask((T) task);
            }
        }
    
        /*线程池中具体干活的线程*/
        class Worker extends Thread {
            private Runnable task;
    
            public Worker(Runnable task) {
                this.task = task;
            }
    
            /*阻塞获取,一直等待*/
            @Override
            public void run() {
                while (task != null || (task = (Runnable) blockingQueue.getTask(timeout, timeUnit)) != null) {
                    try {
                        task.run();
                    } catch (Exception e) {
                        System.out.println("线程执行任务出错");
                    } finally {
                        task = null;
                    }
                }
                /*任务执行完毕后,将该线程从池子中移除*/
                synchronized (pool) {
                    System.out.println("线程销毁");
                    pool.remove(this);
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    测试代码
    package com.erick.multithread.d6;
    
    import java.util.Date;
    import java.util.concurrent.TimeUnit;
    
    public class Test {
        public static void main(String[] args) {
            ErickThreadPool pool = new ErickThreadPool(10, 3, 5, TimeUnit.SECONDS);
    
            for (int i = 0; i < 5; i++) {
                pool.executeTask(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println(Thread.currentThread().getName() + ":" + new Date());
                    }
                });
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    2.3 生产者-超时设置

    • 当阻塞队列中已满,并且核心线程都在工作的时候,生产者线程提供的任务就会进行等待
    • 让任务生产者自己决定该如何执行
    # 拒绝策略
    - 死等
    - 带超时等待
    - 让调用者放弃执行任务
    - 让调用者抛出异常
    - 让调用者自己执行任务
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    阻塞队列
    package com.erick.multithread.d6;
    
    import java.util.ArrayDeque;
    import java.util.Deque;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class BlockingQueue<T> {
    
        /*阻塞队列的上限*/
        private int capacity;
        /*保存具体任务: 也就是Runnable的池子*/
        private Deque<T> blockingQueue = new ArrayDeque<>();
    
        /*锁:从池子中拿或取时需要*/
        private ReentrantLock lock = new ReentrantLock(true);
    
        /*池子满时,生产者线程等待*/
        private Condition producerRoom = lock.newCondition();
    
        /*池子空时,消费者线程等待*/
        private Condition consumerRoom = lock.newCondition();
    
        public BlockingQueue(int capacity) {
            this.capacity = capacity;
        }
    
        /*获取任务:一直等待*/
        public T getTask() {
            try {
                lock.lock();
    
                while (blockingQueue.isEmpty()) {
                    System.out.println("阻塞队列为空,消费者等待");
                    try {
                        consumerRoom.await();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
    
                T task = blockingQueue.removeLast();
                producerRoom.signal();
                return task;
            } finally {
                lock.unlock();
            }
        }
    
        /*获取任务: 超时不侯*/
        public T getTask(long timeout, TimeUnit timeUnit) {
            try {
                lock.lock();
                long nanos = timeUnit.toNanos(timeout);
    
                while (blockingQueue.isEmpty()) {
                    if (nanos < 0) {
                        return null;
                    }
    
                    try {
                        nanos = consumerRoom.awaitNanos(nanos);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
                T task = blockingQueue.removeLast();
                producerRoom.signal();
                return task;
            } finally {
                lock.unlock();
            }
        }
    
    
        /*生产任务:一直等待*/
        public void addTask(T t) {
            try {
                lock.lock();
    
                while (blockingQueue.size() == capacity) {
                    System.out.println("阻塞队列已满,生产者等待");
                    try {
                        producerRoom.await();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
                blockingQueue.addFirst(t);
                consumerRoom.signal();
            } finally {
                lock.unlock();
            }
        }
    
        /*带超时的添加*/
        public boolean addTask(T t, long timeout, TimeUnit timeUnit) {
            try {
                lock.lock();
                long nanos = timeUnit.toNanos(timeout);
    
                while (blockingQueue.size() == capacity) {
                    if (nanos < 0) {
                        return false;
                    }
                    try {
                        nanos = producerRoom.awaitNanos(nanos);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
    
                blockingQueue.addFirst(t);
                consumerRoom.signal();
                return true;
            } finally {
                lock.unlock();
            }
        }
    
        public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
            try {
                lock.lock();
                if (blockingQueue.size() == capacity) {
                    /*具体操作的权利:下放给对应的consumer*/
                    rejectPolicy.reject(this, (Runnable) task);
                    return;
                }
                System.out.println("加入阻塞队列");
                blockingQueue.addFirst(task);
                consumerRoom.signal();
            } finally {
                lock.unlock();
            }
        }
    
        /*获取队列大小*/
        public int getSize() {
            try {
                lock.lock();
                return blockingQueue.size();
            } finally {
                lock.unlock();
            }
        }
    }
    
    interface RejectPolicy<T> {
        void reject(BlockingQueue<T> blockingQueue, Runnable task);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    线程池
    package com.erick.multithread.d6;
    
    import java.util.HashSet;
    import java.util.Set;
    import java.util.concurrent.TimeUnit;
    
    /*自定义线程池*/
    public class ErickThreadPool<T> {
        /*阻塞队列*/
        private BlockingQueue<T> blockingQueue;
    
        /*装工作线程的池子*/
        private final Set<Worker> pool = new HashSet<>();
        /*核心线程数*/
        private int coreThreadSize;
    
        /*线程池子中的线程,等待获取的任务的时候,如果超时,则线程kill掉*/
        private long timeout;
        private TimeUnit timeUnit;
    
        /*拒绝策略*/
        private RejectPolicy<T> rejectPolicy;
    
        public ErickThreadPool(int blockQueueCapacity, int coreThreadSize) {
            blockingQueue = new BlockingQueue<>(blockQueueCapacity);
            this.coreThreadSize = coreThreadSize;
        }
    
        public ErickThreadPool(int blockQueueCapacity, int coreThreadSize, long timeout, TimeUnit timeUnit, RejectPolicy<T> rejectPolicy) {
            this(blockQueueCapacity, coreThreadSize);
            this.timeUnit = timeUnit;
            this.timeout = timeout;
            this.rejectPolicy = rejectPolicy;
        }
    
        /**
         * 任务具体执行流程: 外界接口的任务(Thread) 来了
         * 1. 当前池子没满,则新建一个线程并加入到池子中
         * 2. 如果池子已经满了,当前任务进入到阻塞队列中等待
         */
        public synchronized void executeTask(Runnable task) {
            if (pool.size() < coreThreadSize) {
                Worker worker = new Worker(task);
                pool.add(worker);
                System.out.println("创建新的线程来执行任务");
                worker.start();
            } else {
                System.out.println("线程池已满,生产者???");
                blockingQueue.tryPut(rejectPolicy, (T) task);
            }
        }
    
        /*线程池中具体干活的线程*/
        class Worker extends Thread {
            private Runnable task;
    
            public Worker(Runnable task) {
                this.task = task;
            }
    
            /*阻塞获取,一直等待*/
            @Override
            public void run() {
                while (task != null || (task = (Runnable) blockingQueue.getTask(timeout, timeUnit)) != null) {
                    try {
                        task.run();
                    } catch (Exception e) {
                        System.out.println("线程执行任务出错");
                    } finally {
                        task = null;
                    }
                }
                /*任务执行完毕后,将该线程从池子中移除*/
                synchronized (pool) {
                    System.out.println("线程销毁");
                    pool.remove(this);
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    测试代码
    package com.erick.multithread.d6;
    
    import java.util.Date;
    import java.util.concurrent.TimeUnit;
    
    public class Test {
        public static void main(String[] args) {
            ErickThreadPool pool = new ErickThreadPool(1, 2, 5, TimeUnit.SECONDS, new ProducerException());
    
            for (int i = 0; i < 10; i++) {
                pool.executeTask(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println(Thread.currentThread().getName() + ":" + new Date());
                        try {
                            TimeUnit.SECONDS.sleep(5);
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    }
                });
            }
        }
    }
    
    /*死等的逻辑*/
    class StillWait implements RejectPolicy {
        @Override
        public void reject(BlockingQueue blockingQueue, Runnable task) {
    
            blockingQueue.addTask(task);
        }
    }
    
    /*超时等待的逻辑*/
    class WaitWithTimeOut implements RejectPolicy {
    
        @Override
        public void reject(BlockingQueue blockingQueue, Runnable task) {
            blockingQueue.addTask(task, 1, TimeUnit.SECONDS);
        }
    }
    
    /*调用者放弃任务*/
    class ProducerGiveUp implements RejectPolicy {
    
        @Override
        public void reject(BlockingQueue blockingQueue, Runnable task) {
            System.out.println("调用者抛弃任务");
        }
    }
    
    class ProducerExecute implements RejectPolicy {
    
        @Override
        public void reject(BlockingQueue blockingQueue, Runnable task) {
            System.out.println("调用者自己执行任务");
            new Thread(task).start();
        }
    }
    
    class ProducerException implements RejectPolicy {
    
        /*后续其他任务就不会进来执行*/
        @Override
        public void reject(BlockingQueue blockingQueue, Runnable task) {
            throw new RuntimeException("核心线程已在工作,阻塞队列已满");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69

    JDK线程池

    1. 类图

    image-20221018074250288

    2. 线程状态

    • ThreadPoolExecutor 使用int的高3位来表示线程池状态,低29位表示线程数量

    image-20221018074517365

    3. ThreadPoolExecutor

    3.1 构造方法

    int corePoolSize:                     // 核心线程数
    int maximumPoolSize:                 // 最大线程数
    long keepAliveTime:                  // 救急线程数执行任务完后存活时间
    TimeUnit unit:                       // 救急线程数执行任务完后存活时间
    BlockingQueue<Runnable> workQueue:   // 阻塞队列
    ThreadFactory threadFactory:         // 线程生产工厂,为线程起名字
    RejectedExecutionHandler handler:    // 拒绝策略 
    
     public ThreadPoolExecutor(int corePoolSize,
                               int maximumPoolSize,
                               long keepAliveTime,
                               TimeUnit unit,
                               BlockingQueue<Runnable> workQueue,
                               ThreadFactory threadFactory,
                               RejectedExecutionHandler handler)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    3.2 核心线程和救急线程

    1. 核心线程: 执行完任务后,会继续保留在线程池中
    
    2. 救急线程:如果阻塞队列已满,并且没有空余的核心线程。那么会创建救急线程来执行任务
      2.1 任务执行完毕后,这个线程就会被销毁(临时工)
      2.2 必须是有界阻塞,如果是无界队列,则不需要创建救急线程
    
    3. 拒绝策略: 有界队列,核心线程满负荷,阻塞队列已满,无空余救急线程,才会执行拒绝
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    3.3 JDK拒绝策略

    • 如果线程达到最大线程数,救急线程也满负荷,且有界队列也满了,JDK 提供了4种拒绝策略
    AbortPolicy:           调用者抛出RejectedExecutionException,  默认策略
    CallerRunsPolicy:      调用者运行任务
    DiscardPolicy:         放弃本次任务
    DiscardOldestPolicy:   放弃阻塞队列中最早的任务,本任务取而代之
    
    # 第三方框架的技术实现
    - Dubbo: 在抛出异常之前,记录日志,并dump线程栈信息,方便定位问题
    - Netty: 创建一个新的线程来执行任务
    - ActiveMQ: 带超时等待(60s), 尝试放入阻塞队列
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    image-20221018075430425

    4. Executors类工厂方法

    • 默认的构造方法来创建线程池,参数过多,JDK提供了工厂方法,来创建线程池

    4.1 固定大小

    • 核心线程数 = 最大线程数,救急线程数为0
    • 阻塞队列:无界,可以存放任意数量的任务
    # 应用场景
    任务量已知,但是线程执行时间较长
    执行任务后,线程并不会结束
    
    • 1
    • 2
    • 3
    public static ExecutorService newFixedThreadPool(int nThreads) {
        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    package com.erick.multithread.d7;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class Demo01 {
        public static void main(String[] args) {
            ExecutorService pool = Executors.newFixedThreadPool(2, new ThreadFactory() {
                private AtomicInteger num = new AtomicInteger(0);
    
                @Override
                public Thread newThread(Runnable r) {
                    // 给线程起一个名字
                    return new Thread(r, "erick-pool" + num.getAndIncrement());
                }
            });
    
            pool.execute(() -> System.out.println(Thread.currentThread().getName() + " running"));
            pool.execute(() -> System.out.println(Thread.currentThread().getName() + " running"));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    4.2 带缓冲

    • 核心线程数为0, 最大线程数为Integer的无限大
    • 全部是救急线程,等待时间是60s,60s后就会消亡
    • SynchronousQueue: 没有容量,没有线程来取的时候是放不进去的
    • 整个线程池数会随着任务数目增长,1分钟后没有其他活动会消亡
    # 应用场景
    1. 时间较短的线程
    2. 数量大,任务执行时间长,会造成  OutOfMmeory问题
    
    • 1
    • 2
    • 3
     public static ExecutorService newCachedThreadPool() {
         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                       60L, TimeUnit.SECONDS,
                                       new SynchronousQueue<Runnable>());
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    4.3. 单线程

    • 线程池大小始终为1个,不能改变线程数
    • 相比自定义一个线程来执行,线程池可以保证前面任务的失败,不会影响到后续任务
    # 1. 和自定义线程的区别
    自定义线程:  执行多个任务时,一个出错,后续都能不能执行了
    单线程池:    一个任务失败后,会结束出错线程。重新new一个线程来执行下面的任务
    
    # 2. 执行顺序
    单线程池: 保证所有任务都是串行
    
    # 3. 和newFixedThreadPool的区别
    newFixedThreadPool:          初始化后,还可以修改线程大小
    newSingleThreadExecutor:     不可以修改
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    package com.nike.erick.d07;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class Demo01 {
        public static void main(String[] args) {
            method03();
        }
    
        private static void method01() {
            ExecutorService pool = Executors.newFixedThreadPool(2);
            /*pool-1-thread-1   pool-1-thread-2  pool-1-thread-1*/
            pool.execute(() -> System.out.println(Thread.currentThread().getName() + " working"));
            pool.execute(() -> System.out.println(Thread.currentThread().getName() + " working"));
            pool.execute(() -> System.out.println(Thread.currentThread().getName() + " working"));
        }
    
        private static void method02() {
            ExecutorService pool = Executors.newCachedThreadPool();
            /*pool-1-thread-1  pool-1-thread-2  pool-1-thread-3*/
            pool.execute(() -> System.out.println(Thread.currentThread().getName() + " working"));
            pool.execute(() -> System.out.println(Thread.currentThread().getName() + " working"));
            pool.execute(() -> System.out.println(Thread.currentThread().getName() + " working"));
        }
    
        private static void method03() {
            ExecutorService pool = Executors.newSingleThreadExecutor();
            /*第一个任务执行失败后,后续任务会继续执行*/
            pool.execute(() -> {
                int i = 1 / 0;
                System.out.println(Thread.currentThread().getName() + " running");
            });
    
            pool.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " running");
            });
    
            pool.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " running");
            });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    5. 提交任务

    5.1. execute

    void execute(Runnable command)
    
    • 1

    5.2. submit

    Future<?> submit(Runnable task);
    
    // 可以从 Future 对象中获取一些执行任务的最终结果
    <T> Future<T> submit(Runnable task, T result);
    
    <T> Future<T> submit(Callable<T> task);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    3. invokeAll

    • 执行集合中的所有的任务
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    
    • 1
    • 2

    4. invokeAny

    • 集合中之要有一个任务执行完毕,其他任务就不再执行
    package com.nike.erick.d07;
    
    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.List;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    
    public class Demo02 {
        public static void main(String[] args) throws InterruptedException, ExecutionException {
            ExecutorService pool = Executors.newFixedThreadPool(10);
            method05(pool);
        }
    
        /* void execute(Runnable command) */
        public static void method01(ExecutorService pool) {
            pool.execute(() -> System.out.println(Thread.currentThread().getName() + " running"));
        }
    
        /*   Future submit(Runnable task, T result)
         * Future submit(Runnable task) */
        public static void method02(ExecutorService pool) throws InterruptedException {
            Future<?> result = pool.submit(new Thread(() -> System.out.println(Thread.currentThread().getName() + " running")));
            TimeUnit.SECONDS.sleep(1);
            System.out.println(result.isDone());
            System.out.println(result.isCancelled());
        }
    
        /*
         *  Future submit(Callable task)*/
        public static void method03(ExecutorService pool) throws InterruptedException, ExecutionException {
            Future<String> submit = pool.submit(() -> "success");
            TimeUnit.SECONDS.sleep(1);
            System.out.println(submit.isDone());
            System.out.println(submit.isCancelled());
            System.out.println(submit.get()); // 返回结果是success
        }
    
        /*  List> invokeAll(Collection> tasks) throws InterruptedException;*/
        public static void method04(ExecutorService pool) throws InterruptedException {
            Collection tasks = new ArrayList();
            for (int i = 0; i < 10; i++) {
                int round = i;
                tasks.add((Callable) () -> {
                    System.out.println(Thread.currentThread().getName() + " running");
                    return "success:" + round;
                });
            }
            List results = pool.invokeAll(tasks);
    
            TimeUnit.SECONDS.sleep(1);
            System.out.println(results);
        }
    
        /*
         *      T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException;
         * */
        public static void method05(ExecutorService pool) throws InterruptedException, ExecutionException {
            ExecutorService service = Executors.newFixedThreadPool(1);
            Collection<Callable<String>> tasks = new ArrayList<>();
    
            tasks.add(() -> {
                System.out.println("first task");
                TimeUnit.SECONDS.sleep(1);
                return "success";
            });
    
            tasks.add(() -> {
                System.out.println("second task");
                TimeUnit.SECONDS.sleep(2);
                return "success";
            });
    
    
            tasks.add(() -> {
                System.out.println("third task");
                TimeUnit.SECONDS.sleep(3);
                return "success";
            });
            // 任何一个任务执行完后,就会返回结果
            String result = pool.invokeAny(tasks);
            System.out.println(result);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88

    6. 关闭线程池

    6.1 shutdown

    • 将线程池的状态改变为SHUTDOWN状态
    • 不会接受新任务,已经提交的任务不会停止
    • 不会阻塞调用线程的执行
    void shutdown();
    
    • 1
    package com.dreamer.multithread.day09;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    public class Demo04 {
        public static void main(String[] args) {
            ExecutorService pool = Executors.newFixedThreadPool(1);
            pool.submit(() -> {
                try {
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println(Thread.currentThread().getName() + " first running");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
    
            pool.submit(() -> {
                try {
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println(Thread.currentThread().getName() + " second running");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
    
            pool.submit(() -> {
                try {
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println(Thread.currentThread().getName() + " third running");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
    
            // 不会阻塞主线程的执行
            pool.shutdown();
            System.out.println("main thread ending");
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    6.2. shutdownNow

    • 不会接受新任务
    • 没执行的任务会打断
    • 将等待队列中的任务返回
    List<Runnable> shutdownNow();
    
    • 1
    package com.dreamer.multithread.day09;
    
    import java.util.List;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    public class Demo04 {
        public static void main(String[] args) {
            ExecutorService pool = Executors.newFixedThreadPool(1);
            pool.submit(() -> {
                try {
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println(Thread.currentThread().getName() + " first running");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
    
            pool.submit(() -> {
                try {
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println(Thread.currentThread().getName() + " second running");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
    
            pool.submit(() -> {
                try {
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println(Thread.currentThread().getName() + " third running");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
    
            // 不会阻塞主线程的执行
            List<Runnable> leftOver = pool.shutdownNow();
            System.out.println(leftOver.size()); // 2
            System.out.println("main thread ending"); 
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    线程池拓展

    1. 异步模式之工作线程

    1.1 Worker Thread

    • 让有限的工作线程来轮流异步处理无限多的任务
    • 分类:不同的任务类型应该使用不同的线程池

    1.2 饥饿现象

    • 固定大小线程池会有饥饿现象
    - 两个工人是同一个线程池中的两个线程, 为客人点餐和后厨做菜,这是两个阶段的工作
    - 客人点餐:必须先点餐,等菜做好,上菜,在此期间,处理点餐的工人必须等待
    - A工人处理了点餐任务,B工人把菜做好,然后上菜,配合正常
    - 同时来了两个客人,A和B工人都去处理点餐了,没人做饭了,出现线程数不足导致的资源饥饿
    
    • 1
    • 2
    • 3
    • 4
    正常
    package com.erick.multithread.d7;
    
    import java.util.Arrays;
    import java.util.List;
    import java.util.Random;
    import java.util.concurrent.*;
    
    public class Demo02 {
        private static List<String> MENU = Arrays.asList("宫保鸡丁", "地三鲜", "辣子鸡丁", "红烧肉");
    
        private static Random random = new Random();
    
        private static String cooking() {
            return MENU.get(random.nextInt(MENU.size()));
        }
    
        public static void main(String[] args) {
            ExecutorService pool = Executors.newFixedThreadPool(2);
            pool.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("开始处理点餐");
                    Future<String> cook = pool.submit(new Callable<String>() {
                        @Override
                        public String call() throws Exception {
                            System.out.println("开始做菜");
                            return cooking();
                        }
                    });
    
                    try {
                        String result = cook.get();
                        System.out.println("上菜:" + result);
                    } catch (InterruptedException | ExecutionException e) {
                        throw new RuntimeException(e);
                    }
                }
            });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    线程池饥饿
    package com.erick.multithread.d7;
    
    import java.util.Arrays;
    import java.util.List;
    import java.util.Random;
    import java.util.concurrent.*;
    
    public class Demo02 {
        private static List<String> MENU = Arrays.asList("宫保鸡丁", "地三鲜", "辣子鸡丁", "红烧肉");
    
        private static Random random = new Random();
    
        private static String cooking() {
            return MENU.get(random.nextInt(MENU.size()));
        }
    
        public static void main(String[] args) {
            ExecutorService pool = Executors.newFixedThreadPool(2);
            pool.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("开始处理点餐");
                    Future<String> cook = pool.submit(new Callable<String>() {
                        @Override
                        public String call() throws Exception {
                            System.out.println("开始做菜");
                            return cooking();
                        }
                    });
    
                    try {
                        String result = cook.get();
                        System.out.println("上菜:" + result);
                    } catch (InterruptedException | ExecutionException e) {
                        throw new RuntimeException(e);
                    }
                }
            });
    
            pool.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("开始处理点餐");
                    Future<String> cook = pool.submit(new Callable<String>() {
                        @Override
                        public String call() throws Exception {
                            System.out.println("开始做菜");
                            return cooking();
                        }
                    });
    
                    try {
                        String result = cook.get();
                        System.out.println("上菜:" + result);
                    } catch (InterruptedException | ExecutionException e) {
                        throw new RuntimeException(e);
                    }
                }
            });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    解决方法
    • 最简单的方法: 增加线程池的线程数量,但是不能从根本解决问题
    • 解决方法:不同的任务类型,使用不同的线程池

    2. 线程数量

    • 过小,导致cpu资源不能充分利用,浪费性能
    • 过大,线程上下文切换浪费性能,每个线程也要占用内存导致占用内存过多

    2.1 CPU密集型

    • 如果线程的任务主要是和cpu资源打交道,比如大数据运算,称为CPU密集型
    • 线程数量: 核心数+1
    • +1: 保证某线程由于某些原因(操作系统方面)导致暂停时,额外线程可以启动,不浪费CPU资源

    2.2. IO密集型

    • IO操作,RPC调用,数据库访问时,CPU是空闲的,称为IO密集型
    • 更加常见: IO操作,远程RPC调用,数据库操作
    • 线程数 = 核数 * 期望cpu利用率 * (CPU计算时间 + CPU等待时间) / CPU 计算时间

    image-20221018104629282

    3. 调度功能

    3.1 延时执行

    • 如果希望线程延时执行任务
    package com.dreamer.multithread.day09;
    
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    public class Demo05 {
        public static void main(String[] args) {
            // 两个线程,分别可以延时执行不同的任务
            ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
    
            // 延时1s后执行
            pool.schedule(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " first running");
                }
            }, 1, TimeUnit.SECONDS);
    
            // 延时5s后执行
            pool.schedule(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " second running");
                }
            }, 5, TimeUnit.SECONDS);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    package com.dreamer.multithread.day09;
    
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    public class Demo06 {
        public static void main(String[] args) {
            // 如果是单个线程,则延时的任务是串行执行的
            ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
    
            // 如果一个线程出错,则会再次创建一个线程来执行任务
            pool.schedule(new Runnable() {
                @Override
                public void run() {
                    int i = 1 / 0;
                    System.out.println(Thread.currentThread().getName() + " first running");
                }
            }, 1, TimeUnit.SECONDS);
    
            pool.schedule(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " second running");
                }
            }, 2, TimeUnit.SECONDS);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    3.2 定时执行

    # scheduleAtFixedRate
    - 如果任务的执行时间大于时间间隔,就会紧接着立刻执行
    
    # scheduleWithFixedDelay
    - 上一个任务执行完毕后,再延迟一定的时间才会执行
    
    • 1
    • 2
    • 3
    • 4
    • 5
    package com.dreamer.multithread.day09;
    
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    public class Demo07 {
        public static void main(String[] args) {
            ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
    
            // 定时执行任务
            pool.scheduleWithFixedDelay(new Runnable() {
                @Override
                public void run() {
                    try {
                        TimeUnit.SECONDS.sleep(2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("task is running");
                }
            }, 3, 2, TimeUnit.SECONDS);
            //  初始延时,   任务间隔时间,    任务间隔时间单位
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    4. 异常处理

    4.1 不处理异常

    • 任务执行过程中,业务中的异常并不会抛出
    package com.dreamer.multithread.day09;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class Demo08 {
        public static void main(String[] args) {
            ExecutorService pool = Executors.newFixedThreadPool(1);
            pool.submit(new Runnable() {
                @Override
                public void run() {
                    int i = 1 / 0;
                    System.out.println(Thread.currentThread().getName() + " task running");
                }
            });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    4.2 任务执行者处理

    package com.dreamer.multithread.day09;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class Demo08 {
        public static void main(String[] args) {
            ExecutorService pool = Executors.newFixedThreadPool(1);
            pool.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        int i = 1 / 0;
                        System.out.println(Thread.currentThread().getName() + " task running");
                    } catch (Exception e) {
                        e.printStackTrace();
                        return;
                    }
                }
            });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    4.3 线程池处理

    package com.dreamer.multithread.day09;
    
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    
    public class Demo08 {
        public static void main(String[] args) throws InterruptedException, ExecutionException {
            ExecutorService pool = Executors.newFixedThreadPool(1);
            Future<?> result = pool.submit(new Runnable() {
                @Override
                public void run() {
                    int i = 1 / 0;
                    System.out.println(Thread.currentThread().getName() + " task running");
                }
            });
    
            TimeUnit.SECONDS.sleep(1);
            // 获取结果的时候,就可以把线程执行任务过程中的异常报出来
            System.out.println(result.get());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
  • 相关阅读:
    verilog移位寄存器实现序列检测
    力扣刷题之爬楼梯(7/30)
    【离散数学】集合与关系
    PMP大家都是怎么备考的?使用什么工具可以分享一下吗?
    Java 11会成为下一个Java 8吗,2022年Java发展趋势
    域名服务:域名迁移
    python+flask计算机毕业设计热点推荐个性化新闻系统(程序+开题+论文)
    ELK安装、部署、调试(四)KAFKA消息队列的安装和部署
    【EAI 026】RoboGen: 通过自动数据生成管线实现机器人技能学习
    vue3上传文件组件方法封装
  • 原文地址:https://blog.csdn.net/weixin_43374578/article/details/127945762