• SpringData、SparkStreaming和Flink集成Elasticsearch


    本文代码链接:https://download.csdn.net/download/shangjg03/88522188

    1 Spring Data框架集成

    1.1 Spring Data框架介绍

    Spring Data是一个用于简化数据库、非关系型数据库、索引库访问,并支持云服务的开源框架。其主要目标是使得对数据的访问变得方便快捷,并支持map-reduce框架和云计算数据服务。 Spring Data可以极大的简化JPA(Elasticsearch…)的写法,可以在几乎不用写实现的情况下,实现对数据的访问和操作。除了CRUD外,还包括如分页、排序等一些常用的功能。

    Spring Data的官网:Spring Data

    Spring Data常用的功能模块如下:

    1.2 Spring Data Elasticsearch介绍

    Spring Data Elasticsearch 基于 spring data API 简化 Elasticsearch操作,将原始操作Elasticsearch的客户端API 进行封装 。Spring Data为Elasticsearch项目提供集成搜索引擎。Spring Data Elasticsearch POJO的关键功能区域为中心的模型与Elastichsearch交互文档和轻松地编写一个存储索引库数据访问层。

    官方网站: https://spring.io/projects/spring-data-elasticsearch

    1.3 Spring Data Elasticsearch版本对比

    目前最新springboot对应Elasticsearch7.6.2,Spring boot2.3.x一般可以兼容Elasticsearch7.x

    1.4 框架集成

    1. 创建Maven项目

    1. 修改pom文件,增加依赖关系

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    5. <modelVersion>4.0.0</modelVersion>
    6. <parent>
    7. <groupId>org.springframework.boot</groupId>
    8. <artifactId>spring-boot-starter-parent</artifactId>
    9. <version>2.3.6.RELEASE</version>
    10. <relativePath/>
    11. </parent>
    12. <groupId>com.shangjack.es</groupId>
    13. <artifactId>springdata-elasticsearch</artifactId>
    14. <version>1.0</version>
    15. <properties>
    16. <maven.compiler.source>8</maven.compiler.source>
    17. <maven.compiler.target>8</maven.compiler.target>
    18. </properties>
    19. <dependencies>
    20. <dependency>
    21. <groupId>org.projectlombok</groupId>
    22. <artifactId>lombok</artifactId>
    23. </dependency>
    24. <dependency>
    25. <groupId>org.springframework.boot</groupId>
    26. <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
    27. </dependency>
    28. <dependency>
    29. <groupId>org.springframework.boot</groupId>
    30. <artifactId>spring-boot-devtools</artifactId>
    31. <scope>runtime</scope>
    32. <optional>true</optional>
    33. </dependency>
    34. <dependency>
    35. <groupId>org.springframework.boot</groupId>
    36. <artifactId>spring-boot-starter-test</artifactId>
    37. <scope>test</scope>
    38. </dependency>
    39. <dependency>
    40. <groupId>org.springframework.boot</groupId>
    41. <artifactId>spring-boot-test</artifactId>
    42. </dependency>
    43. <dependency>
    44. <groupId>junit</groupId>
    45. <artifactId>junit</artifactId>
    46. </dependency>
    47. <dependency>
    48. <groupId>org.springframework</groupId>
    49. <artifactId>spring-test</artifactId>
    50. </dependency>
    51. </dependencies>
    52. </project>

    1. 增加配置文件

    在resources目录中增加application.properties文件

    1. # es服务地址
    2. elasticsearch.host=127.0.0.1
    3. # es服务端口
    4. elasticsearch.port=9200
    5. # 配置日志级别,开启debug日志
    6. logging.level.com.shangjack.es=debug

    1. SpringBoot主程序

    package com.shangjack.es;

    1. import org.springframework.boot.SpringApplication;
    2. import org.springframework.boot.autoconfigure.SpringBootApplication;
    3. @SpringBootApplication
    4. public class SpringDataElasticSearchMainApplication {
    5. public static void main(String[] args) {
    6. SpringApplication.run(SpringDataElasticSearchMainApplication.class,args);
    7. }
    8. }
    9. 数据实体类
    10. package com.shangjack.es;
    11. import lombok.AllArgsConstructor;
    12. import lombok.Data;
    13. import lombok.NoArgsConstructor;
    14. import lombok.ToString;
    15. @Data
    16. @NoArgsConstructor
    17. @AllArgsConstructor
    18. @ToString
    19. public class Product {
    20. private Long id;//商品唯一标识
    21. private String title;//商品名称
    22. private String category;//分类名称
    23. private Double price;//商品价格
    24. private String images;//图片地址
    25. }

    1. 配置类
    • ElasticsearchRestTemplate是spring-data-elasticsearch项目中的一个类,和其他spring项目中的template类似。
    • 在新版的spring-data-elasticsearch中,ElasticsearchRestTemplate代替了原来的ElasticsearchTemplate。
    • 原因是ElasticsearchTemplate基于TransportClient,TransportClient即将在8.x以后的版本中移除。所以,我们推荐使用ElasticsearchRestTemplate。
    • ElasticsearchRestTemplate基于RestHighLevelClient客户端的。需要自定义配置类,继承AbstractElasticsearchConfiguration,并实现elasticsearchClient()抽象方法,创建RestHighLevelClient对象。

    1. package com.shangjack.es;
    2. import lombok.Data;
    3. import org.apache.http.HttpHost;
    4. import org.elasticsearch.client.RestClient;
    5. import org.elasticsearch.client.RestClientBuilder;
    6. import org.elasticsearch.client.RestHighLevelClient;
    7. import org.springframework.boot.context.properties.ConfigurationProperties;
    8. import org.springframework.context.annotation.Configuration;
    9. import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;
    10. @ConfigurationProperties(prefix = "elasticsearch")
    11. @Configuration
    12. @Data
    13. public class ElasticsearchConfig extends AbstractElasticsearchConfiguration {
    14. private String host ;
    15. private Integer port ;
    16. //重写父类方法
    17.     @Override
    18. public RestHighLevelClient elasticsearchClient() {
    19. RestClientBuilder builder = RestClient.builder(new HttpHost(host, port));
    20. RestHighLevelClient restHighLevelClient = new RestHighLevelClient(builder);
    21. return restHighLevelClient;
    22. }
    23. }

    1. DAO数据访问对象

    1. package com.shangjack.es;
    2. import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
    3. import org.springframework.stereotype.Repository;
    4. @Repository
    5. public interface ProductDao extends ElasticsearchRepository<Product,Long> {
    6. }

    1. 实体类映射操作

    1. package com.shangjack.es;
    2. import lombok.AllArgsConstructor;
    3. import lombok.Data;
    4. import lombok.NoArgsConstructor;
    5. import lombok.ToString;
    6. import org.springframework.data.annotation.Id;
    7. import org.springframework.data.elasticsearch.annotations.Document;
    8. import org.springframework.data.elasticsearch.annotations.Field;
    9. import org.springframework.data.elasticsearch.annotations.FieldType;
    10. @Data
    11. @NoArgsConstructor
    12. @AllArgsConstructor
    13. @ToString
    14. @Document(indexName = "shopping", shards = 3, replicas = 1)
    15. public class Product {
    16. //必须有id,这里的id是全局唯一的标识,等同于es中的"_id"
    17.     @Id
    18. private Long id;//商品唯一标识
    19. /**
    20. * type : 字段数据类型
    21. * analyzer : 分词器类型
    22. * index : 是否索引(默认:true)
    23. * Keyword : 短语,不进行分词
    24. */
    25.     @Field(type = FieldType.Text, analyzer = "ik_max_word")
    26. private String title;//商品名称
    27.     @Field(type = FieldType.Keyword)
    28. private String category;//分类名称
    29.     @Field(type = FieldType.Double)
    30. private Double price;//商品价格
    31.     @Field(type = FieldType.Keyword, index = false)
    32. private String images;//图片地址
    33. }

    1. 索引操作

    1. package com.shangjack.es;
    2. import org.junit.Test;
    3. import org.junit.runner.RunWith;
    4. import org.springframework.beans.factory.annotation.Autowired;
    5. import org.springframework.boot.test.context.SpringBootTest;
    6. import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
    7. import org.springframework.test.context.junit4.SpringRunner;
    8. @RunWith(SpringRunner.class)
    9. @SpringBootTest
    10. public class SpringDataESIndexTest {
    11. //注入ElasticsearchRestTemplate
    12.     @Autowired
    13. private ElasticsearchRestTemplate elasticsearchRestTemplate;
    14. //创建索引并增加映射配置
    15.     @Test
    16. public void createIndex(){
    17. //创建索引,系统初始化会自动创建索引
    18. System.out.println("创建索引");
    19. }
    20.     @Test
    21. public void deleteIndex(){
    22. //创建索引,系统初始化会自动创建索引
    23. boolean flg = elasticsearchRestTemplate.deleteIndex(Product.class);
    24. System.out.println("删除索引 = " + flg);
    25. }
    26. }

    1. 文档操作

    1. package com.shangjack.es;
    2. import org.junit.Test;
    3. import org.junit.runner.RunWith;
    4. import org.springframework.beans.factory.annotation.Autowired;
    5. import org.springframework.boot.test.context.SpringBootTest;
    6. import org.springframework.data.domain.Page;
    7. import org.springframework.data.domain.PageRequest;
    8. import org.springframework.data.domain.Sort;
    9. import org.springframework.test.context.junit4.SpringRunner;
    10. import java.util.ArrayList;
    11. import java.util.List;
    12. @RunWith(SpringRunner.class)
    13. @SpringBootTest
    14. public class SpringDataESProductDaoTest {
    15.     @Autowired
    16. private ProductDao productDao;
    17. /**
    18. * 新增
    19. */
    20.     @Test
    21. public void save(){
    22. Product product = new Product();
    23. product.setId(2L);
    24. product.setTitle("华为手机");
    25. product.setCategory("手机");
    26. product.setPrice(2999.0);
    27. product.setImages("http://www.shangjack/hw.jpg");
    28. productDao.save(product);
    29. }
    30. //修改
    31.     @Test
    32. public void update(){
    33. Product product = new Product();
    34. product.setId(1L);
    35. product.setTitle("小米2手机");
    36. product.setCategory("手机");
    37. product.setPrice(9999.0);
    38. product.setImages("http://www.shangjack/xm.jpg");
    39. productDao.save(product);
    40. }
    41. //根据id查询
    42.     @Test
    43. public void findById(){
    44. Product product = productDao.findById(1L).get();
    45. System.out.println(product);
    46. }
    47. //查询所有
    48.     @Test
    49. public void findAll(){
    50. Iterable<Product> products = productDao.findAll();
    51. for (Product product : products) {
    52. System.out.println(product);
    53. }
    54. }
    55. //删除
    56.     @Test
    57. public void delete(){
    58. Product product = new Product();
    59. product.setId(1L);
    60. productDao.delete(product);
    61. }
    62. //批量新增
    63.     @Test
    64. public void saveAll(){
    65. List<Product> productList = new ArrayList<>();
    66. for (int i = 0; i < 10; i++) {
    67. Product product = new Product();
    68. product.setId(Long.valueOf(i));
    69. product.setTitle("["+i+"]小米手机");
    70. product.setCategory("手机");
    71. product.setPrice(1999.0+i);
    72. product.setImages("http://www.shangjack/xm.jpg");
    73. productList.add(product);
    74. }
    75. productDao.saveAll(productList);
    76. }
    77. //分页查询
    78.     @Test
    79. public void findByPageable(){
    80. //设置排序(排序方式,正序还是倒序,排序的id)
    81. Sort sort = Sort.by(Sort.Direction.DESC,"id");
    82. int currentPage=0;//当前页,第一页从0开始,1表示第二页
    83. int pageSize = 5;//每页显示多少条
    84. //设置查询分页
    85. PageRequest pageRequest = PageRequest.of(currentPage, pageSize,sort);
    86. //分页查询
    87. Page<Product> productPage = productDao.findAll(pageRequest);
    88. for (Product Product : productPage.getContent()) {
    89. System.out.println(Product);
    90. }
    91. }
    92. }

    1. 文档搜索

    1. package com.shangjack.es;
    2. import org.elasticsearch.index.query.QueryBuilders;
    3. import org.elasticsearch.index.query.TermQueryBuilder;
    4. import org.junit.Test;
    5. import org.junit.runner.RunWith;
    6. import org.springframework.beans.factory.annotation.Autowired;
    7. import org.springframework.boot.test.context.SpringBootTest;
    8. import org.springframework.data.domain.PageRequest;
    9. import org.springframework.test.context.junit4.SpringRunner;
    10. @RunWith(SpringRunner.class)
    11. @SpringBootTest
    12. public class SpringDataESSearchTest {
    13.     @Autowired
    14. private ProductDao productDao;
    15. /**
    16. * term查询
    17. * search(termQueryBuilder) 调用搜索方法,参数查询构建器对象
    18. */
    19.     @Test
    20. public void termQuery(){
    21. TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", "小米");
    22. Iterable<Product> products = productDao.search(termQueryBuilder);
    23. for (Product product : products) {
    24. System.out.println(product);
    25. }
    26. }
    27. /**
    28. * term查询加分页
    29. */
    30.     @Test
    31. public void termQueryByPage(){
    32. int currentPage= 0 ;
    33. int pageSize = 5;
    34. //设置查询分页
    35. PageRequest pageRequest = PageRequest.of(currentPage, pageSize);
    36. TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", "小米");
    37. Iterable<Product> products = productDao.search(termQueryBuilder,pageRequest);
    38. for (Product product : products) {
    39. System.out.println(product);
    40. }
    41. }
    42. }

    2 Spark Streaming框架集成

    2.1 Spark Streaming框架介绍

    Spark Streaming是Spark core API的扩展,支持实时数据流的处理,并且具有可扩展,高吞吐量,容错的特点。 数据可以从许多来源获取,如Kafka,Flume,Kinesis或TCP sockets,并且可以使用复杂的算法进行处理,这些算法使用诸如map,reduce,join和window等高级函数表示。 最后,处理后的数据可以推送到文件系统,数据库等。 实际上,您可以将Spark的机器学习和图形处理算法应用于数据流。

    2.2 框架集成

    1. 创建Maven项目

    1. 修改pom文件,增加依赖关系

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    5. <modelVersion>4.0.0</modelVersion>
    6. <groupId>com.shangjack.es</groupId>
    7. <artifactId>sparkstreaming-elasticsearch</artifactId>
    8. <version>1.0</version>
    9. <properties>
    10. <maven.compiler.source>8</maven.compiler.source>
    11. <maven.compiler.target>8</maven.compiler.target>
    12. </properties>
    13. <dependencies>
    14. <dependency>
    15. <groupId>org.apache.spark</groupId>
    16. <artifactId>spark-core_2.12</artifactId>
    17. <version>3.0.0</version>
    18. </dependency>
    19. <dependency>
    20. <groupId>org.apache.spark</groupId>
    21. <artifactId>spark-streaming_2.12</artifactId>
    22. <version>3.0.0</version>
    23. </dependency>
    24. <dependency>
    25. <groupId>org.elasticsearch</groupId>
    26. <artifactId>elasticsearch</artifactId>
    27. <version>7.8.0</version>
    28. </dependency>
    29. <!-- elasticsearch的客户端 -->
    30. <dependency>
    31. <groupId>org.elasticsearch.client</groupId>
    32. <artifactId>elasticsearch-rest-high-level-client</artifactId>
    33. <version>7.8.0</version>
    34. </dependency>
    35. <!-- elasticsearch依赖2.x的log4j -->
    36. <dependency>
    37. <groupId>org.apache.logging.log4j</groupId>
    38. <artifactId>log4j-api</artifactId>
    39. <version>2.8.2</version>
    40. </dependency>
    41. <dependency>
    42. <groupId>org.apache.logging.log4j</groupId>
    43. <artifactId>log4j-core</artifactId>
    44. <version>2.8.2</version>
    45. </dependency>
    46. <!-- <dependency>-->
    47. <!-- <groupId>com.fasterxml.jackson.core</groupId>-->
    48. <!-- <artifactId>jackson-databind</artifactId>-->
    49. <!-- <version>2.11.1</version>-->
    50. <!-- </dependency>-->
    51. <!-- &lt;!&ndash; junit单元测试 &ndash;&gt;-->
    52. <!-- <dependency>-->
    53. <!-- <groupId>junit</groupId>-->
    54. <!-- <artifactId>junit</artifactId>-->
    55. <!-- <version>4.12</version>-->
    56. <!-- </dependency>-->
    57. </dependencies>
    58. </project>

    1. 功能实现

    1. package com.shangjack.es
    2. import org.apache.http.HttpHost
    3. import org.apache.spark.SparkConf
    4. import org.apache.spark.streaming.dstream.ReceiverInputDStream
    5. import org.apache.spark.streaming.{Seconds, StreamingContext}
    6. import org.elasticsearch.action.index.IndexRequest
    7. import org.elasticsearch.client.indices.CreateIndexRequest
    8. import org.elasticsearch.client.{RequestOptions, RestClient, RestHighLevelClient}
    9. import org.elasticsearch.common.xcontent.XContentType
    10. import java.util.Date
    11. object SparkStreamingESTest {
    12. def main(args: Array[String]): Unit = {
    13. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("ESTest")
    14. val ssc = new StreamingContext(sparkConf, Seconds(3))
    15. val ds: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
    16. ds.foreachRDD(
    17. rdd => {
    18. println("*************** " + new Date())
    19. rdd.foreach(
    20. data => {
    21. val client = new RestHighLevelClient(
    22. RestClient.builder(new HttpHost("localhost", 9200, "http"))
    23. );
    24. // 新增文档 - 请求对象
    25. val request = new IndexRequest();
    26. // 设置索引及唯一性标识
    27. val ss = data.split(" ")
    28. println("ss = " + ss.mkString(","))
    29. request.index("sparkstreaming").id(ss(0));
    30. val productJson =
    31. s"""
    32. | { "data":"${ss(1)}" }
    33. |""".stripMargin;
    34. // 添加文档数据,数据格式为JSON格式
    35. request.source(productJson,XContentType.JSON);
    36. // 客户端发送请求,获取响应对象
    37. val response = client.index(request, RequestOptions.DEFAULT);
    38. System.out.println("_index:" + response.getIndex());
    39. System.out.println("_id:" + response.getId());
    40. System.out.println("_result:" + response.getResult());
    41. client.close()
    42. }
    43. )
    44. }
    45. )
    46. ssc.start()
    47. ssc.awaitTermination()
    48. }
    49. }

    3 Flink框架集成

    3.1 Flink框架介绍

    Apache Spark是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。

    Apache Spark掀开了内存计算的先河,以内存作为赌注,赢得了内存计算的飞速发展。但是在其火热的同时,开发人员发现,在Spark中,计算框架普遍存在的缺点和不足依然没有完全解决,而这些问题随着5G时代的来临以及决策者对实时数据分析结果的迫切需要而凸显的更加明显:

    • 数据精准一次性处理(Exactly-Once)
    • 乱序数据,迟到数据
    • 低延迟,高吞吐,准确性
    • 容错性

    Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。在Spark火热的同时,也默默地发展自己,并尝试着解决其他计算框架的问题。

    慢慢地,随着这些问题的解决,Flink慢慢被绝大数程序员所熟知并进行大力推广,阿里公司在2015年改进Flink,并创建了内部分支Blink,目前服务于阿里集团内部搜索、推荐、广告和蚂蚁等大量核心实时业务。

    3.2 框架集成

    1. 创建Maven项目

    1. 修改pom文件,增加相关依赖类库

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    5. <modelVersion>4.0.0</modelVersion>
    6. <groupId>com.shangjack.es</groupId>
    7. <artifactId>flink-elasticsearch</artifactId>
    8. <version>1.0</version>
    9. <properties>
    10. <maven.compiler.source>8</maven.compiler.source>
    11. <maven.compiler.target>8</maven.compiler.target>
    12. </properties>
    13. <dependencies>
    14. <dependency>
    15. <groupId>org.apache.flink</groupId>
    16. <artifactId>flink-scala_2.12</artifactId>
    17. <version>1.12.0</version>
    18. </dependency>
    19. <dependency>
    20. <groupId>org.apache.flink</groupId>
    21. <artifactId>flink-streaming-scala_2.12</artifactId>
    22. <version>1.12.0</version>
    23. </dependency>
    24. <dependency>
    25. <groupId>org.apache.flink</groupId>
    26. <artifactId>flink-clients_2.12</artifactId>
    27. <version>1.12.0</version>
    28. </dependency>
    29. <dependency>
    30. <groupId>org.apache.flink</groupId>
    31. <artifactId>flink-connector-elasticsearch7_2.11</artifactId>
    32. <version>1.12.0</version>
    33. </dependency>
    34. <!-- jackson -->
    35. <dependency>
    36. <groupId>com.fasterxml.jackson.core</groupId>
    37. <artifactId>jackson-core</artifactId>
    38. <version>2.11.1</version>
    39. </dependency>
    40. </dependencies>
    41. </project>

    1. 功能实现

    1. package com.shangjack.es;
    2. import org.apache.flink.api.common.functions.RuntimeContext;
    3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    5. import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
    6. import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
    7. import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
    8. import org.apache.http.HttpHost;
    9. import org.elasticsearch.action.index.IndexRequest;
    10. import org.elasticsearch.client.Requests;
    11. import java.util.ArrayList;
    12. import java.util.HashMap;
    13. import java.util.List;
    14. import java.util.Map;
    15. public class FlinkElasticsearchSinkTest {
    16. public static void main(String[] args) throws Exception {
    17. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    18. DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
    19. List<HttpHost> httpHosts = new ArrayList<>();
    20. httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
    21. //httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));
    22. // use a ElasticsearchSink.Builder to create an ElasticsearchSink
    23. ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
    24. httpHosts,
    25. new ElasticsearchSinkFunction<String>() {
    26. public IndexRequest createIndexRequest(String element) {
    27. Map<String, String> json = new HashMap<>();
    28. json.put("data", element);
    29. return Requests.indexRequest()
    30. .index("my-index")
    31. //.type("my-type")
    32. .source(json);
    33. }
    34.                     @Override
    35. public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
    36. indexer.add(createIndexRequest(element));
    37. }
    38. }
    39. );
    40. // configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
    41. esSinkBuilder.setBulkFlushMaxActions(1);
    42. // provide a RestClientFactory for custom configuration on the internally created REST client
    43. // esSinkBuilder.setRestClientFactory(
    44. // restClientBuilder -> {
    45. // restClientBuilder.setDefaultHeaders(...)
    46. // restClientBuilder.setMaxRetryTimeoutMillis(...)
    47. // restClientBuilder.setPathPrefix(...)
    48. // restClientBuilder.setHttpClientConfigCallback(...)
    49. // }
    50. // );
    51. source.addSink(esSinkBuilder.build());
    52. env.execute("flink-es");
    53. }
    54. }

  • 相关阅读:
    The 2022 ICPC Asia Xian Regional Contest--C. Clone Ranran
    数据可视化系列教程之组件构成
    面试题:CSS 怎样实现动画?
    攻防世界数据逆向 2023
    网络系统管理 - GWServer虚拟机配置
    信号的处理时机(内核态,用户态,如何/为什么相互转换,内核空间,cpu寄存器),信号的处理流程详细介绍+抽象图解
    L1-023 输出GPLT C++解法【全网最细讲解】
    Clickhouse:clickhouse切换目录
    深度解读《深度探索C++对象模型》之数据成员的存取效率分析(一)
    基于昇腾AI 使用AscendCL实现垃圾分类和视频物体分类应用
  • 原文地址:https://blog.csdn.net/shangjg03/article/details/134336259