• Java Elasticsearch教程


    Elasticsearch 是一个分布式、RESTful 风格的搜索和数据分析引擎,本教程从Java语言角度讲解如何操作Elasticsearch,如果不熟悉Elasticsearch,可以先学习 Elasticsearch教程,掌握基本概念和查询语法。

    教程基于ES官方的Java REST Client 进行讲解,老的Java API, ES 7.0.0以后将会废弃,不推荐继续使用。

    兼容性说明

    支持Elasticsearch 5.6.x 以上。

    Java版本,最低要求是1.8

    Maven配置

    1.   org.elasticsearch.client
    2.   elasticsearch-rest-high-level-client
    3.   7.8.1

    你可以根据自己的ES版本选择对应的Java REST Client版本。

    创建客户端

    在操作ES之前需要创建一个client, ES请求都是通过client发送,通过client可以配置ES的服务地址、安全验证相关参数。

    1. RestHighLevelClient client = new RestHighLevelClient(
    2.       RestClient.builder(
    3.               new HttpHost("localhost", 9200, "http"),
    4.               new HttpHost("localhost", 9201, "http")));

    通常全局创建一个client即可,client内部维护了连接池,因此在不使用client的时候需要通过下面方式释放资源。

    client.close();

    创建索引

    1. // 创建Request对象, 准备创建的索引名为twitter
    2. CreateIndexRequest request = new CreateIndexRequest("twitter");
    3. // 设置Request参数
    4. request.settings(Settings.builder()
    5.   .put("index.number_of_shards", 3) // 设置分区数
    6.   .put("index.number_of_replicas", 2) // 设置副本数
    7. );
    8. // 通过JSON字符串的方式,设置ES索引结构的mapping
    9. // ps: 通常都是通过json配置文件加载索引mapping配置,不需要拼接字符串。
    10. request.mapping(
    11.       "{\n" +
    12.       " \"properties\": {\n" +
    13.       "   \"message\": {\n" +
    14.       "     \"type\": \"text\"\n" +
    15.       "   }\n" +
    16.       " }\n" +
    17.       "}",
    18.       XContentType.JSON);
    19. // 执行请求,创建索引
    20. CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
    21. if (createIndexResponse.isAcknowledged()) {
    22.   // 创建成功
    23. }

    插入数据

    1. // 创建对应的Request请求,设置索引名为posts
    2. IndexRequest request = new IndexRequest("posts");
    3. // 设置文档id=1
    4. request.id("1");
    5. // 以json字符串的形式设置文档内容,也就是准备插入到ES中的数据
    6. String jsonString = "{" +
    7.       "\"user\":\"kimchy\"," +
    8.       "\"postDate\":\"2013-01-30\"," +
    9.       "\"message\":\"trying out Elasticsearch\"" +
    10.       "}";
    11. request.source(jsonString, XContentType.JSON);
    12. // 执行请求
    13. IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);

    查询数据

    根据id查询一条数据

    1. // 创建对应的Request对象,设置索引名为posts, 文档id=1
    2. GetRequest getRequest = new GetRequest(
    3.       "posts",
    4.       "1");
    5. // 执行ES请求
    6. GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
    7. // 处理查询结果
    8. String index = getResponse.getIndex();
    9. String id = getResponse.getId();
    10. // 检查文档是否存在
    11. if (getResponse.isExists()) {
    12.   long version = getResponse.getVersion();
    13.   // 获取文档数据的json字符串形式,可以使用json库转换成Java对象
    14.   String sourceAsString = getResponse.getSourceAsString();      
    15.   // 获取文档数据的Map形式
    16.   Map sourceAsMap = getResponse.getSourceAsMap();
    17.   // 获取文档数据的字节数组形式
    18.   byte[] sourceAsBytes = getResponse.getSourceAsBytes();          
    19. } else {
    20.   // 文档不存在
    21. }

    更新数据

    1. // 创建对应的Request对象,设置索引名为posts, 文档id=1
    2. UpdateRequest request = new UpdateRequest("posts", "1");
    3. // 以map形式,设置需要更新的文档字段
    4. Map jsonMap = new HashMap<>();
    5. jsonMap.put("updated", new Date());
    6. jsonMap.put("reason", "daily update");
    7. request.doc(jsonMap);
    8. // 执行请求
    9. UpdateResponse updateResponse = client.update(
    10.       request, RequestOptions.DEFAULT);

    删除数据

    1. // 创建对应的Request对象,设置索引名为posts, 文档id=1
    2. DeleteRequest request = new DeleteRequest(
    3.       "posts",    
    4.       "1");
    5. // 执行请求
    6. DeleteResponse deleteResponse = client.delete(
    7.       request, RequestOptions.DEFAULT);

    提示:详情,请参考后续章节。

    本节主要讲解Java Elasticsearch RestHighLevelClient的配置详解。

    RestHighLevelClient常用配置如下:

    • elasticsearch连接地址

    • elasticsearch账号/密码

    • Http请求头

    • 连接超时

    • 设置线程池大小

    创建client

    RestHighLevelClient 依赖 REST low-level client builder 进行配置,即依赖底层的RestClientBuilder对象进行参数设置

    1. // 首先创建RestClientBuilder,后续章节通过RestClientBuilder对象进行参数配置。
    2. RestClientBuilder restClientBuilder = RestClient.builder(
    3.               new HttpHost("localhost", 9200, "http"), // 设置ES服务地址,支持多个
    4.               new HttpHost("localhost", 9201, "http"));
    5. // 创建RestHighLevelClient,•请求都是通过RestHighLevelClient实例发出去的。
    6. RestHighLevelClient client = new RestHighLevelClient(restClientBuilder);

    配置ES账号密码

    1. restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
    2.       @Override
    3.       public HttpAsyncClientBuilder customizeHttpClient(
    4.                           HttpAsyncClientBuilder httpClientBuilder) {
    5.             // 通过CredentialsProvider实例,配置账号和密码
    6.             CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
    7.             credentialsProvider.setCredentials(AuthScope.ANY,
    8.                               new UsernamePasswordCredentials("user", "password"));
    9.                        
    10.               // 设置安全验证凭证
    11.               return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
    12.                   }
    13.               });

    设置Http Header

    1. // 创建header数组,可以设置多个header
    2. Header[] defaultHeaders = new Header[]{new BasicHeader("header", "value")};
    3. // 设置http header
    4. restClientBuilder.setDefaultHeaders(defaultHeaders);

    Timeout设置

    配置Elasticsearch连接超时时间

     
    
    1. restClientBuilder.setRequestConfigCallback(
    2.               new RestClientBuilder.RequestConfigCallback() {
    3.                   @Override
    4.                   public RequestConfig.Builder customizeRequestConfig(
    5.                           RequestConfig.Builder requestConfigBuilder) {
    6.                       return requestConfigBuilder
    7.                               .setConnectTimeout(5000) // 设置连接超时时间,5秒
    8.                               .setSocketTimeout(60000); // 设置请求超时时间,1分种
    9.                   }
    10.               });
    11. 设置线程池大小

    1. restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
    2.           @Override
    3.           public HttpAsyncClientBuilder customizeHttpClient(
    4.                   HttpAsyncClientBuilder httpClientBuilder) {
    5.               return httpClientBuilder.setDefaultIOReactorConfig(
    6.                       IOReactorConfig.custom()
    7.                               .setIoThreadCount(10) // 设置线程数
    8.                               .build());
    9.           }
    10.       });

    Document APIs

    Java Elasticsearch Index Api

    Java Elasticsearch Index API 主要用于插入或者更新文档数据。

    创建Index Request

    1. // 创建Index Request,设置索引名为: posts
    2. IndexRequest request = new IndexRequest("posts");
    3. // 设置文档ID
    4. request.id("1");

    设置文档内容

    支持以JSON字符串形式或者map形式设置文档内容。

    1. Json字符串形式:
    2. String jsonString = "{" +
    3.       "\"user\":\"kimchy\"," +
    4.       "\"postDate\":\"2013-01-30\"," +
    5.       "\"message\":\"trying out Elasticsearch\"" +
    6.       "}";
    7. request.source(jsonString, XContentType.JSON);
     
    

    Map形式:

    1. // 创建map
    2. Map jsonMap = new HashMap<>();
    3. jsonMap.put("user", "kimchy");
    4. jsonMap.put("postDate", new Date());
    5. jsonMap.put("message", "trying out Elasticsearch");
    6. // 设置文档内容
    7. request.source(jsonMap);

    其他可选参数

    routing

    设置路由字段

    request.routing("routing"); 

    timeout

    设置单个请求超时参数

    1. request.timeout(TimeValue.timeValueSeconds(1));
    2. request.timeout("1s");

    Version

    1. 设置文档版本
    2. request.version(2);
     
    

    操作类型

    1. Index api支持两类操作:create 或者 index (默认)
    2. request.opType("create");
     
    

    执行请求

    以同步的方式执行ES请求

    IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);

    以异步的方式执行请求

    1. client.indexAsync(request, RequestOptions.DEFAULT, new ActionListener() {
    2.           @Override
    3.           public void onResponse(IndexResponse indexResponse) {
    4.               // 请求成功回调函数
    5.           }
    6.           @Override
    7.           public void onFailure(Exception e) {
    8.               // 请求失败回调函数
    9.           }
    10.       });

    处理请求结果

    1. // 获取索引名
    2. String index = indexResponse.getIndex();
    3. // 获取文档ID
    4. String id = indexResponse.getId();
    5. if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
    6.   // 成功创建文档
    7. } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
    8.   // 成功更新文档
    9. }

    Java Elasticsearch Get Api

    Get Api 主要用于根据文档ID查询索引数据。

    创建Get Request

    1. // 创建GetRequest,索引名=posts, 文档ID=1
    2. GetRequest getRequest = new GetRequest(
    3.       "posts",
    4.       "1");

    其他可选参数

    是否返回文档内容

    默认返回文档内容

    1. // 不返回文档内容
    2. request.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);

    返回指定字段

    1. // 设置返回指定字段
    2. String[] includes = new String[]{"message", "*Date"};
    3. String[] excludes = Strings.EMPTY_ARRAY;
    4. FetchSourceContext fetchSourceContext =
    5.       new FetchSourceContext(true, includes, excludes);
    6. request.fetchSourceContext(fetchSourceContext);

    过滤指定字段

    1. String[] includes = Strings.EMPTY_ARRAY;
    2. // 过滤指定字段
    3. String[] excludes = new String[]{"message"};
    4. FetchSourceContext fetchSourceContext =
    5.       new FetchSourceContext(true, includes, excludes);
    6. request.fetchSourceContext(fetchSourceContext);

    设置路由

    request.routing("routing"); 

    Version

    设置文档版本

    request.version(2); 

    执行请求

    以同步的方式执行请求

    GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);

    以异步方式执行请求

    1. client.getAsync(request, RequestOptions.DEFAULT, new ActionListener< GetResponse>() {
    2.           @Override
    3.           public void onResponse(GetResponse getResponse) {
    4.               // 请求成功回调函数
    5.           }
    6.           @Override
    7.           public void onFailure(Exception e) {
    8.               // 请求失败回调函数
    9.           }
    10.       });

    处理请求结果

    1. String index = getResponse.getIndex();
    2. String id = getResponse.getId();
    3. // 检测索引是否存在
    4. if (getResponse.isExists()) {
    5.   // 获取版本号
    6.   long version = getResponse.getVersion();
    7.   // 获取文档内容,json字符串形式
    8.   String sourceAsString = getResponse.getSourceAsString();    
    9.   // 获取文档内容,map形式
    10.   Map sourceAsMap = getResponse.getSourceAsMap();
    11.   // 获取文档内容,字节数组形式
    12.   byte[] sourceAsBytes = getResponse.getSourceAsBytes();          
    13. } else {
    14.    
    15. }

    Java Elasticsearch Delete API

    Delete API 主要用于根据文档ID删除索引文档。

    创建Delete Request

    1. // 设置索引名=posts, 文档id=1
    2. DeleteRequest request = new DeleteRequest(
    3.       "posts",    
    4.       "1");

    其他可选参数

    设置路由

    request.routing("routing"); 

    timeout

    设置单个请求超时参数

    1. request.timeout(TimeValue.timeValueMinutes(2)); //格式1: 2分钟
    2. request.timeout("2m"); ///格式2:2分钟

    Version

    设置文档版本号

    request.version(2); 

    执行请求

    1. DeleteResponse deleteResponse = client.delete(
    2.       request, RequestOptions.DEFAULT);

    异步执行请求

    1. client.deleteAsync(request, RequestOptions.DEFAULT, new ActionListener() {
    2.           @Override
    3.           public void onResponse(DeleteResponse deleteResponse) {
    4.               // 请求成功回调函数
    5.           }
    6.           @Override
    7.           public void onFailure(Exception e) {
    8.               // 请求失败回调函数
    9.           }
    10.       });

    Java Elasticsearch Update API

    Elasticsearch Update API 根据文档ID更新文档内容,主要支持两种方式更新文档内容:通过脚本的方式更新和更新文档部分字段。

    提示:如果被更新的文档不存在,也支持插入操作,通过upsert api实现。

    创建Update Request

    1. // 创建UpdateRequest请求,索引名=posts,文档ID=1
    2. UpdateRequest request = new UpdateRequest(
    3.       "posts",
    4.       "1");  

    设置更新内容

    UpdateRequest对象支持下面几种更新文档内容的方式,根据需要选择一种方式即可

    脚本方式

    通过ES 内置script脚本更新文档内容。

    1. // 定义脚本参数
    2. Map parameters = singletonMap("count", 4);
    3. // 创建inline脚本,使用painless语言,实现field字段 + count参数值
    4. Script inline = new Script(ScriptType.INLINE, "painless",
    5.       "ctx._source.field += params.count", parameters);  
    6. // 设置脚本
    7. request.script(inline);

    map方式

    通过map对象更新文档部分内容

    1. // 通过map对象设置需要更新的字段内容
    2. Map jsonMap = new HashMap<>();
    3. jsonMap.put("updated", new Date());
    4. jsonMap.put("reason", "daily update");
    5. // 设置更新内容
    6. request.doc(jsonMap);

    json字符串方式

    通过json字符串方式更新文档部分内容

    1. String jsonString = "{" +
    2.       "\"updated\":\"2017-01-01\"," +
    3.       "\"reason\":\"daily update\"" +
    4.       "}";
    5. request.doc(jsonString, XContentType.JSON);

    upsert方式

    通过Upsert方式更新文档内容,跟前面三种类似,支持json字符串、map、脚本方式,但是有一点区别,如果被更新的文档不存在,则会执行插入操作。

    1. String jsonString = "{\"created\":\"2017-01-01\"}";
    2. request.upsert(jsonString, XContentType.JSON);

    其他可选参数

    设置路由

    request.routing("routing"); 

    timeout

    设置请求超时时间

    1. request.timeout(TimeValue.timeValueSeconds(1)); // 方式1:1秒
    2. request.timeout("1s"); // 方式2:1秒

    版本冲突重试

    如果更新文档的时候出现版本冲突,重试几次。

    request.retryOnConflict(3); 

    并发控制

    设置并发控制参数,新版的ES已经废弃老的version字段,详情请参考:ES基于乐观锁的并发控制

    1. // 设置版本号
    2. request.setIfSeqNo(2L);
    3. // 设置文档所在的主分区
    4. request.setIfPrimaryTerm(1L);

    同步执行请求

    1. UpdateResponse updateResponse = client.update(
    2.       request, RequestOptions.DEFAULT);

    异步执行请求

    1. client.updateAsync(request, RequestOptions.DEFAULT, new ActionListener() {
    2.           @Override
    3.           public void onResponse(UpdateResponse updateResponse) {
    4.             // 执行成功
    5.           }
    6.           @Override
    7.           public void onFailure(Exception e) {
    8.               // 执行失败
    9.           }
    10.       });

    执行结果

    1. // 索引名
    2. String index = updateResponse.getIndex();
    3. // 文档id
    4. String id = updateResponse.getId();
    5. // 版本号
    6. long version = updateResponse.getVersion();
    7. if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
    8.   // 成功创建文档
    9. } else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
    10.   // 成功更新文档
    11. }

    Java Elasticsearch Update By Query API 批量更新

    ES update by query api主要用于批量更新文档内容,支持设置查询条件限制更新文档的范围。

    创建UpdateByQueryRequest对象

    1. // 创建UpdateByQueryRequest对象,设置索引名,支持一次更新多个索引
    2. // 同时更新source1和source2索引内容
    3. UpdateByQueryRequest request =
    4.       new UpdateByQueryRequest("source1", "source2");

    版本冲突

    批量更新内容的时候,可能会遇到文档版本冲突的情况,需要设置版本冲突的时候如何处理。

    版本冲突解决方案如下:

    1. proceed - 忽略版本冲突,继续执行
    2. abort - 遇到版本冲突,中断执行
    3. request.setConflicts("proceed");
     
    

    设置查询条件

    1. // 设置term查询条件,查询user字段=kimchy的文档内容
    2. request.setQuery(new TermQueryBuilder("user", "kimchy"));

    ES的查询语法是非常丰富的,这里仅给出一种写法,JAVA ES查询用法请参考后续的章节。

    限制更新文档数量

    可以限制批量更新文档的数量

    request.setMaxDocs(10); 

    设置更新内容

    Update by query api更新文档内容,仅支持通过脚本的方式修改文档内容。

    1. request.setScript(
    2.   new Script( // 创建inline脚本,使用painless语言。
    3.       ScriptType.INLINE, "painless",
    4.       "if (ctx._source.user == 'kimchy') {ctx._source.likes++;}",
    5.       Collections.emptyMap()));

    执行请求

    1. BulkByScrollResponse bulkResponse =
    2.       client.updateByQuery(request, RequestOptions.DEFAULT);

    处理结果

    1. TimeValue timeTaken = bulkResponse.getTook(); // 更新花费时间
    2. boolean timedOut = bulkResponse.isTimedOut(); // 是否超时
    3. long totalDocs = bulkResponse.getTotal(); // 更新文档总数
    4. long updatedDocs = bulkResponse.getUpdated(); // 成功更新了多少文档
    5. long deletedDocs = bulkResponse.getDeleted(); // 删除了多少文档
    6. long versionConflicts = bulkResponse.getVersionConflicts(); // 版本冲突次数

    Java Elasticsearch Delete By Query API 批量删除

    Java ES Delete By Query API 主要用于批量删除操作,支持设置ES查询条件。

    创建DeleteByQueryRequest对象

    1. // 创建 DeleteByQueryRequest 对象,设置批量删除的索引名为:source1和source2
    2. // ps: 支持同时操作多个索引
    3. DeleteByQueryRequest request =
    4.       new DeleteByQueryRequest("source1", "source2");

    版本冲突

    批量更新内容的时候,可能会遇到文档版本冲突的情况,需要设置版本冲突的时候如何处理。

    版本冲突解决方案如下:

    1. proceed - 忽略版本冲突,继续执行
    2. abort - 遇到版本冲突,中断执行
    3. request.setConflicts("proceed");
     
    

    设置查询条件

    1. // 设置term查询条件,查询user字段=kimchy的文档内容
    2. request.setQuery(new TermQueryBuilder("user", "kimchy"));

    ES的查询语法是非常丰富的,这里仅给出一种写法,JAVA ES查询用法请参考后续的章节。

    限制删除文档数量

    可以限制批量删除文档的数量

    request.setMaxDocs(10); 

    执行请求

    1. BulkByScrollResponse bulkResponse =
    2.       client.deleteByQuery(request, RequestOptions.DEFAULT);

    处理结果

    1. TimeValue timeTaken = bulkResponse.getTook(); // 批量操作消耗时间
    2. boolean timedOut = bulkResponse.isTimedOut(); // 是否超时
    3. long totalDocs = bulkResponse.getTotal(); // 涉及文档总数
    4. long deletedDocs = bulkResponse.getDeleted(); // 成功删除文档数量
    5. long versionConflicts = bulkResponse.getVersionConflicts(); // 版本冲突次数

    Java Elasticsearch Multi-Get API 批量查询

    Multi-Get API 主要用于根于id集合,批量查询文档内容,支持查询多个索引内容。

    创建MultiGetRequest对象

    1. MultiGetRequest request = new MultiGetRequest();
    2. // 通过MultiGetRequest.Item对象设置查询参数
    3. request.add(new MultiGetRequest.Item(
    4.   "index",     // 索引名  
    5.   "12345")); // 文档id
    6. // 添加另外一组查询参数,索引名=index, 索引Id=another_id
    7. request.add(new MultiGetRequest.Item("index", "another_id"));

    执行请求

    MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);

    处理结果

    1. // response.getResponses返回多个MultiGetItemResponse对象,每个MultiGetItemResponse对象代表一个查询结果,这里以其中一个结果为例。
    2. // ps: 通常是需要循环遍历response.getResponses返回的结果
    3. MultiGetItemResponse firstItem = response.getResponses()[0];
    4. assertNull(firstItem.getFailure());              
    5. GetResponse firstGet = firstItem.getResponse();  
    6. String index = firstItem.getIndex(); // 获取索引名
    7. String id = firstItem.getId();// 获取文档Id
    8. if (firstGet.isExists()) { // 检测文档是否存在
    9.   long version = firstGet.getVersion(); // 获取版本号
    10.   // 查询文档内容,json字符串格式
    11.   String sourceAsString = firstGet.getSourceAsString();      
    12.   // 查询文档内容,Map对象格式  
    13.   Map sourceAsMap = firstGet.getSourceAsMap();
    14. } else {
    15.    
    16. }

    Java Elasticsearch Bulk API 批量操作

    ES的Bulk API主要用于在单个请求中,批量执行创建、更新、删除文档操作,避免循环发送大量的ES请求。

    创建BulkRequest对象

    BulkRequest request = new BulkRequest(); 

    添加操作对象

    支持index/update/delete操作。

    批量index操作

    1. // 通过add方法,添加IndexRequest对象,创建文档,下面插入3个文档
    2. // ps: IndexRequest对象,以键值对的方式设置文档内容
    3. request.add(new IndexRequest("posts").id("1")  
    4.       .source(XContentType.JSON,"field", "foo"));
    5. request.add(new IndexRequest("posts").id("2")  
    6.       .source(XContentType.JSON,"field", "bar"));
    7. request.add(new IndexRequest("posts").id("3")  
    8.       .source(XContentType.JSON,"field", "baz"));

    混合操作

    批量执行文档的删除、更新、创建操作

    1. request.add(new DeleteRequest("posts", "3"));
    2. request.add(new UpdateRequest("posts", "2")
    3.       .doc(XContentType.JSON,"other", "test"));
    4. request.add(new IndexRequest("posts").id("4")  
    5.       .source(XContentType.JSON,"field", "baz"));

    其他可选参数

    timeout

    设置请求超时时间

    1. request.timeout(TimeValue.timeValueMinutes(2)); // 形式1:2分钟
    2. request.timeout("2m"); // 形式2:2分钟

    执行请求

    BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);

    处理结果

    1. if (bulkResponse.hasFailures()) {
    2.   // 至少存在一个错误处理
    3. }
    4. // 循环检测批量操作结果
    5. for (BulkItemResponse bulkItemResponse : bulkResponse) {
    6.   DocWriteResponse itemResponse = bulkItemResponse.getResponse();
    7.   // 根据操作类型检测执行结果
    8.   switch (bulkItemResponse.getOpType()) {
    9.   case INDEX:    
    10.   case CREATE:
    11.       // 处理创建请求
    12.       IndexResponse indexResponse = (IndexResponse) itemResponse;
    13.       break;
    14.   case UPDATE:  
    15.       // 处理更新请求
    16.       UpdateResponse updateResponse = (UpdateResponse) itemResponse;
    17.       break;
    18.   case DELETE:  
    19.       // 处理删除请求
    20.       DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
    21.   }
    22. }

    查询

    Java Elasticsearch 查询详解

    Elasticsearch的查询功能非常灵活,不了解Elasticsearch查询语法和概念,可以先阅读:ES查询语法

    下面介绍Java Elasticsearch查询的写法。

    实现ES查询的关键步骤:

    • 创建RestHighLevelClient,Java ES Client对象 - (这个就不再重复,参考Java Elasticsearch Client配置

    • 创建SearchRequest对象 - 负责设置搜索参数

    • 通过Client对象发送请求

    • 处理搜索结果

    1. 创建SearchRequest对象

    创建SearchRequest对象步骤如下:

    • 创建SearchRequest对象

    • 通过SearchSourceBuilder设置搜索参数

    • 将SearchSourceBuilder绑定到SearchRequest对象

    1.1. 初始化SearchRequest对象

    创建SearchRequest对象,索引名=posts

    SearchRequest searchRequest = new SearchRequest("posts"); 

    SearchRequest负责设置搜索参数,包括:ES Query、分页参数等等常用设置。

    实际上大部分SearchRequest对象的搜索参数都是通过SearchSourceBuilder对象间接设置。

    1.2. 创建SearchSourceBuilder

    通过SearchSourceBuilder间接完成搜索参数的设置

    1. // 创建SearchSourceBuilder,构建ES搜索
    2. SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
    3. // 设置ES查询条件
    4. sourceBuilder.query(QueryBuilders.termQuery("user", "kimchy"));
    5. // 设置分页参数,偏移从0开始
    6. sourceBuilder.from(0);
    7. // 设置分页参数,分页大小=5
    8. sourceBuilder.size(5);
    9. // 设置请求超时时间,60秒
    10. sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
    11. // 最后完成SearchRequest请求的参数设置
    12. searchRequest.source(sourceBuilder);

    1.3. 构造查询条件

    通过上面例子可以知道,Java ES查询条件是通过QueryBuilders工具类构建的,QueryBuilders工具类,支持丰富ES查询条件,详情后面的章节会单独介绍,这里大家只要记得它的作用即可。

    1. // 例子-构建一个Match模糊查询
    2. QueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("user", "kimchy")
    3.                         .fuzziness(Fuzziness.AUTO)
    4.                         .prefixLength(3)
    5.                         .maxExpansions(10);
    6. // 将QueryBuilders构建的查询,绑定到SearchSourceBuilder对象
    7. sourceBuilder.query(matchQueryBuilder);

    提示: QueryBuilder是所有Java ES查询的基础接口

    1.4. 设置分页参数

    通过SearchSourceBuilder设置搜索结果分页参数

    1. // 设置分页参数,偏移从0开始
    2. sourceBuilder.from(0);
    3. // 设置分页参数,分页大小=5
    4. sourceBuilder.size(5);

    1.5. 设置排序参数

    // 根据id字段,升序
    
    builder.sort("id", SortOrder.ASC);

    2. 执行请求

    SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

    3. 处理结果

    1. RestStatus status = searchResponse.status(); // ES请求状态
    2. TimeValue took = searchResponse.getTook(); // 执行时间
    3. Boolean terminatedEarly = searchResponse.isTerminatedEarly();
    4. boolean timedOut = searchResponse.isTimedOut(); // 是否超时
    5. // 获取hits,SearchHits对象包含搜索结果
    6. SearchHits hits = searchResponse.getHits();
    7. TotalHits totalHits = hits.getTotalHits();
    8. // 搜索结果总数
    9. long numHits = totalHits.value;
    10. // 遍历搜索结果
    11. SearchHit[] searchHits = hits.getHits();
    12. for (SearchHit hit : searchHits) {
    13.     // 获取文档内容,json字符串格式
    14.     String sourceAsString = hit.getSourceAsString();
    15.     // 获取文档内容,Map对象格式
    16.     Map sourceAsMap = hit.getSourceAsMap();
    17. }

    4. 完整例子

    我们先通过一个简单的ES查询例子,了解Java ES查询的基本写法。

    1. // 首先创建RestClient,后续章节通过RestClient对象进行参数配置。
    2. RestClientBuilder restClientBuilder = RestClient.builder(
    3.               new HttpHost("localhost", 9200, "http"), // 设置ES服务地址,支持多个
    4.               new HttpHost("localhost", 9201, "http"));
    5. // 创建RestHighLevelClient,•请求都是通过RestHighLevelClient实例发出去的。
    6. RestHighLevelClient client = new RestHighLevelClient(restClientBuilder);
    7. // 创建SearchRequest对象
    8. SearchRequest searchRequest = new SearchRequest();
    9. // 通过SearchSourceBuilder构建搜索参数
    10. SearchSourceBuilder builder = new SearchSourceBuilder();
    11. // 通过QueryBuilders构建ES查询条件
    12. builder.query(QueryBuilders.termsQuery("order_id", 1,2,3,4,5));
    13. // 设置搜索参数
    14. searchRequest.source(builder);
    15. // 执行ES请求
    16. SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
    17. // 获取搜索到的文档
    18. SearchHits hits = searchResponse.getHits();
    19. // 遍历搜索结果
    20. SearchHit[] searchHits = hits.getHits();
    21. for (SearchHit hit : searchHits) {
    22.     // 获取文档内容,json字符串格式
    23.     String sourceAsString = hit.getSourceAsString();
    24.     // 获取文档内容,Map对象格式
    25.     Map sourceAsMap = hit.getSourceAsMap();
    26. }

    Java Elasticsearch Match 全文搜索查询

    全文搜索是Elasticsearch的核心特性之一,Java Elasticsearch全文搜索查询主要由MatchQueryBuilder这个构造器配置。

    创建MatchQueryBuilder

    有两种方式创建MatchQueryBuilder

    • 直接实例化MatchQueryBuilder

    • 通过QueryBuilders工具创建

    方式1

    直接实例化MatchQueryBuilder对象。 构造方法参数说明:

    • 参数1 - 字段名

    • 参数2 - 匹配关键词

    MatchQueryBuilder matchQueryBuilder = new MatchQueryBuilder("title", "梯子教程");

    方式2

    通过QueryBuilders工具创建

    1. // 可以直接赋值给QueryBuilder接口定义的对象
    2. QueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("title", "梯子教程");

    创建SearchRequest

    Java 所有的ES查询请求都是通过SearchRequest对象进行设置,因此需要实例化SearchRequest对象,设置query参数。

    1. SearchRequest searchRequest = new SearchRequest();
    2. // 通过SearchSourceBuilder构建搜索参数
    3. SearchSourceBuilder builder = new SearchSourceBuilder();
    4. // 设置query参数,绑定前面创建的Query对象
    5. builder.query(matchQueryBuilder);
    6. // 设置SearchRequest搜索参数
    7. searchRequest.source(builder);

    执行请求

    1. // 执行ES请求
    2. SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

    提示: 如何处理查询结果,请参考 Java ES查询基本写法

    Java Elasticsearch Match Phrase 短语查询

    Elasticsearch的match_phrase短语查询跟match的区别就是,关键词作为一个整体进行搜索,而不是拆分成一个个关键词。

    创建MatchPhraseQueryBuilder

    有两种方式创建MatchPhraseQueryBuilder

    • 直接实例化MatchPhraseQueryBuilder

    • 通过QueryBuilders工具创建

    方式1

    直接实例化MatchPhraseQueryBuilder对象。 构造方法参数说明:

    • 参数1 - 字段名

    • 参数2 - 匹配短语

    MatchPhraseQueryBuilder matchPhraseQueryBuilder = new MatchPhraseQueryBuilder("title", "Elasticsearch 查询语法");

    方式2

    通过QueryBuilders工具创建

    1. // 可以直接赋值给QueryBuilder接口定义的对象
    2. QueryBuilder matchPhraseQueryBuilder = QueryBuilders.matchPhraseQuery("title", "Elasticsearch 查询语法");

    创建SearchRequest

    Java 所有的ES查询请求都是通过SearchRequest对象进行设置,因此需要实例化SearchRequest对象,设置query参数。

    1. SearchRequest searchRequest = new SearchRequest();
    2. // 通过SearchSourceBuilder构建搜索参数
    3. SearchSourceBuilder builder = new SearchSourceBuilder();
    4. // 设置query参数,绑定前面创建的Query对象
    5. builder.query(matchPhraseQueryBuilder);
    6. // 设置SearchRequest搜索参数
    7. searchRequest.source(builder);

    执行请求

    // 执行ES请求
    
    SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

    提示: 如何处理查询结果,请参考 Java ES查询基本写法

    Java Elasticsearch Term 等值匹配

    Elasticsearch的term查询,主要用于实现等值匹配,类似SQL的fieldname=value表达式。

    构建term查询

    等值匹配

    // 方式1
    TermQueryBuilder termQueryBuilder = new TermQueryBuilder("order_id", 100);
    ​
    // 方式2
    TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("order_id", 100);

    类似SQL:order_id=100 条件

    创建SearchRequest

    Java 所有的ES查询请求都是通过SearchRequest对象进行设置,因此需要实例化SearchRequest对象,设置query参数。

    1. SearchRequest searchRequest = new SearchRequest();
    2. // 通过SearchSourceBuilder构建搜索参数
    3. SearchSourceBuilder builder = new SearchSourceBuilder();
    4. // 设置query参数,绑定前面创建的Query对象
    5. builder.query(termQueryBuilder);
    6. // 设置SearchRequest搜索参数
    7. searchRequest.source(builder);

    执行请求

    1. // 执行ES请求
    2. SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

    提示: 如何处理查询结果,请参考 Java ES查询基本写法

    Java Elasticsearch terms查询

    Elasticsearch terms查询,用于实现类似SQL的in语句,匹配其中一个值即可。

    构建terms查询

    实现类似SQL的in语句

    1. // 方式1
    2. TermsQueryBuilder termsQueryBuilder = new TermsQueryBuilder("order_id", 1,2,3,4,5);
    3. // 方式2
    4. TermsQueryBuilder termsQueryBuilder = QueryBuilders.termsQuery("order_id", 1,2,3,4,5);

    类似SQL:order_id in (1,2,3,4,5)

    创建SearchRequest

    Java 所有的ES查询请求都是通过SearchRequest对象进行设置,因此需要实例化SearchRequest对象,设置query参数。

    1. SearchRequest searchRequest = new SearchRequest();
    2. // 通过SearchSourceBuilder构建搜索参数
    3. SearchSourceBuilder builder = new SearchSourceBuilder();
    4. // 设置query参数,绑定前面创建的Query对象
    5. builder.query(termsQueryBuilder);
    6. // 设置SearchRequest搜索参数
    7. searchRequest.source(builder);

    执行请求

    1. // 执行ES请求
    2. SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

    提示: 如何处理查询结果,请参考 Java ES查询基本写法

    Java Elasticsearch range范围匹配

    Elasticsearch的range范围匹配,可以实现类似SQL语句中的>, >=, <, <=关系表达式。

    构建range查询

    方式1

    1. // 等价SQL: price > 100 and price < 200
    2. RangeQueryBuilder rangeQueryBuilder = new RangeQueryBuilder("price");
    3.       rangeQueryBuilder.gt(100);
    4.       rangeQueryBuilder.lt(200);
    5. // 等价SQL: price >= 100 and price <= 200
    6. RangeQueryBuilder rangeQueryBuilder = new RangeQueryBuilder("price");
    7.       rangeQueryBuilder.gte(100);
    8.       rangeQueryBuilder.lte(200);

    方式2

    1. // 等价SQL: price >= 150 and price <= 300
    2. RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("price")
    3.               .gte(150)
    4.               .lte(300);

    创建SearchRequest

    1. Java 所有的ES查询请求都是通过SearchRequest对象进行设置,因此需要实例化SearchRequest对象,设置query参数。
    2. SearchRequest searchRequest = new SearchRequest();
    3. // 通过SearchSourceBuilder构建搜索参数
    4. SearchSourceBuilder builder = new SearchSourceBuilder();
    5. // 设置query参数,绑定前面创建的Query对象
    6. builder.query(rangeQueryBuilder);
    7. // 设置SearchRequest搜索参数
    8. searchRequest.source(builder);
    9. 执行请求

    1. // 执行ES请求
    2. SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

    提示: 如何处理查询结果,请参考 Java ES查询基本写法

    Java Elasticsearch bool组合查询

    Elasticsearch bool组合查询,通过must(且)和should(或)逻辑运算组合term、terms、range等ES查询子句,实现复杂的查询需求,类似SQL的where子句。

    不了解ES Bool查询,可以参考Elasticsearch Bool查询语法

    构建bool查询

    1. // 等价SQL: shop_id=100 and status=3 or (price >= 100 and price <= 300)
    2. BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery()
    3.               .must(QueryBuilders.termQuery("shop_id", 100)) // 通过must设置term子查询
    4.               .must(QueryBuilders.termQuery("status", 3)) // 通过must设置term子查询
    5.               .should(QueryBuilders.rangeQuery("price").gte(100).lte(300));// 通过should设置range子查询

    bool查询支持多层嵌套,最终组合出复杂的查询条件

    创建SearchRequest

    Java 所有的ES查询请求都是通过SearchRequest对象进行设置,因此需要实例化SearchRequest对象,设置query参数。

    1. SearchRequest searchRequest = new SearchRequest();
    2. // 通过SearchSourceBuilder构建搜索参数
    3. SearchSourceBuilder builder = new SearchSourceBuilder();
    4. // 设置query参数,绑定前面创建的Query对象
    5. builder.query(boolQueryBuilder);
    6. // 设置SearchRequest搜索参数
    7. searchRequest.source(builder);

    执行请求

    1. // 执行ES请求
    2. SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

    提示:如何处理查询结果,请参考 Java ES查询基本写法

    Java Elasticsearch geo查询

    Java Elasticsearch geo查询主要包括:

    • 按距离搜索

    • 按距离排序

    • 按矩形范围搜索

    按距离搜索

    1. // 根据location坐标字段和当前位置,搜索1千米范围的数据
    2. GeoDistanceQueryBuilder queryBuilder = QueryBuilders.geoDistanceQuery("location")
    3.               .distance("1km") //设置搜索距离为1千米
    4.               // 设置当前位置
    5.               .point(new GeoPoint(39.889916, 116.379547));

    按距离排序

    1. // 创建SearchRequest对象
    2. SearchRequest searchRequest = new SearchRequest();
    3. // 通过SearchSourceBuilder构建搜索参数
    4. SearchSourceBuilder builder = new SearchSourceBuilder();
    5. // 设置前面创建的ES查询条件
    6. builder.query(queryBuilder);
    7. // 构建GeoDistanceSortBuilder设置按距离排序参数
    8. GeoDistanceSortBuilder geoDistanceSortBuilder = SortBuilders.geoDistanceSort("location", new GeoPoint(39.889916, 116.379547));
    9. // 升序排序
    10. geoDistanceSortBuilder.order(SortOrder.ASC);
    11. // 设置排序参数
    12. builder.sort(geoDistanceSortBuilder);
    13.        
    14. // 设置搜索请求参数
    15. searchRequest.source(builder);

    按矩形范围搜索

    1. // 根据location坐标字段和一个矩形范围,匹配文档
    2. GeoBoundingBoxQueryBuilder queryBuilder = QueryBuilders.geoBoundingBoxQuery("location")
    3.               .setCorners( // 设置矩形坐标
    4.                       new GeoPoint(40.73, -74.1), // 设置矩形的左上角坐标
    5.                       new GeoPoint(40.717, -73.99) // 设置矩形的右下角坐标
    6.               );

    创建SearchRequest

    1. Java 所有的ES查询请求都是通过SearchRequest对象进行设置,因此需要实例化SearchRequest对象,设置query参数。
    2. SearchRequest searchRequest = new SearchRequest();
    3. // 通过SearchSourceBuilder构建搜索参数
    4. SearchSourceBuilder builder = new SearchSourceBuilder();
    5. // 设置query参数,绑定前面创建的Query对象
    6. builder.query(queryBuilder);
    7. // 设置SearchRequest搜索参数
    8. searchRequest.source(builder);
     
    

    执行请求

    1. // 执行ES请求
    2. SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

    提示:如何处理查询结果,请参考 Java ES查询基本写法

    聚合分析

    Java Elasticsearch 聚合查询(Aggregation)详解

    Elasticsearch中的聚合查询,类似SQL的SUM/AVG/COUNT/GROUP BY分组查询,主要用于统计分析场景。

    这里主要介绍Java Elasticsearch 聚合查询的写法,如果不了解ES聚合查询,请参考 ES聚合查询基本概念和用法

    例子

    1. import org.apache.http.HttpHost;
    2. import org.elasticsearch.action.search.SearchRequest;
    3. import org.elasticsearch.action.search.SearchResponse;
    4. import org.elasticsearch.client.RequestOptions;
    5. import org.elasticsearch.client.RestClient;
    6. import org.elasticsearch.client.RestClientBuilder;
    7. import org.elasticsearch.client.RestHighLevelClient;
    8. import org.elasticsearch.index.query.*;
    9. import org.elasticsearch.search.aggregations.AggregationBuilders;
    10. import org.elasticsearch.search.aggregations.Aggregations;
    11. import org.elasticsearch.search.aggregations.bucket.terms.Terms;
    12. import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
    13. import org.elasticsearch.search.aggregations.metrics.Avg;
    14. import org.elasticsearch.search.builder.SearchSourceBuilder;
    15. import java.io.IOException;
    16. public class Main {
    17.    public static void main(String[] args) throws IOException {
    18.        // 首先创建RestClient,后续章节通过RestClient对象进行参数配置。
    19.        RestClientBuilder restClientBuilder = RestClient.builder(
    20.                new HttpHost("localhost", 9200, "http"), // 设置ES服务地址,支持多个
    21.                new HttpHost("localhost", 9201, "http"));
    22.        // 创建RestHighLevelClient,•请求都是通过RestHighLevelClient实例发出去的。
    23.        RestHighLevelClient client = new RestHighLevelClient(restClientBuilder);
    24.        // 创建SearchRequest对象, 设置查询索引名=order
    25.        SearchRequest searchRequest = new SearchRequest("order");
    26.        // 通过SearchSourceBuilder构建搜索参数
    27.        SearchSourceBuilder builder = new SearchSourceBuilder();
    28.        // 通过QueryBuilders构建ES查询条件,这里查询所有文档,复杂的查询语句设置请参考前面的章节。
    29.        builder.query(QueryBuilders.matchAllQuery());
    30.        // 创建terms桶聚合,聚合名字=by_shop, 字段=shop_id,根据shop_id分组
    31.        TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms("by_shop")
    32.               .field("shop_id");
    33.        // 嵌套聚合
    34.        // 设置Avg指标聚合,聚合名字=avg_price, 字段=price,计算平均价格
    35.        aggregationBuilder.subAggregation(AggregationBuilders.avg("avg_price").field("price"));
    36.        // 设置聚合查询
    37.        builder.aggregation(aggregationBuilder);
    38.        // 设置搜索条件
    39.        searchRequest.source(builder);
    40.        // 如果只想返回聚合统计结果,不想返回查询结果可以将分页大小设置为0
    41.        builder.size(0);
    42.        // 执行ES请求
    43.        SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
    44.        // 处理聚合查询结果
    45.        Aggregations aggregations = searchResponse.getAggregations();
    46.        // 根据by_shop名字查询terms聚合结果
    47.        Terms byShopAggregation = aggregations.get("by_shop");
    48.        // 遍历terms聚合结果
    49.        for (Terms.Bucket bucket : byShopAggregation.getBuckets()) {
    50.            // 因为是根据shop_id分组,因此可以直接将桶的key转换成int类型
    51.            int shopId = bucket.getKeyAsNumber().intValue();
    52.            // 根据avg_price聚合名字,获取嵌套聚合结果
    53.            Avg avg = bucket.getAggregations().get("avg_price");
    54.            // 获取平均价格
    55.            double avgPrice = avg.getValue();
    56.       }
    57.        // 关闭ES Client
    58.        client.close();
    59.   }
    60. }
    61. 例子聚合统计的效果等价SQL:
    62. select shop_id, avg(price) as avg_price from order group by shop_id

    大家可以先通过例子和注释大致了解 Java ES 聚合查询的基本写法

    Java Elasticsearch 指标聚合(metrics)

    Elasticsearch指标聚合,就是类似SQL的统计函数,指标聚合可以单独使用,也可以跟桶聚合一起使用,下面介绍Java Elasticsearch指标聚合的写法。

    不了解ES指标聚合相关知识,先看一下Elasticsearch 指标聚合教程

    例子

    /
    1. / 首先创建RestClient,后续章节通过RestClient对象进行参数配置。
    2. RestClientBuilder restClientBuilder = RestClient.builder(
    3.               new HttpHost("localhost", 9200, "http"), // 设置ES服务地址,支持多个
    4.               new HttpHost("localhost", 9201, "http"));
    5. // 创建RestHighLevelClient,•请求都是通过RestHighLevelClient实例发出去的。
    6.       RestHighLevelClient client = new RestHighLevelClient(restClientBuilder);
    7. // 创建SearchRequest对象, 索引名=order
    8. SearchRequest searchRequest = new SearchRequest("order");
    9. // 通过SearchSourceBuilder构建搜索参数
    10. SearchSourceBuilder builder = new SearchSourceBuilder();
    11. // 通过QueryBuilders构建ES查询条件,这里查询所有文档,复杂的查询语句设置请参考前面的章节。
    12. builder.query(QueryBuilders.matchAllQuery());
    13. // 创建Value Count指标聚合
    14. // 聚合统计命名为:orders, 统计order_id字段值的数量
    15. ValueCountAggregationBuilder valueCountAggregationBuilder = AggregationBuilders.count("orders")
    16.               .field("order_id");
    17. // 创建Sum指标聚合
    18. // 聚合统计命名为:total_sale, 统计price字段值的总和
    19. SumAggregationBuilder sumAggregationBuilder = AggregationBuilders.sum("total_sale")
    20.               .field("price");
    21. // 设置聚合查询,可以设置多个聚合查询条件,只要聚合查询命名不同就行
    22. builder.aggregation(valueCountAggregationBuilder);
    23. builder.aggregation(sumAggregationBuilder);
    24. // 设置搜索条件
    25. searchRequest.source(builder);
    26. // 执行ES请求
    27. SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
    28. // 处理聚合查询结果
    29. Aggregations aggregations = searchResponse.getAggregations();
    30. // 根据orders命名查询,ValueCount统计结果
    31. ValueCount valueCount = aggregations.get("orders");
    32. System.out.println(valueCount.getValue());
    33. // 根据total_sale命名查询,Sum统计结果
    34. Sum sum = aggregations.get("total_sale");
    35. System.out.println(sum.getValue());

    其他指标聚合的用法类似,后面分别介绍常用指标聚合。

    常用指标聚合

    1. Value Count

    值聚合,主要用于统计文档总数,类似SQL的count函数。

    创建聚合条件

    1. // 创建Value Count指标聚合
    2. // 聚合统计命名为:orders, 统计order_id字段值的数量
    3. ValueCountAggregationBuilder valueCountAggregationBuilder = AggregationBuilders.count("orders")
    4.               .field("order_id");

    处理聚合结果

    1. Aggregations aggregations = searchResponse.getAggregations();
    2. // 根据orders命名查询,ValueCount统计结果
    3. ValueCount valueCount = aggregations.get("orders");
    4. // 打印结果
    5. System.out.println(valueCount.getValue());

    2.Cardinality

    基数聚合,也是用于统计文档的总数,跟Value Count的区别是,基数聚合会去重,不会统计重复的值,类似SQL的count(DISTINCT 字段)用法。

    基数聚合是一种近似算法,统计的结果会有一定误差,不过性能很好。

    创建聚合条件

    1. // 聚合统计命名为:total, 近似统计id字段值的数量
    2. CardinalityAggregationBuilder cardinalityAggregationBuilder = AggregationBuilders.cardinality("total")
    3.               .field("id");

    处理聚合结果

    1. Aggregations aggregations = searchResponse.getAggregations();
    2. // 根据total命名查询,Cardinality统计结果
    3. Cardinality cardinality = aggregations.get("total");
    4. // 打印结果
    5. System.out.println(cardinality.getValue());

    3.Avg

    求平均值

    创建聚合条件

    1. AvgAggregationBuilder avgAggregationBuilder = AggregationBuilders.avg("avg_price")
    2.               .field("price");

    处理聚合结果

    1. Aggregations aggregations = searchResponse.getAggregations();
    2. // 根据total命名查询,Avg统计结果
    3. Avg avgPrice = aggregations.get("avg_price");
    4. // 打印结果
    5. System.out.println(avgPrice.getValue());

    4.Sum

    求和计算

    创建聚合条件

    1. SumAggregationBuilder sumAggregationBuilder = AggregationBuilders.sum("total_sale")
    2.               .field("price");

    处理聚合结果

    1. Aggregations aggregations = searchResponse.getAggregations();
    2. // 根据total命名查询,Sum统计结果
    3. Sum totalPrice = aggregations.get("total_sale");
    4. // 打印结果
    5. System.out.println(totalPrice.getValue());

    5.Max

    求最大值

    创建聚合条件

    1. MaxAggregationBuilder maxAggregationBuilder = AggregationBuilders.max("max_price")
    2.               .field("price");

    处理聚合结果

    1. // 处理聚合查询结果
    2. Aggregations aggregations = searchResponse.getAggregations();
    3. // 根据max_price命名查询,Max统计结果
    4. Max maxPrice = aggregations.get("max_price");
    5. // 打印结果
    6. System.out.println(maxPrice.getValue());

    6.Min

    求最小值

    创建聚合条件

    1. MinAggregationBuilder minAggregationBuilder = AggregationBuilders.min("min_price")
    2.               .field("price");

    处理聚合结果

    1. Aggregations aggregations = searchResponse.getAggregations();
    2. // 根据min_price命名查询,Min统计结果
    3. Min minPrice = aggregations.get("min_price");
    4. // 打印结果
    5. System.out.println(minPrice.getValue());

    java elasticsearch 桶聚合(bucket)

    Elasticsearch桶聚合,目的就是数据分组,先将数据按指定的条件分成多个组,然后对每一个组进行统计。

    不了解Elasticsearch桶聚合概念,可以先学习下Elasticsearch桶聚合教程

    本章介绍java elasticsearch桶聚合的用法

    例子

    1. // 首先创建RestClient,后续章节通过RestClient对象进行参数配置。
    2. RestClientBuilder restClientBuilder = RestClient.builder(
    3.               new HttpHost("localhost", 9200, "http"), // 设置ES服务地址,支持多个
    4.               new HttpHost("localhost", 9201, "http"));
    5. // 创建RestHighLevelClient,•请求都是通过RestHighLevelClient实例发出去的。
    6. RestHighLevelClient client = new RestHighLevelClient(restClientBuilder);
    7. // 创建SearchRequest对象, 设置索引名=order
    8. SearchRequest searchRequest = new SearchRequest("order");
    9. // 通过SearchSourceBuilder构建搜索参数
    10. SearchSourceBuilder builder = new SearchSourceBuilder();
    11. // 通过QueryBuilders构建ES查询条件,这里查询所有文档,复杂的查询语句设置请参考前面的章节。
    12. builder.query(QueryBuilders.matchAllQuery());
    13. // 创建Terms桶聚合条件
    14. // terms聚合命名为: by_shop, 分组字段为: shop_id
    15. TermsAggregationBuilder byShop = AggregationBuilders.terms("by_shop")
    16.               .field("shop_id");
    17. // 设置聚合条件
    18. builder.aggregation(byShop);
    19. // 设置搜索条件
    20. searchRequest.source(builder);
    21. // 执行ES请求
    22. SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
    23. // 处理聚合查询结果
    24. Aggregations aggregations = searchResponse.getAggregations();
    25. // 根据by_shop名字查询terms聚合结果
    26. Terms byShopAggregation = aggregations.get("by_shop");
    27. // 遍历terms聚合结果
    28. for (Terms.Bucket bucket : byShopAggregation.getBuckets()) {
    29.   // 因为是根据shop_id分组,因此可以直接将桶的key转换成int类型
    30.   int shopId = bucket.getKeyAsNumber().intValue();
    31.   // 如果分组的字段是字符串类型,可以直接转成String类型
    32.   // String key = bucket.getKeyAsString();
    33.   // 获取文档总数
    34.   long count = bucket.getDocCount();
    35. }

    其他桶聚合条件的用法类型,下面分别介绍各类常用的桶聚合

    常用桶聚合

    1.Terms聚合

    创建聚合条件

    1. // terms聚合命名为: by_shop, 分组字段为: shop_id
    2. TermsAggregationBuilder byShop = AggregationBuilders.terms("by_shop")
    3.               .field("shop_id");

    处理聚合结果

    1. Aggregations aggregations = searchResponse.getAggregations();
    2. // 根据by_shop命名查询terms聚合结果
    3. Terms byShopAggregation = aggregations.get("by_shop");
    4. // 遍历terms聚合结果
    5. for (Terms.Bucket bucket : byShopAggregation.getBuckets()) {
    6.   // 因为是根据shop_id分组,因此可以直接将桶的key转换成int类型
    7.   int shopId = bucket.getKeyAsNumber().intValue();
    8.   // 如果分组的字段是字符串类型,可以直接转成String类型
    9.   // String key = bucket.getKeyAsString();
    10.   // 获取文档总数
    11.   long count = bucket.getDocCount();
    12. }

    2.Histogram聚合

    创建聚合条件

    1. // Histogram聚合命名为: prices
    2. HistogramAggregationBuilder histogramAggregationBuilder = AggregationBuilders.histogram("prices")
    3.               .field("price") // 根据price字段值,对数据进行分组
    4.               .interval(100); // 分桶的间隔为100,意思就是price字段值按100间隔分组

    处理聚合结果

    1. // 处理聚合查询结果
    2. Aggregations aggregations = searchResponse.getAggregations();
    3. // 根据prices命名查询Histogram聚合结果
    4. Histogram histogram = aggregations.get("prices");
    5.        
    6. // 遍历聚合结果
    7. for (Histogram.Bucket bucket : histogram.getBuckets()) {
    8.   // 获取桶的Key值
    9.   String key = bucket.getKeyAsString();
    10.   // 获取文档总数
    11.   long count = bucket.getDocCount();
    12. }

    3.Date histogram聚合

    创建聚合条件

    1. // DateHistogram聚合命名为: sales_over_time
    2. DateHistogramAggregationBuilder dateHistogramAggregationBuilder = AggregationBuilders.dateHistogram("sales_over_time")
    3.               .field("date") // 根据date字段值,对数据进行分组
    4.               // 时间分组间隔:DateHistogramInterval.* 序列常量,支持每月,每年,每天等等时间间隔
    5.               .calendarInterval(DateHistogramInterval.MONTH)
    6.               // 设置返回结果中桶key的时间格式
    7.               .format("yyyy-MM-dd");

    处理聚合结果

    1. // 处理聚合查询结果
    2. Aggregations aggregations = searchResponse.getAggregations();
    3. // 根据sales_over_time命名查询Histogram聚合结果
    4. Histogram histogram = aggregations.get("sales_over_time");
    5.        
    6. // 遍历聚合结果
    7. for (Histogram.Bucket bucket : histogram.getBuckets()) {
    8.   // 获取桶的Key值
    9.   String key = bucket.getKeyAsString();
    10.   // 获取文档总数
    11.   long count = bucket.getDocCount();
    12. }

    4.Range聚合

    创建聚合条件

    1. //range聚合命名为: price_ranges
    2. RangeAggregationBuilder rangeAggregationBuilder = AggregationBuilders.range("price_ranges")
    3.               .field("price") // 根据price字段分桶
    4.               .addUnboundedFrom(100) // 范围配置, 0 - 100
    5.               .addRange(100.0, 200.0) // 范围配置, 100 - 200
    6.               .addUnboundedTo(200.0); // 范围配置,> 200的值

    处理聚合结果

    1. // 处理聚合查询结果
    2. Aggregations aggregations = searchResponse.getAggregations();
    3. Range range = aggregations.get("price_ranges");
    4. // 遍历聚合结果
    5. for (Range.Bucket bucket : range.getBuckets()) {
    6.   // 获取桶的Key值
    7.   String key = bucket.getKeyAsString();
    8.   // 获取文档总数
    9.   long count = bucket.getDocCount();
    10. }

    5.嵌套聚合的用法

    桶聚合之间支持互相嵌套,同时桶聚合也可以嵌套多个指标聚合,可以参考下面例子组合聚合条件

    创建嵌套聚合条件

    1. // 创建Terms指标聚合
    2. TermsAggregationBuilder byShop = AggregationBuilders.terms("by_shop")
    3.               .field("shop_id");
    4. // 创建avg指标聚合
    5. AvgAggregationBuilder avgPriceBuilder = AggregationBuilders.avg("avg_price")
    6.               .field("price");
    7. // 设置嵌套聚合查询条件
    8. byShop.subAggregation(avgPriceBuilder);
    9. SumAggregationBuilder sumPriceBulder = AggregationBuilders.sum("sum_price")
    10.               .field("price");
    11. // 设置嵌套聚合查询条件
    12. byShop.subAggregation(sumPriceBulder);

    处理结果

    1. // 处理聚合查询结果
    2. Aggregations aggregations = searchResponse.getAggregations();
    3. Terms terms = aggregations.get("by_shop");
    4. // 遍历聚合结果
    5. for (Terms.Bucket bucket : terms.getBuckets()) {
    6.     // 获取桶的Key值
    7.     String key = bucket.getKeyAsString();
    8.     // 获取文档总数
    9.     long count = bucket.getDocCount();
    10.     // 处理嵌套聚合结果
    11.     Aggregations subAggregations = bucket.getAggregations();
    12.     // 根据avg_price命名,查询avg聚合结果
    13.     Avg avgPriceResult = subAggregations.get("avg_price");
    14.     // 获取平均价格
    15.     double avgPrice = avgPriceResult.getValue();
    16.     // 根据sum_price命名,查询sum聚合结果
    17.   Sum sumPriceResult = subAggregations.get("sum_price");
    18.     // 获取总价格
    19.     double sumPrice = sumPriceResult.getValue();
    20. }

  • 相关阅读:
    MVC架构回顾
    android如何通过cpp sendevent发送powerkey按键消息
    科普一下MTU是什么,如何设置MTU
    【Java八股文总结】之JDK常见问题排查
    软考高项-IT部分
    重学c#系列——动态类型[二十二]
    算法总结-最短距离和问题
    信托计划净值数据写入excel
    VS编译OpenCV和OpenCV-contrib
    微信小程序完整项目实战(前端+后端)
  • 原文地址:https://blog.csdn.net/guanshengg/article/details/126315549