• Android多线程学习:线程池(一)


    一、概念

    线程池:创建并维护一定数量的空闲线程,当有需要执行的任务,就交付给线程池中的一个线程,任务执行结束后,该线程也不会死亡,而是回到线程池中重新变为空闲状态。

    线程池优点
    1、重用线程池中的线程,避免频繁创建和销毁线程所带来的内存开销。
    2、有效控制线程的最大并发数,避免因线程之间抢占资源而导致的阻塞现象。
    3、能够对线程进行简单的管理,提供定时执行以及指定时间间隔循环执行等功能。

    二、线程池使用

    ThreadPoolExecutor是线程池的实现,先来个小demo,核心代码如下:

    private void threadPoolTest() {
        //创建线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4 , 3, TimeUnit.SECONDS, 
                    new LinkedBlockingDeque<>(3), new ThreadPoolExecutor.DiscardOldestPolicy());
        for (int i = 0; i < 5; i++) {
            String data = "task@" + i;
            Log.i(TAG, data);
             //提交任务
            threadPoolExecutor.execute(new ThreadPoolRunnable(data));
        }
        Log.i(TAG, "start test ---------------");
    }
    
    class ThreadPoolRunnable implements Runnable {
        private Object taskData;
        public ThreadPoolRunnable(Object taskData) {
            this.taskData = taskData;
        }
    
        @Override
        public void run() {
            Log.i(TAG, "run - runnableData: " + taskData + " | " + Thread.currentThread());
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            taskData = null;
        }
    }
             
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    三、线程池创建参数说明

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • corePoolSize (int ):核心线程的个数。线程池中有两类线程,核心线程和非核心线程。核心线程默认情况下会一直存在于线程池中,即使这个核心线程什么都不干(铁饭碗),而非核心线程如果长时间的闲置,就会被销毁(临时工)。如果调用了线程池的prestartAllCoreThreads()prestartCoreThread()方法,线程池会提前创建并启动所有核心线程。

    • maximumPoolSize (int ):线程池所能容纳的最大线程数。当活动线程数达到这个数值后,后续的新任务将会被阻塞。该值等于核心线程数量 + 非核心线程数量。如果任务队列使用无界的阻塞队列,该参数没有什么效果。

    有界阻塞队列和无界阻塞队列含义和区别:
    阻塞队列有一个非常重要的属性:容量的大小,分为有界和无界两种。

    无界阻塞队列:队列容量很大,近似无上界,例如 LinkedBlockingQueue 的上限是 Integer.MAX_VALUE,约为 2 的 31 次方,是非常大的一个数,可以近似认为是无限容量。
    有界阻塞队列:队列容量有上界,例如 ArrayBlockingQueue 如果容量满了,也不会扩容,所以一旦满了就无法再往里放数据了。

    • keepAliveTime (long):非核心线程闲置时的超时时长,超过这个时长,非核心线程就会被回收。如果设置allowCoreThreadTimeOut(true),则会也作用于核心线程。

    • unit (TimeUnit )keepAliveTime的时间单位。

    • workQueue(BlockingQueue):线程池中的任务队列,通过线程池的execute方法提交的Runnable对象存储在这个参数中,遵循先进先出原则。

    • threadFactory(ThreadFactory ):创建线程的工厂 ,用于批量创建线程,统一在创建线程时设置一些参数,如是否守护线程、线程的优先级等。如果不指定,默认使用Executors.defaultThreadFactory() 来创建线程,线程具有相同的NORM_PRIORITY优先级并且是非守护线程。

    private static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;
    
        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }
    
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • handler(RejectedExecutionHandler ):拒绝处理策略,线程数量大于最大线程数就会采用拒绝处理策略。

    四种拒绝处理的策略如下:

    • ThreadPoolExecutor.AbortPolicy:默认拒绝处理策略,丢弃任务并抛出RejectedExecutionException异常。
    • ThreadPoolExecutor.DiscardPolicy:丢弃新来的任务,但是不抛出异常。
    • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列头部(最旧的)的任务,然后重新尝试执行程序(如果再次失败,重复此过程)。
    • ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务。

    使用总结:
    demo中的构造代码如下,声明了2个核心进程,最大进程数为4,非核心进程空闲超时时长是3s。阻塞队列使用LinkedBlockingDeque,并指定大小为3,防止队列无限膨胀,拒绝处理策略为DiscardOldestPolicy,丢弃队列头部(最旧的)的任务。

     ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4 , 3, TimeUnit.SECONDS, 
                        new LinkedBlockingDeque<>(3), new ThreadPoolExecutor.DiscardOldestPolicy());
    
    • 1
    • 2

    四、线程池常用阻塞队列(BlockingQueue)

    常用阻塞队列类图.jpg

    1、ArrayBlockingQueue(常用)

    ArrayBlockingQueue.jpg

    • 基于数组的阻塞队列。ArrayBlockingQueue 内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列。
    • ArrayBlockingQueue 在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,没有实现读写分离。
    • 在创建 ArrayBlockingQueue 时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。

    ArrayBlockingQueue 使用 ReentrantLock 的公平锁和非公平锁来实现该功能。简单理解就是,ReentrantLock 内部会维护一个有先后顺序的等待队列,假如有五个任务一起过来,都被阻塞了。如果是公平的,则等待队列中等待最久的任务就会先进入阻塞队列。如果是非公平的,那么这五个线程就需要抢锁,谁先抢到,谁就先进入阻塞队列。

    2、LinkedBlockingQueue(常用)
    LinkedBlockingQueue.jpg

    • 由链表结构组成的有界阻塞队列。LinkedBlockingQueue 不指定队列的大小时,默认值是 Integer.MAX_VALUE 。但是,建议指定一个固定大小。因为,如果生产者的速度比消费者的速度大的多的情况下,这会导致阻塞队列一直膨胀,直到系统内存被耗尽。
    • LinkedBlockingQueue 实现了读写分离,可以实现数据的读和写互不影响,这在高并发的场景下,对于效率的提高无疑是非常巨大的。

    3、SynchronousQueue
    SynchronousQueue.jpg

    • 这是一个没有缓冲的无界队列,size0。当执行插入元素的操作时,必须等待一个取出操作。也就是说,put元素的时候,必须等待 take 操作。
    • 适用于并发任务不大,而且生产者和消费者的速度相差不多的场景下,直接把生产者和消费者对接,不用经过队列的入队出队这一系列操作,效率上会高一些。
    • Excutors.newCachedThreadPool 方法用的就是这种队列,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。

    4、PriorityBlockingQueue
    PriorityBlockingQueue.jpg

    • 这是一个支持优先级排序的无界队列。
    • 可以指定初始容量大小(注意初始容量并不代表最大容量),或者不指定,默认大小为 11。也可以传入一个比较器,把元素按一定的规则排序,不指定比较器的话,默认是自然顺序。
    • PriorityBlockingQueue 是基于二叉树最小堆实现的,每当取元素的时候,就会把优先级最高的元素取出来。

    5、DelayQueue

    • 一个带有延迟时间的无界阻塞队列,队列中的元素,只有等延时时间到了,才能取出来。此队列一般用于过期数据的删除,或任务调度。

    五、线程池常用方法

    • execute(Runnable run):提交任务,交由线程池调度;
    • shutdown():关闭线程池,等待任务执行完成;
    • shutdownNow():关闭线程池,不等待任务执行完成;
    • getTaskCount():返回线程池找中所有任务的数量 (已完成的任务+阻塞队列中的任务);
    • getCompletedTaskCount():返回线程池中已执行完成的任务数量 (已完成的任务);
    • getPoolSize():返回线程池中已创建线程数量;
    • getActiveCount():返回当前正在运行的线程数量;
    • terminated():线程池终止时执行的策略。

    六、线程池状态

    ThreadPoolExecutor类中使用了一些final int常量变量来表示线程池的状态 :

    private static final int RUNNING = -1 << COUNT_BITS;
    private static final int SHUTDOWN = 0 << COUNT_BITS;
    private static final int STOP = 1 << COUNT_BITS;
    private static final int TIDYING = 2 << COUNT_BITS;
    private static final int TERMINATED = 3 << COUNT_BITS;
    
    • 1
    • 2
    • 3
    • 4
    • 5

    1、RUNNING

    • 线程池创建后处于RUNNING状态。
    • 线程池能够接收新任务,也能够对已经添加的任务进行处理。

    2、SHUTDOWN

    • 线程池已经被关闭了,不再接收新任务;但是,其还是会处理队列中的剩余的任务。
    • 调用线程池的shutdown()方法后,线程池的状态就会由RUNNING转为SHUTDOWN

    3、STOP

    • 线程池处于STOP状态,此时线程池不再接收新任务,不处理已经添加进来的任务,并且会中断正在处理的任务。
    • 调用线程池的shutdownNow()方法后,线程池的状态就会由RUNNINGSHUTDOWN转为STOP

    4、TIDYING

    • workCount(有效线程数)为0
    • 线程池被下达关闭命令后,如果当前所有的任务都已经终止了(这个终止可以表示执行结束,也可以表示强制中断,也可以表示被丢弃) ,那么线程就会进入TIDYING状态;当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时进行相应的处理,可以通过重载terminated()函数来实现。
    • 如果线程状态已经是SHUTDOWN了,并且线程中以及队列中都没有任务时,线程池就会由SHUTDOWN转为TIDYING;如果线程池状态为STOP,那么当线程池把所有的任务都给清理干净时,线程池就会由STOP转为TIDYING

    5、TERMINATED

    • 线程池就结束了;线程池就不能重新启动了。
    • 如果线程池处于TIDYING状态,那么当线程池执行完terminated()方法后,线程池状态就会由TIDYING转为TERMINTED

    线程池状态转换图.png

    七、线程池分类

    1、newFixedThreadPool()

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 线程数量固定且都是核心线程:核心线程数量和最大线程数量都是nThreads
    • keepAliveTime = 0L,即使线程池中的线程空闲,也不会被回收。除非调用shutDown()shutDownNow()去关闭线程池。
    • 可以更快响应外界的请求,且任务阻塞队列为无边界队列(LinkedBlockingQueue()链表结构的阻塞队列),意味着任务可以无限加入线程池。
    • 可控制线程最大并发数。

    2、newScheduledThreadPool()

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    //默认闲置超时回收时常
    private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 核心线程数量固定,非核心线程数量无限制。
    • 非核心线程闲置超过10s会被回收。
    • 主要用于执行定时任务和具有固定周期的重复任务。

    3、newSingleThreadExecutor()

    public static ExecutorService newSingleThreadExecutor() {
    	return new FinalizableDelegatedExecutorService
         			(new ThreadPoolExecutor(1, 1,
                                 0L, TimeUnit.MILLISECONDS,
                                 new LinkedBlockingQueue<Runnable>()));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 核心线程数为1的线程池。确保所有任务在同一线程执行,因此不用考虑线程同步问题,从命名也可以看出来,其他为ThreadPool线程池,当前类型为ThreadExecutor,执行线程。(相当于newFixedThreadPool传入线程个数是1)。
    • 阻塞队列是无界的,可以一直接收任务 。
    • 常用于不适合并发但可能引起IO阻塞性及影响UI线程响应的操作,如数据库操作、文件操作等。

    4、newCachedThreadPool()

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 没有核心线程,非核心线程无限,但执行完任务后,空闲超过60s则回收空闲线程。
    • 使用SynchronousQueue() 队列,有新任务时使用空闲线程执行,没有空闲线程则创建新的线程来处理。
    • 适合执行大量的耗时较少的任务,当所有线程闲置超过60s都会被停止,所以这时几乎不占用系统资源。

    八、各类线程池使用

    基础知识差不多了,可以撸代码了,以下为核心代码:

    public void fixedThreadPoolTest() {
         Log.i(TAG, "----fixedThreadPoolTest----");
         ExecutorService fixedThreadPool = Executors.newFixedThreadPool(2);
         for (int i = 0; i < 5; i++) {
             String data = "task@" + i;
             Log.i(TAG, data);
             fixedThreadPool.execute(new ThreadPoolRunnable(data));
         }
    }
    
    public void cachedThreadPoolTest() {
         Log.i(TAG, "----cachedThreadPoolTest----");
         ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    
         for (int i = 0; i < 5; i++) {
             String data = "task@" + i;
             Log.i(TAG, data);
             cachedThreadPool.execute(new ThreadPoolRunnable(data));
         } 
    }
    
    public void singleThreadExecutorTest() {
         Log.i(TAG, "----singleThreadExecutorTest----");
         ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
         for (int i = 0; i < 5; i++) {
             String data = "task@" + i;
             Log.i(TAG, data);
             singleThreadExecutor .execute(new ThreadPoolRunnable(data));
         } 
    }
    
    public void scheduledThreadPoolTest() {
         Log.i(TAG, "----scheduledThreadPoolTest----");
         //这里创建的是ScheduledExecutorService对象,ScheduledExecutorService是ExecutorService的子类
         mScheduledThreadPool = Executors.newScheduledThreadPool(2);
         for (int i = 0; i < 5; i++) {
             String data = "task@" + i;
             Log.i(TAG, data);
             // 1000ms后执行runnable
             mScheduledThreadPool.schedule(new ThreadPoolRunnable(data), 1000, TimeUnit.MILLISECONDS);
         } 
    }
    
    public void scheduledAtFixedRateTest() {
         Log.i(TAG, "----scheduledAtFixedRateTest----");
         //这里创建的是ScheduledExecutorService对象,ScheduledExecutorService是ExecutorService的子类
         mScheduledThreadPool = Executors.newScheduledThreadPool(2);
         // 1s后,每2s执行一次runnable
         mScheduledThreadPool.scheduleAtFixedRate(new ThreadPoolRunnable(666), 1000, 2000, TimeUnit.MILLISECONDS);
    }
    
    public void scheduledCancelTest() {
         Log.i(TAG, "----scheduledCancelTest----");
         if (mScheduledThreadPool != null) {
             mScheduledThreadPool.shutdown();
         }
    }
    
    class ThreadPoolRunnable implements Runnable {
         private Object taskData;
         public ThreadPoolRunnable(Object taskData) {
             this.taskData = taskData;
         }
    
         @Override
         public void run() {
             Log.i(TAG, "run - runnableData: " + taskData + " | " + Thread.currentThread());
             try {
                 Thread.sleep(500);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
             taskData = null;
         }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75

    运行结果:

    fixedThreadPoolTest.png

    cachedThreadPoolTest函数,看下图结果可知,一共新建了5个线程来执行这5个任务,所以newCachedThreadPoo()只适合耗时较短的任务,否则会一直创建新线程。
    cachedThreadPool.png

    singleThreadExecutorTest.png

    scheduledThreadPoolTest.png

    scheduleAtFixedRateTest.png

    参考文章:
    线程池10:线程池的5种状态;
    常用阻塞队列 BlockingQueue 有哪些?
    Android多线程:理解和简单使用总结
    Android 线程池
    全方位解析-Android中的线程池

  • 相关阅读:
    rust 迭代器
    大数据Presto(四):Presto自定义函数和JDBC连接
    多线程-- 原子访问和atomic原子操作类实现原理
    代码随想录算法训练营第四十六天|动态规划|139.单词拆分、关于多重背包,你该了解这些! 、背包问题总结篇!
    SQL之开窗函数
    使用Java实现汉诺塔问题~
    【机器学习】21天挑战赛学习/论文总结(第二周)
    qt——窗口置灰不可操作
    【postgresql 数据库运维文档】
    Oracle11g或19c imp/impdp导入记录
  • 原文地址:https://blog.csdn.net/kongqwesd12/article/details/133701778