• Elasticsearch:Apache spark 大数据集成


    Elasticsearch 已成为大数据架构中的常用组件,因为它提供了以下几个特性:

    • 它使你可以快速搜索大量数据。
    • 对于常见的聚合操作,它提供对大数据的实时分析。
    • 使用 Elasticsearch 聚合比使用 Spark 聚合更容易。
    • 如果你需要转向快速数据解决方案,在查询后从文档子集开始比对所有数据进行全面重新扫描要快。

    用于处理数据的最常见的大数据软件现在是 Apache Spark (http://spark.apache.org/),它被认为是过时的 Hadoop MapReduce 的演变,用于将处理从磁盘移动到内存。
    在本中,我们将看到如何将 Elasticsearch 集成到 Spark 中,用于写入和读取数据。 最后,我们将看到如何使用 Apache Pig 以一种简单的方式在Elasticsearch 中写入数据。

    安装 Spark

    要使用 Apache Spark,我们需要安装它。 这个过程非常简单,因为它的要求不是需要 Apache ZooKeeper 和 Hadoop 分布式文件系统 (HDFS) 的传统 Hadoop。 Apache Spark 可以在类似于 Elasticsearch 的独立节点安装中工作。

    要安装 Apache Spark,我们将执行以下步骤:

    1)从 https://spark.apache.org/downloads.html 下载二进制发行版。 对于一般用途,我建议你使用以下请求下载标准版本:

    wget https://www.apache.org/dyn/closer.lua/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz

    2)现在,我们可以使用 tar 提取 Spark 分发包,如下所示:

    tar xzvf spark-3.3.0-bin-hadoop3.tgz

    3)现在,我们可以通过执行测试来测试 Apache Spark 是否正常工作,如下:

    1. $ cd spark-3.3.0-bin-hadoop3
    2. $ ./bin/run-example SparkPi 10

    如果我们看到类似上面的输出,则标明我们的安装是成功的。

    我们甚至可以之前启动 Spark Shell:

    ./bin/spark-shell

     

     现在,可以插入要在集群中执行的命令行命令。

    安装 Elasticsearch 及 Kibana

    如果你还没有安装好自己的 Elasticsearch 及 Kibana,请参阅如下的文章:

    在今天的展示中,我将使用最新的 Elastic Stack 8.3.2 来进行展示。为了演示的方便,我们在安装 Elasticsearch 时,可以选择不启动 HTTPS 的访问。为此,我们可以参照之前的文章 “Elastic Stack 8.0 安装 - 保护你的 Elastic Stack 现在比以往任何时候都简单” 中的 “如何配置 Elasticsearch 只带有基本安全” 一节来进行安装。当我们安装好 Elasticsearch 及 Kibana 后,我们只需使用用户名及密码来进行访问。为了说明问题的方便,我们的超级用户 elastic 的密码设置为 password。

    使用 Apache spark 摄入数据到 Elasticsearch

    现在我们已经安装了 Apache Spark 及 Elasticsearch,我们可以将其配置为与 Elasticsearch 一起工作并在其中写入一些数据。现在我们已经安装了 Apache Spark,我们可以将其配置为与 Elasticsearch 一起工作并在其中写入一些数据。

    1)我们需要下载 Elasticsearch Spark .jar 文件,如下:

    1. wget https://repo1.maven.org/maven2/org/elasticsearch/elasticsearch-hadoop/8.3.2/elasticsearch-hadoop-8.3.2.zip
    2. tar xzf elasticsearch-hadoop-8.3.2.zip

    或者,你也可以使用如下的方法来进行下载 elasticsearch-hadoop 安装包:

    1. wget -c https://artifacts.elastic.co/downloads/elasticsearch-hadoop/elasticsearch-hadoop-8.3.2.zip
    2. tar xzf elasticsearch-hadoop-8.3.2.zip

    2)在 Elasticsearch 中访问 Spark shell 的一种快速方法是复制 Spark 的 jar 目录中所需的 Elasticsearch Hadoop 文件。 必须复制的文件是 elasticsearch-spark-20_2.11-8.3.2.jar。

    1. $ pwd
    2. /Users/liuxg/java/spark/spark-3.3.0-bin-hadoop3/jars
    3. $ ls elasticsearch-spark-20_2.11-8.3.2.jar
    4. elasticsearch-spark-20_2.11-8.3.2.jar

    从上面的版本信息中,我们可以看出来 Scala 的版本信息是 2.11。 这个在我们下面 IDE 的开发环境中一定要注意。

    要使用 Apache Spark 在 Elasticsearch 中存储数据,我们将执行以下步骤:

    1)在 Spark 的根目录中,通过运行以下命令启动 Spark shell 以应用 Elasticsearch 配置:

    1. ./bin/spark-shell \
    2. --conf spark.es.index.auto.create=true \
    3. --conf spark.es.net.http.auth.user=$ES_USER \
    4. --conf spark.es.net.http.auth.pass=$ES_PASSWORD

    ES_USER 和 ES_PASSWORD 是保存 Elasticsearch 集群凭据的环境变量。

     2)在使用 Elasticsearch 特殊的韧性分布式数据集 (Resilient Distributed Dataset -  RDD) 之前,我们将导入 Elasticsearch Spark 隐式,如下:

    import org.elasticsearch.spark._

    3)我们将创建两个要索引的文档,如下所示:

    1. val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
    2. val airports = Map("arrival" -> "Otopeni", "SFO" -> "SanFran")

    4)现在,我们可以创建一个 RDD 并将文档保存在 Elasticsearch 中,如下所示:

    sc.makeRDD(Seq(numbers, airports)).saveToEs("spark")

     我们回到 Kibana 的界面进行查看:

    GET spark/_search

    从上面的输出中我们可以看出来有两个文档已经成功地写入到 Elasticsearch 中了。

    上面是如何工作的?

    通过 Spark 在 Elasticsearch 中存储文档非常简单。 在 shell 上下文中启动 Spark shell 后,sc 变量可用,其中包含 SparkContext。 如果我们需要将值传递给底层 Elasticsearch 配置,我们需要在 Spark shell 命令行中进行设置。
    有几种配置可以设置(如果通过命令行传递,加 spark.前缀); 以下是最常用的:

    • es.index.auto.create:如果索引不存在,则用于创建索引。
    • es.nodes:这用于定义要连接的节点列表(默认本地主机)。
    • es.port:用于定义要连接的 HTTP Elasticsearch 端口(默认 9200)。
    • es.ingest.pipeline:用于定义要使用的摄取管道(默认无)。
    • es.mapping.id:这个用来定义一个字段来提取ID值(默认无)。
    • es.mapping.parent:这用于定义一个字段以提取父值(默认无)。

    简单文档可以定义为 Map[String, AnyRef],并且可以通过 RDD(集合上的特殊 Spark 抽象)对它们进行索引。 通过 org.elasticsearch.spark 中可用的隐式函数,RDD 有一个名为 saveToEs 的新方法,允许你定义要用于索引的对索引或文档:

    sc.makeRDD(Seq(numbers, airports)).saveToEs("spark")

    使用 meta 来写入数据

    使用简单的 map 来摄取数据并不适合简单的工作。 Spark 中的最佳实践是使用案例类(case class),这样你就可以快速序列化并可以管理复杂的类型检查。 在索引期间,提供自定义 ID 会非常方便。 在下面,我们将看到如何涵盖这些问题。

    要使用 Apache Spark 在 Elasticsearch 中存储数据,我们将执行以下步骤:

    1)在 Spark 根目录中,通过运行以下命令启动 Spark shell 以应用 Elasticsearch 配置:

    1. ./bin/spark-shell \
    2. --conf spark.es.index.auto.create=true \
    3. --conf spark.es.net.http.auth.user=$ES_USER \
    4. --conf spark.es.net.http.auth.pass=$ES_PASSWORD

    2)我们将导入所需的类,如下所示:

    import org.elasticsearch.spark.rdd.EsSpark

    3)我们将创建案例类 Person,如下:

    case class Person(username:String, name:String, age:Int)

    4)我们将创建两个要被索引的文档,如下所示:

    val persons = Seq(Person("bob", "Bob",19), Person("susan","Susan",21))

    5)现在,我们可以创建 RDD,如下:

    val rdd=sc.makeRDD(persons)

    6)我们可以使用 EsSpark 对它们进行索引,如下所示:

    EsSpark.saveToEs(rdd, "spark2", Map("es.mapping.id" -> "username"))

     我们回到 Kibana 中来进行查看:

    GET spark2/_search

    从上面的输出中,我们可以看到有两个文档被成功地写入到 Elasticsearch 中,并且它们的 id 是 Person 中的 username。

    通过 IDE 写入到 Elasticsearch 中

    在这个练习中,我们使用 IDE 工具来进行展示。在这里,你可以选择自己喜欢的 IDE 来进行。我选择 Intelij 来展示。你需要安装 Scala 插件。我们来创建一个叫做 SparkDemo 的项目。它的 build.sbt 如下:

    build.sbt

    1. name := "SparkDemo"
    2. version := "0.1"
    3. scalaVersion := "2.11.12"
    4. // https://mvnrepository.com/artifact/org.apache.spark/spark-core
    5. libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.3"
    6. // https://mvnrepository.com/artifact/org.apache.spark/spark-sql
    7. libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.3"

    请注意上面的 2.11.12 scalaVersion。在上面,我们介绍了,elasticsearch-spark 在目前位置是使用 scala 2.11 版本来开发的。我们可以选择一个 Scala 的发行版本。我们需要使用到 spark-core 及 spark-sql 两个包。我们到地址 https://mvnrepository.com/artifact/org.apache.spark 来进行查看:

    在上面,我们可以查看到 spark-core 的想要的版本依赖。依照同样的方法,我们可以找到 spark-sql 的依赖配置。

    为了能够访问 Elasticsearch,我们也可以在 IDE 中直接加载我们之前下载的 elasticsearch-spark-20_2.11-8.3.2.jar 安装包:

    我们接下来创建如下的 scala 文件:

    SparkDemo.scala

    1. import org.apache.spark.sql.SparkSession
    2. import org.elasticsearch.spark.sql._
    3. object SparkDemo {
    4. def main(args: Array[String]): Unit = {
    5. SparkDemo.writeToIndex()
    6. }
    7. def writeToIndex(): Unit = {
    8. val spark = SparkSession
    9. .builder()
    10. .appName("WriteToES")
    11. .master("local[*]")
    12. .config("spark.es.nodes","localhost")
    13. .config("spark.es.port","9200")
    14. .config("spark.es.nodes.wan.only","true") // Needed for ES on AWS
    15. .config("spark.es.net.http.auth.user", "elastic")
    16. .config("spark.es.net.http.auth.pass", "password")
    17. .getOrCreate()
    18. import spark.implicits._
    19. val indexDocuments = Seq (
    20. AlbumIndex("Led Zeppelin",1969,"Led Zeppelin"),
    21. AlbumIndex("Boston",1976,"Boston"),
    22. AlbumIndex("Fleetwood Mac", 1979,"Tusk")
    23. ).toDF
    24. indexDocuments.saveToEs("albumindex")
    25. }
    26. }
    27. case class AlbumIndex(artist:String, yearOfRelease:Int, albumName: String)

    请注意在上面我们定义 elastic 用户的密码为 password。你需要根据自己的配置进行相应的修改。运行上面的代码。运行完后,我们可以在 Kibana 中进行查看:

    GET albumindex/_search

    本质上,这个代码和我们在上面通过命令行来操作所生成的结果是一模一样的。它是通过 AlbumIndex 这个 case class 进行写入的。

    把 JSON 文件写入到 Elasticsearch 中

    我们接下来创建一个如下的 JSON 文件:

    1. $ pwd
    2. /Users/liuxg/java/spark
    3. $ cat sample_json
    4. [ { "color": "red", "value": "#f00" }, { "color": "green", "value": "#0f0" }, { "color": "blue", "value": "#00f" }, { "color": "cyan", "value": "#0ff" }, { "color": "magenta", "value": "#f0f" }, { "color": "yellow", "value": "#ff0" }, { "color": "black", "value": "#000" } ]

    如上所示,上面是一个非常简单的 JSON 文件。我们接下来改写我们上面书写的 SparkDemo.scala 文件:

    SparkDemo.scala

    1. import org.apache.spark.sql.SparkSession
    2. import org.elasticsearch.spark.sql._
    3. object SparkDemo {
    4. def main(args: Array[String]): Unit = {
    5. // Configuration
    6. val spark = SparkSession
    7. .builder()
    8. .appName("WriteJSONToES")
    9. .master("local[*]")
    10. .config("spark.es.nodes", "localhost")
    11. .config("spark.es.port", "9200")
    12. .config("spark.es.net.http.auth.user", "elastic")
    13. .config("spark.es.net.http.auth.pass", "password")
    14. .getOrCreate()
    15. // Create dataframe
    16. val frame = spark.read.json("/Users/liuxg/java/spark/sample_json")
    17. // Write to ES with index name in lower case
    18. frame.saveToEs("dataframejsonindex")
    19. }
    20. }

    运行上面的应用,并在 Kibana 中进行查看:

    GET dataframejsonindex/_search

    如上所示,我们可以看到有7个文档已经被成功地写入到 Elasticsearch 中。

    写入 CSV 文档到 Elasticsearch 中 

    如法炮制,我们也可以把 CSV 文件写入到 Elasticsearch 中。我们首先创建如下的一个 CSV 文件:

    cities.csv

    1. LatD, LatM, LatS, NS, LonD, LonM, LonS, EW", City, State
    2. 41, 5, 59, "N", 80, 39, 0, "W", "Youngstown", OH
    3. 42, 52, 48, "N", 97, 23, 23, "W", "Yankton", SD
    4. 46, 35, 59, "N", 120, 30, 36, "W", "Yakima", WA
    5. 42, 16, 12, "N", 71, 48, 0, "W", "Worcester", MA
    6. 43, 37, 48, "N", 89, 46, 11, "W", "Wisconsin Dells", WI
    7. 36, 5, 59, "N", 80, 15, 0, "W", "Winston-Salem", NC
    8. 49, 52, 48, "N", 97, 9, 0, "W", "Winnipeg", MB
    9. 39, 11, 23, "N", 78, 9, 36, "W", "Winchester", VA
    10. 34, 14, 24, "N", 77, 55, 11, "W", "Wilmington", NC
    11. 39, 45, 0, "N", 75, 33, 0, "W", "Wilmington", DE
    12. 48, 9, 0, "N", 103, 37, 12, "W", "Williston", ND
    13. 41, 15, 0, "N", 77, 0, 0, "W", "Williamsport", PA
    14. 37, 40, 48, "N", 82, 16, 47, "W", "Williamson", WV
    15. 33, 54, 0, "N", 98, 29, 23, "W", "Wichita Falls", TX
    16. 37, 41, 23, "N", 97, 20, 23, "W", "Wichita", KS
    17. 40, 4, 11, "N", 80, 43, 12, "W", "Wheeling", WV
    18. 26, 43, 11, "N", 80, 3, 0, "W", "West Palm Beach", FL
    19. 47, 25, 11, "N", 120, 19, 11, "W", "Wenatchee", WA
    20. 41, 25, 11, "N", 122, 23, 23, "W", "Weed", CA
    21. 31, 13, 11, "N", 82, 20, 59, "W", "Waycross", GA
    22. 44, 57, 35, "N", 89, 38, 23, "W", "Wausau", WI
    23. 42, 21, 36, "N", 87, 49, 48, "W", "Waukegan", IL
    24. 44, 54, 0, "N", 97, 6, 36, "W", "Watertown", SD
    25. 43, 58, 47, "N", 75, 55, 11, "W", "Watertown", NY
    26. 42, 30, 0, "N", 92, 20, 23, "W", "Waterloo", IA
    27. 41, 32, 59, "N", 73, 3, 0, "W", "Waterbury", CT
    28. 38, 53, 23, "N", 77, 1, 47, "W", "Washington", DC
    29. 41, 50, 59, "N", 79, 8, 23, "W", "Warren", PA
    30. 46, 4, 11, "N", 118, 19, 48, "W", "Walla Walla", WA
    31. 31, 32, 59, "N", 97, 8, 23, "W", "Waco", TX
    32. 38, 40, 48, "N", 87, 31, 47, "W", "Vincennes", IN
    33. 28, 48, 35, "N", 97, 0, 36, "W", "Victoria", TX
    34. 32, 20, 59, "N", 90, 52, 47, "W", "Vicksburg", MS
    35. 49, 16, 12, "N", 123, 7, 12, "W", "Vancouver", BC
    36. 46, 55, 11, "N", 98, 0, 36, "W", "Valley City", ND
    37. 30, 49, 47, "N", 83, 16, 47, "W", "Valdosta", GA
    38. 43, 6, 36, "N", 75, 13, 48, "W", "Utica", NY
    39. 39, 54, 0, "N", 79, 43, 48, "W", "Uniontown", PA
    40. 32, 20, 59, "N", 95, 18, 0, "W", "Tyler", TX
    41. 42, 33, 36, "N", 114, 28, 12, "W", "Twin Falls", ID
    42. 33, 12, 35, "N", 87, 34, 11, "W", "Tuscaloosa", AL
    43. 34, 15, 35, "N", 88, 42, 35, "W", "Tupelo", MS
    44. 36, 9, 35, "N", 95, 54, 36, "W", "Tulsa", OK
    45. 32, 13, 12, "N", 110, 58, 12, "W", "Tucson", AZ
    46. 37, 10, 11, "N", 104, 30, 36, "W", "Trinidad", CO
    47. 40, 13, 47, "N", 74, 46, 11, "W", "Trenton", NJ
    48. 44, 45, 35, "N", 85, 37, 47, "W", "Traverse City", MI
    49. 43, 39, 0, "N", 79, 22, 47, "W", "Toronto", ON
    50. 39, 2, 59, "N", 95, 40, 11, "W", "Topeka", KS
    51. 41, 39, 0, "N", 83, 32, 24, "W", "Toledo", OH
    52. 33, 25, 48, "N", 94, 3, 0, "W", "Texarkana", TX
    53. 39, 28, 12, "N", 87, 24, 36, "W", "Terre Haute", IN
    54. 27, 57, 0, "N", 82, 26, 59, "W", "Tampa", FL
    55. 30, 27, 0, "N", 84, 16, 47, "W", "Tallahassee", FL
    56. 47, 14, 24, "N", 122, 25, 48, "W", "Tacoma", WA
    57. 43, 2, 59, "N", 76, 9, 0, "W", "Syracuse", NY
    58. 32, 35, 59, "N", 82, 20, 23, "W", "Swainsboro", GA
    59. 33, 55, 11, "N", 80, 20, 59, "W", "Sumter", SC
    60. 40, 59, 24, "N", 75, 11, 24, "W", "Stroudsburg", PA
    61. 37, 57, 35, "N", 121, 17, 24, "W", "Stockton", CA
    62. 44, 31, 12, "N", 89, 34, 11, "W", "Stevens Point", WI
    63. 40, 21, 36, "N", 80, 37, 12, "W", "Steubenville", OH
    64. 40, 37, 11, "N", 103, 13, 12, "W", "Sterling", CO
    65. 38, 9, 0, "N", 79, 4, 11, "W", "Staunton", VA
    66. 39, 55, 11, "N", 83, 48, 35, "W", "Springfield", OH
    67. 37, 13, 12, "N", 93, 17, 24, "W", "Springfield", MO
    68. 42, 5, 59, "N", 72, 35, 23, "W", "Springfield", MA
    69. 39, 47, 59, "N", 89, 39, 0, "W", "Springfield", IL
    70. 47, 40, 11, "N", 117, 24, 36, "W", "Spokane", WA
    71. 41, 40, 48, "N", 86, 15, 0, "W", "South Bend", IN
    72. 43, 32, 24, "N", 96, 43, 48, "W", "Sioux Falls", SD
    73. 42, 29, 24, "N", 96, 23, 23, "W", "Sioux City", IA
    74. 32, 30, 35, "N", 93, 45, 0, "W", "Shreveport", LA
    75. 33, 38, 23, "N", 96, 36, 36, "W", "Sherman", TX
    76. 44, 47, 59, "N", 106, 57, 35, "W", "Sheridan", WY
    77. 35, 13, 47, "N", 96, 40, 48, "W", "Seminole", OK
    78. 32, 25, 11, "N", 87, 1, 11, "W", "Selma", AL
    79. 38, 42, 35, "N", 93, 13, 48, "W", "Sedalia", MO
    80. 47, 35, 59, "N", 122, 19, 48, "W", "Seattle", WA
    81. 41, 24, 35, "N", 75, 40, 11, "W", "Scranton", PA
    82. 41, 52, 11, "N", 103, 39, 36, "W", "Scottsbluff", NB
    83. 42, 49, 11, "N", 73, 56, 59, "W", "Schenectady", NY
    84. 32, 4, 48, "N", 81, 5, 23, "W", "Savannah", GA
    85. 46, 29, 24, "N", 84, 20, 59, "W", "Sault Sainte Marie", MI
    86. 27, 20, 24, "N", 82, 31, 47, "W", "Sarasota", FL
    87. 38, 26, 23, "N", 122, 43, 12, "W", "Santa Rosa", CA
    88. 35, 40, 48, "N", 105, 56, 59, "W", "Santa Fe", NM
    89. 34, 25, 11, "N", 119, 41, 59, "W", "Santa Barbara", CA
    90. 33, 45, 35, "N", 117, 52, 12, "W", "Santa Ana", CA
    91. 37, 20, 24, "N", 121, 52, 47, "W", "San Jose", CA
    92. 37, 46, 47, "N", 122, 25, 11, "W", "San Francisco", CA
    93. 41, 27, 0, "N", 82, 42, 35, "W", "Sandusky", OH
    94. 32, 42, 35, "N", 117, 9, 0, "W", "San Diego", CA
    95. 34, 6, 36, "N", 117, 18, 35, "W", "San Bernardino", CA
    96. 29, 25, 12, "N", 98, 30, 0, "W", "San Antonio", TX
    97. 31, 27, 35, "N", 100, 26, 24, "W", "San Angelo", TX
    98. 40, 45, 35, "N", 111, 52, 47, "W", "Salt Lake City", UT
    99. 38, 22, 11, "N", 75, 35, 59, "W", "Salisbury", MD
    100. 36, 40, 11, "N", 121, 39, 0, "W", "Salinas", CA
    101. 38, 50, 24, "N", 97, 36, 36, "W", "Salina", KS
    102. 38, 31, 47, "N", 106, 0, 0, "W", "Salida", CO
    103. 44, 56, 23, "N", 123, 1, 47, "W", "Salem", OR
    104. 44, 57, 0, "N", 93, 5, 59, "W", "Saint Paul", MN
    105. 38, 37, 11, "N", 90, 11, 24, "W", "Saint Louis", MO
    106. 39, 46, 12, "N", 94, 50, 23, "W", "Saint Joseph", MO
    107. 42, 5, 59, "N", 86, 28, 48, "W", "Saint Joseph", MI
    108. 44, 25, 11, "N", 72, 1, 11, "W", "Saint Johnsbury", VT
    109. 45, 34, 11, "N", 94, 10, 11, "W", "Saint Cloud", MN
    110. 29, 53, 23, "N", 81, 19, 11, "W", "Saint Augustine", FL
    111. 43, 25, 48, "N", 83, 56, 24, "W", "Saginaw", MI
    112. 38, 35, 24, "N", 121, 29, 23, "W", "Sacramento", CA
    113. 43, 36, 36, "N", 72, 58, 12, "W", "Rutland", VT
    114. 33, 24, 0, "N", 104, 31, 47, "W", "Roswell", NM
    115. 35, 56, 23, "N", 77, 48, 0, "W", "Rocky Mount", NC
    116. 41, 35, 24, "N", 109, 13, 48, "W", "Rock Springs", WY
    117. 42, 16, 12, "N", 89, 5, 59, "W", "Rockford", IL
    118. 43, 9, 35, "N", 77, 36, 36, "W", "Rochester", NY
    119. 44, 1, 12, "N", 92, 27, 35, "W", "Rochester", MN
    120. 37, 16, 12, "N", 79, 56, 24, "W", "Roanoke", VA
    121. 37, 32, 24, "N", 77, 26, 59, "W", "Richmond", VA
    122. 39, 49, 48, "N", 84, 53, 23, "W", "Richmond", IN
    123. 38, 46, 12, "N", 112, 5, 23, "W", "Richfield", UT
    124. 45, 38, 23, "N", 89, 25, 11, "W", "Rhinelander", WI
    125. 39, 31, 12, "N", 119, 48, 35, "W", "Reno", NV
    126. 50, 25, 11, "N", 104, 39, 0, "W", "Regina", SA
    127. 40, 10, 48, "N", 122, 14, 23, "W", "Red Bluff", CA
    128. 40, 19, 48, "N", 75, 55, 48, "W", "Reading", PA
    129. 41, 9, 35, "N", 81, 14, 23, "W", "Ravenna", OH

    我们重新修改上面的 SparkDemo.scala 文件:

    SparkDemo.scala

    1. import org.apache.spark.sql.SparkSession
    2. import org.elasticsearch.spark.sql._
    3. object SparkDemo {
    4. def main(args: Array[String]): Unit = {
    5. // Configuration
    6. val spark = SparkSession
    7. .builder()
    8. .appName("WriteCSVToES")
    9. .master("local[*]")
    10. .config("spark.es.nodes", "localhost")
    11. .config("spark.es.port", "9200")
    12. .config("spark.es.net.http.auth.user", "elastic")
    13. .config("spark.es.net.http.auth.pass", "password")
    14. .getOrCreate()
    15. // Create dataframe
    16. val frame = spark.read.option("header", "true").csv("/Users/liuxg/java/spark/cities.csv")
    17. // Write to ES with index name in lower case
    18. frame.saveToEs("dataframecsvindex")
    19. }
    20. }

     从上面,我们可以看出来 csv 格式的文件已经被成功地写入了。共有 128 个文档被写入。

    使用 Apache spark 把数据从 Elasticsearch 中导出

    我们首先在 Kibana 中使用如下的命令来创建以叫做 twitter 的索引:

    1. PUT twitter
    2. {
    3. "mappings": {
    4. "properties": {
    5. "DOB": {
    6. "type": "date"
    7. },
    8. "address": {
    9. "type": "text",
    10. "fields": {
    11. "keyword": {
    12. "type": "keyword",
    13. "ignore_above": 256
    14. }
    15. }
    16. },
    17. "age": {
    18. "type": "long"
    19. },
    20. "city": {
    21. "type": "keyword"
    22. },
    23. "country": {
    24. "type": "keyword"
    25. },
    26. "message": {
    27. "type": "text",
    28. "fields": {
    29. "keyword": {
    30. "type": "keyword",
    31. "ignore_above": 256
    32. }
    33. }
    34. },
    35. "province": {
    36. "type": "keyword"
    37. },
    38. "uid": {
    39. "type": "long"
    40. },
    41. "user": {
    42. "type": "text",
    43. "fields": {
    44. "keyword": {
    45. "type": "keyword",
    46. "ignore_above": 256
    47. }
    48. }
    49. }
    50. }
    51. }
    52. }

    然后使用 bulk 指令来写入数据:

    1. POST _bulk
    2. {"index":{"_index":"twitter","_id":1}}
    3. {"user":"张三","message":"今儿天气不错啊,出去转转去","uid":2,"age":20,"city":"北京","province":"北京","country":"中国","address":"中国北京市海淀区","DOB": "1999-04-01"}
    4. {"index":{"_index":"twitter","_id":2}}
    5. {"user":"老刘","message":"出发,下一站云南!","uid":3,"age":22,"city":"北京","province":"北京","country":"中国","address":"中国北京市东城区台基厂三条3号", "DOB": "1997-04-01"}
    6. {"index":{"_index":"twitter","_id":3}}
    7. {"user":"李四","message":"happy birthday!","uid":4,"age":25,"city":"北京","province":"北京","country":"中国","address":"中国北京市东城区","DOB": "1994-04-01"}
    8. {"index":{"_index":"twitter","_id":4}}
    9. {"user":"老贾","message":"123,gogogo","uid":5,"age":30,"city":"北京","province":"北京","country":"中国","address":"中国北京市朝阳区建国门", "DOB": "1989-04-01"}
    10. {"index":{"_index":"twitter","_id":5}}
    11. {"user":"老王","message":"Happy BirthDay My Friend!","uid":6,"age":26,"city":"北京","province":"北京","country":"中国","address":"中国北京市朝阳区国贸","DOB": "1993-04-01"}
    12. {"index":{"_index":"twitter","_id":6}}
    13. {"user":"老吴","message":"好友来了都今天我生日,好友来了,什么 birthday happy 就成!","uid":7,"age":28,"city":"上海","province":"上海","country":"中国","address":"中国上海市闵行区", "DOB": "1991-04-01"}

    这样我们就有 6 个文档数据。我们重新改写我们的 SparkDemo.scala:

    SparkDemo.scala

    1. import org.apache.spark.sql.SparkSession
    2. object SparkDemo {
    3. def main(args: Array[String]): Unit = {
    4. // Configuration
    5. val spark = SparkSession
    6. .builder()
    7. .appName("ExportESIndex")
    8. .master("local[*]")
    9. .config("spark.es.nodes", "localhost")
    10. .config("spark.es.port", "9200")
    11. .config("spark.es.net.http.auth.user", "elastic")
    12. .config("spark.es.net.http.auth.pass", "password")
    13. .getOrCreate()
    14. val reader = spark.read
    15. .format("org.elasticsearch.spark.sql")
    16. .option("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    17. val df = reader.load("twitter")
    18. println("No of records: " + df.count())
    19. df.write.format("csv")
    20. .option("header", true)
    21. .mode("overwrite")
    22. .save("file:///Users/liuxg/tmp/samples_download")
    23. println("Job completed!")
    24. }
    25. }

    重新运行我们的应用。我们在电脑的目录中查看生成的文件:

    1. $ pwd
    2. /Users/liuxg/tmp/samples_download
    3. $ ls
    4. _SUCCESS
    5. part-00000-b8a5faee-2a0d-40c8-b25c-f4a5f23fba09-c000.csv
    6. $ cat part-00000-b8a5faee-2a0d-40c8-b25c-f4a5f23fba09-c000.csv
    7. DOB,address,age,city,country,message,province,uid,user
    8. 1999-04-01T00:00:00.000+08:00,中国北京市海淀区,20,北京,中国,今儿天气不错啊,出去转转去,北京,2,张三
    9. 1997-04-01T00:00:00.000+08:00,中国北京市东城区台基厂三条3号,22,北京,中国,出发,下一站云南!,北京,3,老刘
    10. 1994-04-01T00:00:00.000+08:00,中国北京市东城区,25,北京,中国,happy birthday!,北京,4,李四
    11. 1989-04-01T00:00:00.000+08:00,中国北京市朝阳区建国门,30,北京,中国,"123,gogogo",北京,5,老贾
    12. 1993-04-01T00:00:00.000+08:00,中国北京市朝阳区国贸,26,北京,中国,Happy BirthDay My Friend!,北京,6,老王
    13. 1991-04-01T00:00:00.000+08:00,中国上海市闵行区,28,上海,中国,"好友来了都今天我生日,好友来了,什么 birthday happy 就成!",上海,7,老吴

    参考:

    【1】Apache Spark support | Elasticsearch for Apache Hadoop [8.3] | Elastic

  • 相关阅读:
    [数据结构]链表OJ题 (二) 反转链表
    以太坊的终局:去信任的信任
    LaTex学习笔记(三):矩阵的输入
    element-ui 表单验证注意事项
    DataBinding双向绑定简介
    刷题指南-public
    JTS:06 九交模型讲解
    和鲸科技执行总裁殷自强:面向空间数据协同分析场景的模型生命周期管理方法
    AUTOSAR汽车电子嵌入式编程精讲300篇-基于 CAN 总线的车辆数据采集与远程监控系统研发(下)
    【11.3】【VP】Codeforces Round #726 (Div. 2)
  • 原文地址:https://blog.csdn.net/UbuntuTouch/article/details/125968453