• flume的安装配置笔记


    1 Flume入门

    1.1 Flume安装部署

    1.1.1 安装地址

    (1) Flume官网地址:http://flume.apache.org/
    (2)文档查看地址:http://flume.apache.org/FlumeUserGuide.html
    (3)下载地址:http://archive.apache.org/dist/flume/

    1.1.2 安装部署

    (1)将apache-flume-1.9.0-bin.tar.gz上传到linux的/opt/software目录下

    (2)解压apache-flume-1.9.0-bin.tar.gz到/opt/module/目录下

    [atguigu@hadoop102 software]$ tar -zxf
    /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/

    (3)修改apache-flume-1.9.0-bin的名称为flume

    [atguigu@hadoop102 module]$ mv /opt/module/apache-flume-1.9.0-bin
    /opt/module/flume

    (4)将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.3

    [atguigu@hadoop102 module]$ rm /opt/module/flume/lib/guava-11.0.2.jar

    注意:删除guava-11.0.2.jar的服务器节点,一定要配置hadoop环境变量。否则会报如下异常。

    Caused by: java.lang.ClassNotFoundException:
    com.google.common.collect.Lists
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    … 1 more

    (5)将flume/conf下的flume-env.sh.template文件修改为flume-env.sh,并配置flume-env.sh文件

    [atguigu@hadoop102 conf]$ mv flume-env.sh.template flume-env.sh
    [atguigu@hadoop102 conf]$ vi flume-env.sh export
    JAVA_HOME=/opt/module/jdk1.8.0_212

    1.2 项目经验之Flume组件选型

    1)Source

    (1)Taildir Source相比Exec Source、Spooling Directory Source的优势

    TailDir
    Source:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。不会丢数据,但是有可能会导致数据重复。

    Exec Source可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。

    Spooling Directory Source监控目录,支持断点续传。

    (2)batchSize大小如何设置?
    答:Event 1K左右时,500-1000合适(默认为100)

    2)Channel

    采用Kafka Channel,省去了Sink,提高了效率。KafkaChannel数据存储在Kafka里面,所以数据是存储在磁盘中。

    注意: 在Flume1.7以前,Kafka Channel很少有人使用,因为发现parseAsFlumeEvent这个配置起不了作用。也就是无论parseAsFlumeEvent配置为true还是false,都会转为Flume Event。这样的话,造成的结果是,会始终都把Flume的headers中的信息混合着内容一起写入Kafka的消息中,这显然不是我所需要的,我只是需要把内容写入即可。

    2 Flume日志采集

    1.3 日志采集Flume配置

    1)Flume配置分析

    (1)在/opt/module/flume/conf目录下创建file-flume-kafka.conf文件 (文件名最好望文知意)

    [atguigu@hadoop102 conf]$ vim file-flume-kafka.conf

    在文件配置如下内容

    ##
    # 为各组件命名
    ##
    
    a1.sources = r1
    a1.channels = c1
    
    ##
    # 描述source
    ##
    a1.sources.r1.type = TAILDIR
    a1.sources.r1.filegroups = f1
    # flume采集日志的目录
    a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
    # 断点续传文件所在目录
    a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
    
    # 配置拦截器(需要对数据进行拦截时自定义拦截器并在此处配置)
    # a1.sources.r1.interceptors =  i1
    # a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.ETLInterceptor$Builder
    
    ##
    #描述channel
    ##
    # 固定写法kafka的全类名
    a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
    # kafka所在节点
    a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
    #采集数据传输在哪个topic下
    a1.channels.c1.kafka.topic = topic_log
    #是否事件传输
    a1.channels.c1.parseAsFlumeEvent = false
    
    #绑定source和channel以及sink和channel的关系
    a1.sources.r1.channels = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    注意: com.atguigu.flume.interceptor.ETLInterceptor是自定义的拦截器的全类名。需要根据用户自定义的拦截器做相应修改。

    1.4 Flume拦截器

    1)创建Maven工程flume-interceptor

    2)创建包名:com.atguigu.flume.interceptor

    3)在pom.xml文件中添加如下配置

    
        
            org.apache.flume</groupId>
            flume-ng-core</artifactId>
            1.9.0</version>
            provided</scope>
        </dependency>
    
        
            com.alibaba</groupId>
            fastjson</artifactId>
            1.2.62</version>
        </dependency>
    </dependencies>
    
    
        
            
                maven-compiler-plugin</artifactId>
                2.3.2</version>
                
                    1.8</source>
                    1.8</target>
                </configuration>
            </plugin>
            
                maven-assembly-plugin</artifactId>
                
                    
                        jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                
                    
                        make-assembly</id>
                        package</phase>
                        
                            single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    注意: scope中provided的含义是编译时用该jar包。打包时时不用。因为集群上已经存在flume的jar包。只是本地编译时用一下。

    4)在com.atguigu.flume.interceptor包下创建JSONUtils类

    package com.atguigu.flume.interceptor;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONException;
    
    public class JSONUtils {
        public static boolean isJSONValidate(String log){
            try {
                JSON.parse(log);
                return true;
            }catch (JSONException e){
                return false;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    5)在com.atguigu.flume.interceptor包下创建LogInterceptor类

    package com.atguigu.flume.interceptor;
    
    import com.alibaba.fastjson.JSON;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.nio.charset.StandardCharsets;
    import java.util.Iterator;
    import java.util.List;
    
    public class ETLInterceptor implements Interceptor {
    
        @Override
        public void initialize() {
    
        }
    
        @Override
        public Event intercept(Event event) {
    
            byte[] body = event.getBody();
            String log = new String(body, StandardCharsets.UTF_8);
    
            if (JSONUtils.isJSONValidate(log)) {
                return event;
            } else {
                return null;
            }
        }
    
        @Override
        public List<Event> intercept(List<Event> list) {
    
            Iterator<Event> iterator = list.iterator();
    
            while (iterator.hasNext()){
                Event next = iterator.next();
                if(intercept(next)==null){
                    iterator.remove();
                }
            }
    
            return list;
        }
    
        public static class Builder implements Interceptor.Builder{
    
            @Override
            public Interceptor build() {
                return new ETLInterceptor();
            }
            @Override
            public void configure(Context context) {
    
            }
    
        }
    
        @Override
        public void close() {
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64

    6)打包
    在这里插入图片描述
    7)需要先将打好的包放入到hadoop102的/opt/module/flume/lib文件夹下面。

    [atguigu@hadoop102 lib]$ ls | grep interceptor
    flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar
    
    • 1
    • 2

    8)分发Flume到hadoop103、hadoop104

    [atguigu@hadoop102 module]$ xsync flume/
    
    • 1

    9)分别在hadoop102、hadoop103上启动Flume

    [atguigu@hadoop102 flume]$ bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &
    
    [atguigu@hadoop103 flume]$ bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &
    
    • 1
    • 2
    • 3

    1.5 日志采集Flume启动停止脚本

    (1)在/home/atguigu/bin目录下创建脚本f1.sh

    [atguigu@hadoop102 bin]$ vim f1.sh

    在脚本中填写如下内容

    #! /bin/bash
    
    case $1 in
    "start"){
            for i in hadoop102 hadoop103
            do
                    echo " --------启动 $i 采集flume-------"
                    ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log1.txt 2>&1  &"
            done
    };;	
    "stop"){
            for i in hadoop102 hadoop103
            do
                    echo " --------停止 $i 采集flume-------"
                    ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk  '{print \$2}' | xargs -n1 kill -9 "
            done
    
    };;
    esac
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    说明1: nohup,该命令可以在你退出帐户/关闭终端之后继续运行相应的进程。nohup就是不挂起的意思,不挂断地运行命令。
    说明2: awk 默认分隔符为空格
    说明3: $2是在“”双引号内部会被解析为脚本的第二个参数,但是这里面想表达的含义是awk的第二个值,所以需要将他转义,用$2表示。
    说明4: xargs 表示取出前面命令运行的结果,作为后面命令的输入参数。

    (2)增加脚本执行权限

    [atguigu@hadoop102 bin]$ chmod u+x f1.sh
    
    • 1

    (3)f1集群启动脚本

    [atguigu@hadoop102 module]$ f1.sh start
    
    • 1

    (4)f1集群停止脚本

    [atguigu@hadoop102 module]$ f1.sh stop
    
    • 1

    3 Flume消费kafka数据

    1.6 Flume消费kafka数据配置

    1)Flume的具体配置如下:
    (1)在hadoop104的/opt/module/flume/conf目录下创建kafka-flume-hdfs.conf文件

    [atguigu@hadoop104 conf]$ vim kafka-flume-hdfs.conf
    
    • 1

    在文件配置如下内容

    ## 组件
    a1.sources=r1
    a1.channels=c1
    a1.sinks=k1
    
    ## source1
    a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    ## 一次消费5000条数据
    a1.sources.r1.batchSize = 5000
    ## 消费不到5000条到达2000毫秒就可拉取数据
    a1.sources.r1.batchDurationMillis = 2000
    ## 消费的kafka节点端口
    a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
    ##消费的主题
    a1.sources.r1.kafka.topics=topic_log 
    
    ##时间戳拦截器(解决0点漂移问题)
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.TimeStampInterceptor$Builder
    
    ## channel1
    a1.channels.c1.type = file
    ## 索引文件存储路径
    a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
    ## 数据存储在磁盘的目录
    a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/
    
    
    ## sink1
    a1.sinks.k1.type = hdfs
    ## 采集数据在hdfs路径
    a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
    ## 在hdfs产生文件前缀
    a1.sinks.k1.hdfs.filePrefix = log-
    ## 默认按照天进行时间滚动生成文件夹
    a1.sinks.k1.hdfs.round = false
    
    #控制生成的小文件
    a1.sinks.k1.hdfs.rollInterval = 10
    a1.sinks.k1.hdfs.rollSize = 134217728
    a1.sinks.k1.hdfs.rollCount = 0
    
    ## 控制输出文件是原生文件。
    ## 输出文件压缩
    a1.sinks.k1.hdfs.fileType = CompressedStream
    ## 压缩文件的方式
    a1.sinks.k1.hdfs.codeC = lzop
    
    ## 拼装
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel= c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    1.7 Flume时间戳拦截器(解决零点漂移问题)

    由于Flume默认会用Linux系统时间,作为输出到HDFS路径的时间。如果数据是23:59分产生的。Flume消费Kafka里面的数据时,有可能已经是第二天了,那么这部门数据会被发往第二天的HDFS路径。我们希望的是根据日志里面的实际时间,发往HDFS的路径,所以下面拦截器作用是获取日志中的实际时间。

    解决的思路:拦截json日志,通过fastjson框架解析json,获取实际时间ts。将获取的ts时间写入拦截器header头,header的key必须是timestamp,因为Flume框架会根据这个key的值识别为时间,写入到HDFS。

    package com.atguigu.flume.interceptor;
    
    import com.alibaba.fastjson.JSONObject;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.nio.charset.StandardCharsets;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    
    public class TimeStampInterceptor implements Interceptor {
    
        private ArrayList<Event> events = new ArrayList<>();
    
        @Override
        public void initialize() {
    
        }
    
        @Override
        public Event intercept(Event event) {
    
            Map<String, String> headers = event.getHeaders();
            String log = new String(event.getBody(), StandardCharsets.UTF_8);
    
            JSONObject jsonObject = JSONObject.parseObject(log);
    
            String ts = jsonObject.getString("ts");
            headers.put("timestamp", ts);
    
            return event;
        }
    
        @Override
        public List<Event> intercept(List<Event> list) {
            events.clear();
            for (Event event : list) {
                events.add(intercept(event));
            }
    
            return events;
        }
    
        @Override
        public void close() {
    
        }
    
        public static class Builder implements Interceptor.Builder {
            @Override
            public Interceptor build() {
                return new TimeStampInterceptor();
            }
    
            @Override
            public void configure(Context context) {
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    2)重新打包
    在这里插入图片描述

    3)需要先将打好的包放入到hadoop102的/opt/module/flume/lib文件夹下面。

    [atguigu@hadoop102 lib]$ ls | grep interceptor
    
    • 1

    flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar
    4)分发Flume到hadoop103、hadoop104

    [atguigu@hadoop102 module]$ xsync flume/
    
    • 1

    1)在com.atguigu.flume.interceptor包下创建TimeStampInterceptor类

    1.8 测试Flume-Kafka通道

    (1)生成日志(此处生成的日志由java编写,可用替换为任意类型日志)

    [atguigu@hadoop102 ~]$ lg.sh
    
    • 1

    (2)消费Kafka数据,观察控制台是否有数据获取到

    [atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
    --bootstrap-server hadoop102:9092 --from-beginning --topic topic_log
    
    • 1
    • 2

    说明: 如果获取不到数据,先检查Kafka、Flume、Zookeeper是否都正确启动。再检查Flume的拦截器代码是否正常。

    1.9 消费者Flume启动停止脚本

    (1)在/home/atguigu/bin目录下创建脚本f2.sh

    [atguigu@hadoop102 bin]$ vim f2.sh
    
    • 1

    在脚本中填写如下内容

    #! /bin/bash
    
    case $1 in
    "start"){
            for i in hadoop104
            do
                    echo " --------启动 $i 消费flume-------"
                    ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log2.txt   2>&1 &"
            done
    };;
    "stop"){
            for i in hadoop104
            do
                    echo " --------停止 $i 消费flume-------"
                    ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
            done
    
    };;
    esac
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    (2)增加脚本执行权限

    [atguigu@hadoop102 bin]$ chmod u+x f2.sh
    
    • 1

    (3)f2集群启动脚本

    [atguigu@hadoop102 module]$ f2.sh start
    
    • 1

    (4)f2集群停止脚本

    [atguigu@hadoop102 module]$ f2.sh stop
    
    • 1
  • 相关阅读:
    这一篇让你搞定 Flutter 的数据表格
    搜索技术——群智能
    【SLAM论文阅读笔记】Multi-modal Semantic SLAM for Complex Dynamic Environments
    如何快速用一条命令配置好本地yum源(6/7/8版本)
    Wireshark TS | 访问网页失败
    springboot 调用第三方接口的方式(一)使用RestTemplate方法
    176. 第二高的薪水
    【小沐学NLP】关联规则分析Apriori算法(Mlxtend库,Python)
    一比一还原axios源码(二)—— 请求响应处理
    MySQL之查询性能优化(十三)
  • 原文地址:https://blog.csdn.net/weixin_44616592/article/details/127791671