• java使用线程池批量插入mysql数据


    首先我们使用最原始的for循环插入数据:

    1. for (int i = 0; i < 100000000; i++) {
    2. service.add(new LongTest().setStatus(1).
    3. setName(NumberUtil.getPwdRandom(5)));
    4. }

    通过上面的操作大概每3秒可以插入数据库1000条数据,这样效率太慢了。我们下面将换成线程池进行数据插入。

    定义线程池常量类

    1. /**
    2. * 线程池常量.
    3. */
    4. public class ThreadPoolConstant {
    5. /**
    6. * 核心线程数量
    7. */
    8. public static final int CORE_THREAD_NUM = 10;
    9. /**
    10. * 最大线程数量
    11. */
    12. public static final int MAX_THREAD_NUM = 15;
    13. /**
    14. * 非核心线程存活时间
    15. */
    16. public static final long KEEP_ALIVE_TIME_SECONDS = 10L;
    17. /**
    18. * 任务队列长度
    19. */
    20. public static final int QUEUE_LENGTH = 20;
    21. /**
    22. * 线程超时时间
    23. */
    24. public static final long TIME_OUT = 70;
    25. }

    创建线程池执行线程(当日创建线程池的发放有好几种,我们就不一一举例了,我们只列出最简单的一种演示即可)

    1. // 执行任务
    2. // 创建线程池(线程池大小、最大线程数、保持存活时间(线程空闲时间超过会退出线程)、时间单位)
    3. ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
    4. ThreadPoolConstant.CORE_THREAD_NUM
    5. , ThreadPoolConstant.MAX_THREAD_NUM,
    6. ThreadPoolConstant.KEEP_ALIVE_TIME_SECONDS,
    7. TimeUnit.SECONDS,
    8. new LinkedBlockingQueue<>(ThreadPoolConstant.QUEUE_LENGTH),Executors.defaultThreadFactory()
    9. ,new ThreadPoolExecutor.CallerRunsPolicy());
    10. // 执行任务
    11. for (int i = 0; i < 100000000; i++) {
    12. final int index = i;
    13. threadPool.execute(() -> {
    14. Thread.currentThread().getName()+"\n");
    15. service.add(new LongTest().setStatus(1).
    16. setName(NumberUtil.getPwdRandom(5)));
    17. });
    18. }

    通过上面的线程池代码插入数据,相比传统的数据插入,使用线程池的效率提高了百分之90以上,当然我们还可以修改线程常量进行线程控制。

    下面我们将对上面线程参数进行一一讲解:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 

    corePoolSize : 每秒需要多少个线程处理? 
    threadcount = tasks/(1/taskcost) =tasks*taskcout =  (100~1000)*0.1 = 10~100 个线程。corePoolSize设置应该大于10
    根据8020原则,如果80%的每秒任务数小于200,那么corePoolSize设置为20即可
     

    maximumPoolSize:最大线程数量,当前线程池最大接收线程数量,如果超出的话超出的线程将进行等待或者不执行销毁。

    keepAliveTime : 表示线程的存活时间,举例:一个工地设置了20个人(最大线程数),当工地活干完了有10个人处于空闲时间,当空闲时间达到了我们设定的时间就进行辞退(销毁)。

    unit : 线程存活时间单位。

    workQueue:工作队列(阻塞队列)选择,线程池中常用的阻塞队列有三种

    1. BlockingQueue<Runnable> workQueue = null;
    2. workQueue = new SynchronousQueue<>();//无缓冲的等待队列
    3. workQueue = new ArrayBlockingQueue<>(5);//基于数组的先进先出队列
    4. workQueue = new LinkedBlockingQueue<>();//基于链表的先进先出队列

    SynchronousQueue是一个不存储元素的阻塞队列,适合传递性场景,只是负责把父线程提交的任务直接交给线程池线程处理。也就是说提交的任务数超过最大线程数就会执行拒绝策略

    ArrayBlockingQueue底层是用数组实现的有界阻塞队列,因为需要传初始值(如果传Integer最大值,也类似于无界了)。队列按照先进先出的原则对元素进行排序

    LinkedBlockingQueue底层是用链表实现的有界阻塞队列,如果不传初始化值为Integer最大值,也是先进先出对元素进行排序

    一般选择建议选择有界队列,因为如果任务特别多,核心线程处理不过来,会将任务都放到工作队列中,此时最大线程数已经没有意义了。如果控制不好会导致OOM

    handler :最后一个就是拒绝策略选择,JDK提供了四种拒绝策略
    AbortPolicy:直接丢弃新任务,抛出异常,当有多个任务时,只要任务超出了设定任务的最大线程数加阻塞数时,就会抛出异常,没有超出的线程正常执行,超出报异常后面的不执行。

    1. new ThreadPoolExecutor(
    2. ThreadPoolConstant.CORE_THREAD_NUM
    3. , ThreadPoolConstant.MAX_THREAD_NUM,
    4. ThreadPoolConstant.KEEP_ALIVE_TIME_SECONDS,
    5. TimeUnit.SECONDS,
    6. new LinkedBlockingQueue<>(ThreadPoolConstant.QUEUE_LENGTH),Executors.defaultThreadFactory()
    7. ,new ThreadPoolExecutor.AbortPolicy());

    DiscardPolicy:直接丢弃掉,不会抛出异常,最大线程数加阻塞数如果只要10,那么前10个线程会正常执行,后面加入的线程会被丢弃。

    1. new ThreadPoolExecutor(
    2. ThreadPoolConstant.CORE_THREAD_NUM
    3. , ThreadPoolConstant.MAX_THREAD_NUM,
    4. ThreadPoolConstant.KEEP_ALIVE_TIME_SECONDS,
    5. TimeUnit.SECONDS,
    6. new LinkedBlockingQueue<>(ThreadPoolConstant.QUEUE_LENGTH),Executors.defaultThreadFactory()
    7. ,new ThreadPoolExecutor.DiscardPolicy());


    DiscardOldestPolicy:丢弃时间最久的任务。一般是队列最前面的任务,只要还有任务新增,一直会丢弃阻塞队列的最老的任务,并将新的任务加入到阻塞队列中

    1. new ThreadPoolExecutor(
    2. ThreadPoolConstant.CORE_THREAD_NUM
    3. , ThreadPoolConstant.MAX_THREAD_NUM,
    4. ThreadPoolConstant.KEEP_ALIVE_TIME_SECONDS,
    5. TimeUnit.SECONDS,
    6. new LinkedBlockingQueue<>(ThreadPoolConstant.QUEUE_LENGTH),Executors.defaultThreadFactory()
    7. ,new ThreadPoolExecutor.DiscardOldestPolicy());


    CallerRunsPolicy:交给主线程去执行,多余的任务会被放入队列中,最后的任务还是继续被执行。(一般系统都是默认使用此类策略的,不会造成线程丢失),他是什么意思呢?通俗一点讲就是如果我们添加了最大线程数是2,核心线程数是2,队列长度是5的,这个时候我们一次性加入了10个线程,其中2个线程在等待,队列里面有5个线程,还有3个线程怎么办呢?这3个线程不会被丢弃的,而是谁提交的线程谁去执行(就是主线程执行,不会在提交到线程池了)

    1. new ThreadPoolExecutor(
    2. ThreadPoolConstant.CORE_THREAD_NUM
    3. , ThreadPoolConstant.MAX_THREAD_NUM,
    4. ThreadPoolConstant.KEEP_ALIVE_TIME_SECONDS,
    5. TimeUnit.SECONDS,
    6. new LinkedBlockingQueue<>(ThreadPoolConstant.QUEUE_LENGTH),Executors.defaultThreadFactory()
    7. ,new ThreadPoolExecutor.CallerRunsPolicy());

    详细讲解 CallerRunsPolicy:

    拒绝策略 CallerRunsPolicy,相对而言它就比较完善了,当有新任务提交后,如果线程池没被关闭且没有能力执行,则把这个任务交于提交任务的线程执行,也就是谁提交任务,谁就负责执行任务。这样做主要有两点好处。

    • 第一点新提交的任务不会被丢弃,这样也就不会造成业务损失。
    • 第二点好处是,由于谁提交任务谁就要负责执行任务,这样提交任务的线程就得负责执行任务,而执行任务又是比较耗时的,在这段期间,提交任务的线程被占用,也就不会再提交新的任务,减缓了任务提交的速度,相当于是一个负反馈。在此期间,线程池中的线程也可以充分利用这段时间来执行掉一部分任务,腾出一定的空间,相当于是给了线程池一定的缓冲期。

    4种拒绝策略,给大家提供一个非常详细讲解的地址

    Java 线程池四种拒绝策略_小码code的博客-CSDN博客_线程池的拒绝策略

  • 相关阅读:
    TCP 和 UDP 可以同时绑定相同的端口吗?
    github 中关于Pyqt 的module view 操作练习
    MapReduce 排序三种实现方式
    PG::SunsetDecoy
    创建Vue3工程
    【数据结构】Java实现数据结构的前置知识,时间复杂度空间复杂度,泛型类的讲解
    可视化大屏的终极解决方案居然这么简单,autofit.js一行全搞定!
    基于安卓Android银行排队叫号系统设计与实现
    API商品数据接口:实现电子商务应用程序的核心功能
    clock gating check
  • 原文地址:https://blog.csdn.net/qq_38935605/article/details/126382734