实际上在我之前的文章:
我已经详述了如和使用一些工具来生产相应的最基本的 ingest pipeline 的处理器。在今天的文章中,我进一步来通过一个例子来进行展示。在今天的展示中,我将使用最新的 Elastic Stack 8.4.0 来进行展示。我们将设计一个叫做 sample 的处理器。它可以把文档中的一个字段的首字母进行提取,转换为小写字母,并置于一个用户自己设定的字段中:
如果你还没有安装好自己的 Elastic Stack,请参考如下的文章来安装 Elasticsearch 及 Kibana:
就像在之前的文章 “Elasticsearch:创建一个 Elasticsearch Ingest 插件” 所描述的那样,我们可以使用 elasticsearch-plugin-archtype 插件来生产。我们使用如下的命令来创建一个最为基本的插件模板:
- mvn archetype:generate \
- -DarchetypeGroupId=org.codelibs \
- -DarchetypeArtifactId=elasticsearch-plugin-archetype \
- -DarchetypeVersion=6.6.0 \
- -DgroupId=com.liuxg \
- -DartifactId=elasticsearch-plugin \
- -Dversion=1.0-SNAPSHOT \
- -DpluginName=ingest
上面已经帮我们创建了一个最为基本的插件模板。它在当前的目录下创建了一个叫做 elasticsearch-plugin 的目录。我们首先进入到该目录中:
- $ pwd
- /Users/liuxg/java/plugins/elasticsearch-plugin
- $ tree -L 8
- .
- ├── pom.xml
- └── src
- └── main
- ├── assemblies
- │ └── plugin.xml
- ├── java
- │ └── com
- │ └── liuxg
- │ ├── ingestPlugin.java
- │ └── rest
- │ └── RestingestAction.java
- └── plugin-metadata
- └── plugin-descriptor.properties
上面的命令为我们生成了一个最为基本的 REST handler 的插件架构。它不是我们所需要的,我们需要对文件进行重新命名,并对文件的目录进行调整。调整后的文件架构如下:
- $ pwd
- /Users/liuxg/java/plugins/elasticsearch-plugin
- $ tree -L 8
- .
- ├── pom.xml
- └── src
- └── main
- ├── assemblies
- │ └── plugin.xml
- ├── java
- │ └── com
- │ └── liuxg
- │ ├── ingest
- │ │ └── SampleProcessor.java
- │ └── plugin
- │ └── ingest
- │ └── ingestPlugin.java
- └── plugin-metadata
- └── plugin-descriptor.properties
我们接下来修改 pom.xml:
pom.xml
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <name>elasticsearch-plugin</name>
- <modelVersion>4.0.0</modelVersion>
- <groupId>com.liuxg</groupId>
- <artifactId>elasticsearch-plugin</artifactId>
- <version>1.0-SNAPSHOT</version>
- <packaging>jar</packaging>
- <description>elasticsearch ingest plugin</description>
- <inceptionYear>2019</inceptionYear>
- <licenses>
- <license>
- <name>The Apache Software License, Version 2.0</name>
- <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
- <distribution>repo</distribution>
- </license>
- </licenses>
- <properties>
- <elasticsearch.version>8.4.0</elasticsearch.version>
- <elasticsearch.plugin.classname>com.liuxg.plugin.ingest.ingestPlugin</elasticsearch.plugin.classname>
- <log4j.version>2.11.1</log4j.version>
- <maven.compiler.source>1.8</maven.compiler.source>
- <maven.compiler.target>1.8</maven.compiler.target>
- </properties>
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.8.0</version>
- <configuration>
- <source>${maven.compiler.source}</source>
- <target>${maven.compiler.target}</target>
- <encoding>UTF-8</encoding>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-surefire-plugin</artifactId>
- <version>2.22.1</version>
- <configuration>
- <includes>
- <include>**/*Tests.java</include>
- </includes>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-source-plugin</artifactId>
- <version>3.0.1</version>
- <executions>
- <execution>
- <id>attach-sources</id>
- <goals>
- <goal>jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>3.1.0</version>
- <configuration>
- <appendAssemblyId>false</appendAssemblyId>
- <outputDirectory>${project.build.directory}/releases/</outputDirectory>
- <descriptors>
- <descriptor>${basedir}/src/main/assemblies/plugin.xml</descriptor>
- </descriptors>
- </configuration>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- <dependencies>
- <dependency>
- <groupId>org.elasticsearch</groupId>
- <artifactId>elasticsearch</artifactId>
- <version>${elasticsearch.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-api</artifactId>
- <version>${log4j.version}</version>
- <scope>provided</scope>
- </dependency>
- </dependencies>
- </project>
在上面,我们对如下的两行做了修改:
- <elasticsearch.version>8.4.0</elasticsearch.version>
- <elasticsearch.plugin.classname>com.liuxg.plugin.ingest.ingestPlugin</elasticsearch.plugin.classname>
我们需要把 elasticsearch.version 设置为和 Elastic Stack 一样的版本才可以得到安装。另外,我们也必须修改 elasticsearch.plugin.classname,这是因为我们的文件路径发生变化了。
紧接着,我们来修改 ingestPlugin.java 文件:
ingestPlugin.java
- package com.liuxg.plugin.ingest;
-
- import org.elasticsearch.ingest.Processor;
- import org.elasticsearch.plugins.IngestPlugin;
- import org.elasticsearch.plugins.Plugin;
- import com.liuxg.ingest.SampleProcessor;
-
- import java.util.Collections;
- import java.util.Map;
-
- public class ingestPlugin extends Plugin implements IngestPlugin {
- @Override
- public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
- return Collections.singletonMap(SampleProcessor.TYPE, new SampleProcessor.Factory());
- }
- }
上面的代码是用来这次这个 ingest pipeline 的插件的。
再接着下来,我们修改 SampleProcessor.java 文件:
SampleProcessor.java
- package com.liuxg.ingest;
-
- import org.elasticsearch.ingest.AbstractProcessor;
- import org.elasticsearch.ingest.ConfigurationUtils;
- import org.elasticsearch.ingest.IngestDocument;
- import org.elasticsearch.ingest.Processor;
-
- import java.util.Locale;
- import java.util.Map;
-
- public final class SampleProcessor extends AbstractProcessor {
-
- public static final String TYPE = "sample";
-
- private final String field;
- private final String targetField;
- private final String defaultValue;
- private final boolean ignoreMissing;
-
- public SampleProcessor(String tag, String description, String field, String targetField, boolean ignoreMissing, String defaultValue) {
- super(tag, description);
- this.field = field;
- this.targetField = targetField;
- this.ignoreMissing = ignoreMissing;
- this.defaultValue = defaultValue;
- }
-
- String getField() {
- return field;
- }
-
- String getTargetField() {
- return targetField;
- }
-
- String getDefaultField() {
- return defaultValue;
- }
-
- boolean isIgnoreMissing() {
- return ignoreMissing;
- }
-
- @Override
- public IngestDocument execute(IngestDocument document) {
- if (!document.hasField(field, true)) {
- if (ignoreMissing) {
- return document;
- } else {
- throw new IllegalArgumentException("field [" + field + "] not present as part of path [" + field + "]");
- }
- }
- // We fail here if the target field point to an array slot that is out of range.
- // If we didn't do this then we would fail if we set the value in the target_field
- // and then on failure processors would not see that value we tried to rename as we already
- // removed it.
- if (document.hasField(targetField, true)) {
- throw new IllegalArgumentException("field [" + targetField + "] already exists");
- }
-
- Object value = document.getFieldValue(field, Object.class);
- if( value!=null && value instanceof String ) {
- String myValue=value.toString().trim();
- if(myValue.length()>1){
- try {
- document.setFieldValue(targetField, myValue.substring(0,1).toLowerCase(Locale.getDefault()));
- } catch (Exception e) {
- // setting the value back to the original field shouldn't as we just fetched it from that field:
- document.setFieldValue(field, value);
- throw e;
- }
- }
- }
- return document;
- }
-
- @Override
- public String getType() {
- return TYPE;
- }
-
- public static final class Factory implements Processor.Factory {
- @Override
- public Processor create(Map<String, Processor.Factory> processorFactories, String tag, String description, Map<String, Object> config) throws Exception {
- String field = ConfigurationUtils.readStringProperty(TYPE, tag, config, "field");
- String targetField = ConfigurationUtils.readStringProperty(TYPE, tag,
- config, "target_field");
- String defaultValue = ConfigurationUtils.readOptionalStringProperty(TYPE, tag,
- config, "defaultValue");
- boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag,
- config, "ignore_missing", false);
- return new SampleProcessor(tag, description, field, targetField, ignoreMissing, defaultValue);
- }
- }
- }
在上面,我们定义了该处理器的名字为 sample。我们将在下面的测试中进行使用。上面的实现使得我们提取一个字段的首字母,并放置于一个自定义的字段中去。我们将在下面的测试中进行展示。
我们在项目的根目录下使人如下的命令来进行编译:
mvn clean install
- $ pwd
- /Users/liuxg/java/plugins/elasticsearch-plugin
- $ mvn clean install
- [INFO] Scanning for projects...
- [INFO]
- [INFO] -------------------< com.liuxg:elasticsearch-plugin >-------------------
- [INFO] Building elasticsearch-plugin 1.0-SNAPSHOT
- [INFO] --------------------------------[ jar ]---------------------------------
- [INFO]
- [INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ elasticsearch-plugin ---
- [INFO]
- [INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ elasticsearch-plugin ---
- [WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
- [INFO] skip non existing resourceDirectory /Users/liuxg/java/plugins/elasticsearch-plugin/src/main/resources
- [INFO]
- [INFO] --- maven-compiler-plugin:3.8.0:compile (default-compile) @ elasticsearch-plugin ---
- [INFO] Changes detected - recompiling the module!
- [INFO] Compiling 2 source files to /Users/liuxg/java/plugins/elasticsearch-plugin/target/classes
- [INFO]
- [INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ elasticsearch-plugin ---
- [WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
- [INFO] skip non existing resourceDirectory /Users/liuxg/java/plugins/elasticsearch-plugin/src/test/resources
- [INFO]
- [INFO] --- maven-compiler-plugin:3.8.0:testCompile (default-testCompile) @ elasticsearch-plugin ---
- [INFO] No sources to compile
- [INFO]
- [INFO] --- maven-surefire-plugin:2.22.1:test (default-test) @ elasticsearch-plugin ---
- [INFO] No tests to run.
- [INFO]
- [INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ elasticsearch-plugin ---
- [INFO] Building jar: /Users/liuxg/java/plugins/elasticsearch-plugin/target/elasticsearch-plugin-1.0-SNAPSHOT.jar
- [INFO]
- [INFO] >>> maven-source-plugin:3.0.1:jar (attach-sources) > generate-sources @ elasticsearch-plugin >>>
- [INFO]
- [INFO] <<< maven-source-plugin:3.0.1:jar (attach-sources) < generate-sources @ elasticsearch-plugin <<<
- [INFO]
- [INFO]
- [INFO] --- maven-source-plugin:3.0.1:jar (attach-sources) @ elasticsearch-plugin ---
- [INFO] Building jar: /Users/liuxg/java/plugins/elasticsearch-plugin/target/elasticsearch-plugin-1.0-SNAPSHOT-sources.jar
- [INFO]
- [INFO] --- maven-assembly-plugin:3.1.0:single (default) @ elasticsearch-plugin ---
- [INFO] Reading assembly descriptor: /Users/liuxg/java/plugins/elasticsearch-plugin/src/main/assemblies/plugin.xml
- [WARNING] The following patterns were never triggered in this artifact exclusion filter:
- o 'org.elasticsearch:elasticsearch'
-
- [INFO] Building zip: /Users/liuxg/java/plugins/elasticsearch-plugin/target/releases/elasticsearch-plugin-1.0-SNAPSHOT.zip
- [INFO]
- [INFO] --- maven-install-plugin:2.4:install (default-install) @ elasticsearch-plugin ---
- [INFO] Installing /Users/liuxg/java/plugins/elasticsearch-plugin/target/elasticsearch-plugin-1.0-SNAPSHOT.jar to /Users/liuxg/.m2/repository/com/liuxg/elasticsearch-plugin/1.0-SNAPSHOT/elasticsearch-plugin-1.0-SNAPSHOT.jar
- [INFO] Installing /Users/liuxg/java/plugins/elasticsearch-plugin/pom.xml to /Users/liuxg/.m2/repository/com/liuxg/elasticsearch-plugin/1.0-SNAPSHOT/elasticsearch-plugin-1.0-SNAPSHOT.pom
- [INFO] Installing /Users/liuxg/java/plugins/elasticsearch-plugin/target/elasticsearch-plugin-1.0-SNAPSHOT-sources.jar to /Users/liuxg/.m2/repository/com/liuxg/elasticsearch-plugin/1.0-SNAPSHOT/elasticsearch-plugin-1.0-SNAPSHOT-sources.jar
- [INFO] Installing /Users/liuxg/java/plugins/elasticsearch-plugin/target/releases/elasticsearch-plugin-1.0-SNAPSHOT.zip to /Users/liuxg/.m2/repository/com/liuxg/elasticsearch-plugin/1.0-SNAPSHOT/elasticsearch-plugin-1.0-SNAPSHOT.zip
- [INFO] ------------------------------------------------------------------------
- [INFO] BUILD SUCCESS
- [INFO] ------------------------------------------------------------------------
- [INFO] Total time: 5.213 s
- [INFO] Finished at: 2022-09-08T11:50:02+08:00
- [INFO] ------------------------------------------------------------------------
编译成功后,我们可以在 target 目录先看到如下的安装文件:
- $ pwd
- /Users/liuxg/java/plugins/elasticsearch-plugin
- $ ls target/releases/
- elasticsearch-plugin-1.0-SNAPSHOT.zip
上面显示的 elasticsearch-plugin-1.0-SNAPSHOT.zip 就是我们可以安装的插件文件。
我们接下来换到 Elasticsearch 的安装目录下,并打入如下的命令:
- $ pwd
- /Users/liuxg/elastic0/elasticsearch-8.4.0
- $ bin/elasticsearch-plugin install file:Users/liuxg/java/plugins/elasticsearch-plugin/target/releases/elasticsearch-plugin-1.0-SNAPSHOT.zip
- -> Installing file:Users/liuxg/java/plugins/elasticsearch-plugin/target/releases/elasticsearch-plugin-1.0-SNAPSHOT.zip
- -> Downloading file:Users/liuxg/java/plugins/elasticsearch-plugin/target/releases/elasticsearch-plugin-1.0-SNAPSHOT.zip
- [=================================================] 100%
- -> Installed ingest
- -> Please restart Elasticsearch to activate any plugins installed
- $ ./bin/elasticsearch-plugin list
- ingest
从上面的显示中,我们可以看出来 ingest 插件已经被成功地安装。我们接下来需要重新启动 Elasticsearch。这个非常重要!
等 Elasticsearch 重新启动后,我们打开 Kibana,并使用如下的命令来进行测试:
- POST _ingest/pipeline/_simulate
- {
- "pipeline": {
- "description": "This is a test for my custom pipeline",
- "processors": [
- {
- "sample": {
- "field": "user",
- "target_field": "user_initial"
- }
- }
- ]
- },
- "docs": [
- {
- "_source": {
- "user": "xiaoguo"
- }
- },
- {
- "_source": {
- "user": "liu"
- }
- }
- ]
- }
在上面的测试中,我们的字段 user 的值分别为 xiaoguo 及 Liu。经过我们的 sample 处理器后, 结果如下:
- {
- "docs": [
- {
- "doc": {
- "_index": "_index",
- "_id": "_id",
- "_version": "-3",
- "_source": {
- "user_initial": "x",
- "user": "xiaoguo"
- },
- "_ingest": {
- "timestamp": "2022-09-08T04:00:48.922489Z"
- }
- }
- },
- {
- "doc": {
- "_index": "_index",
- "_id": "_id",
- "_version": "-3",
- "_source": {
- "user_initial": "l",
- "user": "Liu"
- },
- "_ingest": {
- "timestamp": "2022-09-08T04:00:48.922514Z"
- }
- }
- }
- ]
- }
显然第一个字母被成功提取,并且把它转换为小写字母。最终把这个字母置于我们设定的 user_initial 字段中去。
为了方便大家的学习,我把最终的代码置于仓库:https://github.com/liu-xiao-guo/es-ingest-pipeline