• JAVA用线程池模拟查询大批量数据


    JAVA用线程池模拟查询大批量数据

    1、前言背景

    1.1、线程

    进程中的一个执行任务(控制单元),负责当前进程中程序的执行。一个进程至少有一个线程,一个进程可以运行多个线程,多个线程可共享数据。

    与进程不同的是同类的多个线程共享进程的方法区资源,但每个线程有自己的程序计数器虚拟机栈本地方法栈,所以系统在产生一个线程,或是在各个线程之间作切换工作时,负担要比进程小得多,也正因为如此,线程也被称为轻量级进程。

    1.2、进程与线程的区别总结

    线程具有许多传统进程所具有的特征,故又称为轻型进程(Light—Weight Process)或进程元;而把传统的进程称为重型进程(Heavy—Weight Process),它相当于只有一个线程的任务。在引入了线程的操作系统中,通常一个进程都有若干个线程,至少包含一个线程。

    根本区别:进程是操作系统资源分配的基本单位,而线程是处理器任务调度和执行的基本单位

    1.3、使用多线程的好处

    在开发过程中可能会碰到某些独特的业务,比如查询全部表数据,数据量过多会导致查询变得十分缓慢。虽然在大多数情况下并不需要查询所有的数据,而是通过分页或缓存的形式去减少或者避免这个问题,但是仍然存在需要这样的场景,比如需要导出所有的数据到excel中,导出数据之前,肯定需要先查询表中数据,这个查询的过程中数据量一旦过大,单线程查询数据会严重影响程序性能,有可能过长的查询时间导致服务宕机。现在模拟使用多线程来查询一张数据量较大的表。

    2、多线程的实现

    多线程有好几种方式,今天说的方式比较好,实现Callable<> 这种方式能返回查询的数据,加上Future异步获取方式,查询效率大大加快

    2.1、操作流程

    1. 查询出表的数据总量。
    2. 数据的切分,根据本机CPU的核数配置合适数量的线程处理数,根据数据总量为不同的线程分配不同的查询数据量分段,即不同的线程查询不同分段的数据。
    3. 将各个查询数据的线程提交至线程池,这里使用的线程是带有返回结果的异步线程。(这样能把所有查询结果合并,才能进行下一步的操作。拿不到处理后的数据还怎么做下一步操作呢,所以一点要选择带有返回结果的线程)

    controller层

    @RestController
    @RequestMapping("/thread")
    public class ThreadController {
    
        @Resource
        private IBlogService blogService;
    
        @GetMapping("/testThredData")
        public List testThredData(){
            return blogService.getAllResult();
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    service层

    public interface IBlogService extends IService<Blog> {
    
        //每个线程分页查询
        public List<Blog> getQueryData(Integer start, Integer end);
        //合并线程结果
        public List getAllResult();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    Impl实现层

    @Service
    public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {
        @Resource
        private BlogMapper blogMapper;
    
        @Autowired
        private MultiThreadQueryUtil multiThreadQueryUtil;
        //每个线程分页查询
        @Override
        public List<Blog> getQueryData(Integer start, Integer end) {
            return this.blogMapper.getQueryData(start,end);
        }
        //合并线程结果
        @Override
        public List getAllResult() {
            return multiThreadQueryUtil.getMultiCombineResult();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    MultiThreadQueryUtil工具类(多线程的配置)

    
    @Service
    public class MultiThreadQueryUtil {
    
        /**
         * 获取多线程结果并进行结果合并
         * @return
         */
        public List<List> getMultiCombineResult() {
            //开始时间
            long start = System.currentTimeMillis();
            //返回结果
            List<List> result = new ArrayList<>();
            //查询数据库总数量
    //        int count = workflowTaskMapper.selectCountAll();
    //        Map splitMap = ExcelLocalUtils.getSplitMap(count,5);
            //假定总数据4条
            //Callable用于产生结果
            List<Callable<List>> tasks = new ArrayList<>();
            for (int i = 1; i <= 4; i++) {
                //不同的线程用户处理不同分段的数据量,这样就达到了平均分摊查询数据的压力
                //这里让每个线程每次查询一条数据
                int startNum =i-1;//对应的数据要和i挂钩 ,否则数据不变
                int endNum =1;
                Callable<List> qfe = new ThredQuery(startNum, endNum);
                tasks.add(qfe);
            }
            try{
                //定义固定长度的线程池  防止线程过多,5就够用了
    //            ExecutorService executorService = Executors.newFixedThreadPool(5);
                //4条数据,分成4个线程来查询
                ExecutorService executorService = Executors.newFixedThreadPool(4);
                //Future用于获取结果
                List<Future<List>> futures=executorService.invokeAll(tasks);
                //处理线程返回结果
                if(futures!=null&&futures.size() > 0){
                    for (Future<List> future:futures){
                        result.addAll(future.get());
                    }
                }
                //关闭线程池,一定不能忘记
                executorService.shutdown();
            }catch (Exception e){
                e.printStackTrace();
            }
            long end = System.currentTimeMillis();
            System.out.println("线程查询数据用时:"+(end-start)+"ms");
            return result;
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    注意:每次启动线程调用查询时,因为sql中用的是limit,因为没有那么大的数据量,模拟每一个线程查询1条数据,最后将数据汇总

            for (int i = 1; i <= 4; i++) {
                //不同的线程用户处理不同分段的数据量,这样就达到了平均分摊查询数据的压力
                //这里让每个线程每次查询一条数据
                int startNum =i-1;//对应的数据要和i挂钩 ,否则数据不变
                int endNum =1;
                Callable<List> qfe = new ThredQuery(startNum, endNum);
                tasks.add(qfe);
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    在这里插入图片描述

    对于并发的数据,可以使用Future类来接收多线程的执行结果

    Future类详细讲解:可以参考这个大神的博客

    https://blog.csdn.net/bobozai86/article/details/123978048

    ThredQuery线程执行类

    package com.hmdp.utils;
    
    import com.hmdp.service.IBlogService;
    
    import java.util.List;
    import java.util.concurrent.Callable;
    
    public class ThredQuery implements Callable<List> {
    
        public static SpringContextUtil springContextUtil = new SpringContextUtil();
    
        private int start;
    
        private int end;
    
        //每个线程查询出来的数据集合
        private List datas;
    
        public  ThredQuery(int start,int end) {
            this.start=start;
            this.end=end;
            //每个线程查询出来的数据集合
            IBlogService blogService = springContextUtil.getBean(IBlogService.class);
            List count = blogService.getQueryData(start,end);
            datas = count;
        }
    
        //返回数据给Future
        @Override
        public List call() throws Exception {
            return datas;
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    因为该线程执行类是通过构造器创建出来的对象,所以该执行类没有交给Spring来管理,所以该对象里面不能够自动注入想要其他对象,所以需要通过工具类(通过反射的机制)创建所需要调用的service层

    SpringContextUtil创建bean对象的工具类

    @Component
    public class SpringContextUtil implements ApplicationContextAware {
    
        /**
         * 上下文对象实例
         */
        private static ApplicationContext applicationContext;
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = applicationContext;
        }
    
        /**
         * 获取applicationContext
         *
         * @return
         */
        public static ApplicationContext getApplicationContext() {
            return applicationContext;
        }
    
        /**
         * 获取HttpServletRequest
         */
        public static HttpServletRequest getHttpServletRequest() {
            return ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
        }
    
        public static String getDomain(){
            HttpServletRequest request = getHttpServletRequest();
            StringBuffer url = request.getRequestURL();
            return url.delete(url.length() - request.getRequestURI().length(), url.length()).toString();
        }
    
        public static String getOrigin(){
            HttpServletRequest request = getHttpServletRequest();
            return request.getHeader("Origin");
        }
    
        /**
         * 通过name获取 Bean.
         *
         * @param name
         * @return
         */
        public static Object getBean(String name) {
            return getApplicationContext().getBean(name);
        }
    
        /**
         * 通过class获取Bean.
         *
         * @param clazz
         * @param       
         * @return
         */
        public static <T> T getBean(Class<T> clazz) {
            return getApplicationContext().getBean(clazz);
        }
    
        /**
         * 通过name,以及Clazz返回指定的Bean
         *
         * @param name
         * @param clazz
         * @param       
         * @return
         */
        public static <T> T getBean(String name, Class<T> clazz) {
            return getApplicationContext().getBean(name, clazz);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73

    3、测试

    通过postman测试,会返回给前端多个线程查询到的合并的数据集
    在这里插入图片描述

  • 相关阅读:
    R语言获取data.table数据中指定数据列的最小值所在的数据行(minimum)
    JavaScript系列之数据类型
    《论文阅读》Commonsense Knowledge Aware Conversation Generation with Graph Attention
    CentOS7 k3s安装与配置
    前端食堂技术周刊第 99 期:Remix 2.0、v0、2023 组件库盘点、TS Config 备忘录、Hello 算法
    JAVA 同城服务同城货运搬家小程序系统开发优势
    Mindspore网络构建
    记录一次mysql死锁
    Shopee虾皮API接口:解锁商品买家评论数据的宝藏
    PD充电调试
  • 原文地址:https://blog.csdn.net/qq_45830276/article/details/126561411