• quartz框架(六)-ThreadPool


    ThreadPool

    本篇博文,博主将介绍Quartz框架中ThreadPool线程池相关的内容。线程池顾名思义,就是一个可以帮助我们来进行线程资源管理的对象。在web开发中,常见的就有数据库连接池,http连接池,redis连接池等。在看这篇文章之前,读者需要先具备一定的多线程和锁的知识,如使用wait和notify方法,实现生产者和消费者功能。

    为什么要用线程池?

    1. 线程池可以复用线程,减少线程的创建和销毁次数。
    2. 可以提高程序的响应速度。
    3. 可以对线程资源进行统一管理,比如监控。
    4. ...

    接口定义

    quartz框架中的ThreadPool接口定义如下,博主在相应的接口方法上进行了注释。

    public interface ThreadPool {
    
        //将runnable接口放入到Thread中执行
        boolean runInThread(Runnable runnable);
       
       //阻塞获取可用线程数
        int blockForAvailableThreads();
       
       //线程初始化方法
        void initialize() throws SchedulerConfigException;
       
       //关闭线程
        void shutdown(boolean waitForJobsToComplete);
        
    	//获取线程池大小
        int getPoolSize();
       
       //设置实例id
        void setInstanceId(String schedInstId);
        
    	//设置实例名称
        void setInstanceName(String schedName);
    }
    

    WorkThread

    在介绍SimpleThreadPool之前,博主先讲解一下WorkThread。WorkThread继承了Thread类,因此它是一个可以被运行的线程。它会阻塞式的接收线程池分配的任务,然后执行对应的任务。

    工作者线程属性

    1. lock,任务执行锁。
    2. tp,工作者线程所在的线程池。
    3. runnable,可以运行的任务。
    4. runOnce,是否只执行一次。通过构造函数传runnable实例,此时runOnce为true。
    5. run,是否需要循环执行。

    Runnable的run方法

    工作者线程启动之后,在一个while循环里面执行业务逻辑。while循环的退出条件是线程是否关闭(run == false)。先去获取任务锁(lock对象),如果没有关闭且当前runnale方法为空,说明此时没有需要执行的任务。线程会在这里等待500毫秒(wait 500)。当有任务投递给当前线程时,才会唤醒工作者线程,继续执行当前方法。

    如果任务不为空的话,执行runnable接口。执行完之后获取lock锁,防止并发冲突。获取到锁之后,设置runnable接口为null。如果只是只执行一次的任务,通过持有的threadPool引用,去获取下一次任务允许的锁,获取到下一次任务允许的锁之后,那么就将自己从busyWorks中移除。否则就从busyWorkers中移除,然后添加到avaliableWorkers。

    自定义的run方法

    内部的run(Runnable newRunnable)方法:先去获取内部对象lock的锁,获取成功后先判断内部的runnable是否为空。如果不为空的话说明该线程还是处于繁忙状态,抛出异常。如果为空的话,则设置为传递进来的newRunnable,并且唤醒所有lock等待队列中的对象(提前让这些对象结束等待,提高工作线程的响应速度)。

    shutdown方法

    设置内部成员变量run为false,也就是任务执行完之后不再循环等待下一个可以运行的任务,结束线程的run方法后,线程会进入terminate状态。

    SimpleThreadPool

    在quartz默认的配置文件中,使用的是SimpleThreadPool这个线程池,并且指定了线程池的个数为10。从源码中,我们也可以看到SimpleThreadPool是一个线程大小固定的线程池。代码如下所示:

    public SimpleThreadPool(int threadCount, int threadPriority) {
            setThreadCount(threadCount);
            setThreadPriority(threadPriority);
     }
    

    线程池属性

    1. count,线程池个数
    2. isShutdown,是否处于关闭状态
    3. handoffPending,是否处于切换状态
    4. makeThreadsDaemons,创建的线程是否为后台线程
    5. threadGroup,线程组
    6. nextRunnableLock,下一个可以运行的任务锁
    7. workers,总共线程集合
    8. availWorkers,空闲线程集合
    9. busyWorkers,繁忙线程集合
    10. threadNamePrefix,线程名称前缀
    11. schedulerInstanceName,调度器实例名称

    initialize方法

    在QuartzScheduler对象进行初始化的时候,就会创建对应的线程池,并且调用对应的initialize方法。initialize方法主要就是预先创建对应个数的工作者线程,并且将它们添加到workers和availWorkers集合中,并且循环调用每个workThread的start方法。

    blockForAvailableThreads方法

    先获取到下一次运行的任务锁(nextRunnableLock),防止这时候的空闲线程集合availWorkers发生变化。循环判断如果此时的availWorkers小于1,或者此时有任务进行等待分配(handoffPending为true),并且此时线程池没有关闭(isShutdown为false),那么就进行等待(wait 500),直到前面的条件都不满足为止才退出循环。

    如果此时获取到的空闲线程数大于等于1,则说明现在可以把对应个数的任务交给线程池进行分配执行。

    runInThread方法

    先获取到可以允许下一个任务的锁(nextRunnableLock),设置handoffPending为true,handoffPending表示当前有任务在等待线程池分配任务。接着阻塞判断是否存在空闲线程可以获取(一般情况下,繁忙线程执行完后会将自己添加到空闲线程集合中)或者 线程池是否将要被关闭(isShutdown为false)。

    如果没有关闭线程池的情况下,直接从空闲线程集合中拿到第一个线程,并把它从空闲集合中移除。然后将这个空闲线程添加到繁忙线程集合中,接着执行workThread的投递方法(run方法)。

    如果关闭线程池将要被关闭的情况下,直接new出一个线程(此时这个WorkThread的runOnce属性为true)去执行这个任务,不再等待有空闲线程去执行,这样可以减少线程池的shutdown时间。并将此线程添加到繁忙线程集合中,添加到工作者集合中。

    最后通知等待下一次允许任务锁的线程,设置handoffPending为false。

    shutdown

    整体来说,我们关闭线程池时,一个需要停止上游线程(quartzScheduleThread)给他(thradPool)分配任务,另一个需要关闭掉工作池中现有的任务。关闭线程池的方法有一个waitForJobsToComplete的属性,waitForJobsToComplete表示线程池关闭是否需要等到运行中的任务执行完毕。

    先获取nextRunnableLock,设置线程池为关闭状态(shutdown),并循环调用workers集合中thread的shutdown方法(不让工作者线程再循环执行),然后移除availWorkers中的线程。通知阻塞在nextRunnableLock锁上的所有线程。

    如果waitForJobsToComplete为true,那么会循环判断busyWorkers的元素个数是否大于0(一般情况下,繁忙集合中的线程结束任务后,会将自己从繁忙集合中移除),如果busyWorkers的元素个数大于0的,调用nextRunnableLock的阻塞方法,让其它方法有时间处理(比如如果此时有任务需要进行分配,可以让线程池把任务分配好)。最后循环调用每个workers中thread的join方法,等待thread死亡。

    如果waitForJobsToComplete为false,那么会直接返回。

    博主微信公众号

  • 相关阅读:
    iframe安全问题
    怎样在应用中实现自助报表功能?
    MacOS怎么安装Nacos(附带:Windows系统)
    你写过的最蠢的代码是?
    LeetCode 88. 合并两个有序数组(JavaScript 简单)
    Spring BeanUtils copyProperties list 带来的问题
    C++实现std::bind
    高级测试:如何使用Flink对Strom任务的逻辑功能进行复现测试?
    JDK8升级JDK11最全实践干货来了
    Linux开发工具之调试器gdb
  • 原文地址:https://www.cnblogs.com/chenhaoblog/p/15999212.html