• Dubbo线程池


    前言

        Dubbo使用Netty作为网络调用框架,Netty是一个Reactor模型的框架,线程模型分为boss线程池和worker线程池,boss线程池负责监听、分配事件,worker线程池负责处理事件,简单说就是boss线程池负责hold请求,并分发到worker池,worker线程池负责处理具体事件。

        dubbo在原本的netty中的线程(boss线程和worker)做了一些修改,将其定义为io线程,而后由实现了一套用于处理业务的业务线程池,这就和上一篇介绍的Dubbo协议下的服务端线程模型产生了关联,dubbo的io线程监听请求,业务处理由dubbo自定义的线程池处理,这里将请求分发到具体的业务线程池就是由Dispatcher实现的,默认是AllDispatcher,上一篇已经简单介绍了Dubbo协议的线程池的分发模型,这篇文章就介绍下Dubbo究竟自定义了哪几种线程池的实现,并且都是怎么实现的。

    • 注:Apache Dubbo版本为3.0.7

    Dubbo线程池接口ThreadPool

    在这里插入图片描述

        Dubbo自定义的线程池的核心接口是org.apache.dubbo.common.threadpool.ThreadPool,并且提供了四种实现分别是CachedThreadPoolFixedThreadPoolLimitedThreadPoolEagerThreadPoolThreadPool接口是SPI的,如果不指定线程池的具体实现默认是fixed,在项目中配置如下:配置线程池类型是fixed,线程数为100,线程模型是all

    <dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="100" />
    
    • 1

    ThreadPool代码如下,接下来分别简单介绍一下四种线程池的具体实现

    @SPI(value = "fixed", scope = ExtensionScope.FRAMEWORK)
    public interface ThreadPool {
    
        /**
         * Thread pool
         *
         * @param url URL contains thread parameter
         * @return thread pool
         */
        @Adaptive({THREADPOOL_KEY})
        Executor getExecutor(URL url);
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    CachedThreadPool缓存线程池

        该线程池是缓存类型的,当空闲到一定时间时会将线程删掉,使用时再创建,具体dubbo的实现如下,代码实现很简单,就是使用JUC的ThreadPoolExecutor创建了一个缓存类型的线程池,将maximumPoolSize设置成Integer.MAX_VALUE,keepAliveTime设置成60000毫秒,队列大小设置成0,当超过任务数超过corePoolSize就会直接创建worker线程,当线程空闲60s后就会被销毁。

    public class CachedThreadPool implements ThreadPool {
    
        @Override
        public Executor getExecutor(URL url) {
            String name = url.getParameter(THREAD_NAME_KEY, (String) url.getAttribute(THREAD_NAME_KEY, DEFAULT_THREAD_NAME));
            int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
            int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
            int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
            int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
            return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                    queues == 0 ? new SynchronousQueue<Runnable>() :
                            (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                    : new LinkedBlockingQueue<Runnable>(queues)),
                    new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    FixedThreadPool固定线程数的线程池

        该线程池是固定线程数的线程池实现,具体实现也是使用JUC的ThreadPoolExecutor创建了一个固定线程数的线程池,通过url中配置的threads,将corePoolSize和maximumPoolSize都设置成threads的数量,并且keepAliveTime设置成0。

    public class FixedThreadPool implements ThreadPool {
    
        @Override
        public Executor getExecutor(URL url) {
            String name = url.getParameter(THREAD_NAME_KEY, (String) url.getAttribute(THREAD_NAME_KEY, DEFAULT_THREAD_NAME));
            int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
            int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
            return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                    queues == 0 ? new SynchronousQueue<Runnable>() :
                            (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                    : new LinkedBlockingQueue<Runnable>(queues)),
                    new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    LimitedThreadPool可伸缩线程池

        虽然叫可伸缩线程池,但是实际上只能伸不能缩,官网上说是为了突然大量的流量引起性能问题,具体实现就是将keepAliveTime设置成无限大,这样当队列满了后就会创建线程达到maximumPoolSize,新创建的这些线程因为keepAliveTime设置成无限大所以也不会销毁了。

    public class LimitedThreadPool implements ThreadPool {
    
        @Override
        public Executor getExecutor(URL url) {
            String name = url.getParameter(THREAD_NAME_KEY, (String) url.getAttribute(THREAD_NAME_KEY, DEFAULT_THREAD_NAME));
            int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
            int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
            int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
            return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
                    queues == 0 ? new SynchronousQueue<Runnable>() :
                            (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                    : new LinkedBlockingQueue<Runnable>(queues)),
                    new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    EagerThreadPool

        Eager单词是渴望的,热切地的意思,这个线程池所实现的逻辑是,当任务数超过corePoolSize但小于maximumPoolSize时不是将新任务放到队列中,而是优先创建新的worker线程,当线程数已经达到maximumPoolSize,接下来新的任务才会放到阻塞队列中,阻塞队列满了会抛出RejectedExecutionException

        EagerThreadPool线程池就不是通过JUC的ThreadPoolExecutor实现的了,而是继承ThreadPoolExecutor自己实现一些逻辑,下面一步一步看。

    • EagerThreadPool

        Dubbo自己实现了阻塞队列TaskQueue和线程池EagerThreadPoolExecutor,从EagerThreadPool的代码中看不到该类型线程池的核心逻辑,核心逻辑是在TaskQueue代码中,这里跳过直接看TaskQueue代码。

    public class EagerThreadPool implements ThreadPool {
    
        @Override
        public Executor getExecutor(URL url) {
            String name = url.getParameter(THREAD_NAME_KEY, (String) url.getAttribute(THREAD_NAME_KEY, DEFAULT_THREAD_NAME));
            int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
            int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
            int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
            int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
    
            // init queue and executor
            TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);
            EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
                    threads,
                    alive,
                    TimeUnit.MILLISECONDS,
                    taskQueue,
                    new NamedInternalThreadFactory(name, true),
                    new AbortPolicyWithReport(name, url));
            taskQueue.setExecutor(executor);
            return executor;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • TaskQueue

        Dubbo的EagerThreadPool是通过TaskQueueoffer方法实现的,逻辑就是当提交到线程池任务时,如果任务数大于corePoolSize,会将任务offerTaskQueue中,这时如果活跃的线程数大于等于线程池大小,并且当前线程数小于maximumPoolSize时就会伪装成放入到队列失败,这时线程池就会创建线程,从而实现超过corePoolSize不超过maximumPoolSize时创建worker线程而不是将任务放入到队列中。

    public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {
    
        private static final long serialVersionUID = -2635853580887179627L;
    
        private EagerThreadPoolExecutor executor;
    
        public TaskQueue(int capacity) {
            super(capacity);
        }
    
        public void setExecutor(EagerThreadPoolExecutor exec) {
            executor = exec;
        }
    
        @Override
        public boolean offer(Runnable runnable) {
            if (executor == null) {
                throw new RejectedExecutionException("The task queue does not have executor!");
            }
    
            int currentPoolThreadSize = executor.getPoolSize();
            // have free worker. put task into queue to let the worker deal with task.
            if (executor.getActiveCount() < currentPoolThreadSize) {
                return super.offer(runnable);
            }
    
            // 伪装放入队列失败,让线程池创建线程
            if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
                return false;
            }
    
            // currentPoolThreadSize >= max
            return super.offer(runnable);
        }
    
        /**
         * retry offer task
         *
         * @param o task
         * @return offer success or not
         * @throws RejectedExecutionException if executor is terminated.
         */
        public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
            if (executor.isShutdown()) {
                throw new RejectedExecutionException("Executor is shutdown!");
            }
            return super.offer(o, timeout, unit);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • EagerThreadPoolExecutor

        当任务数大于maximumPoolSize时,线程池会抛出RejectedExecutionExceptionEagerThreadPoolExecutor捕获这个异常,并且调用TaskQueueretryOffer方法尝试放入队列,这样就实现了当线程数已经达到maximumPoolSize,接下来新的任务才会放到阻塞队列中,阻塞队列满了会抛出RejectedExecutionException,代码如下:

    public class EagerThreadPoolExecutor extends ThreadPoolExecutor {
    
        public EagerThreadPoolExecutor(int corePoolSize,
                                       int maximumPoolSize,
                                       long keepAliveTime,
                                       TimeUnit unit, TaskQueue<Runnable> workQueue,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        }
    
        @Override
        public void execute(Runnable command) {
            if (command == null) {
                throw new NullPointerException();
            }
    
            try {
                super.execute(command);
            } catch (RejectedExecutionException rx) {
                // 重新尝试将任务放到队列中.
                final TaskQueue queue = (TaskQueue) super.getQueue();
                try {
                    if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
                        throw new RejectedExecutionException("Queue capacity is full.", rx);
                    }
                } catch (InterruptedException x) {
                    throw new RejectedExecutionException(x);
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    总结

        Dubbo实现了自定义线程池,其核心接口是ThreadPool,该接口是SPI的默认的实现是fixed,Dubbo提供了四种实现,分别是CachedThreadPoolFixedThreadPoolLimitedThreadPoolEagerThreadPool

  • 相关阅读:
    白嫖游戏指南,Epic喜加二:《INDUSTRIA》《LISA: Definitive Edition》
    DP读书:《半导体物理学(第八版)》(四)半导体的导电性-玻尔兹曼-电导率
    Flutter——最详细(Scaffold)使用教程
    shell脚本踩坑,source中文properties。
    从阿里云“数字证书管理服务”申请免费的SSL证书
    图像处理黑科技—破解文档识别难题(PS检测、弯曲拉平、切边切片、摩尔纹)
    OJ练习第23题——Z字形变换
    beanstalkd 启动跟停止【经常使用 nohup 和 & 配合来启动程序,如: nohup ./test &同时免疫SIGINT和SIGHUP信号】
    Agile Development
    websocket详解
  • 原文地址:https://blog.csdn.net/qq_31279701/article/details/126666703