• Flume学习笔记:03-自定义拦截器


    概述

    Flume自定义拦截器可以帮助我们实现一些简单的ETL工作、比如过滤掉一些脏数据,转换数据的结构或者添加一些额外的信息。下面我将写一个自定义的拦截器示例,实现数据的过滤操作。

    代码实现

    需求描述

    在拦截器中过滤掉不是json格式的数据,并且在Event的header中添加数据的时间戳。

    1.配置pom.xml

    
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>SparkAppartifactId>
            <groupId>com.hjt.yxh.hwgroupId>
            <version>1.0-SNAPSHOTversion>
        parent>
        <modelVersion>4.0.0modelVersion>
    
        <artifactId>FlumeInterceptorsartifactId>
        <dependencies>
            
            <dependency>
                <groupId>org.apache.flumegroupId>
                <artifactId>flume-ng-coreartifactId>
                <version>1.10.1version>
                <scope>providedscope>
            dependency>
    
            <dependency>
                <groupId>com.alibaba.fastjson2groupId>
                <artifactId>fastjson2artifactId>
                <version>2.0.12version>
            dependency>
        dependencies>
        <build>
            <plugins>
                <plugin>
                    <artifactId>maven-compiler-pluginartifactId>
                    <version>2.3.2version>
                    <configuration>
                        <source>1.8source>
                        <target>1.8target>
                    configuration>
                plugin>
    
                <plugin>
                    <artifactId>maven-assembly-pluginartifactId>
                    <configuration>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependenciesdescriptorRef>
                        descriptorRefs>
                    configuration>
                    <executions>
                        <execution>
                            <id>make-assemblyid>
                            <phase>packagephase>
                            <goals>
                                <goal>singlegoal>
                            goals>
                        execution>
                    executions>
                plugin>
    
            plugins>
        build>
    
    project>
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    2.代码实现

    package com.hjt.yxh.hw.interceptors;
    
    import com.hjt.yxh.hw.utils.JSONUtils;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.nio.charset.StandardCharsets;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    
    public class ETLInterceptor implements Interceptor {
    
        private static Logger logger = LoggerFactory.getLogger(ETLInterceptor.class);
        private String header;
        private String value;
    
        private ETLInterceptor(String header,String operator_value){
            this.header = header;
            this.value = operator_value;
        }
    
        @Override
        public void initialize() {
    
        }
    
        @Override
        public Event intercept(Event event) {
            String log = new String(event.getBody(), StandardCharsets.UTF_8);
            if (JSONUtils.isJsonStr(log)){
                Map<String, String> headers = event.getHeaders();
                headers.put(this.header,this.value);
                logger.info(log);
                return event;
            }
            return null;
        }
    
        @Override
        public List<Event> intercept(List<Event> list) {
            List<Event> retList = new ArrayList<Event>();
            for (Event event : list) {
                if (intercept(event)!=null){
                    retList.add(event);
                }
            }
            return retList;
        }
    
        @Override
        public void close() {
    
        }
    
        public static class Builder implements Interceptor.Builder{
            private String header;
            private String operator_value;
    
            /*
            * 用于创建拦截器对象
            *
            * */
            @Override
            public Interceptor build() {
                return new ETLInterceptor(header,operator_value);
            }
    
            /*
            *
            * 可以实现通过flume job的配置文件进行参数传递
            *
            * */
            @Override
            public void configure(Context context) {
                header = context.getString("header","operator_type");
                operator_value = context.getString("operator_value","ADD");
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83

    在代码中实现自定义拦截器的步骤

    • 定义一个类实现org.apache.flume.interceptor.Interceptor接口

    • 实现一个内部静态类Builder,继承Interceptor.Builder接口,configure方法和build方法。

    3.打成jar包,上传到Flume

    打成jar包,上传到flume的安装目录下的lib目录中。

    Flume job的配置

    #声明source、channel、sink变量名
    a2.sources = r1
    a2.channels = c1
    
    #定义source的信息
    a2.sources.r1.type=TAILDIR
    a2.sources.r1.filegroups=fg1
    a2.sources.r1.filegroups.fg1=/opt/module/applog/log/app.*
    a2.sources.r1.positionFile = /opt/module/flume/taildir_position.json
    a2.sources.r1.interceptors = i1
    
    # 给自定义的interceptor传递参数
    a2.sources.r1.interceptors.i1.header=operator_type
    a2.sources.r1.interceptors.i1.operator_value=delete
    a2.sources.r1.interceptors.i1.type =com.hjt.yxh.hw.interceptors.ETLInterceptor$Builder
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    Tips:要写类的完整路径,并在后面跟上$Builder。

    如果有多个Interceptor,则可以按如下配置

    #声明source、channel、sink变量名
    a2.sources = r1
    a2.channels = c1
    
    #定义source的信息
    a2.sources.r1.type=TAILDIR
    a2.sources.r1.filegroups=fg1
    a2.sources.r1.filegroups.fg1=/opt/module/applog/log/app.*
    a2.sources.r1.positionFile = /opt/module/flume/taildir_position.json
    a2.sources.r1.interceptors = i1 i2
    a2.sources.r1.interceptors.i1.type =com.hjt.yxh.hw.interceptors.ETLInterceptor$Builder
    
    a2.sources.r1.interceptors.i2.type =com.hjt.yxh.hw.interceptors.ETLInterceptor2$Builder
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    上述配置完成后,就可以执行flume agent进行测试了。

    [root@k8s-node3 apache-flume-1.10.1-bin]# flume-ng agent -f job/flume-file-kafka.conf -n a2
    
    • 1
  • 相关阅读:
    将可遍历对象转换为(索引,值)序列 enumerate() 函数
    thinkphp withJoin 模式下field 无效
    如何提升速卖通店铺流量
    使用spring boot集成shardingsphere分库分表简易测试
    VideoMAE
    30行Python极简代码,10分钟get常用技巧
    ECharts 实例2
    双硬盘WIN+UBUNTU 系统, 重装UNBUNTU20.14系统后无法启动, 进入进入grub rescue
    设计模式-12-策略模式
    使用函数验证哥德巴赫猜想
  • 原文地址:https://blog.csdn.net/wangzhongyudie/article/details/127411944