1. 简介
1.1 引入原因
1 . 一个任务过来,一个线程去做。如果每次过来都创建新线程,性能低且比较耗费内存
2 . 线程数多于cpu核心,线程切换,要保存原来线程的状态,运行现在的线程,势必会更加耗费资源
线程数少于cpu核心,不能很好的利用多线程的性能
3 . 充分利用已有线程,去处理原来的任务
1.2. 线程池组件
1 . 消费者( 线程池) : 保存一定数量线程来处理任务
2 . 生产者: 客户端源源不断产生的新任务
3 . 阻塞队列( blocking queue) : 平衡消费者和生产者之间,用来保存任务 的一个等待队列
- 生产任务速度较快,多余的任务要等
- 生产任务速度慢,那么线程池中存活的线程等
2. 自定义线程池
2.1 不带超时
package com. erick. multithread. d6 ;
import java. util. ArrayDeque ;
import java. util. Deque ;
import java. util. concurrent. locks. Condition ;
import java. util. concurrent. locks. ReentrantLock ;
public class BlockingQueue < T > {
private int capacity;
private Deque < T > blockingQueue = new ArrayDeque < > ( ) ;
private ReentrantLock lock = new ReentrantLock ( true ) ;
private Condition producerRoom = lock. newCondition ( ) ;
private Condition consumerRoom = lock. newCondition ( ) ;
public BlockingQueue ( int capacity) {
this . capacity = capacity;
}
public T getTask ( ) {
try {
lock. lock ( ) ;
while ( blockingQueue. isEmpty ( ) ) {
System . out. println ( "阻塞队列为空,消费者等待" ) ;
try {
consumerRoom. await ( ) ;
} catch ( InterruptedException e) {
throw new RuntimeException ( e) ;
}
}
T task = blockingQueue. removeLast ( ) ;
producerRoom. signal ( ) ;
return task;
} finally {
lock. unlock ( ) ;
}
}
public void addTask ( T t) {
try {
lock. lock ( ) ;
while ( blockingQueue. size ( ) == capacity) {
System . out. println ( "阻塞队列已满,生产者等待" ) ;
try {
producerRoom. await ( ) ;
} catch ( InterruptedException e) {
throw new RuntimeException ( e) ;
}
}
blockingQueue. addFirst ( t) ;
consumerRoom. signal ( ) ;
} finally {
lock. unlock ( ) ;
}
}
public int getSize ( ) {
try {
lock. lock ( ) ;
return blockingQueue. size ( ) ;
} finally {
lock. unlock ( ) ;
}
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
线程池
package com. erick. multithread. d6 ;
import java. util. HashSet ;
import java. util. Set ;
public class ErickThreadPool < T > {
private BlockingQueue < T > blockingQueue;
private final Set < Worker > pool = new HashSet < > ( ) ;
private int coreThreadSize;
public ErickThreadPool ( int blockQueueCapacity, int coreThreadSize) {
blockingQueue = new BlockingQueue < > ( blockQueueCapacity) ;
this . coreThreadSize = coreThreadSize;
}
public synchronized void executeTask ( Runnable task) {
if ( pool. size ( ) < coreThreadSize) {
Worker worker = new Worker ( task) ;
pool. add ( worker) ;
System . out. println ( "创建新的线程来执行任务" ) ;
worker. start ( ) ;
} else {
System . out. println ( "线程池已满,生产者暂时等待" ) ;
blockingQueue. addTask ( ( T ) task) ;
}
}
class Worker extends Thread {
private Runnable task;
public Worker ( Runnable task) {
this . task = task;
}
@Override
public void run ( ) {
while ( task != null || ( task = ( Runnable ) blockingQueue. getTask ( ) ) != null ) {
try {
task. run ( ) ;
} catch ( Exception e) {
System . out. println ( "线程执行任务出错" ) ;
} finally {
task = null ;
}
}
synchronized ( pool) {
System . out. println ( "线程销毁" ) ;
pool. remove ( this ) ;
}
}
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
测试代码
package com. erick. multithread. d6 ;
import java. util. Date ;
public class Test {
public static void main ( String [ ] args) {
ErickThreadPool pool = new ErickThreadPool ( 10 , 3 ) ;
for ( int i = 0 ; i < 5 ; i++ ) {
pool. executeTask ( new Runnable ( ) {
@Override
public void run ( ) {
System . out. println ( Thread . currentThread ( ) . getName ( ) + ":" + new Date ( ) ) ;
}
} ) ;
}
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
2.2 超时等待
上面线程池中的worker线程获取blockingqueue的时候,即使阻塞队列中没有任务,也会一直死等,并不会结束
阻塞队列
package com. erick. multithread. d6 ;
import java. util. ArrayDeque ;
import java. util. Deque ;
import java. util. concurrent. TimeUnit ;
import java. util. concurrent. locks. Condition ;
import java. util. concurrent. locks. ReentrantLock ;
public class BlockingQueue < T > {
private int capacity;
private Deque < T > blockingQueue = new ArrayDeque < > ( ) ;
private ReentrantLock lock = new ReentrantLock ( true ) ;
private Condition producerRoom = lock. newCondition ( ) ;
private Condition consumerRoom = lock. newCondition ( ) ;
public BlockingQueue ( int capacity) {
this . capacity = capacity;
}
public T getTask ( ) {
try {
lock. lock ( ) ;
while ( blockingQueue. isEmpty ( ) ) {
System . out. println ( "阻塞队列为空,消费者等待" ) ;
try {
consumerRoom. await ( ) ;
} catch ( InterruptedException e) {
throw new RuntimeException ( e) ;
}
}
T task = blockingQueue. removeLast ( ) ;
producerRoom. signal ( ) ;
return task;
} finally {
lock. unlock ( ) ;
}
}
public T getTask ( long timeout, TimeUnit timeUnit) {
try {
lock. lock ( ) ;
long nanos = timeUnit. toNanos ( timeout) ;
while ( blockingQueue. isEmpty ( ) ) {
if ( nanos < 0 ) {
return null ;
}
try {
nanos = consumerRoom. awaitNanos ( nanos) ;
} catch ( InterruptedException e) {
throw new RuntimeException ( e) ;
}
}
T task = blockingQueue. removeLast ( ) ;
producerRoom. signal ( ) ;
return task;
} finally {
lock. unlock ( ) ;
}
}
public void addTask ( T t) {
try {
lock. lock ( ) ;
while ( blockingQueue. size ( ) == capacity) {
System . out. println ( "阻塞队列已满,生产者等待" ) ;
try {
producerRoom. await ( ) ;
} catch ( InterruptedException e) {
throw new RuntimeException ( e) ;
}
}
blockingQueue. addFirst ( t) ;
consumerRoom. signal ( ) ;
} finally {
lock. unlock ( ) ;
}
}
public int getSize ( ) {
try {
lock. lock ( ) ;
return blockingQueue. size ( ) ;
} finally {
lock. unlock ( ) ;
}
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
线程池
package com. erick. multithread. d6 ;
import java. util. HashSet ;
import java. util. Set ;
import java. util. concurrent. TimeUnit ;
public class ErickThreadPool < T > {
private BlockingQueue < T > blockingQueue;
private final Set < Worker > pool = new HashSet < > ( ) ;
private int coreThreadSize;
private long timeout;
private TimeUnit timeUnit;
public ErickThreadPool ( int blockQueueCapacity, int coreThreadSize) {
blockingQueue = new BlockingQueue < > ( blockQueueCapacity) ;
this . coreThreadSize = coreThreadSize;
}
public ErickThreadPool ( int blockQueueCapacity, int coreThreadSize, long timeout, TimeUnit timeUnit) {
this ( blockQueueCapacity, coreThreadSize) ;
this . timeUnit = timeUnit;
this . timeout = timeout;
}
public synchronized void executeTask ( Runnable task) {
if ( pool. size ( ) < coreThreadSize) {
Worker worker = new Worker ( task) ;
pool. add ( worker) ;
System . out. println ( "创建新的线程来执行任务" ) ;
worker. start ( ) ;
} else {
System . out. println ( "线程池已满,生产者暂时等待" ) ;
blockingQueue. addTask ( ( T ) task) ;
}
}
class Worker extends Thread {
private Runnable task;
public Worker ( Runnable task) {
this . task = task;
}
@Override
public void run ( ) {
while ( task != null || ( task = ( Runnable ) blockingQueue. getTask ( timeout, timeUnit) ) != null ) {
try {
task. run ( ) ;
} catch ( Exception e) {
System . out. println ( "线程执行任务出错" ) ;
} finally {
task = null ;
}
}
synchronized ( pool) {
System . out. println ( "线程销毁" ) ;
pool. remove ( this ) ;
}
}
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
测试代码
package com. erick. multithread. d6 ;
import java. util. Date ;
import java. util. concurrent. TimeUnit ;
public class Test {
public static void main ( String [ ] args) {
ErickThreadPool pool = new ErickThreadPool ( 10 , 3 , 5 , TimeUnit . SECONDS ) ;
for ( int i = 0 ; i < 5 ; i++ ) {
pool. executeTask ( new Runnable ( ) {
@Override
public void run ( ) {
System . out. println ( Thread . currentThread ( ) . getName ( ) + ":" + new Date ( ) ) ;
}
} ) ;
}
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
2.3 生产者-超时设置
当阻塞队列中已满,并且核心线程都在工作的时候,生产者线程提供的任务就会进行等待 让任务生产者自己决定该如何执行
- 死等
- 带超时等待
- 让调用者放弃执行任务
- 让调用者抛出异常
- 让调用者自己执行任务
阻塞队列
package com. erick. multithread. d6 ;
import java. util. ArrayDeque ;
import java. util. Deque ;
import java. util. concurrent. TimeUnit ;
import java. util. concurrent. locks. Condition ;
import java. util. concurrent. locks. ReentrantLock ;
public class BlockingQueue < T > {
private int capacity;
private Deque < T > blockingQueue = new ArrayDeque < > ( ) ;
private ReentrantLock lock = new ReentrantLock ( true ) ;
private Condition producerRoom = lock. newCondition ( ) ;
private Condition consumerRoom = lock. newCondition ( ) ;
public BlockingQueue ( int capacity) {
this . capacity = capacity;
}
public T getTask ( ) {
try {
lock. lock ( ) ;
while ( blockingQueue. isEmpty ( ) ) {
System . out. println ( "阻塞队列为空,消费者等待" ) ;
try {
consumerRoom. await ( ) ;
} catch ( InterruptedException e) {
throw new RuntimeException ( e) ;
}
}
T task = blockingQueue. removeLast ( ) ;
producerRoom. signal ( ) ;
return task;
} finally {
lock. unlock ( ) ;
}
}
public T getTask ( long timeout, TimeUnit timeUnit) {
try {
lock. lock ( ) ;
long nanos = timeUnit. toNanos ( timeout) ;
while ( blockingQueue. isEmpty ( ) ) {
if ( nanos < 0 ) {
return null ;
}
try {
nanos = consumerRoom. awaitNanos ( nanos) ;
} catch ( InterruptedException e) {
throw new RuntimeException ( e) ;
}
}
T task = blockingQueue. removeLast ( ) ;
producerRoom. signal ( ) ;
return task;
} finally {
lock. unlock ( ) ;
}
}
public void addTask ( T t) {
try {
lock. lock ( ) ;
while ( blockingQueue. size ( ) == capacity) {
System . out. println ( "阻塞队列已满,生产者等待" ) ;
try {
producerRoom. await ( ) ;
} catch ( InterruptedException e) {
throw new RuntimeException ( e) ;
}
}
blockingQueue. addFirst ( t) ;
consumerRoom. signal ( ) ;
} finally {
lock. unlock ( ) ;
}
}
public boolean addTask ( T t, long timeout, TimeUnit timeUnit) {
try {
lock. lock ( ) ;
long nanos = timeUnit. toNanos ( timeout) ;
while ( blockingQueue. size ( ) == capacity) {
if ( nanos < 0 ) {
return false ;
}
try {
nanos = producerRoom. awaitNanos ( nanos) ;
} catch ( InterruptedException e) {
throw new RuntimeException ( e) ;
}
}
blockingQueue. addFirst ( t) ;
consumerRoom. signal ( ) ;
return true ;
} finally {
lock. unlock ( ) ;
}
}
public void tryPut ( RejectPolicy < T > rejectPolicy, T task) {
try {
lock. lock ( ) ;
if ( blockingQueue. size ( ) == capacity) {
rejectPolicy. reject ( this , ( Runnable ) task) ;
return ;
}
System . out. println ( "加入阻塞队列" ) ;
blockingQueue. addFirst ( task) ;
consumerRoom. signal ( ) ;
} finally {
lock. unlock ( ) ;
}
}
public int getSize ( ) {
try {
lock. lock ( ) ;
return blockingQueue. size ( ) ;
} finally {
lock. unlock ( ) ;
}
}
}
interface RejectPolicy < T > {
void reject ( BlockingQueue < T > blockingQueue, Runnable task) ;
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
线程池
package com. erick. multithread. d6 ;
import java. util. HashSet ;
import java. util. Set ;
import java. util. concurrent. TimeUnit ;
public class ErickThreadPool < T > {
private BlockingQueue < T > blockingQueue;
private final Set < Worker > pool = new HashSet < > ( ) ;
private int coreThreadSize;
private long timeout;
private TimeUnit timeUnit;
private RejectPolicy < T > rejectPolicy;
public ErickThreadPool ( int blockQueueCapacity, int coreThreadSize) {
blockingQueue = new BlockingQueue < > ( blockQueueCapacity) ;
this . coreThreadSize = coreThreadSize;
}
public ErickThreadPool ( int blockQueueCapacity, int coreThreadSize, long timeout, TimeUnit timeUnit, RejectPolicy < T > rejectPolicy) {
this ( blockQueueCapacity, coreThreadSize) ;
this . timeUnit = timeUnit;
this . timeout = timeout;
this . rejectPolicy = rejectPolicy;
}
public synchronized void executeTask ( Runnable task) {
if ( pool. size ( ) < coreThreadSize) {
Worker worker = new Worker ( task) ;
pool. add ( worker) ;
System . out. println ( "创建新的线程来执行任务" ) ;
worker. start ( ) ;
} else {
System . out. println ( "线程池已满,生产者???" ) ;
blockingQueue. tryPut ( rejectPolicy, ( T ) task) ;
}
}
class Worker extends Thread {
private Runnable task;
public Worker ( Runnable task) {
this . task = task;
}
@Override
public void run ( ) {
while ( task != null || ( task = ( Runnable ) blockingQueue. getTask ( timeout, timeUnit) ) != null ) {
try {
task. run ( ) ;
} catch ( Exception e) {
System . out. println ( "线程执行任务出错" ) ;
} finally {
task = null ;
}
}
synchronized ( pool) {
System . out. println ( "线程销毁" ) ;
pool. remove ( this ) ;
}
}
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
测试代码
package com. erick. multithread. d6 ;
import java. util. Date ;
import java. util. concurrent. TimeUnit ;
public class Test {
public static void main ( String [ ] args) {
ErickThreadPool pool = new ErickThreadPool ( 1 , 2 , 5 , TimeUnit . SECONDS , new ProducerException ( ) ) ;
for ( int i = 0 ; i < 10 ; i++ ) {
pool. executeTask ( new Runnable ( ) {
@Override
public void run ( ) {
System . out. println ( Thread . currentThread ( ) . getName ( ) + ":" + new Date ( ) ) ;
try {
TimeUnit . SECONDS . sleep ( 5 ) ;
} catch ( InterruptedException e) {
throw new RuntimeException ( e) ;
}
}
} ) ;
}
}
}
class StillWait implements RejectPolicy {
@Override
public void reject ( BlockingQueue blockingQueue, Runnable task) {
blockingQueue. addTask ( task) ;
}
}
class WaitWithTimeOut implements RejectPolicy {
@Override
public void reject ( BlockingQueue blockingQueue, Runnable task) {
blockingQueue. addTask ( task, 1 , TimeUnit . SECONDS ) ;
}
}
class ProducerGiveUp implements RejectPolicy {
@Override
public void reject ( BlockingQueue blockingQueue, Runnable task) {
System . out. println ( "调用者抛弃任务" ) ;
}
}
class ProducerExecute implements RejectPolicy {
@Override
public void reject ( BlockingQueue blockingQueue, Runnable task) {
System . out. println ( "调用者自己执行任务" ) ;
new Thread ( task) . start ( ) ;
}
}
class ProducerException implements RejectPolicy {
@Override
public void reject ( BlockingQueue blockingQueue, Runnable task) {
throw new RuntimeException ( "核心线程已在工作,阻塞队列已满" ) ;
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
JDK线程池
1. 类图
2. 线程状态
ThreadPoolExecutor 使用int的高3位来表示线程池状态,低29位表示线程数量
3. ThreadPoolExecutor
3.1 构造方法
int corePoolSize:
int maximumPoolSize:
long keepAliveTime:
TimeUnit unit:
BlockingQueue < Runnable > workQueue:
ThreadFactory threadFactory:
RejectedExecutionHandler handler:
public ThreadPoolExecutor ( int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue < Runnable > workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
3.2 核心线程和救急线程
1 . 核心线程: 执行完任务后,会继续保留在线程池中
2 . 救急线程:如果阻塞队列已满,并且没有空余的核心线程。那么会创建救急线程来执行任务
2.1 任务执行完毕后,这个线程就会被销毁( 临时工)
2.2 必须是有界阻塞,如果是无界队列,则不需要创建救急线程
3 . 拒绝策略: 有界队列,核心线程满负荷,阻塞队列已满,无空余救急线程,才会执行拒绝
3.3 JDK拒绝策略
如果线程达到最大线程数,救急线程也满负荷,且有界队列也满了,JDK 提供了4种拒绝策略
AbortPolicy: 调用者抛出RejectedExecutionException, 默认策略
CallerRunsPolicy: 调用者运行任务
DiscardPolicy: 放弃本次任务
DiscardOldestPolicy: 放弃阻塞队列中最早的任务,本任务取而代之
- Dubbo: 在抛出异常之前,记录日志,并dump线程栈信息,方便定位问题
- Netty: 创建一个新的线程来执行任务
- ActiveMQ: 带超时等待( 60s) , 尝试放入阻塞队列
4. Executors类工厂方法
默认的构造方法来创建线程池,参数过多,JDK提供了工厂方法,来创建线程池
4.1 固定大小
核心线程数 = 最大线程数,救急线程数为0 阻塞队列:无界,可以存放任意数量的任务
任务量已知,但是线程执行时间较长
执行任务后,线程并不会结束
public static ExecutorService newFixedThreadPool ( int nThreads) {
public static ExecutorService newFixedThreadPool ( int nThreads) {
return new ThreadPoolExecutor ( nThreads, nThreads,
0L , TimeUnit . MILLISECONDS ,
new LinkedBlockingQueue < Runnable > ( ) ) ;
}
package com. erick. multithread. d7 ;
import java. util. concurrent. ExecutorService ;
import java. util. concurrent. Executors ;
import java. util. concurrent. ThreadFactory ;
import java. util. concurrent. atomic. AtomicInteger ;
public class Demo01 {
public static void main ( String [ ] args) {
ExecutorService pool = Executors . newFixedThreadPool ( 2 , new ThreadFactory ( ) {
private AtomicInteger num = new AtomicInteger ( 0 ) ;
@Override
public Thread newThread ( Runnable r) {
return new Thread ( r, "erick-pool" + num. getAndIncrement ( ) ) ;
}
} ) ;
pool. execute ( ( ) -> System . out. println ( Thread . currentThread ( ) . getName ( ) + " running" ) ) ;
pool. execute ( ( ) -> System . out. println ( Thread . currentThread ( ) . getName ( ) + " running" ) ) ;
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
4.2 带缓冲
核心线程数为0, 最大线程数为Integer的无限大 全部是救急线程,等待时间是60s,60s后就会消亡 SynchronousQueue: 没有容量,没有线程来取的时候是放不进去的 整个线程池数会随着任务数目增长,1分钟后没有其他活动会消亡
1 . 时间较短的线程
2 . 数量大,任务执行时间长,会造成 OutOfMmeory问题
public static ExecutorService newCachedThreadPool ( ) {
return new ThreadPoolExecutor ( 0 , Integer . MAX_VALUE ,
60L , TimeUnit . SECONDS ,
new SynchronousQueue < Runnable > ( ) ) ;
}
4.3. 单线程
线程池大小始终为1个,不能改变线程数 相比自定义一个线程来执行,线程池可以保证前面任务的失败,不会影响到后续任务
自定义线程: 执行多个任务时,一个出错,后续都能不能执行了
单线程池: 一个任务失败后,会结束出错线程。重新new一个线程来执行下面的任务
单线程池: 保证所有任务都是串行
newFixedThreadPool: 初始化后,还可以修改线程大小
newSingleThreadExecutor: 不可以修改
public static ExecutorService newSingleThreadExecutor ( ) {
return new FinalizableDelegatedExecutorService
( new ThreadPoolExecutor ( 1 , 1 ,
0L , TimeUnit . MILLISECONDS ,
new LinkedBlockingQueue < Runnable > ( ) ) ) ;
}
package com. nike. erick. d07 ;
import java. util. concurrent. ExecutorService ;
import java. util. concurrent. Executors ;
public class Demo01 {
public static void main ( String [ ] args) {
method03 ( ) ;
}
private static void method01 ( ) {
ExecutorService pool = Executors . newFixedThreadPool ( 2 ) ;
pool. execute ( ( ) -> System . out. println ( Thread . currentThread ( ) . getName ( ) + " working" ) ) ;
pool. execute ( ( ) -> System . out. println ( Thread . currentThread ( ) . getName ( ) + " working" ) ) ;
pool. execute ( ( ) -> System . out. println ( Thread . currentThread ( ) . getName ( ) + " working" ) ) ;
}
private static void method02 ( ) {
ExecutorService pool = Executors . newCachedThreadPool ( ) ;
pool. execute ( ( ) -> System . out. println ( Thread . currentThread ( ) . getName ( ) + " working" ) ) ;
pool. execute ( ( ) -> System . out. println ( Thread . currentThread ( ) . getName ( ) + " working" ) ) ;
pool. execute ( ( ) -> System . out. println ( Thread . currentThread ( ) . getName ( ) + " working" ) ) ;
}
private static void method03 ( ) {
ExecutorService pool = Executors . newSingleThreadExecutor ( ) ;
pool. execute ( ( ) -> {
int i = 1 / 0 ;
System . out. println ( Thread . currentThread ( ) . getName ( ) + " running" ) ;
} ) ;
pool. execute ( ( ) -> {
System . out. println ( Thread . currentThread ( ) . getName ( ) + " running" ) ;
} ) ;
pool. execute ( ( ) -> {
System . out. println ( Thread . currentThread ( ) . getName ( ) + " running" ) ;
} ) ;
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
5. 提交任务
5.1. execute
void execute( Runnable command )
5.2. submit
Future < ? > submit ( Runnable task) ;
< T > Future < T > submit ( Runnable task, T result) ;
< T > Future < T > submit ( Callable < T > task) ;
3. invokeAll
< T > List < Future < T > > invokeAll ( Collection < ? extends Callable < T > > tasks)
throws InterruptedException ;
4. invokeAny
package com. nike. erick. d07 ;
import java. util. ArrayList ;
import java. util. Collection ;
import java. util. List ;
import java. util. concurrent. Callable ;
import java. util. concurrent. ExecutionException ;
import java. util. concurrent. ExecutorService ;
import java. util. concurrent. Executors ;
import java. util. concurrent. Future ;
import java. util. concurrent. TimeUnit ;
public class Demo02 {
public static void main ( String [ ] args) throws InterruptedException , ExecutionException {
ExecutorService pool = Executors . newFixedThreadPool ( 10 ) ;
method05 ( pool) ;
}
public static void method01 ( ExecutorService pool) {
pool. execute ( ( ) -> System . out. println ( Thread . currentThread ( ) . getName ( ) + " running" ) ) ;
}
public static void method02 ( ExecutorService pool) throws InterruptedException {
Future < ? > result = pool. submit ( new Thread ( ( ) -> System . out. println ( Thread . currentThread ( ) . getName ( ) + " running" ) ) ) ;
TimeUnit . SECONDS . sleep ( 1 ) ;
System . out. println ( result. isDone ( ) ) ;
System . out. println ( result. isCancelled ( ) ) ;
}
public static void method03 ( ExecutorService pool) throws InterruptedException , ExecutionException {
Future < String > submit = pool. submit ( ( ) -> "success" ) ;
TimeUnit . SECONDS . sleep ( 1 ) ;
System . out. println ( submit. isDone ( ) ) ;
System . out. println ( submit. isCancelled ( ) ) ;
System . out. println ( submit. get ( ) ) ;
}
public static void method04 ( ExecutorService pool) throws InterruptedException {
Collection tasks = new ArrayList ( ) ;
for ( int i = 0 ; i < 10 ; i++ ) {
int round = i;
tasks. add ( ( Callable ) ( ) -> {
System . out. println ( Thread . currentThread ( ) . getName ( ) + " running" ) ;
return "success:" + round;
} ) ;
}
List results = pool. invokeAll ( tasks) ;
TimeUnit . SECONDS . sleep ( 1 ) ;
System . out. println ( results) ;
}
public static void method05 ( ExecutorService pool) throws InterruptedException , ExecutionException {
ExecutorService service = Executors . newFixedThreadPool ( 1 ) ;
Collection < Callable < String > > tasks = new ArrayList < > ( ) ;
tasks. add ( ( ) -> {
System . out. println ( "first task" ) ;
TimeUnit . SECONDS . sleep ( 1 ) ;
return "success" ;
} ) ;
tasks. add ( ( ) -> {
System . out. println ( "second task" ) ;
TimeUnit . SECONDS . sleep ( 2 ) ;
return "success" ;
} ) ;
tasks. add ( ( ) -> {
System . out. println ( "third task" ) ;
TimeUnit . SECONDS . sleep ( 3 ) ;
return "success" ;
} ) ;
String result = pool. invokeAny ( tasks) ;
System . out. println ( result) ;
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
6. 关闭线程池
6.1 shutdown
将线程池的状态改变为SHUTDOWN状态 不会接受新任务,已经提交的任务不会停止 不会阻塞调用线程的执行
void shutdown ( ) ;
package com. dreamer. multithread. day09 ;
import java. util. concurrent. ExecutorService ;
import java. util. concurrent. Executors ;
import java. util. concurrent. TimeUnit ;
public class Demo04 {
public static void main ( String [ ] args) {
ExecutorService pool = Executors . newFixedThreadPool ( 1 ) ;
pool. submit ( ( ) -> {
try {
TimeUnit . SECONDS . sleep ( 2 ) ;
System . out. println ( Thread . currentThread ( ) . getName ( ) + " first running" ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
} ) ;
pool. submit ( ( ) -> {
try {
TimeUnit . SECONDS . sleep ( 2 ) ;
System . out. println ( Thread . currentThread ( ) . getName ( ) + " second running" ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
} ) ;
pool. submit ( ( ) -> {
try {
TimeUnit . SECONDS . sleep ( 2 ) ;
System . out. println ( Thread . currentThread ( ) . getName ( ) + " third running" ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
} ) ;
pool. shutdown ( ) ;
System . out. println ( "main thread ending" ) ;
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
6.2. shutdownNow
不会接受新任务 没执行的任务会打断 将等待队列中的任务返回
List < Runnable > shutdownNow ( ) ;
package com. dreamer. multithread. day09 ;
import java. util. List ;
import java. util. concurrent. ExecutorService ;
import java. util. concurrent. Executors ;
import java. util. concurrent. TimeUnit ;
public class Demo04 {
public static void main ( String [ ] args) {
ExecutorService pool = Executors . newFixedThreadPool ( 1 ) ;
pool. submit ( ( ) -> {
try {
TimeUnit . SECONDS . sleep ( 2 ) ;
System . out. println ( Thread . currentThread ( ) . getName ( ) + " first running" ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
} ) ;
pool. submit ( ( ) -> {
try {
TimeUnit . SECONDS . sleep ( 2 ) ;
System . out. println ( Thread . currentThread ( ) . getName ( ) + " second running" ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
} ) ;
pool. submit ( ( ) -> {
try {
TimeUnit . SECONDS . sleep ( 2 ) ;
System . out. println ( Thread . currentThread ( ) . getName ( ) + " third running" ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
} ) ;
List < Runnable > leftOver = pool. shutdownNow ( ) ;
System . out. println ( leftOver. size ( ) ) ;
System . out. println ( "main thread ending" ) ;
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
线程池拓展
1. 异步模式之工作线程
1.1 Worker Thread
让有限的工作线程来轮流异步处理无限多的任务 分类:不同的任务类型应该使用不同的线程池
1.2 饥饿现象
- 两个工人是同一个线程池中的两个线程, 为客人点餐和后厨做菜,这是两个阶段的工作
- 客人点餐:必须先点餐,等菜做好,上菜,在此期间,处理点餐的工人必须等待
- A工人处理了点餐任务,B工人把菜做好,然后上菜,配合正常
- 同时来了两个客人,A和B工人都去处理点餐了,没人做饭了,出现线程数不足导致的资源饥饿
正常
package com. erick. multithread. d7 ;
import java. util. Arrays ;
import java. util. List ;
import java. util. Random ;
import java. util. concurrent. * ;
public class Demo02 {
private static List < String > MENU = Arrays . asList ( "宫保鸡丁" , "地三鲜" , "辣子鸡丁" , "红烧肉" ) ;
private static Random random = new Random ( ) ;
private static String cooking ( ) {
return MENU . get ( random. nextInt ( MENU . size ( ) ) ) ;
}
public static void main ( String [ ] args) {
ExecutorService pool = Executors . newFixedThreadPool ( 2 ) ;
pool. execute ( new Runnable ( ) {
@Override
public void run ( ) {
System . out. println ( "开始处理点餐" ) ;
Future < String > cook = pool. submit ( new Callable < String > ( ) {
@Override
public String call ( ) throws Exception {
System . out. println ( "开始做菜" ) ;
return cooking ( ) ;
}
} ) ;
try {
String result = cook. get ( ) ;
System . out. println ( "上菜:" + result) ;
} catch ( InterruptedException | ExecutionException e) {
throw new RuntimeException ( e) ;
}
}
} ) ;
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
线程池饥饿
package com. erick. multithread. d7 ;
import java. util. Arrays ;
import java. util. List ;
import java. util. Random ;
import java. util. concurrent. * ;
public class Demo02 {
private static List < String > MENU = Arrays . asList ( "宫保鸡丁" , "地三鲜" , "辣子鸡丁" , "红烧肉" ) ;
private static Random random = new Random ( ) ;
private static String cooking ( ) {
return MENU . get ( random. nextInt ( MENU . size ( ) ) ) ;
}
public static void main ( String [ ] args) {
ExecutorService pool = Executors . newFixedThreadPool ( 2 ) ;
pool. execute ( new Runnable ( ) {
@Override
public void run ( ) {
System . out. println ( "开始处理点餐" ) ;
Future < String > cook = pool. submit ( new Callable < String > ( ) {
@Override
public String call ( ) throws Exception {
System . out. println ( "开始做菜" ) ;
return cooking ( ) ;
}
} ) ;
try {
String result = cook. get ( ) ;
System . out. println ( "上菜:" + result) ;
} catch ( InterruptedException | ExecutionException e) {
throw new RuntimeException ( e) ;
}
}
} ) ;
pool. execute ( new Runnable ( ) {
@Override
public void run ( ) {
System . out. println ( "开始处理点餐" ) ;
Future < String > cook = pool. submit ( new Callable < String > ( ) {
@Override
public String call ( ) throws Exception {
System . out. println ( "开始做菜" ) ;
return cooking ( ) ;
}
} ) ;
try {
String result = cook. get ( ) ;
System . out. println ( "上菜:" + result) ;
} catch ( InterruptedException | ExecutionException e) {
throw new RuntimeException ( e) ;
}
}
} ) ;
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
解决方法
最简单的方法: 增加线程池的线程数量,但是不能从根本解决问题 解决方法:不同的任务类型,使用不同的线程池
2. 线程数量
过小,导致cpu资源不能充分利用,浪费性能 过大,线程上下文切换浪费性能,每个线程也要占用内存导致占用内存过多
2.1 CPU密集型
如果线程的任务主要是和cpu资源打交道,比如大数据运算,称为CPU密集型 线程数量: 核心数+1 +1: 保证某线程由于某些原因(操作系统方面)导致暂停时,额外线程可以启动,不浪费CPU资源
2.2. IO密集型
IO操作,RPC调用,数据库访问时,CPU是空闲的,称为IO密集型 更加常见: IO操作,远程RPC调用,数据库操作 线程数 = 核数 * 期望cpu利用率 * (CPU计算时间 + CPU等待时间) / CPU 计算时间
3. 调度功能
3.1 延时执行
package com. dreamer. multithread. day09 ;
import java. util. concurrent. Executors ;
import java. util. concurrent. ScheduledExecutorService ;
import java. util. concurrent. TimeUnit ;
public class Demo05 {
public static void main ( String [ ] args) {
ScheduledExecutorService pool = Executors . newScheduledThreadPool ( 2 ) ;
pool. schedule ( new Runnable ( ) {
@Override
public void run ( ) {
System . out. println ( Thread . currentThread ( ) . getName ( ) + " first running" ) ;
}
} , 1 , TimeUnit . SECONDS ) ;
pool. schedule ( new Runnable ( ) {
@Override
public void run ( ) {
System . out. println ( Thread . currentThread ( ) . getName ( ) + " second running" ) ;
}
} , 5 , TimeUnit . SECONDS ) ;
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
package com. dreamer. multithread. day09 ;
import java. util. concurrent. Executors ;
import java. util. concurrent. ScheduledExecutorService ;
import java. util. concurrent. TimeUnit ;
public class Demo06 {
public static void main ( String [ ] args) {
ScheduledExecutorService pool = Executors . newScheduledThreadPool ( 1 ) ;
pool. schedule ( new Runnable ( ) {
@Override
public void run ( ) {
int i = 1 / 0 ;
System . out. println ( Thread . currentThread ( ) . getName ( ) + " first running" ) ;
}
} , 1 , TimeUnit . SECONDS ) ;
pool. schedule ( new Runnable ( ) {
@Override
public void run ( ) {
System . out. println ( Thread . currentThread ( ) . getName ( ) + " second running" ) ;
}
} , 2 , TimeUnit . SECONDS ) ;
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
3.2 定时执行
- 如果任务的执行时间大于时间间隔,就会紧接着立刻执行
- 上一个任务执行完毕后,再延迟一定的时间才会执行
package com. dreamer. multithread. day09 ;
import java. util. concurrent. Executors ;
import java. util. concurrent. ScheduledExecutorService ;
import java. util. concurrent. TimeUnit ;
public class Demo07 {
public static void main ( String [ ] args) {
ScheduledExecutorService pool = Executors . newScheduledThreadPool ( 2 ) ;
pool. scheduleWithFixedDelay ( new Runnable ( ) {
@Override
public void run ( ) {
try {
TimeUnit . SECONDS . sleep ( 2 ) ;
} catch ( InterruptedException e) {
e. printStackTrace ( ) ;
}
System . out. println ( "task is running" ) ;
}
} , 3 , 2 , TimeUnit . SECONDS ) ;
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
4. 异常处理
4.1 不处理异常
package com. dreamer. multithread. day09 ;
import java. util. concurrent. ExecutorService ;
import java. util. concurrent. Executors ;
public class Demo08 {
public static void main ( String [ ] args) {
ExecutorService pool = Executors . newFixedThreadPool ( 1 ) ;
pool. submit ( new Runnable ( ) {
@Override
public void run ( ) {
int i = 1 / 0 ;
System . out. println ( Thread . currentThread ( ) . getName ( ) + " task running" ) ;
}
} ) ;
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
4.2 任务执行者处理
package com. dreamer. multithread. day09 ;
import java. util. concurrent. ExecutorService ;
import java. util. concurrent. Executors ;
public class Demo08 {
public static void main ( String [ ] args) {
ExecutorService pool = Executors . newFixedThreadPool ( 1 ) ;
pool. submit ( new Runnable ( ) {
@Override
public void run ( ) {
try {
int i = 1 / 0 ;
System . out. println ( Thread . currentThread ( ) . getName ( ) + " task running" ) ;
} catch ( Exception e) {
e. printStackTrace ( ) ;
return ;
}
}
} ) ;
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
4.3 线程池处理
package com. dreamer. multithread. day09 ;
import java. util. concurrent. ExecutionException ;
import java. util. concurrent. ExecutorService ;
import java. util. concurrent. Executors ;
import java. util. concurrent. Future ;
import java. util. concurrent. TimeUnit ;
public class Demo08 {
public static void main ( String [ ] args) throws InterruptedException , ExecutionException {
ExecutorService pool = Executors . newFixedThreadPool ( 1 ) ;
Future < ? > result = pool. submit ( new Runnable ( ) {
@Override
public void run ( ) {
int i = 1 / 0 ;
System . out. println ( Thread . currentThread ( ) . getName ( ) + " task running" ) ;
}
} ) ;
TimeUnit . SECONDS . sleep ( 1 ) ;
System . out. println ( result. get ( ) ) ;
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24