CQEngine(Collection Query Engine) 集合查询引擎,是一个高性能的Java集合,可以使用类似 SQL 的查询语句进行搜索,且延迟极低。
支持堆内持久化、堆外持久化、磁盘持久化,并支持 MVCC 事务隔离。
CQEngine 通过以空间换时间的策略,对存储在集合中的对象的字段上建立索引,并应用基于集理论规则的算法来降低访问它们的时间复杂度,解决了迭代的可伸缩性和延迟问题。
github 地址:https://github.com/npgall/cqengine
pom 依赖:
<dependency>
<groupId>com.googlecode.cqenginegroupId>
<artifactId>cqengineartifactId>
<version>3.6.0version>
dependency>
CQEngine 需要为 POJO 类里字段建立索引加快查询速度,需要给字段建立 Attribute
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class User {
public enum Role {ADMIN, NORMAL, SUPER}
private int userId;
private String name;
private Role role;
private double height;
private List<String> features;
public static final Attribute<User, Integer> USER_ID = attribute("userId", User::getUserId);
public static final Attribute<User, String> NAME = attribute("name", User::getName);
public static final Attribute<User, Role> ROLE = attribute("role", User::getRole);
public static final Attribute<User, Double> HEIGHT = attribute("height", User::getHeight);
public static final Attribute<User, String> FEATURES = attribute(String.class, "features", User::getFeatures);
}
一般会给 Attribute 建立索引,不同的索引有不同的作用,具体可以查看文档
常用的有:
IndexedCollection<User> users = new ConcurrentIndexedCollection<>();
users.addIndex(NavigableIndex.onAttribute(User.USER_ID));
users.addIndex(InvertedRadixTreeIndex.onAttribute(User.NAME));
users.addIndex(HashIndex.onAttribute(User.NAME));
users.addIndex(NavigableIndex.onAttribute(User.HEIGHT));
users.addIndex(HashIndex.onAttribute(User.FEATURES));
位于import static com.googlecode.cqengine.query.QueryFactory.*; 有很多静态方法,能够用来辅助查询
public static void main(String[] args) {
IndexedCollection<User> users = new ConcurrentIndexedCollection<>();
User user1 = User.builder().userId(1).name("张三").role(User.Role.SUPER).height(180.0)
.features(Lists.newArrayList("脸大", "近视")).build();
User user2 = User.builder().userId(2).name("李四").role(User.Role.ADMIN).height(171.1)
.features(Lists.newArrayList("脸小", "正常")).build();
User user3 = User.builder().userId(3).name("王五").role(User.Role.NORMAL).height(160.5)
.features(Lists.newArrayList("脸小", "近视")).build();
User user4 = User.builder().userId(4).name("张二").role(User.Role.SUPER).height(190.9)
.features(Lists.newArrayList("脸大", "正常")).build();
users.addAll(Lists.newArrayList(user1, user2, user3, user4));
users.addIndex(NavigableIndex.onAttribute(User.USER_ID));
users.addIndex(InvertedRadixTreeIndex.onAttribute(User.NAME));
users.addIndex(HashIndex.onAttribute(User.NAME));
users.addIndex(NavigableIndex.onAttribute(User.HEIGHT));
users.addIndex(HashIndex.onAttribute(User.FEATURES));
Query<User> nameQuery = contains(User.NAME, "张");
display(users, nameQuery);
Query<User> heightQuery = between(User.HEIGHT, 150.0, 180.0);
display(users, heightQuery);
Query<User> roleQuery = equal(User.ROLE, User.Role.SUPER);
display(users, roleQuery);
Query<User> andQuery = and(roleQuery, heightQuery);
display(users, andQuery);
Query<User> userQuery = in(User.FEATURES, "近视", "脸大");
display(users, userQuery);
Query<User> andUserQuery = and (in(User.FEATURES, "近视"),
in(User.FEATURES, "脸大"));
display(users, andUserQuery);
}
private static void display(IndexedCollection<User> users, Query<User> query) {
ResultSet<User> r = users.retrieve(query);
for (User user : r) {
System.out.println(user);
}
System.out.println("---------");
}
上述属性在查询时,遇到 null 值会报空指针,可在 POJO 内将 attribute 方法换成 nullableAttribute
public static final Attribute<User, String> FEATURES = nullableAttribute(String.class, "features", User::getFeatures);
添加一个null 数据测试
User user5 = User.builder().userId(5).name("李七").role(User.Role.NORMAL).height(170.0)
.features(null).build();
CQEngine 提供类SQL语句解析器对集合进行查询,使用的是 antlr 解析器
SQLParser<User> parser = new SQLParser(User.class){{
registerAttribute(User.USER_ID);
registerAttribute(User.NAME);
registerAttribute(User.ROLE);
registerAttribute(User.HEIGHT);
registerAttribute(User.FEATURES);
}};
ResultSet<User> results = parser.retrieve(users, "SELECT * FROM users WHERE " +
"(height <= 180.0 " +
"AND role IN ('SUPER', 'NORMAL')) " +
"ORDER BY height ASC, userId DESC");
results.forEach(System.out::println);
对象集合持久化
CQEngine 的IndexedCollection可通过配置到堆内、堆外、磁盘
堆内
// 默认堆内
IndexedCollection<User> users = new ConcurrentIndexedCollection<>();
堆外
// 堆外
IndexedCollection<User> users = new ConcurrentIndexedCollection<>(OffHeapPersistence.onPrimaryKey((SimpleAttribute<User, Integer>)User.USER_ID));
磁盘
IndexedCollection<User> users1 = new ConcurrentIndexedCollection<>(
DiskPersistence.onPrimaryKeyInFile((SimpleAttribute<User, Integer>) User.USER_ID, new File("cars.dat"))
);
混合
IndexedCollection<User> users = new ConcurrentIndexedCollection<>(
CompositePersistence.of(
OnHeapPersistence.onPrimaryKey((SimpleAttribute<User, Integer>) User.USER_ID),
DiskPersistence.onPrimaryKeyInFile((SimpleAttribute<User, Integer>) User.USER_ID, new File("cars.dat"))
)
);
索引持久化
大部分索引继承了OnHeapTypeIndex,属于堆内索引
堆内
users.addIndex(NavigableIndex.onAttribute(User.USER_ID));
堆外
users.addIndex(OffHeapIndex.onAttribute(User.NAME));
磁盘
users.addIndex(DiskIndex.onAttribute(User.FEATURES));
注:如果持久化在磁盘,那么默认就是 sqlite 文件格式
CQEngine 支持统计 IndexedCollection 的一些数据
MetadataEngine<User> engine = users.getMetadataEngine();
AttributeMetadata<String, User> attributeMetadata = engine.getAttributeMetadata(User.NAME);
System.out.println(attributeMetadata.getFrequencyDistribution().collect(Collectors.toList()));
System.out.println(attributeMetadata.getCountOfDistinctKeys());
官方性能测试,见:https://github.com/npgall/cqengine/blob/master/documentation/Benchmark.md
使用datafaker 库构造假数据进行测试,分别对普通for迭代,stream迭代,cqengine加哈希索引,cqengine不加索引四种情况进行测试
@Warmup(iterations = 2, time = 5)
@Measurement(iterations = 5, time = 5)
@Fork(1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
@BenchmarkMode(value = Mode.Throughput)
@Timeout(time = 60)
public class JmhDemo {
private List<User> users = new ArrayList<>();
private String find = "";
private IndexedCollection<User> indexedCollection;
private IndexedCollection<User> noIndexedCollection;
Query<User> query;
@Param({"100", "1000", "10000"})
int originLen;
@Setup
public void setUp() {
Faker faker = new Faker(Locale.CHINA);
List<String> n = faker.collection(() ->faker.name().name()).len(originLen / 10).generate();
users = faker.collection(() -> User.builder().name(n.get(faker.random().nextInt(0, originLen / 10 - 1)))
.height(faker.random().nextDouble(1.50, 1.90))
.features(faker.collection(() -> faker.food().fruit()).len(1, 5).generate())
.build())
.len(originLen).generate();
find = n.get(faker.random().nextInt(0, n.size() - 1));
query = equal(User.NAME, find);
indexedCollection = new ConcurrentIndexedCollection<>();
indexedCollection.addAll(users);
indexedCollection.addIndex(HashIndex.onAttribute(User.NAME));
noIndexedCollection = new ConcurrentIndexedCollection<>();
noIndexedCollection.addAll(users);
}
@Benchmark
public void iterationNaiveQuery(Blackhole bh) {
int count = 0;
for (User user : users) {
if (user.getName().equals(find)) count++;
}
bh.consume(count);
}
@Benchmark
public void streamQuery(Blackhole bh) {
long count = users.stream().filter(u -> u.getName().equals(find)).count();
bh.consume(count);
}
@Benchmark
public void cqengineHashIndexQuery(Blackhole bh) {
ResultSet<User> retrieve = indexedCollection.retrieve(query);
bh.consume(retrieve.size());
}
@Benchmark
public void cqengineNoIndexQuery(Blackhole bh) {
ResultSet<User> retrieve = noIndexedCollection.retrieve(query);
bh.consume(retrieve.size());
}
}
得到结果如下图所示,可以发现

@Warmup(iterations = 2, time = 5)
@Measurement(iterations = 5, time = 5)
@Fork(1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
@BenchmarkMode(value = Mode.Throughput)
@Timeout(time = 60)
public class JmhBetweenDemo {
private List<User> users = new ArrayList<>();
private double min = 1.60;
private double max = 1.70;
private IndexedCollection<User> indexedCollection;
private IndexedCollection<User> noIndexedCollection;
Query<User> query;
@Param({"100", "1000", "10000"})
int originLen;
@Setup
public void setUp() {
Faker faker = new Faker(Locale.CHINA);
users = faker.collection(() -> User.builder().name(faker.name().name())
.height(faker.random().nextInt(150, 200) / 100.0)
.features(faker.collection(() -> faker.food().fruit()).len(1, 5).generate())
.build())
.len(originLen).generate();
query = between(User.HEIGHT, min, max);
indexedCollection = new ConcurrentIndexedCollection<>();
indexedCollection.addAll(users);
indexedCollection.addIndex(NavigableIndex.onAttribute(User.HEIGHT));
noIndexedCollection = new ConcurrentIndexedCollection<>();
noIndexedCollection.addAll(users);
}
@Benchmark
public void iterationNaiveQuery(Blackhole bh) {
int count = 0;
for (User user : users) {
double height = user.getHeight();
if (height >= min && height <= max) count++;
}
bh.consume(count);
}
@Benchmark
public void streamQuery(Blackhole bh) {
long count = users.stream().filter(u -> u.getHeight() >= min && u.getHeight() <= max).count();
bh.consume(count);
}
@Benchmark
public void cqengineWithoutIndexedCollectionQuery(Blackhole bh) {
ResultSet<User> retrieve = noIndexedCollection.retrieve(query);
bh.consume(retrieve.size());
}
@Benchmark
public void cqengineNavigableIndexedCollectionQuery(Blackhole bh) {
ResultSet<User> retrieve = indexedCollection.retrieve(query);
bh.consume(retrieve.size());
}
}
得到结果如下图所示,可以发现
