public class ThreadTest {
public static ExecutorService executorService = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1. 继承Thread方式
// System.out.println("main.....start....");
//
// Thread01 thread01 = new Thread01();
// thread01.start();
//
// System.out.println("main.......end....");
//实现Runable方式
// System.out.println("main---start");
// Runable01 runable01 = new Runable01();
// new Thread(runable01).start();
// System.out.println("main---end");
// System.out.println("main---start");
// FutureTask futureTask = new FutureTask<>(new Callable01());
// new Thread(futureTask).start();
// //阻塞等待整个线程执行完,获取返回结果
// Integer integer = futureTask.get();
// System.out.println(integer);
// System.out.println("main---end");
//以后的业务代码里,以上三种启动线程的方式都不用,将所有的一步任务交给线程池使用
//线程池 给线程直接提交任务
//当前系统中只有一两个池 ,提交线程池让他们自己去执行
executorService.submit(new Runable01());
}
//继承thread
public static class Thread01 extends Thread{
@Override
public void run() {
System.out.println("当前线程:"+Thread.currentThread().getId());
int i=10/2;
System.out.println("运算结果"+i);
}
}
//实现runable接口
public static class Runable01 implements Runnable{
@Override
public void run() {
System.out.println("当前线程:"+Thread.currentThread().getId());
int i=10/2;
System.out.println("运算结果"+i);
}
}
//实现 Callable 接口
public static class Callable01 implements Callable<Integer>{
@Override
public Integer call() throws Exception {
System.out.println("当前线程:"+Thread.currentThread().getId());
int i=10/2;
System.out.println("运算结果"+i);
return i;
}
}
}
区划:
/**
* 七大参数
* corePoolSize: 核心线程数(一直存在除非设置allowCoreThreadTimeOut):线程池创建好以后就准备就绪的线程的数量,就等待接受异步任务去执行
* maximumPoolSize:最大线程数 控制资源
*keepAliveTime:存活时间 如果当前的线程数量大于核心线程数的数量 ,只有线程的空闲时间大于keepAliveTime 就会释放空闲线程(corePoolSize-maximumPoolSize)
*unit:时间单位
*workQueue:阻塞队列 如果任务有很多,就会将目前多的任务放到队列里面,只要有线程空闲,就会去队列里面取出新任务继续执行
* threadFactory 线程工厂 创建线程的工厂
* handler:拒绝策略 如果队列满了,按照我们指定的拒绝策略拒绝执行任务
*/
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5,
200,
10,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(1000),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
}
面试: 一个线程池 core 7; max 20 ,queue:50,100 并发进来怎么分配的?
先有 7 个能直接得到执行,接下来 50 个进入队列排队,在多开 13 个继续执行。现在 70 个 被安排上了。剩下30 个默认拒绝策略
public class Demo1 {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(5);
try {
for(int i=0;i<10;i++){
threadPool.execute(()->{
System.out.println("当前线程"+Thread.currentThread().getName());
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
threadPool.shutdown();
}
}
}
public class Demo1 {
public static void main(String[] args) {
// ExecutorService threadPool = Executors.newFixedThreadPool(5);
ExecutorService threadPool = Executors.newSingleThreadExecutor();
try {
for(int i=0;i<10;i++){
threadPool.execute(()->{
System.out.println("当前线程"+Thread.currentThread().getName());
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
threadPool.shutdown();
}
}
}
public class Demo1 {
public static void main(String[] args) {
// ExecutorService threadPool = Executors.newFixedThreadPool(5);
// ExecutorService threadPool = Executors.newSingleThreadExecutor();
ExecutorService threadPool = Executors.newCachedThreadPool();
try {
for(int i=0;i<20;i++){
threadPool.execute(()->{
System.out.println("当前线程"+Thread.currentThread().getName());
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
threadPool.shutdown();
}
}
}
public class Demo2 {
public static void main(String[] args) {
ThreadPoolExecutor threadPool= new ThreadPoolExecutor(2,
5,
2L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
try {
for (int i = 0; i < 10; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName());
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
threadPool.shutdown();
}
}
}
业务场景: 查询商品详情页的逻辑比较复杂,有些数据还需要远程调用,必然需要花费更多的时间
假如商品详情页的每个查询,需要如下标注的时间才能完成 那么,用户需要 5.5s 后才能看到商品详情页的内容。很显然是不能接受的。 如果有多个线程同时完成这 6 步操作,也许只需要 1.5s 即可完成响应,但有一些是异步执行的,比如1必须执行完并返回结果,我们才可以拿1的结果去执行4,此时可以采用异步的方式
CompletableFuture 提供了四个静态方法来创建一个异步操作。
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main----start");
CompletableFuture.runAsync(()->{
System.out.println("当前线程:"+Thread.currentThread().getId());
int i=10/2;
System.out.println("运算结果"+i);
},executorService);
System.out.println("main----end");
System.out.println("main----start");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运算结果" + i);
return i;
}, executorService);
Integer integer = future.get();
System.out.println("main----end---"+integer);
}
和 complete 一样,可对结果做最后的处理(可处理异常),可改变返回值。
System.out.println("main---start");
//方法执行后的处理
CompletableFuture<Integer> handle = CompletableFuture.supplyAsync(() -> {
int i = 10 / 4;
System.out.println("执行的结果" + i);
return i;
}, executorService).handle((res,e)->{
if(res != null){
return res*2;
}
if (e != null){
return 0;
}
return -1;
});
System.out.println("main---end"+handle.get());
//线程串行话
System.out.println("main---start");
//方法执行后的处理
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int i = 10 / 4;
System.out.println("当前线程"+Thread.currentThread().getId());
System.out.println("执行的结果" + i);
return i;
}, executorService).thenApplyAsync((res) -> {
System.out.println("当前线程"+Thread.currentThread().getId());
System.out.println("上一次返回的结果" + res);
return res;
}, executorService);
Integer integer = future.get();
System.out.println("返回的结果"+integer);
两个任务必须都完成,触发该任务。
/**
* 两个都要完成
*/
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("线程一" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("线程一结束");
return i;
}, executorService);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务二开始");
return 10;
}, executorService);
future.runAfterBothAsync(future1,()->{
System.out.println("执行完毕");
},executorService);
System.out.println("main--end");
future.thenAcceptBothAsync(future1,(e1,e2)->{
System.out.println("线程一的结果:"+e1);
System.out.println("线程二的结果:"+e2);
},executorService);
CompletableFuture<Integer> future2 = future.thenCombineAsync(future1, (e1, e2) -> {
System.out.println("线程一的结果:" + e1);
System.out.println("线程二的结果:" + e2);
return e1 + e2;
}, executorService);
System.out.println("两个线程执行后的结果为"+future2.get());
当两个任务中,任意一个 future 任务完成的时候,执行任务。
/**
* 两个任务只要有一个完成
*/
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("线程一" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("线程一结束");
return i;
}, executorService);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务二开始");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
}, executorService);
future.runAfterEitherAsync(future1,()->{
System.out.println("ok");
},executorService);
future.acceptEitherAsync(future1,res->{
System.out.println("返回的结果"+res);
},executorService);
CompletableFuture<Object> future2 = future.applyToEitherAsync(future1, res -> {
System.out.println("返回结果一:" + res);
return res;
}, executorService);
System.out.println("最终结果"+future2.get());
allOf:等待所有任务完成
anyOf:只要有一个任务完成
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("111");
return "11";
}, executorService);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("222");
return "22";
}, executorService);
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
System.out.println("333");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "3";
}, executorService);
CompletableFuture<Void> all = CompletableFuture.allOf(future1, future2, future3);
all.get();
System.out.println("main---end");