• RestHighLevelClient 操作ElasticSearch


    1、jar包

    1. org.elasticsearch
    2. elasticsearch
    3. 7.8
    4. org.elasticsearch.client
    5. elasticsearch-rest-high-level-client
    6. 7.8

    2、kerberos认证

    1. public void kerberos() {
    2. try {
    3. System.setProperty("http.auth.preference", "Kerberos");
    4. System.setProperty("java.security.krb5.conf", KRB5CONF);
    5. System.setProperty("sun.security.krb5.debug", "false");
    6. System.setProperty("sun.security.spnego.debug", "false");
    7. String acceptorPrincipal = PRINCIPAL;
    8. Path acceptorKeyTabPath = Paths.get(KEYTAB);
    9. Set set = new HashSet<>();
    10. set.add(acceptorPrincipal);
    11. final Subject subject = JaasKrbUtil.loginUsingKeytab(set, acceptorKeyTabPath, true); // 工具类未提供,需要单联
    12. Set privateCredentials = subject.getPrivateCredentials();
    13. log.info("getPrivateCredentials ------------------- ");
    14. privateCredentials.forEach(System.out::println);
    15. } catch (Exception e) {
    16. e.printStackTrace();
    17. }
    18. }
      1. public void kerberosRest(String index) {
      2. RestHighLevelClient restHighLevelClient = null;
      3. String indexName = "cool_test_one";
      4. String typeName = "cool_test_one_table";
      5. try {
      6. log.info("kerberos 访问 start =======================");
      7. log.info("开始认证");
      8. SpnegoHttpClientConfigCallbackHandler callbackHandler = new SpnegoHttpClientConfigCallbackHandler(PRINCIPAL, KEYTAB, true); // 工具类未提供,需要单联
      9. log.info("认证成功 ===================================");
      10. // 业务逻辑开始
      11. List hosts = new ArrayList<>();
      12. HttpHost hostNew = new HttpHost("localhost", 9200, "http");
      13. hosts.add(hostNew);
      14. HttpHost[] httpHosts = hosts.toArray(new HttpHost[0]);
      15. RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
      16. /** options start **/
      17. // 异步连接延时配置 https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.12/_timeouts.html
      18. // RequestConfig有三个超时如下
      19. int connectTimeout = 5000; // 设置连接超时时间,单位毫秒。指的是连接一个url的连接等待时间
      20. int socketTimeout = 5000; // 请求获取数据的超时时间,单位毫秒。 如果访问一个接口,多少时间内无法返回数据,就直接放弃此次调用。指的是连接上一个url,获取response的返回等待时间。
      21. int connectionRequestTimeout = 5000; // 设置从connect Manager获取Connection 超时时间,单位毫秒。这个属性是新加的属性,因为目前版本是可以共享连接池的。
      22. //配置请求超时超时,分为 连接超时(默认1s) 和 套接字超时(默认30s)
      23. restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> {
      24. requestConfigBuilder.setConnectTimeout(connectTimeout);//配置连接超时时间
      25. requestConfigBuilder.setSocketTimeout(socketTimeout);//配置套接字超时时间
      26. requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeout);//获取连接的超时时间
      27. return requestConfigBuilder;
      28. });
      29. // 异步连接数配置
      30. int maxConnectNum = 100; // 最大连接数
      31. int maxConnectPerRoute = 100; // 最大路由连接数
      32. restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
      33. httpClientBuilder.setMaxConnTotal(maxConnectNum);
      34. httpClientBuilder.setMaxConnPerRoute(maxConnectPerRoute);
      35. return httpClientBuilder;
      36. });
      37. /** options end **/
      38. restHighLevelClient = new RestHighLevelClient(restClientBuilder);
      39. log.info("operation start =========================================");
      40. singleIndex(restHighLevelClient, indexName, typeName); // 单条文档写入
      41. bulkIndex(restHighLevelClient, indexName, typeName); // 批量文档写入
      42. singleUpdate(restHighLevelClient, indexName, typeName); // 单条文档更新
      43. bulkUpdate(restHighLevelClient, indexName, typeName); // 批量文档更新
      44. singleUpsert(restHighLevelClient, indexName, typeName); // 单条文档upsert
      45. bulkUpsert(restHighLevelClient, indexName, typeName); // 批量文档upsert
      46. bulkUpdateNoDocId(restHighLevelClient, indexName, typeName);
      47. bulkUpsertNoDocId(restHighLevelClient, indexName, typeName);
      48. singleDelete(restHighLevelClient, indexName, typeName);
      49. log.info("operation end ===========================================");
      50. // 测试获取所有的索引
      51. log.info("获取" + indexName + " 索引数据");
      52. getIndex(restHighLevelClient, indexName);
      53. log.info("kerberos 访问 end");
      54. } catch (Exception e) {
      55. e.printStackTrace();
      56. } finally {
      57. if (restHighLevelClient != null) {
      58. try {
      59. restHighLevelClient.close();
      60. } catch (IOException e) {
      61. e.printStackTrace();
      62. }
      63. }
      64. }
      65. }

      3、写入操作

      1)根据_id单条文档写入

      单条文档写入需要创建IndexRequest对象,设置索引名称,类型名称,id名称,以及使用source传入文档字段的Map对象,在执行写入时使用客户端的index方法把IndexRequest传入即可

      1. public String singleIndex(RestHighLevelClient restHighLevelClient, String indexName, String typeName) throws IOException {
      2. XContentBuilder builder = XContentFactory.jsonBuilder();
      3. builder.startObject();
      4. builder.field("a1", "bb");
      5. builder.field("a2", 100);
      6. builder.field("a3", 10.12);
      7. builder.field("a4", "2055-05-05");
      8. builder.field("a5", "20:55:55");
      9. builder.field("a6", "2055-05-05");
      10. builder.endObject();
      11. String docIdStr = Md5Util.encode("bb");
      12. log.info("doc_id ===>" + docIdStr);
      13. IndexRequest request = new IndexRequest(indexName, typeName, docIdStr).source(builder);
      14. IndexResponse response = null;
      15. try {
      16. // 不使用默认的RequestOptions.DEFAULT,而通过使用自定义RequestOptions的方式(ES官方api已经给我们开放出来了):
      17. RequestOptions.Builder buildersize = RequestOptions.DEFAULT.toBuilder();
      18. buildersize.setHttpAsyncResponseConsumerFactory(
      19. new HttpAsyncResponseConsumerFactory
      20. //修改为500MB
      21. .HeapBufferedResponseConsumerFactory(500 * 1024 * 1024));
      22. // response = restHighLevelClient.index(request, RequestOptions.DEAFULT);
      23. response = restHighLevelClient.index(request, buildersize.build());
      24. } catch (IOException e) {
      25. e.printStackTrace();
      26. }
      27. return response.getId();
      28. }

      2)批量写入文档

      批量写入需要创建BulkRequest,多条数据每条创建一个IndexRequest对象设置index,type,id和数据Map,将这些IndexRequest对象条件到BulkRequest中,调用客户端的bulk方法执行即可

      1. public static void bulkIndex(RestHighLevelClient restHighLevelClient, String indexName, String typeName) throws IOException {
      2. BulkRequest bulkRequest = new BulkRequest();
      3. Map data = new HashMap<>();
      4. Object colData = "kugou";
      5. data.put("a1", String.valueOf(colData));
      6. data.put("a2", 100);
      7. data.put("a3", 10.12);
      8. data.put("a4", "2055-05-05");
      9. data.put("a5", "20:55:55");
      10. data.put("a6", "2055-05-05");
      11. for (int i = 0; i < 2; i++) {
      12. String docIdStr = Md5Util.encode("bb" + i);
      13. log.info("doc_id =>" + docIdStr);
      14. // 有doc_id
      15. IndexRequest indexRequest = new IndexRequest(indexName, typeName, docIdStr).source(data);
      16. // 无doc_id
      17. // IndexRequest indexRequest1 = new IndexRequest(indexName, typeName, docIdStr).source(data);
      18. bulkRequest.add(indexRequest);
      19. }
      20. BulkResponse response = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
      21. log.info("bulk write result is " + !response.hasFailures());
      22. }

       3)更新单条文档

      更新单条文档需要创建UpdateRequest对象,设置index,type,id和文档字段数据,调用客户端的update方法执行即可

      1. public static void singleUpdate(RestHighLevelClient restHighLevelClient, String indexName, String typeName) throws IOException {
      2. Map data = new HashMap<>();
      3. Object colData = "kugou";
      4. data.put("a1", String.valueOf(colData));
      5. data.put("a2", 100);
      6. data.put("a3", 10.12);
      7. data.put("a4", "2055-05-05");
      8. data.put("a5", "20:55:55");
      9. data.put("a6", "2055-05-05");
      10. String docIdStr = Md5Util.encode("kugou");
      11. UpdateRequest updateRequest = new UpdateRequest(indexName, typeName, docIdStr);
      12. UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
      13. DocWriteResponse.Result result = updateResponse.getResult();
      14. }

      4)批量文档更新

      1. public static void bulkUpdate(RestHighLevelClient restHighLevelClient, String indexName, String typeName) throws IOException {
      2. BulkRequest bulkRequest = new BulkRequest();
      3. Map data = new HashMap<>();
      4. Object colData = "test";
      5. data.put("a1", String.valueOf(colData));
      6. data.put("a2", 100);
      7. data.put("a3", 10.12);
      8. data.put("a4", "2055-05-05");
      9. data.put("a5", "20:55:55");
      10. data.put("a6", "2055-05-05");
      11. for (int i = 0; i < 2; i++) {
      12. String docIdStr = Md5Util.encode("bb" + i);
      13. log.info("doc_id =>" + docIdStr);
      14. UpdateRequest updateRequest = new UpdateRequest(indexName, typeName, docIdStr);
      15. updateRequest.doc(data);
      16. bulkRequest.add(updateRequest);
      17. }
      18. BulkResponse response = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
      19. log.info("bulk write result is " + !response.hasFailures());
      20. }

      5)upsert方式

      对于有则更新无则插入的情况,UpdateRequest在设置doc之后再设置以下upsert即可,其他一样

      1. public static void singleUpsert(RestHighLevelClient restHighLevelClient, String indexName, String typeName) throws IOException {
      2. // 对于有则更新无则插入的情况,UpdateRequest在设置doc之后再设置以下upsert即可,其他一样
      3. Map data = new HashMap<>();
      4. Object colData = "kugou";
      5. data.put("a1", String.valueOf(colData));
      6. data.put("a2", 100);
      7. data.put("a3", 10.12);
      8. data.put("a4", "2055-05-05");
      9. data.put("a5", "20:55:55");
      10. data.put("a6", "2055-05-05");
      11. /**
      12. * 测试过程中,如果es定义的字段数为6个;先index的时候,字段数为5个;upsert的时候,字段数6个,结果是success
      13. */
      14. String docIdStr = Md5Util.encode("kugou");
      15. UpdateRequest updateRequest = new UpdateRequest(indexName, typeName, docIdStr);
      16. updateRequest.doc(data).upsert(data);
      17. UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
      18. }
      19. public static void bulkUpsert(RestHighLevelClient restHighLevelClient, String indexName, String typeName) throws IOException {
      20. BulkRequest bulkRequest = new BulkRequest();
      21. Map data = new HashMap<>();
      22. Object colData = "test";
      23. data.put("a1", String.valueOf(colData));
      24. data.put("a2", 100);
      25. data.put("a3", 10.12);
      26. data.put("a4", "2055-05-05");
      27. data.put("a5", "20:55:55");
      28. data.put("a6", "2055-05-05");
      29. for (int i = 0; i < 2; i++) {
      30. String docIdStr = Md5Util.encode("bb" + i);
      31. log.info("doc_id =>" + docIdStr);
      32. UpdateRequest updateRequest = new UpdateRequest(indexName, typeName, docIdStr);
      33. updateRequest.doc(data).upsert(data); // 对于有则更新无则插入的情况,UpdateRequest在设置doc之后再设置以下upsert即可,其他一样
      34. bulkRequest.add(updateRequest);
      35. }
      36. BulkResponse response = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
      37. log.info("bulk write result is " + !response.hasFailures());
      38. }

      6)多种操作类型混合操作

      1. public static void bulkMixOperation(RestHighLevelClient restHighLevelClient, String indexName, String typeName) throws IOException {
      2. Map data = new HashMap<>();
      3. Object colData = "index_id";
      4. data.put("a1", String.valueOf(colData));
      5. data.put("a2", 100);
      6. data.put("a3", 10.12);
      7. data.put("a4", "2055-05-05");
      8. data.put("a5", "20:55:55");
      9. data.put("a6", "2055-05-05");
      10. String index_doc_id = Md5Util.encode(String.valueOf(colData));
      11. Map data1 = new HashMap<>();
      12. Object colData1 = "upsert_id";
      13. data1.put("a1", String.valueOf(colData1));
      14. data1.put("a2", 100);
      15. data1.put("a3", 10.12);
      16. data1.put("a4", "2055-05-05");
      17. data1.put("a5", "20:55:55");
      18. data1.put("a6", "2055-05-05");
      19. String upsert_doc_id = Md5Util.encode(String.valueOf(colData1));
      20. Map data2 = new HashMap<>();
      21. Object colData2 = "update_id";
      22. data2.put("a1", String.valueOf(colData1));
      23. data2.put("a2", 100);
      24. data2.put("a3", 10.12);
      25. data2.put("a4", "2055-05-05");
      26. data2.put("a5", "20:55:55");
      27. data2.put("a6", "2055-05-05");
      28. String update_doc_id = Md5Util.encode(String.valueOf(colData2));
      29. BulkRequest bulkRequest = new BulkRequest();
      30. IndexRequest indexRequest = new IndexRequest(indexName, typeName, index_doc_id).source(data);
      31. UpdateRequest updateRequest = new UpdateRequest(indexName, typeName, upsert_doc_id).doc(data1).upsert(data1);
      32. UpdateRequest updateRequest1 = new UpdateRequest(indexName, typeName, update_doc_id).doc(data2);
      33. bulkRequest.add(indexRequest);
      34. bulkRequest.add(updateRequest);
      35. bulkRequest.add(updateRequest1);
      36. BulkResponse result = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
      37. log.info("mix bulk result is " + !result.hasFailures());
      38. }

      7)listMap构造数据插入ES

      1. public static void ListMapIndex(RestHighLevelClient restHighLevelClient, String indexName, String typeName) throws IOException {
      2. List> list = new ArrayList<>();
      3. list.add(new HashMap() {{
      4. put("id", "003");
      5. put("name", "c宾馆");
      6. put("city", "苏州");
      7. put("price", 7.54);
      8. }});
      9. list.add(new HashMap() {{
      10. put("id", "004");
      11. put("name", "d宾馆");
      12. put("city", "杭州");
      13. put("price", 17.34);
      14. }});
      15. list.add(new HashMap() {{
      16. put("id", "005");
      17. put("name", "e宾馆");
      18. put("city", "上海");
      19. put("price", 21.92);
      20. }});
      21. BulkRequest bulkRequest = new BulkRequest();
      22. list.forEach(s -> bulkRequest.add(new IndexRequest().index(indexName).type(typeName).id(s.get("id").toString()).source(s)));
      23. bulkRequest.timeout(TimeValue.timeValueSeconds(5));
      24. BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
      25. log.info("result is " + !bulkResponse.hasFailures());
      26. restHighLevelClient.close();
      27. }

      8)根据条件批量更新文档

      创建UpdateByQueryRequest对象,分别设置setQuerysetScript,分别代表条件和更新语句,最后客户端调用updateByQuery更新

      1. public static void updateByQuery(RestHighLevelClient restHighLevelClient, String indexName, String typeName) throws IOException {
      2. UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(indexName, typeName);
      3. updateByQueryRequest.setQuery(new TermQueryBuilder("city", "上海"));
      4. updateByQueryRequest.setScript(new Script("ctx._source['city']='杭州'"));
      5. restHighLevelClient.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
      6. restHighLevelClient.close();
      7. }

      9)删除单条文档

      删除单条文档使用DeleteRequest对象,传入index,type,doc_id,客户端调用delete方法即可

      1. public static void singleDelete(RestHighLevelClient restHighLevelClient, String indexName, String typeName) {
      2. DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest();
      3. deleteByQueryRequest.indices(indexName);
      4. deleteByQueryRequest.setQuery(new MatchQueryBuilder("a1", "hello"));
      5. try {
      6. restHighLevelClient.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
      7. } catch (IOException e) {
      8. e.printStackTrace();
      9. }
      10. }

      10)批量删除文档

      批量删除使用BulkRequest对象,将DeleteRequest传入BulkRequest中,最后调用客户端的bulk提交即可

      1. public static void bulkDelete(RestHighLevelClient restHighLevelClient, String indexName, String typeName) throws IOException {
      2. BulkRequest bulkRequest = new BulkRequest();
      3. bulkRequest.add(new DeleteRequest("hotel", "_doc", "004"));
      4. bulkRequest.add(new DeleteRequest("hotel", "_doc", "003"));
      5. restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
      6. restHighLevelClient.close();
      7. }

      11)根据条件删除文档

      使用deleteByQueryRequest对象,设置setQuery参数为条件,客户端调用deleteByQuery即可

      1. public static void deleteByQuery(RestHighLevelClient restHighLevelClient, String indexName, String typeName) throws IOException {
      2. DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indexName);
      3. deleteByQueryRequest.setQuery(new BoolQueryBuilder().mustNot(new TermQueryBuilder("city", "上海")));
      4. restHighLevelClient.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
      5. restHighLevelClient.close();
      6. }

      12)搜索文档操作

      1. /**
      2. *
      3. * @param restHighLevelClient
      4. * @param indexName
      5. * @param typeName
      6. * @throws IOException
      7. */
      8. public static void searchIndex(RestHighLevelClient restHighLevelClient, String indexName, String typeName) throws IOException {
      9. SearchRequest searchRequest = new SearchRequest(indexName, typeName);
      10. SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
      11. sourceBuilder.query(QueryBuilders.matchQuery("city", "上海"));
      12. searchRequest.source(sourceBuilder);
      13. SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
      14. RestStatus restStatus = searchResponse.status();
      15. System.out.println(restStatus);
      16. if (restStatus == RestStatus.OK) {
      17. SearchHits searchHits = searchResponse.getHits();
      18. for (SearchHit searchHit : searchHits) {
      19. System.out.println("id:" + searchHit.getId());
      20. System.out.println("index:" + searchHit.getIndex());
      21. System.out.println("score:" + searchHit.getScore());
      22. Map map = searchHit.getSourceAsMap();
      23. System.out.println("name:" + (String) map.get("name"));
      24. System.out.println("city:" + (String) map.get("city"));
      25. System.out.println("price:" + (Double) map.get("price"));
      26. }
      27. }
      28. restHighLevelClient.close();
      29. }
      30. /**
      31. * term精确搜索
      32. *
      33. * @param restHighLevelClient
      34. * @param indexName
      35. * @param typeName
      36. * @throws IOException
      37. */
      38. public static void termSearch(RestHighLevelClient restHighLevelClient, String indexName, String typeName) throws IOException {
      39. SearchRequest searchRequest = new SearchRequest(indexName, typeName);
      40. SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(QueryBuilders.termQuery("_id", "007"));
      41. searchRequest.source(searchSourceBuilder);
      42. SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
      43. if (searchResponse.status() == RestStatus.OK) {
      44. SearchHits searchHits = searchResponse.getHits();
      45. Object city = searchHits.getAt(0).getSourceAsMap().get("city");
      46. System.out.println(String.valueOf(city));
      47. }
      48. restHighLevelClient.close();
      49. }
      50. /**
      51. * range范围搜索
      52. *
      53. * @param restHighLevelClient
      54. * @param indexName
      55. * @param typeName
      56. * @throws IOException
      57. */
      58. public static void rangeSearch(RestHighLevelClient restHighLevelClient, String indexName, String typeName) throws IOException {
      59. SearchRequest searchRequest = new SearchRequest(indexName, typeName);
      60. SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(QueryBuilders.rangeQuery("price").gte(10.0).lte(100.0));
      61. searchRequest.source(searchSourceBuilder);
      62. SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
      63. if (searchResponse.status() == RestStatus.OK) {
      64. SearchHits searchHits = searchResponse.getHits();
      65. for (SearchHit searchHit : searchHits) {
      66. System.out.println(searchHit.getSourceAsMap());
      67. }
      68. }
      69. restHighLevelClient.close();
      70. }
      71. /**
      72. * 分页查询
      73. *
      74. * @param restHighLevelClient
      75. * @param indexName
      76. * @param typeName
      77. * @throws IOException
      78. */
      79. public static void pageSearch(RestHighLevelClient restHighLevelClient, String indexName, String typeName) throws IOException {
      80. SearchRequest searchRequest = new SearchRequest(indexName, typeName);
      81. SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().from(1).size(1).query(QueryBuilders.rangeQuery("price").gte(10.0).lte(100.0));
      82. searchRequest.source(searchSourceBuilder);
      83. SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
      84. if (searchResponse.status() == RestStatus.OK) {
      85. SearchHits searchHits = searchResponse.getHits();
      86. for (SearchHit searchHit : searchHits) {
      87. System.out.println(searchHit.getSourceAsMap());
      88. }
      89. }
      90. restHighLevelClient.close();
      91. }
      92. /**
      93. * 这种常用于根据筛选条件之后抽取全部数据的场景,
      94. * scroll API 可以被用来检索大量的结果, 甚至所有的结果 ,
      95. * 注意es的游标查询的是当下时刻的数据快照,
      96. * 即在游标查询之后的数据的变动不会影响游标查询的结果,
      97. * 默认游标查询根据_doc字段进行排序
      98. *
      99. * @param restHighLevelClient
      100. * @param indexName
      101. * @param typeName
      102. * @throws IOException
      103. */
      104. public static void cusorSearch(RestHighLevelClient restHighLevelClient, String indexName, String typeName) throws IOException {
      105. SearchRequest searchRequest = new SearchRequest(indexName, typeName);
      106. SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(QueryBuilders.rangeQuery("price").gte(5.0).lte(100.0));
      107. searchSourceBuilder.size(2);
      108. searchRequest.source(searchSourceBuilder);
      109. Scroll scroll = new Scroll(timeValueMillis(1L));
      110. /**
      111. *
      112. * 在搜索条件之后使用searchSourceBuilder.size(2)设置了每次游标只抽取2条数据,
      113. * 设置每次游标的超时时间是1毫秒timeValueMillis,可以适当调高超时时间防止由于超时还没查完导致游标提前结束。
      114. * 在执行游标的时候,第一次使用了客户端的search方法,从第二次开始使用scroll方法,
      115. * 每开始下一次游标的时候都通过查看本次游标的结果是否为空searchResponse.getHits().getHits()来判断是否还要继续,
      116. * 把每次游标的返回结果收集起来拿到全部数据
      117. *
      118. */
      119. searchRequest.scroll(scroll);
      120. SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
      121. String scrollId = searchResponse.getScrollId();
      122. SearchHit[] hits = searchResponse.getHits().getHits();
      123. List resultSearchHit = new ArrayList<>();
      124. while (hits != null && hits.length > 0) {
      125. System.out.println(hits.length);
      126. System.out.println(scrollId);
      127. resultSearchHit.addAll(Arrays.asList(hits));
      128. SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId);
      129. searchScrollRequest.scroll(scroll);
      130. SearchResponse searchScrollResponse = restHighLevelClient.scroll(searchScrollRequest, RequestOptions.DEFAULT);
      131. scrollId = searchScrollResponse.getScrollId();
      132. hits = searchScrollResponse.getHits().getHits();
      133. }
      134. ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
      135. clearScrollRequest.addScrollId(scrollId);
      136. restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
      137. restHighLevelClient.close();
      138. }
      139. /**
      140. * 返回指定字段
      141. * 设置fetchSource参数,传入两个数组,前一个是包含的字段,后一个是排除的字段
      142. *
      143. * @param restHighLevelClient
      144. * @param indexName
      145. * @param typeName
      146. * @throws IOException
      147. */
      148. public static void specColSearch(RestHighLevelClient restHighLevelClient, String indexName, String typeName) throws IOException {
      149. SearchRequest searchRequest = new SearchRequest(indexName, typeName);
      150. SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
      151. .query(QueryBuilders.termQuery("_id", "001"))
      152. .fetchSource(new String[]{"city"}, new String[]{});
      153. searchRequest.source(searchSourceBuilder);
      154. SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
      155. if (searchResponse.status() == RestStatus.OK) {
      156. SearchHits searchHits = searchResponse.getHits();
      157. System.out.println(searchHits.getAt(0));
      158. }
      159. restHighLevelClient.close();
      160. }
      161. /**
      162. * 排序
      163. * 排序在SearchSourceBuilder对象后构建sort参数,通过SortOrder.DESC倒序列和SortOrder.ASC升序
      164. * @param restHighLevelClient
      165. * @param indexName
      166. * @param typeName
      167. * @throws IOException
      168. */
      169. public static void sortSearch(RestHighLevelClient restHighLevelClient, String indexName, String typeName) throws IOException {
      170. SearchRequest searchRequest = new SearchRequest(indexName, typeName);
      171. SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().sort("price", SortOrder.DESC);
      172. searchRequest.source(searchSourceBuilder);
      173. SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
      174. if (searchResponse.status() == RestStatus.OK) {
      175. SearchHits searchHits = searchResponse.getHits();
      176. for (SearchHit searchHit : searchHits) {
      177. System.out.println(searchHit);
      178. }
      179. }
      180. restHighLevelClient.close();
      181. }

    19. 相关阅读:
      Java 中常用进制转换
      mac使用python递归删除文件夹下所有的.DS_Store文件
      STM32物联网项目-GPRS模块介绍
      C语言:通讯录联系(内存存储)
      springboot项目结构命名规范
      vue的生命周期
      113-JavaSE基础进阶:补充知识-工厂模式、装饰模式
      【电路参考】缓启动电路
      opencv之修改尺寸、灰度转换(python)
      java计算机毕业设计基于springboo+vue的个人家庭理财记账管理系统
    20. 原文地址:https://blog.csdn.net/dkjhl/article/details/126872986