• Java中如何创建TB大小的低延迟队列


    队列通常是软件设计模式中的基本组件。

    但是,如果每秒收到数百万条消息,并且多进程消费者需要能够读取所有消息的完整分类账,那该怎么办呢?

    Java只能保存这么多信息,否则堆就会成为一个限制因素,产生影响很大的垃圾收集,可能会阻止我们实现目标的SLA,甚至使JVM暂停几秒钟甚至几分钟。

    本文介绍如何使用开源的Chronicle Queue创建巨大的持久队列,同时保持可预测和一致的低延迟。

    应用程序

    在本文中,目标是维护来自市场数据馈送的对象队列(例如,在交易所交易的证券的最新价格)。 也可以选择其他业务领域,例如物联网设备的传感输入或读取汽车行业的碰撞记录信息。 原理是一样的。

    首先,定义一个持有市场数据的类:

    public class MarketData extends SelfDescribingMarshallable {
        int securityId;
        long time;
        float last;
        float high;
        float low;
    
        // Getters and setters not shown for brevity
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    📝注意: 在现实世界中,使用 float 和 double 来保存货币值时必须非常小心,否则可能会导致舍入问题 [Bloch18,第 60 项]。 但是,在这篇介绍性文章中,我想保持简单。

    还有一个小的实用函数 MarketDataUtil::create 将在调用时创建并返回一个新的随机 MarketData 对象:

    static MarketData create() {
        MarketData marketData = new MarketData();
        int id = ThreadLocalRandom.current().nextInt(1000);
        marketData.setSecurityId(id);
        float nextFloat = ThreadLocalRandom.current().nextFloat();
        float last = 20 + 100 * nextFloat;
    
        marketData.setLast(last);
        marketData.setHigh(last * 1.1f);
        marketData.setLow(last * 0.9f);
        marketData.setTime(System.currentTimeMillis());
    
        return marketData;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    现在,目标是创建一个持久、并发、低延迟、可从多个进程访问并且可以容纳数十亿个对象的队列。

    幼稚的方法

    有了这些类,就可以探索使用 ConcurrentLinkedQueue 的简单方法:

    public static void main(String[] args) {
        final Queue queue = new ConcurrentLinkedQueue();
        for (long i = 0; i < 1e9; i++) {
            queue.add(MarketDataUtil.create());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    这将失败,原因有以下几个:

    1. ConcurrentLinkedQueue 将为添加到队列中的每个元素创建一个包装节点。 这将显著地使创建的对象数量增加一倍。
    2. 对象放置在 Java 堆上,导致堆内存压力和垃圾收集问题。 在我的机器上,这导致我的整个 JVM 变得无响应,唯一的方法是使用“kill -9”强行将其杀死。
    3. 无法从其他进程(即其他 JVM)读取队列。
    4. 一旦 JVM 终止,队列的内容就会丢失。 因此,队列不是持久的。

    查看各种其他标准 Java 类,可以得出结论,不支持大型持久化队列。

    使用 Chronicle Queue

    Chronicle Queue 是一个开源库,旨在满足上述要求。 这是设置和使用它的一种方法:

    public static void main(String[] args) {
        final MarketData marketData = new MarketData();
        final ChronicleQueue q = ChronicleQueue
                .single("market-data");
        final ExcerptAppender appender = q.acquireAppender();
    
        for (long i = 0; i < 1e9; i++) {
            try (final DocumentContext document =
                         appender.acquireWritingDocument(false)) {
                 document
                        .wire()
                        .bytes()
                        .writeObject(MarketData.class, 
                                MarketDataUtil.recycle(marketData));
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    使用配备 2.3 GHz 8 核英特尔酷睿 i9 的 MacBook Pro 2019 时,仅使用一个线程即可插入每秒超过 3,000,000(3百万) 条消息。

    该队列通过给定目录“market-data”中的内存映射文件进行持久化。 人们会期望 MarketData 对象至少占用 4 (int securityId) + 8 (long time) + 4*3 (float last, high 和 low) = 24 字节。

    在上面的示例中,生成了 10 亿个对象,导致映射文件占用 30,148,657,152个字节(30G) ,这意味着每条消息大约 30 个字节。 在我看来,这确实非常有效。

    可以看出,单个 MarketData 实例可以反复重用,因为 Chronicle Queue 会将当前对象的内容展平到内存映射文件上,从而允许对象重用。 这进一步降低了内存压力。 这是循环方法的工作原理:

    static MarketData recycle(MarketData marketData) {
        final int id = ThreadLocalRandom.current().nextInt(1000);
        marketData.setSecurityId(id);
        final float nextFloat = ThreadLocalRandom.current().nextFloat();
        final float last = 20 + 100 * nextFloat;
    
        marketData.setLast(last);
        marketData.setHigh(last * 1.1f);
        marketData.setLow(last * 0.9f);
        marketData.setTime(System.currentTimeMillis());
    
        return marketData;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    从 Chronicle Queue 中读取

    从 Chronicle Queue 中读取很简单。 继续上面的示例,下面显示了如何从队列中读取前两个 MarketData 对象:

    public static void main(String[] args) {
        final ChronicleQueue q = ChronicleQueue
                .single("market-data");
        final ExcerptTailer tailer = q.createTailer();
    
        for (long i = 0; i < 2; i++) {
            try (final DocumentContext document =
                         tailer.readingDocument()) {
                MarketData marketData = document
                        .wire()
                        .bytes()
                        .readObject(MarketData.class);
                System.out.println(marketData);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    这可能会产生以下输出:

    !software.chronicle.sandbox.queuedemo.MarketData {
      securityId: 202,
      time: 1634646488837,
      last: 45.8673,
      high: 50.454,
      low: 41.2806
    }
    
    !software.chronicle.sandbox.queuedemo.MarketData {
      securityId: 117,
      time: 1634646488842,
      last: 34.7567,
      high: 38.2323,
      low: 31.281
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    有一些规定可以有效地寻找尾部的位置,例如,到队列的末尾或到某个索引。

    下一步是什么?

    还有许多其他功能超出了本文的范围。

    例如,可以将队列文件设置为以特定间隔(例如每天、每小时或每分钟)滚动,从而有效地创建信息分解,以便随着时间的推移清理旧数据。

    还提供了隔离cpu的功能,并将Java线程锁定在这些隔离的cpu上,从而大大减少了应用程序的抖动。

    最后,还有一个企业版,可以跨服务器集群复制队列,为分布式架构的高可用性和改进性能铺平了道路。

    企业版还包括各种其他功能,例如加密、时区滚动和异步附加程序。


    <<<<<<<<<<<< [完] >>>>>>>>>>>>

  • 相关阅读:
    【逆向】Base64编码解码及逆向识别
    动态类型语言和静态类型语言的区别
    Zookeeper重要概念
    华为云云耀云服务器L实例评测|Ubuntu云锁防火墙安装搭建使用
    计算机毕业设计django基于python精品课程在线学习系统(源码+系统+mysql数据库+Lw文档)
    10 个很棒的 Python 3.9 特性
    低代码+流程管理,打造信息工程项目动态管理系统
    盘点7种JavaScript常用设计模式
    【SpringSecurity】九、Base64与JWT
    第06章 第06章 查找
  • 原文地址:https://blog.csdn.net/wjw465150/article/details/127824432