模拟了查询耗时操作,并使用FutureTask和CompletableFuture分别获取计算结果,统计执行时长
- package org.alllearn.futurtask;
-
- import com.google.common.base.Stopwatch;
- import com.google.common.collect.Lists;
- import lombok.AllArgsConstructor;
- import lombok.Getter;
- import lombok.Setter;
-
- import java.util.ArrayList;
- import java.util.List;
- import java.util.concurrent.CompletableFuture;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.ForkJoinPool;
- import java.util.concurrent.FutureTask;
- import java.util.stream.Collectors;
-
- public class CompletableFutureTest {
- public static void main(String[] args) {
-
- CompletableFutureTest test = new CompletableFutureTest();
- List
users = test.getUsers(); - //流计算的简单使用
- Stopwatch started = Stopwatch.createStarted();
- double average = users.stream().mapToInt(User::getAge).average().getAsDouble();
- System.out.println(average + " " + started.elapsed());
-
- //模拟通过id集合去其他表批量查询数据
- //单线程任务分割,数据库承受不了大量id的in查询
- List
> partition1 = Lists.partition(users, users.size() / 200);
- started.reset();
- started.start();
- double average1 = partition1.stream().mapToDouble(test::select).average().getAsDouble();
- System.out.println(average1 + " " + started.elapsed());
-
-
- //多线程任务两种方案FutureTask、CompletableFuture
- int cpu = Runtime.getRuntime().availableProcessors();
- //计算密集型cpu+1,io密集型cpu*2
- ForkJoinPool forkJoinPool = new ForkJoinPool(cpu + 1);
- List
> partition2 = Lists.partition(users, users.size() / 200);
-
-
- //FutureTask
- started.reset();
- started.start();
- double average2 = partition2.stream()
- .map(u -> {
- FutureTask
futureTask = new FutureTask<>(() -> test.select(u)); - forkJoinPool.submit(futureTask);
- return futureTask;
- })
- .collect(Collectors.toList())
- .stream().
- mapToDouble(f -> {
- try {
- return f.get();
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException(e);
- }
- })
- .average()
- .getAsDouble();
- System.out.println(average2 + " " + started.elapsed());
-
- //CompletableFuture
- started.reset();
- started.start();
- double average3 = partition2.stream()
- .map(u -> CompletableFuture.supplyAsync(() -> test.select(u), forkJoinPool))
- .collect(Collectors.toList())
- .stream().
- mapToDouble(CompletableFuture::join)
- .average()
- .getAsDouble();
- forkJoinPool.shutdown();
- System.out.println(average3 + " " + started.elapsed());
- }
-
- //模拟查询耗时
- public Double select(List
user) { - try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- return 1D;
- //return Math.random();
- }
-
- //模拟获取用户
- public List
getUsers() { - List
users = new ArrayList<>(); - for (int i = 0; i < 20000; i++) {
- users.add(new User(i, "user" + i, 1));
- }
- return users;
- }
-
- @Getter
- @Setter
- @AllArgsConstructor
- static class User {
- private int id;
- private String name;
- private int age;
- }
-
- }
-
-
运行结果
