如果你在创建线程时,使用的是 Runnable 接口,那么此时你是无法获取线程执行结果的,如果想要获取线程的执行结果,需要实现 Callable 接口,示例如下:
- public class J0_Callable {
-
- static class Task implements Callable
{ -
- @Override
- public Integer call() throws InterruptedException {
- Thread.sleep(3000);
- return 100;
- }
- }
-
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- ExecutorService executors = Executors.newSingleThreadExecutor();
- Future
submit = executors.submit(new Task()); - System.out.println("计算结果为:" + submit.get());
- executors.shutdown();
- }
- }
此时通过 ExecutorService.submit() 进行提交,得到的是一个 Future 对象,它包含了线程的执行结果,当你调用其 get() 方法时,它会阻塞直至获取到线程的返回结果。
使用 Callable 接口的限制是:其只能使用线程池提交,而不能使用单独的线程进行提交。如果想要使用单独的线程提交,可以使用 FutureTask 对其进行包装,FutureTask 是 Runnable 接口的实现类,可以用于任何场景下的提交,示例如下:
- static class Task implements Callable<Integer> {
- @Override
- public Integer call() throws InterruptedException {
- Thread.sleep(3000);
- return 100;
- }
- }
-
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- FutureTask<Integer> futureTask01 = new FutureTask<>(new Task());
- FutureTask<Integer> futureTask02 = new FutureTask<>(new Task());
- // 使用独立的线程执行
- new Thread(futureTask01).start();
- ExecutorService executorService = Executors.newSingleThreadExecutor();
- // 使用线程池提交
- executorService.submit(futureTask02);
- System.out.println("futureTask01 计算结果为:" + futureTask01.get());
- System.out.println("futureTask02 计算结果为:" + futureTask01.get());
- executorService.shutdown();
- }
CompletableFuture 是 JDK 8 提供的增强后 Future ,它支持流式调用,等待唤醒等一系列新的功能:
- public class J2_CompletableFuture {
-
- static class Compute implements Runnable {
-
- private CompletableFuture<Integer> future;
-
- Compute(CompletableFuture<Integer> future) {
- this.future = future;
- }
-
- @Override
- public void run() {
- try {
- System.out.println("子线程等待主线程运算完成····");
- Integer integer = future.get();
- System.out.println("子线程完成后续运算:" + integer * integer);
- } catch (InterruptedException | ExecutionException e) {
- e.printStackTrace();
- }
- }
- }
-
- public static void main(String[] args) throws InterruptedException {
- int intermediateResult;
- CompletableFuture<Integer> future = new CompletableFuture<>();
- // 启动子线程
- new Thread(new Compute(future)).start();
- System.out.println("启动主线程");
- Thread.sleep(2000);
- System.out.println("主线程计算完成");
- // 假设主线程计算结果为 100
- intermediateResult = 100;
- // 传递主线程的计算结果给子线程
- future.complete(intermediateResult);
- }
- }
-
- // 输出
- 启动主线程
- 子线程等待主线程运算完成····
- 主线程计算完成
- 子线程完成后续运算:10000
CompletableFuture 的 supplyAsync 可以将一个正常的方法以异步的方式来执行:
- public class J3_SupplyAsync {
-
- private static Integer compute() {
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("子线程计算完成");
- return 100;
- }
-
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(J3_SupplyAsync::compute);
- System.out.println("主线程等待子线程计算完成");
- Integer integer = supplyAsync.get();
- System.out.println("主线程计算完成:" + integer * integer);
- }
- }
CompletableFuture 支持大部分流式处理的特性,示例如下:
- public class J4_StreamingCall {
-
- private static Integer compute() {
- System.out.println("compute所在线程:" + Thread.currentThread().getId());
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return 100;
- }
-
- private static Integer multi(Integer integer) {
- try {
- System.out.println("multi所在线程:" + Thread.currentThread().getId());
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return integer * integer;
- }
-
- private static void accept(Integer integer) {
- System.out.println("accept所在线程:" + Thread.currentThread().getId());
- System.out.println("accept方法消费掉计算结果:" + integer);
- }
-
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- CompletableFuture<Void> future = CompletableFuture.supplyAsync(J4_StreamingCall::compute)
- .thenApply(J4_StreamingCall::multi)
- .thenAccept(J4_StreamingCall::accept) //值在这一步被消费掉了
- .thenAccept(x -> System.out.println("运算结果:" + x));
- future.get(); //类似于流式计算的惰性求值,如果缺少这一步,不会有任何输出
- }
- }
除了使用单个的 CompletableFuture,还可以通过 thenCompose 或 thenCombineAsync 来组合多个 CompletableFuture:
- public class J6_Combination {
-
- private static Integer compute() {
- System.out.println("compute 所在线程:" + Thread.currentThread().getId());
- return 100;
- }
-
- private static Integer multi(Integer integer) {
- System.out.println("epr 所在线程:" + Thread.currentThread().getId());
- return integer * integer;
- }
-
-
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- // 组合实现方式1 thenCompose 一个计算的输入依赖另外一个计算的结果
- CompletableFuture<Void> future01 = CompletableFuture.supplyAsync(J6_Combination::compute)
- .thenCompose(x -> CompletableFuture.supplyAsync(() -> multi(x)))
- .thenAccept(x -> System.out.println("运算结果:" + x)); // 运算结果:10000
- future01.get();
-
- System.out.println();
-
- // 组合实现方式2 thenCombineAsync 两个计算之间不依赖
- CompletableFuture<Integer> future02 = CompletableFuture.supplyAsync(J6_Combination::compute);
- CompletableFuture<Integer> future03 = CompletableFuture.supplyAsync(() -> J6_Combination.multi(100));
- CompletableFuture<Integer> futureAll = future02.thenCombineAsync(future03, (x, y) -> x + y);
- System.out.println("运算结果:" + futureAll.get()); // 运算结果:10100
-
- }
- }
ThreadLocal 是以增加资源的方式来避免竞态,它会为每一个线程创建一份私有的资源,从而避免对公共资源的竞争。实例如下:
- /**
- * 线程不安全的SimpleDateFormat
- */
- public class J1_ThreadUnsafe {
-
- private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- private static int sum = 1000;
- private static CountDownLatch countDownLatch = new CountDownLatch(sum);
- private static AtomicInteger atomicInteger = new AtomicInteger(0);
-
- static class Task implements Runnable {
-
- @Override
- public void run() {
- try {
- Date parse = sdf.parse("2018-08-08 08:08:08");
- System.out.println(parse);
- atomicInteger.incrementAndGet();
- } catch (ParseException e) {
- e.printStackTrace();
- } finally {
- countDownLatch.countDown();
- }
- }
- }
-
- public static void main(String[] args) throws InterruptedException {
- ExecutorService executorService = Executors.newFixedThreadPool(10);
- for (int i = 0; i < sum; i++) {
- executorService.execute(new Task());
- }
- countDownLatch.await();
- System.out.println("格式化成功次数为:" + atomicInteger.get());
- }
- }
因为 SimpleDateFormat 是线程不安全的,因此其格式化成功的次数总是小于 100 次,此时可以使用 ThreadLocal 进行改写,让每个线程都持有自己独立的格式化器,具体如下:
- public class J2_ThreadSafe {
-
- private static ThreadLocal
threadLocal = new ThreadLocal<>(); - private static int sum = 1000;
- private static CountDownLatch countDownLatch = new CountDownLatch(sum);
- private static AtomicInteger atomicInteger = new AtomicInteger(0);
-
- static class Task implements Runnable {
-
- @Override
- public void run() {
- try {
- // 如果当前线程中不存在该值,则创建一个
- if (threadLocal.get() == null) {
- threadLocal.set(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
- }
- // 使用线程私有的SimpleDateFormat
- Date parse = threadLocal.get().parse("2018-08-08 08:08:08");
- System.out.println(parse);
- atomicInteger.incrementAndGet();
- } catch (ParseException e) {
- e.printStackTrace();
- } finally {
- countDownLatch.countDown();
- }
- }
- }
-
- public static void main(String[] args) throws InterruptedException {
- ExecutorService executorService = Executors.newFixedThreadPool(10);
- for (int i = 0; i < sum; i++) {
- executorService.execute(new Task());
- }
- countDownLatch.await();
- System.out.println("格式化成功次数为:" + atomicInteger.get());
- executorService.shutdown();
- }
- }