根据我之前文章对 ES命令的查询使用,测试索引的文档数据前面文章有提到的。
下面我们就通过 ES8(Java API Client)来进行查询。
参考官方AP文档:https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/current/usage.html
Elasticsearch高级查询Query DSL:https://blog.csdn.net/qq_42402854/article/details/125357445
打印ES语句:主要是对 SearchRequest的序列化
。
获取SearchRequest对象,可以通过 fn 或者 searchRequestBuilder.build()对象。这里使用 fn对象。
//获取 SearchRequest对象
SearchRequest searchRequest = (SearchRequest)((ObjectBuilder)fn.apply(new co.elastic.clients.elasticsearch.core.SearchRequest.Builder())).build();
String esIndex = searchRequest.index().toString();
String printEsSearchRequest = printEsBySearchRequest(searchRequest);
// 执行查询
SearchResponse<Map> searchResponse = elasticsearchClient.search(fn, Map.class);
/**
* 打印ES8执行语句(序列化)。
* @param searchRequest
* @return
*/
public String printEsBySearchRequest(SearchRequest searchRequest) {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
JsonpMapper mapper=new JacksonJsonpMapper();
JsonGenerator generator =mapper.jsonProvider().createGenerator(byteArrayOutputStream);
mapper.serialize(searchRequest, generator);
generator.close();
return byteArrayOutputStream.toString();
}
@Test
public void testMatchAll() throws IOException {
/**
* 参数一:SearchRequest.Builder
* 参数二:返回结果的大Class
*/
// 2.执行查询
SearchResponse<Map> searchResponse = elasticsearchClient.search(searchRequestBuilder -> {
// 1.通过 builder模式和 Lambda表达式,构建搜索的条件
searchRequestBuilder
.index("db_idx4");
return searchRequestBuilder;
} , Map.class);
// 简化
//SearchResponse
//3.解析查询结果,和之前版本在调用方法名上有一点不同
System.out.println(searchResponse);
System.out.println("花费的时长:" + searchResponse.took());
HitsMetadata<Map> hits = searchResponse.hits();
System.out.println(hits.total());
System.out.println("符合条件的总文档数量:" + hits.total().value());
List<Hit<Map>> hitList = searchResponse.hits().hits(); //注意:第一个hits() 与 第二个hits()的区别
for (Hit<Map> mapHit : hitList) {
String source = mapHit.source().toString();
System.out.println("文档原生信息:" + source);
}
}
@Test
@DisplayName("分页查询")
public void testMatchAllPage() throws IOException {
// 执行查询
SearchResponse<Map> searchResponse = elasticsearchClient.search(srBuilder -> srBuilder
.index("db_idx4")
.from(1)
.size(5), Map.class);
//解析查询结果
System.out.println(searchResponse);
System.out.println("花费的时长:" + searchResponse.took());
HitsMetadata<Map> hits = searchResponse.hits();
System.out.println(hits.total());
System.out.println("符合条件的总文档数量:" + hits.total().value());
List<Hit<Map>> hitList = searchResponse.hits().hits(); //注意:第一个hits() 与 第二个hits()的区别
for (Hit<Map> mapHit : hitList) {
String source = mapHit.source().toString();
System.out.println("文档原生信息:" + source);
}
}
// 执行查询
SearchResponse<Map> searchResponse = elasticsearchClient.search(srBuilder -> srBuilder
.index("db_idx4")
//指定排序
.sort(sortOptionsBuilder -> sortOptionsBuilder
.field(fieldSortBuilder -> fieldSortBuilder
.field("age").order(SortOrder.Desc)
.field("id").order(SortOrder.Desc))), Map.class);
//解析查询结果
System.out.println(searchResponse);
数据过滤就是 返回我们指定的字段。
// 执行查询
SearchResponse<Map> searchResponse = elasticsearchClient.search(srBuilder -> srBuilder
.index("db_idx4")
// 执行要返回的字段
.source(sourceConfigBuilder -> sourceConfigBuilder
.filter(sourceFilterBuilder -> sourceFilterBuilder
.includes("id", "name", "age")
.excludes("age"))), Map.class);
//解析查询结果
System.out.println(searchResponse);
match在匹配时会对所查找的关键词进行分词,然后按分词匹配查找。
// 执行查询
SearchResponse<Map> searchResponse = elasticsearchClient.search(srBuilder -> srBuilder
.index("db_idx4")
// match 查询:对输入内容先分词再查询。
.query(queryBuilder -> queryBuilder
.match(matchQueryBuilder -> matchQueryBuilder
.field("address").query("王者打野")))
, Map.class);
//解析查询结果
System.out.println(searchResponse);
multi_match 关键字:可以根据字段类型,决定是否使用分词查询,得分最高的在前面。
// 执行查询
SearchResponse<Map> searchResponse = elasticsearchClient.search(srBuilder -> srBuilder
.index("db_idx4")
// MultiMatch 查找:对输入内容先分词再查询。
.query(queryBuilder -> queryBuilder
.multiMatch(multiMatchQueryBuilder -> multiMatchQueryBuilder
.fields("address", "desc")
.query("王者打野")
.operator(Operator.Or))
)
, Map.class);
//解析查询结果
System.out.println(searchResponse);
// 执行查询
SearchResponse<Map> searchResponse = elasticsearchClient.search(srBuilder -> srBuilder
.index("db_idx4")
// terms查询:对输入内容不做分词处理。
.query(queryBuilder -> queryBuilder
.term(termQueryBuilder -> termQueryBuilder
.field("address.keyword")
.value("三国演义小乔"))
)
, Map.class);
//解析查询结果
System.out.println(searchResponse);
// 执行查询
SearchResponse<Map> searchResponse = elasticsearchClient.search(srBuilder -> srBuilder
.index("db_idx4")
// 范围查询Range
.query(queryBuilder -> queryBuilder
.range(rangeQueryBuilder -> rangeQueryBuilder
.field("age")
.gte(JsonData.of(18))
.lt(JsonData.of(23)))
)
, Map.class);
//解析查询结果
System.out.println(searchResponse);
// 执行查询
SearchResponse<Map> searchResponse = elasticsearchClient.search(srBuilder -> srBuilder
.index("product")
// 日期Range 查找
.query(queryBuilder -> queryBuilder
.range(rangeQueryBuilder -> rangeQueryBuilder
.field("date")
.lt(JsonData.of("now‐2y")))
)
, Map.class);
//解析查询结果
System.out.println(searchResponse);
// 执行查询
SearchResponse<Map> searchResponse = elasticsearchClient.search(srBuilder -> srBuilder
.index("db_idx4")
// 多个id查询
.query(queryBuilder -> queryBuilder
.ids(idsQueryBuilder -> idsQueryBuilder
.values("2", "5", "1111"))
)
, Map.class);
//解析查询结果
System.out.println(searchResponse);
@Test
@DisplayName("高亮查询")
public void testHighlight() throws IOException {
// 执行查询
SearchResponse<Map> searchResponse = elasticsearchClient.search(srBuilder -> srBuilder
.index("db_idx4")
// MultiMatch 查找:对输入内容先分词再查询。
.query(queryBuilder -> queryBuilder
.multiMatch(multiMatchQueryBuilder -> multiMatchQueryBuilder
.fields("address", "desc")
.query("王者打野")
.operator(Operator.Or))
)
// 高亮查询
.highlight(highlightBuilder -> highlightBuilder
.preTags("")
.postTags("")
.requireFieldMatch(false) //多字段时,需要设置为false
.fields("address", highlightFieldBuilder -> highlightFieldBuilder)
.fields("desc", highlightFieldBuilder -> highlightFieldBuilder)
)
, Map.class);
//解析查询结果
System.out.println(searchResponse);
System.out.println("花费的时长:" + searchResponse.took());
HitsMetadata<Map> hits = searchResponse.hits();
System.out.println(hits.total());
System.out.println("符合条件的总文档数量:" + hits.total().value());
List<Hit<Map>> hitList = searchResponse.hits().hits(); //注意:第一个hits() 与 第二个hits()的区别
for (Hit<Map> mapHit : hitList) {
String source = mapHit.source().toString();
System.out.println("文档原生信息:" + source);
System.out.println("高亮信息:" + mapHit.highlight());
}
}
// 执行查询
SearchResponse<Map> searchResponse = elasticsearchClient.search(srBuilder -> srBuilder
.index("db_idx4")
// 布尔查询Bool Query
.query(queryBuilder -> queryBuilder
.bool(boolQueryBuilder -> boolQueryBuilder
// and
//.must(queryBuilder2 -> queryBuilder2
// .range(rangeBuilder -> rangeBuilder.field("age").gte(JsonData.of(20))))
//.must(queryBuilder2 -> queryBuilder2
// .match(matchQueryBuilder -> matchQueryBuilder.field("sex").query(0)))
//or
.should(queryBuilder2 -> queryBuilder2
.range(rangeBuilder -> rangeBuilder.field("age").gte(JsonData.of("20"))))
.should(queryBuilder2 -> queryBuilder2
.match(matchQueryBuilder -> matchQueryBuilder.field("sex").query("0")))
)
)
, Map.class);
//解析查询结果
System.out.println(searchResponse);
// 执行查询
SearchResponse<Map> searchResponse = elasticsearchClient.search(srBuilder -> srBuilder
.index("db_idx4")
// 模糊查询Fuzzy
.query(queryBuilder -> queryBuilder
.fuzzy(fuzzyQueryBuilder -> fuzzyQueryBuilder
.field("name")
.value("张")
.fuzziness("1"))
)
, Map.class);
//解析查询结果
System.out.println(searchResponse);
ElasticSearch聚合操作:https://blog.csdn.net/qq_42402854/article/details/125377293
Metric Aggregation 一些数学运算,可以对文档字段进行统计分析。
@Test
public void testAgg1() throws IOException {
// 执行查询
SearchResponse<Map> searchResponse = elasticsearchClient.search(srBuilder -> srBuilder
.index("employees")
//查询员工的最低最高和平均工资
.aggregations("maxSalary", aggregationBuilder -> aggregationBuilder
.max(maxAggregationBuilder -> maxAggregationBuilder.field("salary")))
.aggregations("minSalary", aggregationBuilder -> aggregationBuilder
.min(maxAggregationBuilder -> maxAggregationBuilder.field("salary")))
.aggregations("avgSalary", aggregationBuilder -> aggregationBuilder
.avg(maxAggregationBuilder -> maxAggregationBuilder.field("salary")))
, Map.class);
//解析查询结果
System.out.println(searchResponse);
System.out.println("花费的时长:" + searchResponse.took());
HitsMetadata<Map> hits = searchResponse.hits();
System.out.println(hits.total());
System.out.println("符合条件的总文档数量:" + hits.total().value());
List<Hit<Map>> hitList = searchResponse.hits().hits(); //注意:第一个hits() 与 第二个hits()的区别
for (Hit<Map> mapHit : hitList) {
String source = mapHit.source().toString();
System.out.println("文档原生信息:" + source);
}
//获取聚合结果
Map<String, Aggregate> aggregations = searchResponse.aggregations();
System.out.println("aggregations:" + aggregations);
Aggregate maxSalary = aggregations.get("maxSalary");
Aggregate minSalary = aggregations.get("minSalary");
Aggregate avgSalary = aggregations.get("avgSalary");
System.out.println("maxSalary:" + maxSalary);
System.out.println("最高工资:" + maxSalary.max().value());
System.out.println("最低工资:" + minSalary.min().value());
System.out.println("平均工资:" + avgSalary.avg().value());
}
@Test
public void testAgg2() throws IOException {
// 执行查询
SearchResponse<Map> searchResponse = elasticsearchClient.search(srBuilder -> srBuilder
.index("employees")
//对salary进行统计
.aggregations("statSalary", aggregationBuilder -> aggregationBuilder
.stats(statsAggregationBuilder -> statsAggregationBuilder
.field("salary")))
, Map.class);
//解析查询结果
System.out.println(searchResponse);
System.out.println("花费的时长:" + searchResponse.took());
HitsMetadata<Map> hits = searchResponse.hits();
System.out.println(hits.total());
System.out.println("符合条件的总文档数量:" + hits.total().value());
List<Hit<Map>> hitList = searchResponse.hits().hits(); //注意:第一个hits() 与 第二个hits()的区别
for (Hit<Map> mapHit : hitList) {
String source = mapHit.source().toString();
System.out.println("文档原生信息:" + source);
}
//获取聚合结果
Map<String, Aggregate> aggregations = searchResponse.aggregations();
System.out.println("aggregations:" + aggregations);
Aggregate statSalary = aggregations.get("statSalary");
System.out.println("statSalary:" + statSalary);
System.out.println("统计个数:" + statSalary.stats().count());
System.out.println("最高工资:" + statSalary.stats().max());
System.out.println("最低工资:" + statSalary.stats().min());
System.out.println("平均工资:" + statSalary.stats().avg());
System.out.println("工资之和:" + statSalary.stats().sum());
}
@Test
public void testAgg3() throws IOException {
// 执行查询
SearchResponse<Map> searchResponse = elasticsearchClient.search(srBuilder -> srBuilder
.index("employees")
//cardinate对搜索结果去重统计
.aggregations("jobCardinate", aggregationBuilder -> aggregationBuilder
.cardinality(cardinalityAggregationBuilder -> cardinalityAggregationBuilder
.field("job.keyword")))
, Map.class);
//解析查询结果
System.out.println(searchResponse);
System.out.println("花费的时长:" + searchResponse.took());
//获取聚合结果
Map<String, Aggregate> aggregations = searchResponse.aggregations();
System.out.println("aggregations:" + aggregations);
CardinalityAggregate jobCardinate = aggregations.get("jobCardinate").cardinality();
System.out.println("jobCardinate:" + jobCardinate.toString());
System.out.println("不重复的个数" + jobCardinate.value());
}
Bucket Aggregation:按照一定的规则,将文档分配到不同的桶中,每一个桶关联一个 key,从而达到分类的目的。类比Mysql中的group by操作。
@Test
public void testAgg4() throws IOException {
// 执行查询
SearchResponse<Map> searchResponse = elasticsearchClient.search(srBuilder -> srBuilder
.index("employees")
//获取 job的分类信息
.aggregations("jobGroup", aggregationBuilder -> aggregationBuilder
.terms(termsAggregationBuilder -> termsAggregationBuilder
.field("job.keyword")))
, Map.class);
//解析查询结果
System.out.println(searchResponse);
System.out.println("花费的时长:" + searchResponse.took());
//获取聚合结果
Map<String, Aggregate> aggregations = searchResponse.aggregations();
System.out.println("aggregations:" + aggregations);
StringTermsAggregate jobGroup = aggregations.get("jobGroup").sterms();//注意类型
List<StringTermsBucket> termsBucketList = jobGroup.buckets().array();
for (StringTermsBucket bucket : termsBucketList) {
System.out.println("key:" + bucket.key());
System.out.println("docCount:" + bucket.docCount());
}
}
// 执行查询
SearchResponse<Map> searchResponse = elasticsearchClient.search(srBuilder -> srBuilder
.index("employees")
//range范围查询
.query(queryBuilder -> queryBuilder
.range(rangeQueryBuilder -> rangeQueryBuilder.field("salary").gte(JsonData.of(10000))))
//获取 job的分类信息
.aggregations("jobGroup", aggregationBuilder -> aggregationBuilder
.terms(termsAggregationBuilder -> termsAggregationBuilder
.field("job.keyword")))
, Map.class);
//解析查询结果
System.out.println(searchResponse);
@Test
public void testAgg6() throws IOException {
// 执行查询
SearchResponse<Map> searchResponse = elasticsearchClient.search(srBuilder -> srBuilder
.index("employees")
//Salary Range分桶,可以自己定义 key
.aggregations("salary_range", aggregationBuilder -> aggregationBuilder
.range(rangeAggregationBuilder -> rangeAggregationBuilder
.field("salary")
.ranges(rangeBuilder -> rangeBuilder.to("10000"))
.ranges(rangeBuilder -> rangeBuilder.from("10000").to("20000"))
.ranges(rangeBuilder -> rangeBuilder.key(">20000").from("20000"))
)
)
, Map.class);
//解析查询结果
System.out.println(searchResponse);
System.out.println("花费的时长:" + searchResponse.took());
//获取聚合结果
Map<String, Aggregate> aggregations = searchResponse.aggregations();
System.out.println("aggregations:" + aggregations);
RangeAggregate salary_range = aggregations.get("salary_range").range();//注意类型
List<RangeBucket> bucketList = salary_range.buckets().array();
for (RangeBucket bucket : bucketList) {
System.out.println("key:" + bucket.key());
System.out.println("docCount:" + bucket.docCount());
System.out.println("from:" + bucket.from());
System.out.println("to:" + bucket.to());
}
}
SearchResponse<Map> searchResponse = elasticsearchClient.search(srBuilder -> srBuilder
.index("employees")
//Histogram示例:按照工资的间隔分桶
.aggregations("salary_histrogram", aggregationBuilder -> aggregationBuilder
.histogram(histogramAggregationBuilder -> histogramAggregationBuilder
.field("salary")
.interval(5000D)
.extendedBounds(extendedBounds -> extendedBounds.min(0D).max(100000D))
)
)
, Map.class);
//解析查询结果
System.out.println(searchResponse);
top_hits应用场景:当获取分桶后,桶内最匹配的顶部文档列表
@Test
public void testAgg8() throws IOException {
// 执行查询
SearchResponse<Map> searchResponse = elasticsearchClient.search(srBuilder -> srBuilder
.index("employees")
//top_hits示例:指定size,不同工种中,年纪最大的3个员工的具体信息
.aggregations("jobs", aggregationBuilder -> aggregationBuilder
//获取 job的分类信息
.terms(termsAggregationBuilder -> termsAggregationBuilder
.field("job.keyword")
)//嵌套聚合
.aggregations("old_employee", subAggregationBuilder -> subAggregationBuilder
.topHits(topHitsAggregationBuilder -> topHitsAggregationBuilder
.size(3)
.sort(sortOptionsBuilder -> sortOptionsBuilder
.field(fieldSortBuilder -> fieldSortBuilder
.field("age").order(SortOrder.Desc)))
)
)
)
, Map.class);
//解析查询结果
System.out.println(searchResponse);
System.out.println("花费的时长:" + searchResponse.took());
//获取聚合结果
Map<String, Aggregate> aggregations = searchResponse.aggregations();
System.out.println("aggregations:" + aggregations);
StringTermsAggregate jobs = aggregations.get("jobs").sterms();//注意类型
List<StringTermsBucket> bucketList = jobs.buckets().array();
for (StringTermsBucket bucket : bucketList) {
System.out.println("key:" + bucket.key());
System.out.println("docCount:" + bucket.docCount());
//嵌套的信息
TopHitsAggregate old_employee = bucket.aggregations().get("old_employee").topHits();
List<Hit<JsonData>> hits = old_employee.hits().hits();
System.out.println(" old_employee.hits().total().value():" + old_employee.hits().total().value());
for (Hit<JsonData> hit : hits) {
System.out.println(" hit.source():" + hit.source().toString());
}
}
}
Pipeline Aggregation:支持对聚合分析的结果,再次进行聚合分析。
在员工数最多的工种里,找出平均工资最低的工种
@Test
public void testAgg9() throws IOException {
// 执行查询
SearchResponse<Map> searchResponse = elasticsearchClient.search(srBuilder -> srBuilder
.index("employees")
//min_bucket示例:平均工资最低的工种
.aggregations("jobs", aggregationBuilder -> aggregationBuilder
//获取 job的分类信息
.terms(termsAggregationBuilder -> termsAggregationBuilder
.field("job.keyword")
)//嵌套聚合:平均工资
.aggregations("avg_salary", subAggregationBuilder -> subAggregationBuilder
.avg(avgAggregationBuilder -> avgAggregationBuilder.field("salary"))
)
)//min_bucket:平均工资最低的工种
.aggregations("min_salary_by_job", subAggregationBuilder -> subAggregationBuilder
.minBucket(minBucketAggregationBuilder -> minBucketAggregationBuilder
.bucketsPath(bucketsPathBuilder -> bucketsPathBuilder
.array(Arrays.asList("jobs>avg_salary"))
)
)
)
, Map.class);
//解析查询结果
System.out.println(searchResponse);
System.out.println("花费的时长:" + searchResponse.took());
//获取聚合结果
Map<String, Aggregate> aggregations = searchResponse.aggregations();
System.out.println("aggregations:" + aggregations);
StringTermsAggregate jobs = aggregations.get("jobs").sterms();//注意类型
List<StringTermsBucket> bucketList = jobs.buckets().array();
for (StringTermsBucket bucket : bucketList) {
System.out.println("key:" + bucket.key());
System.out.println("docCount:" + bucket.docCount());
//嵌套的信息
AvgAggregate avg_salary = bucket.aggregations().get("avg_salary").avg();
System.out.println(" 平均工资最低的工种:" + avg_salary.value());
}
}
// 执行查询
SearchResponse<Map> searchResponse = elasticsearchClient.search(srBuilder -> srBuilder
.index("employees")
//Stats示例:统计分析:平均工资的统计分析
.aggregations("jobs", aggregationBuilder -> aggregationBuilder
//获取 job的分类信息
.terms(termsAggregationBuilder -> termsAggregationBuilder
.field("job.keyword")
)//嵌套聚合:平均工资
.aggregations("avg_salary", subAggregationBuilder -> subAggregationBuilder
.avg(avgAggregationBuilder -> avgAggregationBuilder.field("salary"))
)
)//stats_bucket:平均工资的统计分析
.aggregations("stats_salary_by_job", subAggregationBuilder -> subAggregationBuilder
.statsBucket(statsBucketAggregationBuilder -> statsBucketAggregationBuilder
.bucketsPath(bucketsPathBuilder -> bucketsPathBuilder
.array(Arrays.asList("jobs>avg_salary"))
)
)
)
, Map.class);
//解析查询结果
System.out.println(searchResponse);
到此,Java 操作 ES8查询基本就 ok了。
总结如下:
– 求知若饥,虚心若愚。