• Elasticsearch:从零开始创建一个 ingest pipeline 处理器


    实际上在我之前的文章:

    我已经详述了如和使用一些工具来生产相应的最基本的 ingest pipeline 的处理器。在今天的文章中,我进一步来通过一个例子来进行展示。在今天的展示中,我将使用最新的  Elastic Stack 8.4.0 来进行展示。我们将设计一个叫做 sample 的处理器。它可以把文档中的一个字段的首字母进行提取,转换为小写字母,并置于一个用户自己设定的字段中:

    安装

    如果你还没有安装好自己的 Elastic Stack,请参考如下的文章来安装 Elasticsearch 及 Kibana:

    创建插件模板

    就像在之前的文章 “Elasticsearch:创建一个 Elasticsearch Ingest 插件” 所描述的那样,我们可以使用   elasticsearch-plugin-archtype 插件来生产。我们使用如下的命令来创建一个最为基本的插件模板:

    1. mvn archetype:generate \
    2. -DarchetypeGroupId=org.codelibs \
    3. -DarchetypeArtifactId=elasticsearch-plugin-archetype \
    4. -DarchetypeVersion=6.6.0 \
    5. -DgroupId=com.liuxg \
    6. -DartifactId=elasticsearch-plugin \
    7. -Dversion=1.0-SNAPSHOT \
    8. -DpluginName=ingest

     上面已经帮我们创建了一个最为基本的插件模板。它在当前的目录下创建了一个叫做 elasticsearch-plugin 的目录。我们首先进入到该目录中:

    1. $ pwd
    2. /Users/liuxg/java/plugins/elasticsearch-plugin
    3. $ tree -L 8
    4. .
    5. ├── pom.xml
    6. └── src
    7. └── main
    8. ├── assemblies
    9. │   └── plugin.xml
    10. ├── java
    11. │   └── com
    12. │   └── liuxg
    13. │   ├── ingestPlugin.java
    14. │   └── rest
    15. │   └── RestingestAction.java
    16. └── plugin-metadata
    17. └── plugin-descriptor.properties

    上面的命令为我们生成了一个最为基本的 REST handler 的插件架构。它不是我们所需要的,我们需要对文件进行重新命名,并对文件的目录进行调整。调整后的文件架构如下:

    1. $ pwd
    2. /Users/liuxg/java/plugins/elasticsearch-plugin
    3. $ tree -L 8
    4. .
    5. ├── pom.xml
    6. └── src
    7. └── main
    8. ├── assemblies
    9. │   └── plugin.xml
    10. ├── java
    11. │   └── com
    12. │   └── liuxg
    13. │   ├── ingest
    14. │   │   └── SampleProcessor.java
    15. │   └── plugin
    16. │   └── ingest
    17. │   └── ingestPlugin.java
    18. └── plugin-metadata
    19. └── plugin-descriptor.properties

    我们接下来修改 pom.xml:

    pom.xml

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    4. <name>elasticsearch-plugin</name>
    5. <modelVersion>4.0.0</modelVersion>
    6. <groupId>com.liuxg</groupId>
    7. <artifactId>elasticsearch-plugin</artifactId>
    8. <version>1.0-SNAPSHOT</version>
    9. <packaging>jar</packaging>
    10. <description>elasticsearch ingest plugin</description>
    11. <inceptionYear>2019</inceptionYear>
    12. <licenses>
    13. <license>
    14. <name>The Apache Software License, Version 2.0</name>
    15. <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
    16. <distribution>repo</distribution>
    17. </license>
    18. </licenses>
    19. <properties>
    20. <elasticsearch.version>8.4.0</elasticsearch.version>
    21. <elasticsearch.plugin.classname>com.liuxg.plugin.ingest.ingestPlugin</elasticsearch.plugin.classname>
    22. <log4j.version>2.11.1</log4j.version>
    23. <maven.compiler.source>1.8</maven.compiler.source>
    24. <maven.compiler.target>1.8</maven.compiler.target>
    25. </properties>
    26. <build>
    27. <plugins>
    28. <plugin>
    29. <artifactId>maven-compiler-plugin</artifactId>
    30. <version>3.8.0</version>
    31. <configuration>
    32. <source>${maven.compiler.source}</source>
    33. <target>${maven.compiler.target}</target>
    34. <encoding>UTF-8</encoding>
    35. </configuration>
    36. </plugin>
    37. <plugin>
    38. <artifactId>maven-surefire-plugin</artifactId>
    39. <version>2.22.1</version>
    40. <configuration>
    41. <includes>
    42. <include>**/*Tests.java</include>
    43. </includes>
    44. </configuration>
    45. </plugin>
    46. <plugin>
    47. <artifactId>maven-source-plugin</artifactId>
    48. <version>3.0.1</version>
    49. <executions>
    50. <execution>
    51. <id>attach-sources</id>
    52. <goals>
    53. <goal>jar</goal>
    54. </goals>
    55. </execution>
    56. </executions>
    57. </plugin>
    58. <plugin>
    59. <artifactId>maven-assembly-plugin</artifactId>
    60. <version>3.1.0</version>
    61. <configuration>
    62. <appendAssemblyId>false</appendAssemblyId>
    63. <outputDirectory>${project.build.directory}/releases/</outputDirectory>
    64. <descriptors>
    65. <descriptor>${basedir}/src/main/assemblies/plugin.xml</descriptor>
    66. </descriptors>
    67. </configuration>
    68. <executions>
    69. <execution>
    70. <phase>package</phase>
    71. <goals>
    72. <goal>single</goal>
    73. </goals>
    74. </execution>
    75. </executions>
    76. </plugin>
    77. </plugins>
    78. </build>
    79. <dependencies>
    80. <dependency>
    81. <groupId>org.elasticsearch</groupId>
    82. <artifactId>elasticsearch</artifactId>
    83. <version>${elasticsearch.version}</version>
    84. <scope>provided</scope>
    85. </dependency>
    86. <dependency>
    87. <groupId>org.apache.logging.log4j</groupId>
    88. <artifactId>log4j-api</artifactId>
    89. <version>${log4j.version}</version>
    90. <scope>provided</scope>
    91. </dependency>
    92. </dependencies>
    93. </project>

    在上面,我们对如下的两行做了修改:

    1. <elasticsearch.version>8.4.0</elasticsearch.version>
    2. <elasticsearch.plugin.classname>com.liuxg.plugin.ingest.ingestPlugin</elasticsearch.plugin.classname>

    我们需要把 elasticsearch.version 设置为和 Elastic Stack 一样的版本才可以得到安装。另外,我们也必须修改 elasticsearch.plugin.classname,这是因为我们的文件路径发生变化了。

    紧接着,我们来修改 ingestPlugin.java 文件:

    ingestPlugin.java

    1. package com.liuxg.plugin.ingest;
    2. import org.elasticsearch.ingest.Processor;
    3. import org.elasticsearch.plugins.IngestPlugin;
    4. import org.elasticsearch.plugins.Plugin;
    5. import com.liuxg.ingest.SampleProcessor;
    6. import java.util.Collections;
    7. import java.util.Map;
    8. public class ingestPlugin extends Plugin implements IngestPlugin {
    9. @Override
    10. public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
    11. return Collections.singletonMap(SampleProcessor.TYPE, new SampleProcessor.Factory());
    12. }
    13. }

    上面的代码是用来这次这个 ingest pipeline 的插件的。

    再接着下来,我们修改 SampleProcessor.java 文件:

    SampleProcessor.java

    1. package com.liuxg.ingest;
    2. import org.elasticsearch.ingest.AbstractProcessor;
    3. import org.elasticsearch.ingest.ConfigurationUtils;
    4. import org.elasticsearch.ingest.IngestDocument;
    5. import org.elasticsearch.ingest.Processor;
    6. import java.util.Locale;
    7. import java.util.Map;
    8. public final class SampleProcessor extends AbstractProcessor {
    9. public static final String TYPE = "sample";
    10. private final String field;
    11. private final String targetField;
    12. private final String defaultValue;
    13. private final boolean ignoreMissing;
    14. public SampleProcessor(String tag, String description, String field, String targetField, boolean ignoreMissing, String defaultValue) {
    15. super(tag, description);
    16. this.field = field;
    17. this.targetField = targetField;
    18. this.ignoreMissing = ignoreMissing;
    19. this.defaultValue = defaultValue;
    20. }
    21. String getField() {
    22. return field;
    23. }
    24. String getTargetField() {
    25. return targetField;
    26. }
    27. String getDefaultField() {
    28. return defaultValue;
    29. }
    30. boolean isIgnoreMissing() {
    31. return ignoreMissing;
    32. }
    33. @Override
    34. public IngestDocument execute(IngestDocument document) {
    35. if (!document.hasField(field, true)) {
    36. if (ignoreMissing) {
    37. return document;
    38. } else {
    39. throw new IllegalArgumentException("field [" + field + "] not present as part of path [" + field + "]");
    40. }
    41. }
    42. // We fail here if the target field point to an array slot that is out of range.
    43. // If we didn't do this then we would fail if we set the value in the target_field
    44. // and then on failure processors would not see that value we tried to rename as we already
    45. // removed it.
    46. if (document.hasField(targetField, true)) {
    47. throw new IllegalArgumentException("field [" + targetField + "] already exists");
    48. }
    49. Object value = document.getFieldValue(field, Object.class);
    50. if( value!=null && value instanceof String ) {
    51. String myValue=value.toString().trim();
    52. if(myValue.length()>1){
    53. try {
    54. document.setFieldValue(targetField, myValue.substring(0,1).toLowerCase(Locale.getDefault()));
    55. } catch (Exception e) {
    56. // setting the value back to the original field shouldn't as we just fetched it from that field:
    57. document.setFieldValue(field, value);
    58. throw e;
    59. }
    60. }
    61. }
    62. return document;
    63. }
    64. @Override
    65. public String getType() {
    66. return TYPE;
    67. }
    68. public static final class Factory implements Processor.Factory {
    69. @Override
    70. public Processor create(Map<String, Processor.Factory> processorFactories, String tag, String description, Map<String, Object> config) throws Exception {
    71. String field = ConfigurationUtils.readStringProperty(TYPE, tag, config, "field");
    72. String targetField = ConfigurationUtils.readStringProperty(TYPE, tag,
    73. config, "target_field");
    74. String defaultValue = ConfigurationUtils.readOptionalStringProperty(TYPE, tag,
    75. config, "defaultValue");
    76. boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag,
    77. config, "ignore_missing", false);
    78. return new SampleProcessor(tag, description, field, targetField, ignoreMissing, defaultValue);
    79. }
    80. }
    81. }

    在上面,我们定义了该处理器的名字为 sample。我们将在下面的测试中进行使用。上面的实现使得我们提取一个字段的首字母,并放置于一个自定义的字段中去。我们将在下面的测试中进行展示。

    编译

    我们在项目的根目录下使人如下的命令来进行编译:

    mvn clean install
    1. $ pwd
    2. /Users/liuxg/java/plugins/elasticsearch-plugin
    3. $ mvn clean install
    4. [INFO] Scanning for projects...
    5. [INFO]
    6. [INFO] -------------------< com.liuxg:elasticsearch-plugin >-------------------
    7. [INFO] Building elasticsearch-plugin 1.0-SNAPSHOT
    8. [INFO] --------------------------------[ jar ]---------------------------------
    9. [INFO]
    10. [INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ elasticsearch-plugin ---
    11. [INFO]
    12. [INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ elasticsearch-plugin ---
    13. [WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
    14. [INFO] skip non existing resourceDirectory /Users/liuxg/java/plugins/elasticsearch-plugin/src/main/resources
    15. [INFO]
    16. [INFO] --- maven-compiler-plugin:3.8.0:compile (default-compile) @ elasticsearch-plugin ---
    17. [INFO] Changes detected - recompiling the module!
    18. [INFO] Compiling 2 source files to /Users/liuxg/java/plugins/elasticsearch-plugin/target/classes
    19. [INFO]
    20. [INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ elasticsearch-plugin ---
    21. [WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
    22. [INFO] skip non existing resourceDirectory /Users/liuxg/java/plugins/elasticsearch-plugin/src/test/resources
    23. [INFO]
    24. [INFO] --- maven-compiler-plugin:3.8.0:testCompile (default-testCompile) @ elasticsearch-plugin ---
    25. [INFO] No sources to compile
    26. [INFO]
    27. [INFO] --- maven-surefire-plugin:2.22.1:test (default-test) @ elasticsearch-plugin ---
    28. [INFO] No tests to run.
    29. [INFO]
    30. [INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ elasticsearch-plugin ---
    31. [INFO] Building jar: /Users/liuxg/java/plugins/elasticsearch-plugin/target/elasticsearch-plugin-1.0-SNAPSHOT.jar
    32. [INFO]
    33. [INFO] >>> maven-source-plugin:3.0.1:jar (attach-sources) > generate-sources @ elasticsearch-plugin >>>
    34. [INFO]
    35. [INFO] <<< maven-source-plugin:3.0.1:jar (attach-sources) < generate-sources @ elasticsearch-plugin <<<
    36. [INFO]
    37. [INFO]
    38. [INFO] --- maven-source-plugin:3.0.1:jar (attach-sources) @ elasticsearch-plugin ---
    39. [INFO] Building jar: /Users/liuxg/java/plugins/elasticsearch-plugin/target/elasticsearch-plugin-1.0-SNAPSHOT-sources.jar
    40. [INFO]
    41. [INFO] --- maven-assembly-plugin:3.1.0:single (default) @ elasticsearch-plugin ---
    42. [INFO] Reading assembly descriptor: /Users/liuxg/java/plugins/elasticsearch-plugin/src/main/assemblies/plugin.xml
    43. [WARNING] The following patterns were never triggered in this artifact exclusion filter:
    44. o 'org.elasticsearch:elasticsearch'
    45. [INFO] Building zip: /Users/liuxg/java/plugins/elasticsearch-plugin/target/releases/elasticsearch-plugin-1.0-SNAPSHOT.zip
    46. [INFO]
    47. [INFO] --- maven-install-plugin:2.4:install (default-install) @ elasticsearch-plugin ---
    48. [INFO] Installing /Users/liuxg/java/plugins/elasticsearch-plugin/target/elasticsearch-plugin-1.0-SNAPSHOT.jar to /Users/liuxg/.m2/repository/com/liuxg/elasticsearch-plugin/1.0-SNAPSHOT/elasticsearch-plugin-1.0-SNAPSHOT.jar
    49. [INFO] Installing /Users/liuxg/java/plugins/elasticsearch-plugin/pom.xml to /Users/liuxg/.m2/repository/com/liuxg/elasticsearch-plugin/1.0-SNAPSHOT/elasticsearch-plugin-1.0-SNAPSHOT.pom
    50. [INFO] Installing /Users/liuxg/java/plugins/elasticsearch-plugin/target/elasticsearch-plugin-1.0-SNAPSHOT-sources.jar to /Users/liuxg/.m2/repository/com/liuxg/elasticsearch-plugin/1.0-SNAPSHOT/elasticsearch-plugin-1.0-SNAPSHOT-sources.jar
    51. [INFO] Installing /Users/liuxg/java/plugins/elasticsearch-plugin/target/releases/elasticsearch-plugin-1.0-SNAPSHOT.zip to /Users/liuxg/.m2/repository/com/liuxg/elasticsearch-plugin/1.0-SNAPSHOT/elasticsearch-plugin-1.0-SNAPSHOT.zip
    52. [INFO] ------------------------------------------------------------------------
    53. [INFO] BUILD SUCCESS
    54. [INFO] ------------------------------------------------------------------------
    55. [INFO] Total time: 5.213 s
    56. [INFO] Finished at: 2022-09-08T11:50:02+08:00
    57. [INFO] ------------------------------------------------------------------------

    编译成功后,我们可以在 target 目录先看到如下的安装文件:

    1. $ pwd
    2. /Users/liuxg/java/plugins/elasticsearch-plugin
    3. $ ls target/releases/
    4. elasticsearch-plugin-1.0-SNAPSHOT.zip

    上面显示的 elasticsearch-plugin-1.0-SNAPSHOT.zip 就是我们可以安装的插件文件。

    安装插件并测试插件

    我们接下来换到 Elasticsearch 的安装目录下,并打入如下的命令:

    1. $ pwd
    2. /Users/liuxg/elastic0/elasticsearch-8.4.0
    3. $ bin/elasticsearch-plugin install file:Users/liuxg/java/plugins/elasticsearch-plugin/target/releases/elasticsearch-plugin-1.0-SNAPSHOT.zip
    4. -> Installing file:Users/liuxg/java/plugins/elasticsearch-plugin/target/releases/elasticsearch-plugin-1.0-SNAPSHOT.zip
    5. -> Downloading file:Users/liuxg/java/plugins/elasticsearch-plugin/target/releases/elasticsearch-plugin-1.0-SNAPSHOT.zip
    6. [=================================================] 100%  
    7. -> Installed ingest
    8. -> Please restart Elasticsearch to activate any plugins installed
    9. $ ./bin/elasticsearch-plugin list
    10. ingest

    从上面的显示中,我们可以看出来 ingest 插件已经被成功地安装。我们接下来需要重新启动 Elasticsearch。这个非常重要!

    等 Elasticsearch 重新启动后,我们打开 Kibana,并使用如下的命令来进行测试:

    1. POST _ingest/pipeline/_simulate
    2. {
    3. "pipeline": {
    4. "description": "This is a test for my custom pipeline",
    5. "processors": [
    6. {
    7. "sample": {
    8. "field": "user",
    9. "target_field": "user_initial"
    10. }
    11. }
    12. ]
    13. },
    14. "docs": [
    15. {
    16. "_source": {
    17. "user": "xiaoguo"
    18. }
    19. },
    20. {
    21. "_source": {
    22. "user": "liu"
    23. }
    24. }
    25. ]
    26. }

    在上面的测试中,我们的字段 user 的值分别为 xiaoguo 及 Liu。经过我们的 sample 处理器后, 结果如下:

    1. {
    2. "docs": [
    3. {
    4. "doc": {
    5. "_index": "_index",
    6. "_id": "_id",
    7. "_version": "-3",
    8. "_source": {
    9. "user_initial": "x",
    10. "user": "xiaoguo"
    11. },
    12. "_ingest": {
    13. "timestamp": "2022-09-08T04:00:48.922489Z"
    14. }
    15. }
    16. },
    17. {
    18. "doc": {
    19. "_index": "_index",
    20. "_id": "_id",
    21. "_version": "-3",
    22. "_source": {
    23. "user_initial": "l",
    24. "user": "Liu"
    25. },
    26. "_ingest": {
    27. "timestamp": "2022-09-08T04:00:48.922514Z"
    28. }
    29. }
    30. }
    31. ]
    32. }

    显然第一个字母被成功提取,并且把它转换为小写字母。最终把这个字母置于我们设定的 user_initial 字段中去。

    为了方便大家的学习,我把最终的代码置于仓库:https://github.com/liu-xiao-guo/es-ingest-pipeline

  • 相关阅读:
    直播带货系统,乡村直播电商平台的新选择
    Mybatis-Plus入门
    前置放大器和功率放大器有什么区别?
    STM32CUBEMX学习路线这样学就行了
    Java 一台机器搭建多个tomcat,运行不同的程序
    chrome F12 network 保留之前请求信息
    C#的类型转换
    猫12分类:使用yolov5训练检测模型
    深入探究MinimalApi是如何在Swagger中展示的
    uniapp的组件封装与组件间的参数传递
  • 原文地址:https://blog.csdn.net/UbuntuTouch/article/details/126762104