1.配置线程池
- /**
- * int corePoolSize,
- * int maximumPoolSize,
- * long keepAliveTime,
- * TimeUnit unit,
- * BlockingQueue
workQueue, - * ThreadFactory threadFactory,
- * RejectedExecutionHandler handler
- *
- * @return
- */
- @Bean
- public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties pool) {
- return new ThreadPoolExecutor(pool.getCoreSize(),
- pool.getMaxSize(),
- pool.getKeepAliveTime(),
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(100000),
- Executors.defaultThreadFactory(),
- new ThreadPoolExecutor.AbortPolicy()
- );
- }
2.线程池参数配置类
- import lombok.Data;
- import org.springframework.boot.context.properties.ConfigurationProperties;
- import org.springframework.stereotype.Component;
- /**
- my.thread.core-size=20
- my.thread.max-size=200
- my.thread.keep-alive-time=10
- */
- @ConfigurationProperties(prefix = "my.thread")
- @Component
- @Data
- public class ThreadPoolConfigProperties {
- //核心线程数
- private Integer coreSize;
- //最大线程数
- private Integer maxSize;
- //空余线程的存活时间
- private Integer keepAliveTime;
- }
3.测试异步任务
- @Autowired
- private ThreadPoolExecutor executor;
-
- //在这里开启一个异步任务,提交给线程池,runAsync()方法没有返回值,需要有返回值的可使用supplyAsync()方法
- @Test
- void testCompletableFuture() {
- CompletableFuture
runAsync = CompletableFuture.runAsync(() -> { - int result = 0;
- for (int i = 0; i <= 100; i++) {
- result += i;
- }
- System.out.println(result);
- }, executor);
- }
4.关于CompletableFuture的其他相关用法
4.1 CompletableFuture的**get()**方法可以获取异步的结果,get方法是一个阻塞式等待的方法,也即get方法会等待异步任务的完成
- CompletableFuture
completableFuture2 = CompletableFuture.supplyAsync(() -> { - for (int i = 0; i <= 100; i++) {
- sum2.addAndGet(i);
- }
- return sum2;
- }, executor);
- //获取异步结果
- AtomicInteger integer = completableFuture2.get();
4.2 allOf : 等待所有任务完成完成
- AtomicInteger sum = new AtomicInteger();
- AtomicInteger sum2 = new AtomicInteger();
- CompletableFuture
completableFuture1 = CompletableFuture.supplyAsync(() -> { - for (int i = 0; i <= 100; i++) {
- sum.addAndGet(i);
- }
- return sum;
- }, executor);
-
-
- CompletableFuture
completableFuture2 = CompletableFuture.supplyAsync(() -> { - for (int i = 0; i <= 100; i++) {
- sum2.addAndGet(i);
- }
- return sum2;
- }, executor);
- AtomicInteger integer = completableFuture2.get();
-
- //allOf : 等待所有任务完成完成,注意get方法,是阻塞式等待,等待上面的异步任务都完成
- CompletableFuture.allOf(completableFuture1,completableFuture2).get();
-
- //获取异步结果
- AtomicInteger atomicInteger1 = completableFuture1.get();
- AtomicInteger atomicInteger2 = completableFuture2.get();
-
- System.out.println("结果是--->"+atomicInteger1.addAndGet(atomicInteger2.intValue()));
4.3 异步任务完成时,whenComplete,exceptionally
- CompletableFuture
completableFuture3 = CompletableFuture.supplyAsync(() -> { - for (int i = 0; i <= 10; i++) {
- sum2.addAndGet(i);
- }
- return sum2;
- }, executor).whenComplete((res, exception) -> {
- //当出现异常,可以拿到异常信息,但是无法修改返回数据
- System.out.println("结果是:" + res + ",异常:" + exception);
- }).exceptionally(throwable -> {
- //可以感知异常,同时返回默认值
- return new AtomicInteger(10);
- });
4.4 handle,方法完成后的后续处理
- CompletableFuture
completableFuture4 = CompletableFuture.supplyAsync(() -> { - int i = 10 / 2;
- return i;
- }, executor).handle((res, throwable) -> {
- //res 为结果,throwable 为异常
- if (res != null) {
- return res * 2;
- }
- if (throwable != null) {
- return -1;
- }
- return 0;
- });
- System.out.println("completableFuture4--结果是:"+completableFuture4.get());
4.5 异步任务串行化
- /**
- * 异步任务串行化
- * thenAcceptAsync 可以接收上一步获取的结果,但是无返回值
- * thenApplyAsync 可以接收上一步获取的结果,有返回值
- */
- CompletableFuture
future = CompletableFuture.supplyAsync(() -> { - int i = 10 / 2;
- return i;
- }, executor).thenApplyAsync(res -> {
- //res为上一步的结果
- return res * 2;
- }, executor).thenAcceptAsync((res) -> {
- System.out.println("hello ...thenAcceptAsync");
- }, executor);