• Tomcat线程池与Fork/Join使用


    1 Tomcat线程池

    1 简介

    在这里插入图片描述

    Tomcat使用线程池:

    • LimitLatch 用来限流,可以控制最大连接个数
    • Acceptor 只负责 接收新的 socket 连接
    • Poller 只负责监听 socket channel 是否有 可读的 I/O 事件
    • 一旦可读,封装一个任务对象(socketProcessor),提交给 Executor 线程池处理
    • Executor 线程池中的工作线程最终负责进行处理请求

    Tomcat 线程池扩展了 ThreadPoolExecutor, 但是总线程数达到 maximumPoolSize, 此时不会立即抛出 RejectedExecutionException 异常, 而是再次尝试将任务放入队列,如果再次失败, 才抛出RejectedExecutionException 异常.

    部分源码

    public void execute(Runnable command, long timeout, TimeUnit unit) {
         submittedCount.incrementAndGet();
         try {
             super.execute(command);
         } catch (RejectedExecutionException rx) {
             if (super.getQueue() instanceof TaskQueue) {
                 final TaskQueue queue = (TaskQueue)super.getQueue();
                 try {
                     if (!queue.force(command, timeout, unit)) {
                         submittedCount.decrementAndGet();
                         throw new RejectedExecutionException("Queue capacity is full.");
                     }
                 } catch (InterruptedException x) {
                     submittedCount.decrementAndGet();
                     Thread.interrupted();
                     throw new RejectedExecutionException(x);
                 }
             } else {
                 submittedCount.decrementAndGet();
                 throw rx;
             }
         }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    任务队列

    public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
     if ( parent.isShutdown() ) 
         throw new RejectedExecutionException(
         "Executor not running, can't force a command into the queue"
         );
         //forces the item onto the queue, to be used if the task is rejected
         return super.offer(o,timeout,unit); 
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    2 Connector 配置

    配置项默认值说明
    acceptorThreadCount1acceptor 线程数量
    pollerThreadCount1poller 线程数量
    minSpareThreads10核心线程数,即 corePoolSize
    maxThreads200最大线程数,即 maximumPoolSize
    executor最大线程数,即 maximumPoolSize

    3 Executor 线程配置

    配置项默认值说明
    threadPriority5线程优先级
    daemontrue是否守护线程
    minSpareThreads25核心线程数,即 corePoolSize
    maxThreads200最大线程数,即 maximumPoolSize
    maxIdleTime60000线程生存时间,单位是毫秒,默认值即 1 分钟
    maxQueueSizeInteger.MAX_VALUE队列长度
    prestartminSpareThreadsfalse核心线程是否在服务器启动时启动

    任务执行流程图:

    在这里插入图片描述

    2 Fork/Join

    1 说明

    Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的 cpu 密集型运算.

    任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计 算,如归并排序、斐波那契数列、都可以用分治思想进行求解.

    Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,可提升了运算效率, 默认会创建与 cpu 核心数大小相同的线程池.

    2 案例

    提交给 Fork/Join 线程池的任务需要继承 RecursiveTask(有返回值)或 RecursiveAction(没有返回值)

    创建了一个对 1~n 之间的整数求和的任务.

    @Slf4j(topic = "c.AddTask")
    class AddTask1 extends RecursiveTask<Integer> {
        
         int n;
         public AddTask1(int n) {
             this.n = n;
         }
         @Override
         public String toString() {
             return "{" + n + '}';
         }
        
         @Override
         protected Integer compute() {
             // 如果 n 已经为 1,可以求得结果了
             if (n == 1) {
             log.debug("join() {}", n);
             return n;
             }
    
             // 将任务进行拆分(fork)
             AddTask1 t1 = new AddTask1(n - 1);
             t1.fork();
             log.debug("fork() {} + {}", n, t1);
    
             // 合并(join)结果
             int result = n + t1.join();
             log.debug("join() {} + {} = {}", n, t1, result);
             return result;
         }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    测试

    public static void main(String[] args) {
         ForkJoinPool pool = new ForkJoinPool(4);
         System.out.println(pool.invoke(new AddTask1(5)));
    }
    
    • 1
    • 2
    • 3
    • 4

    运行结果

    [ForkJoinPool-1-worker-0] - fork() 2 + {1} 
    [ForkJoinPool-1-worker-1] - fork() 5 + {4} 
    [ForkJoinPool-1-worker-0] - join() 1 
    [ForkJoinPool-1-worker-0] - join() 2 + {1} = 3 
    [ForkJoinPool-1-worker-2] - fork() 4 + {3} 
    [ForkJoinPool-1-worker-3] - fork() 3 + {2} 
    [ForkJoinPool-1-worker-3] - join() 3 + {2} = 6 
    [ForkJoinPool-1-worker-2] - join() 4 + {3} = 10 
    [ForkJoinPool-1-worker-1] - join() 5 + {4} = 15 
    15
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    执行流程图:

    在这里插入图片描述

    方案优化

    class AddTask3 extends RecursiveTask<Integer> {
     
         int begin;
         int end;
         public AddTask3(int begin, int end) {
             this.begin = begin;
             this.end = end;
         }
        
         @Override
         public String toString() {
             return "{" + begin + "," + end + '}';
         }
        
         @Override
         protected Integer compute() {
             // 5, 5
             if (begin == end) {
                 log.debug("join() {}", begin);
                 return begin;
             }
             // 4, 5
    
             if (end - begin == 1) {
             log.debug("join() {} + {} = {}", begin, end, end + begin);
             return end + begin;
             }
     
             // 目标 1 5
             // 3
             int mid = (end + begin) / 2; 
             // 1,3
             AddTask3 t1 = new AddTask3(begin, mid); 
             t1.fork();
             // 4,5
             AddTask3 t2 = new AddTask3(mid + 1, end); 
             t2.fork();
             log.debug("fork() {} + {} = ?", t1, t2);
             int result = t1.join() + t2.join();
             log.debug("join() {} + {} = {}", t1, t2, result);
             return result;
         }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    测试

    public static void main(String[] args) {
     ForkJoinPool pool = new ForkJoinPool(4);
     System.out.println(pool.invoke(new AddTask3(1, 10)));
    }
    
    • 1
    • 2
    • 3
    • 4

    运行结果

    [ForkJoinPool-1-worker-0] - join() 1 + 2 = 3 
    [ForkJoinPool-1-worker-3] - join() 4 + 5 = 9 
    [ForkJoinPool-1-worker-0] - join() 3 
    [ForkJoinPool-1-worker-1] - fork() {1,3} + {4,5} = ? 
    [ForkJoinPool-1-worker-2] - fork() {1,2} + {3,3} = ? 
    [ForkJoinPool-1-worker-2] - join() {1,2} + {3,3} = 6 
    [ForkJoinPool-1-worker-1] - join() {1,3} + {4,5} = 15 
    15 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    流程执行图:

    在这里插入图片描述

  • 相关阅读:
    飞行动力学 - 第31节-荷兰滚模态机理 之 基础点摘要
    从 Hackathon 战队到创业公司,和开发者们聊聊真实世界 AI Apps 的基础设施丨活动预告
    C++ 编译运行opencv4代码踩坑及解决
    shell脚本中的运算符和条件判断
    电脑基础知识—————— 删除文件
    Kotlin基础学习 16
    C++ 模拟实现 STL 中 unordered 系列的容器
    zookeeper+kafka群集
    学习Hutool工具类库
    有没有免费的视频剪辑软件?快来看看这些视频裁剪软件
  • 原文地址:https://blog.csdn.net/ABestRookie/article/details/126331578