• Java开源工具库使用之高性能内存数据查找库CQengine


    前言

    CQEngine(Collection Query Engine) 集合查询引擎,是一个高性能的Java集合,可以使用类似 SQL 的查询语句进行搜索,且延迟极低。

    • 每秒实现数百万个查询,查询延迟以微秒为单位
    • 从数据库中卸载查询流量,扩展应用程序层
    • 即使是在低端硬件上,其性能也比数据库高出数千倍

    支持堆内持久化、堆外持久化、磁盘持久化,并支持 MVCC 事务隔离。

    CQEngine 通过以空间换时间的策略,对存储在集合中的对象的字段上建立索引,并应用基于集理论规则的算法来降低访问它们的时间复杂度,解决了迭代的可伸缩性和延迟问题。

    github 地址:https://github.com/npgall/cqengine

    pom 依赖:

    <dependency>
    	  <groupId>com.googlecode.cqenginegroupId>
    	  <artifactId>cqengineartifactId>
    	  <version>3.6.0version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    一、简单使用

    1.1 POJO

    CQEngine 需要为 POJO 类里字段建立索引加快查询速度,需要给字段建立 Attribute

    @Data
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    public class User {
        public enum Role {ADMIN, NORMAL, SUPER}
        private int userId;
        private String name;
        private Role role;
        private double height;
        private List<String> features;
    
        public static final Attribute<User, Integer> USER_ID = attribute("userId", User::getUserId);
    
        public static final Attribute<User, String> NAME = attribute("name", User::getName);
    
        public static final Attribute<User, Role> ROLE = attribute("role", User::getRole);
    
        public static final Attribute<User, Double> HEIGHT = attribute("height", User::getHeight);
    
        public static final Attribute<User, String> FEATURES = attribute(String.class, "features", User::getFeatures);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    1.2 索引

    一般会给 Attribute 建立索引,不同的索引有不同的作用,具体可以查看文档

    常用的有:

    • HashIndex 底层是 ConcurrentHashMap
    • NavigableIndex 底层是 ConcurrentSkipListMap
    IndexedCollection<User> users = new ConcurrentIndexedCollection<>();
    
    users.addIndex(NavigableIndex.onAttribute(User.USER_ID));
    users.addIndex(InvertedRadixTreeIndex.onAttribute(User.NAME));
    users.addIndex(HashIndex.onAttribute(User.NAME));
    users.addIndex(NavigableIndex.onAttribute(User.HEIGHT));
    users.addIndex(HashIndex.onAttribute(User.FEATURES));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    1.3 普通查询

    位于import static com.googlecode.cqengine.query.QueryFactory.*; 有很多静态方法,能够用来辅助查询

    public static void main(String[] args) {
    	IndexedCollection<User> users = new ConcurrentIndexedCollection<>();
    
        User user1 = User.builder().userId(1).name("张三").role(User.Role.SUPER).height(180.0)
            .features(Lists.newArrayList("脸大", "近视")).build();
        User user2 = User.builder().userId(2).name("李四").role(User.Role.ADMIN).height(171.1)
            .features(Lists.newArrayList("脸小", "正常")).build();
        User user3 = User.builder().userId(3).name("王五").role(User.Role.NORMAL).height(160.5)
            .features(Lists.newArrayList("脸小", "近视")).build();
        User user4 = User.builder().userId(4).name("张二").role(User.Role.SUPER).height(190.9)
            .features(Lists.newArrayList("脸大", "正常")).build();
        users.addAll(Lists.newArrayList(user1, user2, user3, user4));
    
        users.addIndex(NavigableIndex.onAttribute(User.USER_ID));
        users.addIndex(InvertedRadixTreeIndex.onAttribute(User.NAME));
        users.addIndex(HashIndex.onAttribute(User.NAME));
    
        users.addIndex(NavigableIndex.onAttribute(User.HEIGHT));
        users.addIndex(HashIndex.onAttribute(User.FEATURES));
    
    
        Query<User> nameQuery = contains(User.NAME, "张");
        display(users, nameQuery);
    
        Query<User> heightQuery = between(User.HEIGHT, 150.0, 180.0);
        display(users, heightQuery);
    
        Query<User> roleQuery = equal(User.ROLE, User.Role.SUPER);
        display(users, roleQuery);
    
        Query<User> andQuery = and(roleQuery, heightQuery);
        display(users, andQuery);
    
        Query<User> userQuery = in(User.FEATURES, "近视", "脸大");
        display(users, userQuery);
    
        Query<User> andUserQuery = and (in(User.FEATURES, "近视"),
                                        in(User.FEATURES, "脸大"));
        display(users, andUserQuery);
    
    }
    
    private static void display(IndexedCollection<User> users, Query<User> query) {
        ResultSet<User> r = users.retrieve(query);
        for (User user : r) {
            System.out.println(user);
        }
        System.out.println("---------");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    二、高级特性

    2.1 Null值

    上述属性在查询时,遇到 null 值会报空指针,可在 POJO 内将 attribute 方法换成 nullableAttribute

    public static final Attribute<User, String> FEATURES = nullableAttribute(String.class, "features", User::getFeatures);
    
    • 1

    添加一个null 数据测试

    User user5 = User.builder().userId(5).name("李七").role(User.Role.NORMAL).height(170.0)
                            .features(null).build();
    
    • 1
    • 2

    2.2 类SQL 查询

    CQEngine 提供类SQL语句解析器对集合进行查询,使用的是 antlr 解析器

    SQLParser<User> parser = new SQLParser(User.class){{
        registerAttribute(User.USER_ID);
        registerAttribute(User.NAME);
        registerAttribute(User.ROLE);
        registerAttribute(User.HEIGHT);
        registerAttribute(User.FEATURES);
    }};
    
    ResultSet<User> results = parser.retrieve(users, "SELECT * FROM users WHERE " +
            "(height <= 180.0 " +
            "AND role IN ('SUPER', 'NORMAL')) " +
            "ORDER BY height ASC, userId DESC");
    results.forEach(System.out::println);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    2.3 持久化

    • 对象集合持久化

      CQEngine 的IndexedCollection可通过配置到堆内、堆外、磁盘

      • 堆内

        // 默认堆内
        IndexedCollection<User> users = new ConcurrentIndexedCollection<>();
        
        • 1
        • 2
      • 堆外

        // 堆外
        IndexedCollection<User> users = new ConcurrentIndexedCollection<>(OffHeapPersistence.onPrimaryKey((SimpleAttribute<User, Integer>)User.USER_ID));
        
        • 1
        • 2
      • 磁盘

        IndexedCollection<User> users1 = new ConcurrentIndexedCollection<>(                    
                                DiskPersistence.onPrimaryKeyInFile((SimpleAttribute<User, Integer>) User.USER_ID, new File("cars.dat"))
                );
        
        • 1
        • 2
        • 3
      • 混合

        IndexedCollection<User> users = new ConcurrentIndexedCollection<>(
                        CompositePersistence.of(
                                OnHeapPersistence.onPrimaryKey((SimpleAttribute<User, Integer>) User.USER_ID),
                                DiskPersistence.onPrimaryKeyInFile((SimpleAttribute<User, Integer>) User.USER_ID, new File("cars.dat"))
                        )
                );
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
    • 索引持久化

      大部分索引继承了OnHeapTypeIndex,属于堆内索引

      • 堆内

        users.addIndex(NavigableIndex.onAttribute(User.USER_ID));
        
        • 1
      • 堆外

        users.addIndex(OffHeapIndex.onAttribute(User.NAME));
        
        • 1
      • 磁盘

        users.addIndex(DiskIndex.onAttribute(User.FEATURES));
        
        • 1

    注:如果持久化在磁盘,那么默认就是 sqlite 文件格式

    2.4 统计

    CQEngine 支持统计 IndexedCollection 的一些数据

    • 频率分布(在索引中存储的每个属性值的计数)
    • 不同的键(索引中的不同属性值,可选在x和y之间的范围内)
    • 存储在索引中的属性值和关联对象的流(升序/降序,可选在x和y之间的范围内)
    • 不同键的计数(索引中有多少个不同的属性值)
    • 计算特定键(与特定属性值匹配的对象数)
     MetadataEngine<User> engine = users.getMetadataEngine();
     AttributeMetadata<String, User> attributeMetadata = engine.getAttributeMetadata(User.NAME);
     System.out.println(attributeMetadata.getFrequencyDistribution().collect(Collectors.toList()));
     System.out.println(attributeMetadata.getCountOfDistinctKeys());
    
    • 1
    • 2
    • 3
    • 4

    三、性能对比

    官方性能测试,见:https://github.com/npgall/cqengine/blob/master/documentation/Benchmark.md

    3.1 HashIndex

    使用datafaker 库构造假数据进行测试,分别对普通for迭代,stream迭代,cqengine加哈希索引,cqengine不加索引四种情况进行测试

    @Warmup(iterations = 2, time = 5)
    @Measurement(iterations = 5, time = 5)
    @Fork(1)
    @OutputTimeUnit(TimeUnit.MILLISECONDS)
    @State(Scope.Benchmark)
    @BenchmarkMode(value = Mode.Throughput)
    @Timeout(time = 60)
    public class JmhDemo {
        private List<User> users = new ArrayList<>();
    
        private String find = "";
    
        private IndexedCollection<User> indexedCollection;
    
        private IndexedCollection<User> noIndexedCollection;
    
        Query<User> query;
    
        @Param({"100", "1000", "10000"})
        int originLen;
    
        @Setup
        public void setUp() {
            Faker faker = new Faker(Locale.CHINA);
            List<String> n = faker.collection(() ->faker.name().name()).len(originLen / 10).generate();
            users = faker.collection(() -> User.builder().name(n.get(faker.random().nextInt(0, originLen / 10 - 1)))
                            .height(faker.random().nextDouble(1.50, 1.90))
                            .features(faker.collection(() -> faker.food().fruit()).len(1, 5).generate())
                            .build())
                    .len(originLen).generate();
            find = n.get(faker.random().nextInt(0, n.size() - 1));
    
            query = equal(User.NAME, find);
    
            indexedCollection = new ConcurrentIndexedCollection<>();
            indexedCollection.addAll(users);
            indexedCollection.addIndex(HashIndex.onAttribute(User.NAME));
    
            noIndexedCollection = new ConcurrentIndexedCollection<>();
            noIndexedCollection.addAll(users);
    
        }
    
        @Benchmark
        public void iterationNaiveQuery(Blackhole bh) {
            int count = 0;
            for (User user : users) {
                if (user.getName().equals(find)) count++;
            }
            bh.consume(count);
        }
    
        @Benchmark
        public void streamQuery(Blackhole bh) {
            long count = users.stream().filter(u -> u.getName().equals(find)).count();
            bh.consume(count);
        }
    
    
        @Benchmark
        public void cqengineHashIndexQuery(Blackhole bh) {
            ResultSet<User> retrieve = indexedCollection.retrieve(query);
            bh.consume(retrieve.size());
        }
    
        @Benchmark
        public void cqengineNoIndexQuery(Blackhole bh) {
            ResultSet<User> retrieve = noIndexedCollection.retrieve(query);
            bh.consume(retrieve.size());
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72

    得到结果如下图所示,可以发现

    • cqengine 加了索引查询,吞吐量随着数据量增大没有太大变化,而普通迭代和流的吞吐量都会随着数据量增加而正比减小
    • cqengine 没加索引是最慢的,比普通迭代还慢
    • 普通迭代和流两者吞吐量相差不大

    hash index

    3.2 NavigableIndex

    @Warmup(iterations = 2, time = 5)
    @Measurement(iterations = 5, time = 5)
    @Fork(1)
    @OutputTimeUnit(TimeUnit.MILLISECONDS)
    @State(Scope.Benchmark)
    @BenchmarkMode(value = Mode.Throughput)
    @Timeout(time = 60)
    public class JmhBetweenDemo {
        private List<User> users = new ArrayList<>();
    
        private double min = 1.60;
        private double max = 1.70;
    
        private IndexedCollection<User> indexedCollection;
    
        private IndexedCollection<User> noIndexedCollection;
    
        Query<User> query;
    
        @Param({"100", "1000", "10000"})
        int originLen;
    
        @Setup
        public void setUp() {
            Faker faker = new Faker(Locale.CHINA);
            users = faker.collection(() -> User.builder().name(faker.name().name())
                            .height(faker.random().nextInt(150, 200) / 100.0)
                            .features(faker.collection(() -> faker.food().fruit()).len(1, 5).generate())
                            .build())
                    .len(originLen).generate();
    
            query = between(User.HEIGHT, min, max);
    
            indexedCollection = new ConcurrentIndexedCollection<>();
            indexedCollection.addAll(users);
            indexedCollection.addIndex(NavigableIndex.onAttribute(User.HEIGHT));
    
            noIndexedCollection = new ConcurrentIndexedCollection<>();
            noIndexedCollection.addAll(users);
    
        }
    
        @Benchmark
        public void iterationNaiveQuery(Blackhole bh) {
            int count = 0;
            for (User user : users) {
                double height = user.getHeight();
                if (height >= min && height <= max) count++;
            }
            bh.consume(count);
        }
    
        @Benchmark
        public void streamQuery(Blackhole bh) {
            long count = users.stream().filter(u -> u.getHeight() >= min && u.getHeight() <= max).count();
            bh.consume(count);
        }
    
        @Benchmark
        public void cqengineWithoutIndexedCollectionQuery(Blackhole bh) {
            ResultSet<User> retrieve = noIndexedCollection.retrieve(query);
            bh.consume(retrieve.size());
        }
    
        @Benchmark
        public void cqengineNavigableIndexedCollectionQuery(Blackhole bh) {
            ResultSet<User> retrieve = indexedCollection.retrieve(query);
            bh.consume(retrieve.size());
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71

    得到结果如下图所示,可以发现

    • 当数据量小的时候,普通迭代和流查找都很快,但当数据量增大时,普通迭代和流的吞吐量快速下降
    • cqengine 加了索引查询,虽然在数据量小的时候比普通迭代慢,但它的吞吐量随着数据量增大而保持不变
    • cqengine 没加索引是最慢的,比普通迭代还慢
    • 普通迭代和流两者吞吐量相差不大

    cqengine_navigable_index

    参考

  • 相关阅读:
    Python函数递归
    格芯斥资80亿欧元,提高德国德累斯顿的产能,并寻求与台积电相当的政府补贴
    函数总章C11
    为什么浏览器控制台(Console)运行JavaScript代码有时会出现“undefined”?
    element中file-upload组件的提示‘按delete键可删除’,怎么去掉?
    【ElasticSearch系列-07】ES的开发场景和索引分片的设置及优化
    ROS2 库包设置和使用 Catch2 进行单元测试
    阶段性测试完成后,你进行缺陷分析了么?
    Shiro授权&&Shiro的注解式开发
    极简opencv操作xml文件
  • 原文地址:https://blog.csdn.net/qq_23091073/article/details/127651179