万万没想到,在生产环境中竟然翻车了,因为没有考虑到一些场景,导致了CountDownLatch出现了问题,接下来来分享一下由于CountDownLatch导致的问题。
先简单介绍下业务场景,针对用户批量下载的文件进行修改上传
为了提高执行的速度,所以在采用线程池去执行 下载-修改-上传 的操作,并在全部执行完之后统一提交保存文件地址到数据库,于是加入了CountDownLatch来进行控制。
根据服务本身情况,自定义一个线程池
- public static ExecutorService testExtcutor() {
- return new ThreadPoolExecutor(
- 2,
- 2,
- 0L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(1));
- }
- 复制代码
模拟执行
- public static void main(String[] args) {
- // 下载文件总数
- List<Integer> resultList = new ArrayList<>(100);
- IntStream.range(0,100).forEach(resultList::add);
- // 下载文件分段
- List<List<Integer>> split = CollUtil.split(resultList, 10);
-
- ExecutorService executorService = BaseThreadPoolExector.testExtcutor();
- CountDownLatch countDownLatch = new CountDownLatch(100);
- for (List<Integer> list : split) {
- executorService.execute(() -> {
- list.forEach(i ->{
- try {
- // 模拟业务操作
- Thread.sleep(500);
- System.out.println("任务进入");
- } catch (InterruptedException e) {
- e.printStackTrace();
- System.out.println(e.getMessage());
- } finally {
- System.out.println(countDownLatch.getCount());
- countDownLatch.countDown();
- }
- });
- });
- }
- try {
- countDownLatch.await();
- System.out.println("countDownLatch.await()");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- 复制代码
一开始我个人感觉没有什么问题,反正finally都能够做减一的操作,到最后调用await方法,进行主线程任务
- Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@300ffa5d rejected from java.util.concurrent.ThreadPoolExecutor@1f17ae12[Running, pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 0]
- at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
- at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
- at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
- at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
- at Thread.executor.executorTestBlock.main(executorTestBlock.java:28)
- 任务进入
- countDownLatch.countDown
- 任务进入
- countDownLatch.countDown
- 任务进入
- countDownLatch.countDown
- 任务进入
- countDownLatch.countDown
- 任务进入
- countDownLatch.countDown
- 任务进入
- countDownLatch.countDown
- 任务进入
- countDownLatch.countDown
- 任务进入
- countDownLatch.countDown
- 任务进入
- countDownLatch.countDown
- 任务进入
- countDownLatch.countDown
- 任务进入
- countDownLatch.countDown
- 任务进入
- countDownLatch.countDown
- 任务进入
- countDownLatch.countDown
- 任务进入
- countDownLatch.countDown
- 任务进入
- countDownLatch.countDown
- 复制代码
由于任务数量较多,阻塞队列中已经塞满了,所以默认的拒绝策略,当队列满时,处理策略报错异常,
要注意这个异常是线程池,自己抛出的,不是我们循环里面打印出来的,
这也造成了,线上这个线程池被阻塞了,他永远也调用不到await方法,
利用jstack,我们就能够看到有问题
调大阻塞队列,但是问题来了,到底多少阻塞队列才是大呢,如果太大了会不由又造成内存溢出等其他的问题
在第一个的基础上,我们修改了拒绝策略,当触发拒绝策略的时候,用调用者所在的线程来执行任务
- public static ThreadPoolExecutor queueExecutor(BlockingQueue
workQueue ){ - return new ThreadPoolExecutor(
- size,
- size,
- 0L,
- TimeUnit.SECONDS,
- workQueue,
- new ThreadPoolExecutor.CallerRunsPolicy());
- }
- 复制代码
你可能又会想说,会不会任务数量太多,导致调用者所在的线程执行不过来,任务提交的性能急剧下降
那我们就应该自定义拒绝策略,将这下排队的消息记录下来,采用补偿机制的方式去执行
同时也要注意上面的那个异常是线程池抛出来的,我们自己也需要将线程池进行try catch,记录问题数据,并且在finally中执行countDownLatch.countDown来避免,线程池的使用
目前根据业务部门的反馈,业务实际中任务数不很特别多的情况,所以暂时先采用了第二种方式去解决这个线上问题
在这里我们也可以看到,如果没有正确的关闭countDownLatch,可能会导致一直等待,这也是我们需要注意的。
工具虽然好,但是依然要注意他带来的问题,没有正确的去处理好,引发的一系列连锁反应。