• java批量消费队列(BlockingQueue)中的数据


    队列的消费模式

    在我们实际开发过程中经常会处理各种大批量数据入库,这个时候我们就会到队列,将数据先写入队列中,然后开启多个消费线程慢慢消费入库。从队列中消费数据有两种方式:

    • 单条消费
    • 批量消费
      我们分别来实现这两种消费方

    存数据到队列

    存数据相对比较简单,这里我推荐大家使用BlockingQueue,该队列为阻塞队列!

    //创建队列  
    BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(1000000,true);  
      
     /**  
     * 向队列中存放数据  
      * @param message  
     */
     public void saveQueueData(String message){  
      //存放数据  
      blockingQueue.offer("test");  
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    ArrayBlockingQueue参数说明

    ArrayBlockingQueue一共有三个重载方法

    • int capacity
      该参数表示当前定义队列的大小,也就是能存放多少条数据
    • boolean fair
      该参数表示访问该队列的策略是否公平。true:按照 FIFO 顺序访问插入或移除时受阻塞线程的队列;false:访问顺序是不确定的
    • Collection c
      该参数是一个集合,表示将一个集合的数据存入该阻塞队列,相当于给该队列一个初始数据

    一、单条消费 :

        /**
         * 从队列中单条消费数据
         */
        public void consumerBySingle() {
            while (true) {
                try {
                    String take = blockingQueue.take();
                    log.info("消费到的数据是:{}", take);
                } catch (Exception e) {
                    log.error("缓存队列单条消费异常:{}", e.getMessage());
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    为什么用while(true),这样不是一个死循环么,那不是一直都在执行?其实并不是这样的,这就是为什么我推荐大家用BlockingQueue的原因,他是一个阻塞队列take()这个方法是阻塞的,一段队列中没有数据,那么就不会继续往下执行,而是阻塞到这个地方,等对队列中有数据的时候才会继续执行

    二、批量消费

        /**
         * 从队列中批量消费数据
         */
        public void consumerByBatch() {
            while (true) {
                try {
                    List<String> list = new ArrayList<>();
                    Queues.drain(blockingQueue, list, 100, 1, TimeUnit.MINUTES);
                    log.info("批量消费到的数据是:{}", list);
                } catch (Exception e) {
                    log.error("缓存队列批量消费异常:{}", e.getMessage());
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    这里面用到一个很重要的东西Guava的Queues
    需要导入如Guava的包,maven项目只需要在pom文件中添加:

    		<dependency>
                <groupId>com.google.guavagroupId>
                <artifactId>guavaartifactId>
                <version>30.1.1-jreversion>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    Queues.drain(blockingQueue, list, 100, 1, TimeUnit.MINUTES);这个方法一共有5个参数

    • 第一个:传入你需要批量消费的队列
    • 第二个:传入一个用来接收批量消费到的数据
    • 第三个:批量消费数据的大小,这里我们给100,即意味着每次消费100条数据
    • 第四个:批量消费的等待的最大间隔,什么意思呢?比如说,我先在队列中只有10条数据,它不到100条,那按道理就不会消费,但是这样显然不合理,所以需要指定当超多多长时间,即使当前队列中数据低于我们设定的阈值也会消费
    • 第五个,这个就很好理解,就是指定第四个参数的单位,是秒是分钟还是小时等等

    三、测试

    public static void main(String[] args) {
            //创建队列
            BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(1000000,true);
            for (int i = 0 ;i < 789; i++){
                //存放数据
                blockingQueue.offer(i + "test");
            }
            while (true) {
                try {
                    List<String> list = new ArrayList<>();
                    Queues.drain(blockingQueue, list, 100, 1, TimeUnit.MINUTES);
                    log.info("批量消费到的数据量是:{}, 数据是: {}", list.size(), list);
                } catch (Exception e) {
                    log.error("缓存队列批量消费异常:{}", e.getMessage());
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    测试代码

    测试代码中可以看出:每次批量消费100条数据,如果队列当前数据不够100条,那么等待1分钟然后将数据全部消费


    总结

    如果此篇文章有帮助到您, 希望打大佬们能关注点赞收藏评论支持一波,非常感谢大家!
    如果有不对的地方请指正!!!
    参考1
    参考2
    参考3

  • 相关阅读:
    linux高级作业
    Android笔记(五):结合Compose组件利用ActivityResultLauncher解决多活动跳转返回数据
    (6)点云数据处理学习——RGBD图
    [附源码]SSM计算机毕业设计远程教育系统JAVA
    位、比特、字节、字、帧等概念关系的理解
    全量知识系统问题及SmartChat给出的答复 之8 三套工具之3语法解析器 之1
    为什么阿里人能够快速成长?看完他们 Java 架构进化笔记,我秒懂!
    score_inverse_problems运行环境,pycharm重新安装,jax,jaxlib的GPU版本安装-230831
    ubuntu如何开启22端口支持ssh访问
    webpack原理篇(六十一):更复杂的 loader 的开发场
  • 原文地址:https://blog.csdn.net/weixin_42326851/article/details/126336012