• JAVA如何处理各种批量数据入库(BlockingQueue)


    JAVA如何处理各种批量数据入库(BlockingQueue)

    这里我推荐大家使用BlockingQueue,该队列为阻塞队列

    将数据先写入队列中,然后开启多个消费线程慢慢消费入库。从队列中消费数据有两种方式:

     单条消费
    
     批量消费
    
    • 1
    • 2
    • 3
    一、案例:人脸识别设备—开门记录(批量消费)
    1.创建队列
    @Component
    public class RequestQueue<T> {
       
    	/**
    	* 获取开门记录队列
    	*/
        BlockingQueue<JSONObject> blockingQueue = new ArrayBlockingQueue<>(1000000,true);
        
         
        /**
         * 获取请求队列数据
         */
        public BlockingQueue<JSONObject> getBlockingQueue(){
            return blockingQueue;
        }
       
        
        
        /**
         * 保存数据进入队列
         */
        public void putJsonDataQueue(JSONObject jsonObject){
            try {
                getBlockingQueue().put(jsonObject);
    
            }catch (Exception e){
                e.printStackTrace();
            }
    
        }
        
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    2.创建消费队列任务
    @Component
    public class DoorOpeningRecordTask extends Thread{
        @Autowired
        private RequestQueue requestQueue;
    
        @Autowired
        private DoorOpeningRecordService doorOpeningRecordService;
    
        @Override
        public void run(){
            while(true){
                try {
                    //取出信息列表
                    List<JSONObject> list = new ArrayList<>();
                    //参数列表: 获取队列  数据  批量处理一百条  500毫秒
                    Queues.drain(requestQueue.getBlockingQueue(), list, 100, 500, TimeUnit.MILLISECONDS);
                    //对数据进行保存
                    doorOpeningRecordService.save(list);
    
                }catch (Exception e){
                    e.printStackTrace();
                }
    
            }
    
        }
    
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    3.监听队列信息
    @Component
    public class QueueListener {
        
        @Autowired
        private DoorOpeningRecordTask doorOpeningRecordTask;
    
        /**
         * 初始化时启动监听请求队列
         */
        @PostConstruct
        public void init() {
            doorOpeningRecordTask.start();
        }
    
        /**
         * 销毁容器时停止监听任务
         */
        @PreDestroy
        public void destory() {
           
            doorOpeningRecordTask.interrupt();
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
  • 相关阅读:
    maven
    C++ - git 命令行
    快速掌握Golang单元测试与断言教程
    47 VM.maxDirectMemory() 来自于哪里
    [Servlet/Tomcat] HttpServletRequest#getHeader(headerNameWithIgnoreCase)(获取header时不区分大小写)
    算法练习1——合并两个有序数组
    使用Mac的Applescript定制sublime执行C/C++程序弹出终端
    Python实现WOA智能鲸鱼优化算法优化循环神经网络分类模型(LSTM分类算法)项目实战
    Vue2自定义插件的写法-Vue.use()
    CAN报文:数据帧详解
  • 原文地址:https://blog.csdn.net/qq_41128049/article/details/134442487