• flume拦截器


    flume拦截器代码

    1.依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>org.example</groupId>
        <artifactId>flume-interceptor</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.flume</groupId>
                <artifactId>flume-ng-core</artifactId>
                <version>1.9.0</version>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.62</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>2.3.2</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
                <plugin>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <configuration>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    
    
    
    </project>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    2.核心代码

    package com.atguigu.gmall.flume.log.interceptor;
    
    import com.alibaba.fastjson.JSONObject;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.nio.charset.StandardCharsets;
    import java.util.List;
    import java.util.Map;
    
    public class TimestampAndTableNameInterceptor implements Interceptor {
        @Override
        public void initialize() {
    
        }
    
        @Override
        public Event intercept(Event event) {
    
            Map<String, String> headers = event.getHeaders();
    String log = new String(event.getBody(), StandardCharsets.UTF_8);
    
     		JSONObject jsonObject = JSONObject.parseObject(log);
    
     		Long ts = jsonObject.getLong("ts");
     		//Maxwell输出的数据中的ts字段时间戳单位为秒,Flume HDFSSink要求单位为毫秒
     		String timeMills = String.valueOf(ts * 1000);
    
     		String tableName = jsonObject.getString("table");
     		String databaseName = jsonObject.getString("database");
    
     		headers.put("timestamp", timeMills);
     		headers.put("tableName", tableName);
     		headers.put("databaseName", databaseName);
    		return event;
    
        }
    
        @Override
        public List<Event> intercept(List<Event> events) {
    
            for (Event event : events) {
                intercept(event);
            }
    
            return events;
        }
    
        @Override
        public void close() {
    
        }
    
        public static class Builder implements Interceptor.Builder {
    
    
            @Override
            public Interceptor build() {
                return new TimestampAndTableNameInterceptor ();
            }
    
            @Override
            public void configure(Context context) {
    
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
  • 相关阅读:
    Git 21 天打卡:day09-day21汇总
    【短道速滑九】仿halcon中gauss_filter小半径高斯模糊优化的实现
    浏览器输入url,到看到页面会发生什么?
    浏览器提示“此网站的安全证书有问题“,你还敢继续访问吗?
    Vue2数据双向绑定的原理(Object.defineProperty)
    TensorFlow学习:使用官方模型进行图像分类、使用自己的数据对模型进行微调
    (一)shell编程
    【双指针】Leetcode 11. 盛最多水的容器
    热门Java开发工具IDEA入门指南——创建新的Java应用程序(中)
    SSM框架快速搭建(四)
  • 原文地址:https://blog.csdn.net/m0_37759590/article/details/132558110