Flume自定义拦截器可以帮助我们实现一些简单的ETL工作、比如过滤掉一些脏数据,转换数据的结构或者添加一些额外的信息。下面我将写一个自定义的拦截器示例,实现数据的过滤操作。
在拦截器中过滤掉不是json格式的数据,并且在Event的header中添加数据的时间戳。
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>SparkAppartifactId>
<groupId>com.hjt.yxh.hwgroupId>
<version>1.0-SNAPSHOTversion>
parent>
<modelVersion>4.0.0modelVersion>
<artifactId>FlumeInterceptorsartifactId>
<dependencies>
<dependency>
<groupId>org.apache.flumegroupId>
<artifactId>flume-ng-coreartifactId>
<version>1.10.1version>
<scope>providedscope>
dependency>
<dependency>
<groupId>com.alibaba.fastjson2groupId>
<artifactId>fastjson2artifactId>
<version>2.0.12version>
dependency>
dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-pluginartifactId>
<version>2.3.2version>
<configuration>
<source>1.8source>
<target>1.8target>
configuration>
plugin>
<plugin>
<artifactId>maven-assembly-pluginartifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependenciesdescriptorRef>
descriptorRefs>
configuration>
<executions>
<execution>
<id>make-assemblyid>
<phase>packagephase>
<goals>
<goal>singlegoal>
goals>
execution>
executions>
plugin>
plugins>
build>
project>
package com.hjt.yxh.hw.interceptors;
import com.hjt.yxh.hw.utils.JSONUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class ETLInterceptor implements Interceptor {
private static Logger logger = LoggerFactory.getLogger(ETLInterceptor.class);
private String header;
private String value;
private ETLInterceptor(String header,String operator_value){
this.header = header;
this.value = operator_value;
}
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
String log = new String(event.getBody(), StandardCharsets.UTF_8);
if (JSONUtils.isJsonStr(log)){
Map<String, String> headers = event.getHeaders();
headers.put(this.header,this.value);
logger.info(log);
return event;
}
return null;
}
@Override
public List<Event> intercept(List<Event> list) {
List<Event> retList = new ArrayList<Event>();
for (Event event : list) {
if (intercept(event)!=null){
retList.add(event);
}
}
return retList;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
private String header;
private String operator_value;
/*
* 用于创建拦截器对象
*
* */
@Override
public Interceptor build() {
return new ETLInterceptor(header,operator_value);
}
/*
*
* 可以实现通过flume job的配置文件进行参数传递
*
* */
@Override
public void configure(Context context) {
header = context.getString("header","operator_type");
operator_value = context.getString("operator_value","ADD");
}
}
}
在代码中实现自定义拦截器的步骤
定义一个类实现org.apache.flume.interceptor.Interceptor接口
实现一个内部静态类Builder,继承Interceptor.Builder接口,configure方法和build方法。
打成jar包,上传到flume的安装目录下的lib目录中。
#声明source、channel、sink变量名
a2.sources = r1
a2.channels = c1
#定义source的信息
a2.sources.r1.type=TAILDIR
a2.sources.r1.filegroups=fg1
a2.sources.r1.filegroups.fg1=/opt/module/applog/log/app.*
a2.sources.r1.positionFile = /opt/module/flume/taildir_position.json
a2.sources.r1.interceptors = i1
# 给自定义的interceptor传递参数
a2.sources.r1.interceptors.i1.header=operator_type
a2.sources.r1.interceptors.i1.operator_value=delete
a2.sources.r1.interceptors.i1.type =com.hjt.yxh.hw.interceptors.ETLInterceptor$Builder
Tips:要写类的完整路径,并在后面跟上$Builder。
如果有多个Interceptor,则可以按如下配置
#声明source、channel、sink变量名
a2.sources = r1
a2.channels = c1
#定义source的信息
a2.sources.r1.type=TAILDIR
a2.sources.r1.filegroups=fg1
a2.sources.r1.filegroups.fg1=/opt/module/applog/log/app.*
a2.sources.r1.positionFile = /opt/module/flume/taildir_position.json
a2.sources.r1.interceptors = i1 i2
a2.sources.r1.interceptors.i1.type =com.hjt.yxh.hw.interceptors.ETLInterceptor$Builder
a2.sources.r1.interceptors.i2.type =com.hjt.yxh.hw.interceptors.ETLInterceptor2$Builder
上述配置完成后,就可以执行flume agent进行测试了。
[root@k8s-node3 apache-flume-1.10.1-bin]# flume-ng agent -f job/flume-file-kafka.conf -n a2