• ElasticSearch从入门到精通:数据导入


    hello,大家好,我是 Jackpop,硕士毕业于哈尔滨工业大学,曾在华为、阿里等大厂工作,如果你对升学、就业、技术提升等有疑惑,不妨交个朋友:

    我是Jackpop,我们交个朋友吧!

    在第二部分中,我们学习了如何在ElasticSearch中执行搜索。但是,我们无法使用其批量API将.json数据文件导入ElasticSearch。

    在这部分中,我们将进行一些编程,并学习一些有关如何将.json飞行数据文件导入ElasticSearch的方法:

    • 通过将.json数据文件转换为ElasticSearch的API需要的格式
    • 通过解析.json数据文件,使用JSON库(例如gson)提取其值,然后使用ElasticSearch的REST API导入数据

    数据转换

    ElasticSearch对数据格式有特定的格式要求:

    {``"index"``:{``"_id"``:4800770}}
    {``"Rcvr"``:1,``"HasSig"``:``false``,``"Icao"``:``"494102"``, ``"Bad"``:``false``,``"Reg"``:``"CS-PHB"``, ...}
    ...
    
    • 1
    • 2
    • 3

    这就意味着,你需要把下载的每一份json数据按照上述格式进行转换。主要满足如下2点:

    • 在每个数据文档前面加入一行以index开头的数据
    • 把"Id":修改为{“_id”:}

    我们可以通过编写简单的Java程序,快速把json文件转换成对应格式:

    package com.jgc;
    import java.io.IOException;
    import java.nio.file.Files;
    import java.nio.file.Path;
    import java.nio.file.Paths;
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import static java.util.stream.Collectors.toList;
    /**
     * Converts a flight data json file to a format that can be imported to 
     * ElasticSearch using its bulk API.
     */
    public class JsonFlightFileConverter {
        private static final Path flightDataJsonFile = 
            Paths.get("src/main/resources/flightdata/2016-07-01-1300Z.json");
        public static void main(String[] args) {
            List<String> list = new ArrayList<>();
            try (Stream<String> stream = Files.lines(flightDataJsonFile.toAbsolutePath())) {
                list = stream
                        .map(line -> line.split("\{"))
                        .flatMap(Arrays::stream)
                        .collect(toList());
            } catch (IOException e) {
                e.printStackTrace();
            }
            System.out.println(list);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    最后,通过简单的拼接,输出我们想要的结果:

    final String result = list.stream().skip(3)
                    .map(s -> "{" + s + "\n")
                    .collect(Collectors.joining());
    System.out.println(result);
    
    • 1
    • 2
    • 3
    • 4

    现在,可以看到输出已经非常接近我们想要的结果:

    {"Id":4800770,"Rcvr":1,"HasSig":false,"Icao":"494102", ...
    
    • 1

    实际上,我们可以将最后一个代码片段添加到原始流中,如下所示:

    String result = "";
    try (Stream<String> stream = Files.lines(flightDataJsonFile.toAbsolutePath())) {
         result = stream
                .map(line -> line.split("\{"))
                .flatMap(Arrays::stream)
                .skip(3)
                .map(s -> "{" + s + "\n")
                .collect(Collectors.joining());
    } catch (IOException e) {
        e.printStackTrace();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    现在,我们需要在每行的上方插入新行,其中包含文档的索引,如下所示:

    {"index":{"_id":4800770}}
    
    • 1

    我们可以创建一个函数,这样处理会更加简洁明了:

    private static String insertIndex(String s) {
        final String[] keyValues = s.split(",");
        final String[] idKeyValue = keyValues[0].split(":");
        return "{"index":{"_id":"+ idKeyValue[1] +"}}\n";
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    这样,就可以对每个输入进行转换,给出我们需要的输出。

    我们还需要解决的更多细节,从每个文档中删除最后一个逗号。

    private static String removeLastComma(String s) {
        return s.charAt(s.length() - 1) == ',' ? s.substring(0, s.length() - 1) : s;
    }
    
    • 1
    • 2
    • 3

    这时候,数据处理代码就变成了下面这个样子:

    public class JsonFlightFileConverter {
     
     public static void main(String[] args) {
      if (args.length == 1) {
        Path inDirectoryPath = Paths.get(args[0]);
        if (inDirectoryPath != null) {
            Path outDirectoryPath = Paths.get(inDirectoryPath.toString(), "out");
            try {
                if (Files.exists(outDirectoryPath)) {
                    Files.walk(outDirectoryPath)
                            .sorted(Comparator.reverseOrder())
                            .map(Path::toFile)
                            .forEach(File::delete);
                }
                Files.createDirectory(Paths.get(inDirectoryPath.toString(), "out"));
            } catch (IOException e) {
                e.printStackTrace();
            }
            try (DirectoryStream ds = Files.newDirectoryStream(inDirectoryPath, "*.json")) {
                for (Path inFlightDataJsonFile : ds) {
                    String result = "";
                    try (Stream stream = 
                         Files.lines(inFlightDataJsonFile.toAbsolutePath())) {
                result = stream
                          .parallel()
                          .map(line -> line.split("\{"))
                          .flatMap(Arrays::stream)
                          .skip(3)
                          .map(s -> createResult(s))
                          .collect(Collectors.joining());
                    Path outFlightDataJsonFile = 
                         Paths.get(outDirectoryPath.toString(), 
                                   inFlightDataJsonFile.getFileName().toString());
                    Files.createFile(outFlightDataJsonFile);
                    Files.writeString(outFlightDataJsonFile, result);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
     } else {
        System.out.println("Usage: java JsonFlightFileConverter ");
     }
    ...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    使用ElasticSearch的批量API导入数据

    需要再次强调,文件必须以空行结尾。 如果不是,则添加一个(实际上前面的程序已经在文件末尾添加了换行符)。

    在产生新的.json文件的目录(输出目录)内,执行以下命令:

    curl -H "Content-Type: application/x-ndjson" -XPOST http://localhost:9200/flight/_bulk --data-binary "@2016-07-01-1300Z.json"
    
    • 1

    请注意,内容类型是application / x-ndjson,而不是application / x-json。

    还要注意,我们将数据表示为二进制以便保留换行符。 文件名为2016-07-01-1300Z.json。

    ElasticSearch中任何具有相同ID的现有文档都将被.json文件中的文档替换。

    最后,可以发现有7679文件被导入:

    "hits" : {
        "total" : {
          "value" : 7679,
          "relation" : "eq"
        },
    GET /_cat/shards?v
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    返回结果:

    index   shard prirep state      docs   store ip        node
    flight  0     p      STARTED    7679   71mb 127.0.0.1 MacBook-Pro.local
    flight  0     r      UNASSIGNED
    
    • 1
    • 2
    • 3

    解析JSON数据

    将这些文档导入ElasticSearch的另一种方法是将JSON数据文件解析到内存中,并使用ElasticSearch的REST API将其导入ElasticSearch。

    有许多库可用于解析Java中的JSON文件:

    • GSon
    • Jackson
    • mJson
    • JSON-Simple
    • JSON-P

    我们将使用Google的GSon库,但其他任何JSON库都可以完成此工作。

    GSon提供了多种表示JSON数据的方法,具体使用哪一种,则取决于下一步,即如何将数据导入到ElasticSearch。

    ElasticSearch API要求数据的格式为:Map<String, Object>,这是我们将解析后的JSON数据存储到的位置。

    首先,将下面依赖加入到pom.xml中:

    <dependency>
        <groupId>com.google.code.gson</groupId>
        <artifactId>gson</artifactId>
        <version>2.8.6</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    使用下方代码解析json数据:

    package com.jcg;
     
    import com.google.gson.Gson;
    import com.google.gson.internal.LinkedTreeMap;
    import com.google.gson.reflect.TypeToken;
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.nio.file.Files;
    import java.nio.file.Paths;
    import java.util.List;
    import java.util.Map;
     
    public class JsonFlightFileReader {
        private static final String flightDataJsonFile = "src/main/resources/flightdata/2016-07-01-1300Z.json";
        private static final Gson gson = new Gson();
        public static void main(String[] args) {
            parseJsonFile(flightDataJsonFile);
        }
        private static void parseJsonFile(String file) {
            try (BufferedReader reader = Files.newBufferedReader(Paths.get(file))) {
                Map<String, Object> map = gson.fromJson(reader, 
                           new TypeToken<Map<String, Object>>() { }.getType());
                List<Object> acList = (List<Object>) (map.get("acList"));
                for (Object item : acList) {
                    LinkedTreeMap<String, Object> flight = 
                            (LinkedTreeMap<String, Object>) item;
                    for (Map.Entry<String, Object> entry : flight.entrySet()) {
                        String key = entry.getKey();
                        Object value = entry.getValue();
                        String outEntry = (key.equals("Id") ? "{" + key : key) + " : " + value + ", ";
                        System.out.print(outEntry);
                    }
                    System.out.println("}");
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    通过下述方法可以使用数据:

    Map<String, Object> map = gson.fromJson(reader, new TypeToken<Map<String, Object>>() {}.getType());
    List<Object> acList = (List<Object>) (map.get("acList"));
    
    • 1
    • 2

    使用ElasticSearch REST API导入数据

    首先,在pom.xml中加入下方依赖:

    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-client</artifactId>
        <version>7.10.0</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    我们可以通过RestClient与ElasticSearch进行交互:

    RestClient restClient = RestClient.builder(
        new HttpHost("localhost", 9200, "http"));
    .setDefaultHeaders(new Header[]{
            new BasicHeader("accept", "application/json"),
            new BasicHeader("content-type", "application/json")})
    .setFailureListener(new RestClient.FailureListener() {
        public void onFailure(Node node) {
            System.err.println("Low level Rest Client Failure on node " +
                    node.getName());
        }
    }).build();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    创建好RestClient之后,下一步就是创建一个Request,并将json数据传递给它:

    Request request = new Request("POST", "/flight/_doc/4800770");
    String jsonDoc = "{"Rcvr":1,"HasSig":false,"Icao":"494102",...]}";
    request.setJsonEntity(jsonDoc);
    
    • 1
    • 2
    • 3

    最后,我们发送请求。

    有两种方式,同步:

    Response response = restClient.performRequest(request);
    if (response.getStatusLine().getStatusCode() != 200) {
        System.err.println("Could not add document with Id: " + id + " to index /flight");
    }
    
    • 1
    • 2
    • 3
    • 4

    异步:

    Cancellable cancellable = restClient.performRequestAsync(request,
        new ResponseListener() {
            @Override
            public void onSuccess(Response response) {
                System.out.println("Document with Id: " + id + " was successfully added to index /flight");
            }
     
            @Override
            public void onFailure(Exception exception) {
                System.err.println("Could not add document with Id: " + id + " to index /flight");
            }
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    最后,不要忘记关闭restClient连接:

    } finally {
        try {
            restClient.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    这部分,我们重点介绍了如何将.json数据批处理文件导入到ElasticSearch。

    我们看到了如何通过两种方式做到这一点:

    • 使用ElasticSearch的批量API,
    • 使用JSON库解析.json文件

    你可以根据自己的情况自行选择其中一种方法。

  • 相关阅读:
    润和软件HopeStage与上海瑞美云LIS系统管理软件完成产品兼容性互认证
    搭建react项目遇到的问题2022
    PHP7 +nginx Docker 部署
    基于poi 3.17导入excel文件 含处理字典项转换为状态
    【软件安装&环境配置】vscode 安装界面没有出现安装路径的选择 的解决,以及vscode的删除的问题
    Python封装机制及实现方法
    一篇文章教你Pytest快速入门和基础讲解,一定要看
    037:vue项目监听页面变化,动态设置iframe元素高度
    ARM-A架构入门基础(四)Cache
    QT C++实战:实现用户登录页面及多个界面跳转
  • 原文地址:https://blog.csdn.net/jakpopc/article/details/125547132