• 大数据项目之电商数仓、实时数仓同步数据、离线数仓同步数据、用户行为数据同步、日志消费Flume配置实操、日志消费Flume测试、日志消费Flume启停脚本


    8. 实时数仓同步数据

      实时数仓由Flink源源不断从Kafka当中读数据计算,所以不需要手动同步数据到实时数仓。

    9. 离线数仓同步数据

    9.1 用户行为数据同步

    9.1.1 数据通道

      用户行为数据由Flume从Kafka直接同步到HDFS,由于离线数仓采用Hive的分区表按天统计,所以目标路径要包含一层日期。具体数据流向如下图所示。

    9.1.1.1 用户行为数据通道

    在这里插入图片描述

    9.1.2 日志消费Flume配置概述

      按照规划,该Flume需将Kafka中topic_log的数据发往HDFS。并且对每天产生的用户行为日志进行区分,将不同天的数据发往HDFS不同天的路径。
    此处选择KafkaSource、FileChannel、HDFSSink。
    关键配置如下:

    9.1.2.1 日志消费Flume关键配置

    在这里插入图片描述

    9.1.3 日志消费Flume配置实操

    9.1.3.1 创建Flume配置文件

    在这里插入图片描述
    先将里面的文件删除。

    9.1.3.2 配置文件内容如下
    # 定义组件
    a1.sources=r1
    a1.channels=c1
    a1.sinks=k1
    
    
    # 配置source1
    # 连接Kafka集群
    a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092  
    # 消费者从topic_log读数据
    a1.sources.r1.kafka.topics = topic_log 
    # 给消费者设置一个唯一标识,也就是消费者组ID,建议配置和业务一样的,如果用默认的,有多个消费者,则会有的消费者可以消费到数据,有些消费者消费不了数据
    a1.sources.r1.kafka.consumer.group.id = topic_log 
    # 一次最大传输多少数据,数据满了就传输
    a1.sources.r1.batchSize = 2000   
    # 多久传输一次,时间到了就传输,不管数据到2000了没有,时间一到就开始传输数据。
    a1.sources.r1.batchDurationMillis = 1000
    # 拦截器
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = com.summer.gmall.flume.interceptor.TimestampInterceptor$Builder
    
    
    # 配置channel
    a1.channels.c1.type = file
    # 这个是Flume的第一次备份路径
    a1.channels.c1.checkpointDir = /opt/module/flume-1.9.0/checkpoint/behavior1  
    # 打开二次Flume的二次备份,一般情况这个都是关闭状态,要是想打开的话改为true即可,然后配置backupCheckpointDir 的路径,这个路径要和第一次的路径放在不同的磁盘上
    a1.channels.c1.useDualCheckpoints = false
    # 这个是Flume的第二次备份路径
    # a1.channels.c1.backupCheckpointDir = /opt/module/flume-1.9.0/checkpoint/behavior2
    # 是flume的多目录存储,可以将服务器存在多个磁盘上,目前我只有一个磁盘,因此只能配置一个磁盘。和Hadoop的datadirs是一个意思。 
     a1.channels.c1.dataDirs = /opt/module/flume-1.9.0/data/behavior1  
     # file channel写入文件,这个文件最大可以容纳2G内容
     a1.channels.c1.maxFileSize = 2146435071
    # 是file channel条数的限制,最大是1000000 
     a1.channels.c1.capacity = 1000000
    # 等一会,如果还没有足够的空间,还是会回滚
     a1.channels.c1.keep-alive = 3
    
     
    # 配置sink
     a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
    a1.sinks.k1.hdfs.filePrefix = log
    # 如果想配置6个小时落一次盘的话,可以将round打开,将hdfs.roundValue这个参数设置为6,hdfs.roundUnit这个参数设置为hour
    # 并且a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d/%h要多加一个%h
    a1.sinks.k1.hdfs.round = false
    
    # 解决数据漂移问题
    # 多少秒后滚动一次
    a1.sinks.k1.hdfs.rollInterval = 10
    # 文件大小超过设置值后滚动一次
    a1.sinks.k1.hdfs.rollSize = 134217728
    # event的条数,当设置多少条后,就是多少条后滚动一次
    a1.sinks.k1.hdfs.rollCount = 0
    
    #控制输出文件类型
    a1.sinks.k1.hdfs.fileType = CompressedStream
    a1.sinks.k1.hdfs.codeC = gzip
    
    #组装 
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64

    在这里插入图片描述

    hdfs.roundfalseShould the timestamp be rounded down (if true, affects all time based escape sequences except %t)
    hdfs.roundValue1Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than current time.
    hdfs.roundUnitsecondThe unit of the round down value - second, minute or hour.

    在这里插入图片描述

    9.1.3.2.1 配置优化
    9.1.3.2.1.1 FileChannel优化

      通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。
    官方说明如下:

    Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance
    
    • 1

    checkpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据

    9.1.3.2.1.2 HDFS Sink优化

    1.HDFS存入大量小文件,有什么影响?
    元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命
    计算层面:默认情况下MR会对每个小文件启用一个Map任务计算,非常影响计算性能。同时也影响磁盘寻址时间。
    2.HDFS小文件处理
    官方默认的这三个参数配置写入HDFS后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount
    基于以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0几个参数综合作用,效果如下:
    (1)文件在达到128M时会滚动生成新文件
    (2)文件创建超3600秒时会滚动生成新文件

    9.1.3.3 编写Flume拦截器
    9.1.3.3.1 数据漂移问题

    在这里插入图片描述

    9.1.3.3.2 在com.summer.gmall.flume.interceptor包下创建TimestampInterceptor类

    在这里插入图片描述

    package com.summer.gmall.flume.interceptor;
    
    import com.alibaba.fastjson.JSONObject;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.nio.charset.StandardCharsets;
    import java.util.List;
    import java.util.Map;
    
    /**
     * @author Redamancy
     * @create 2022-10-29 19:54
     */
    public class TimestampInterceptor implements Interceptor {
        @Override
        public void initialize() {
            
        }
    
        @Override
        public Event intercept(Event event) {
            //1 获取header和body的数据
            Map<String, String> headers = event.getHeaders();
            byte[] body = event.getBody();
            String log = new String(body, StandardCharsets.UTF_8);
            
            //2 将body的数据类型转成jsonObject类型(方便获取数据)
            JSONObject jsonObject = JSONObject.parseObject(log);
            
            //3 header中timestamp时间字段替换成日志生产的时间戳(解决数据漂移问题)
            String ts = jsonObject.getString("ts");
            headers.put("timestamp", ts);
            return event;
        }
    
        @Override
        public List<Event> intercept(List<Event> list) {
            for (Event event : list) {
                intercept(event);
            }
            return list;
        }
    
        @Override
        public void close() {
    
        }
        public static class Builder implements Interceptor.Builder {
    
            @Override
            public Interceptor build() {
                return new TimestampInterceptor();
            }
    
            @Override
            public void configure(Context context) {
    
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    9.1.3.3.3 重新打包

    在这里插入图片描述
    先clean再package

    9.1.3.3.4 需要先将打好的包放入到hadoop104的/opt/module/flume-1.9.0/lib文件夹下面

    需要先在这个目录下面看看有没有这个名字的jar包,如果有则先删除rm,然后再将写的jar上传到这个目录下面,如果不先删除该jar包,则上传的jar名字后面会加个.0,jar包也不会运行起来。

    在这里插入图片描述
    在这里插入图片描述

    9.1.4 日志消费Flume测试

    9.1.4.1 启动Zookeeper、Kafka集群

    在这里插入图片描述

    9.1.4.2 启动日志采集Flume
    [summer@hadoop102 module]$ f1.sh start
    
    • 1

    在这里插入图片描述

    9.1.4.3 启动hadoop104的日志消费Flume
    [summer@hadoop104 flume-1.9.0]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_log.conf -Dflume.root.logger=info,console
    
    • 1

    在这里插入图片描述

    9.1.4.4 生成模拟数据
    [summer@hadoop102 ~]$ lg.sh
    
    • 1

    在这里插入图片描述

    9.1.4.5 观察HDFS是否出现数据

    生成模拟数据之前
    在这里插入图片描述生成模拟数据之后
    在这里插入图片描述

    在这里插入图片描述

    9.1.5 日志消费Flume启停脚本

      若上述测试通过,为方便,此处创建一个Flume的启停脚本。

    9.1.5.1 在hadoop102节点的/home/summer/bin目录下创建脚本f2.sh
    [summer@hadoop102 bin]$ vim f2.sh
    
    • 1

    在脚本中填写如下内容
    在这里插入图片描述

    #!/bin/bash
    
    case $1 in
    "start")
            echo " --------启动 hadoop104 下游flume-------"
            ssh hadoop104 "nohup /opt/module/flume-1.9.0/bin/flume-ng agent -n a1 -c /opt/module/flume-1.9.0/conf -f /opt/module/flume-1.9.0/job/kafka_to_hdfs_log.conf >/dev/null 2>&1 &"
    ;;
    "stop")
    
            echo " --------停止 hadoop104 下游flume-------"
            ssh hadoop104 "ps -ef | grep kafka_to_hdfs_log | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
    ;;
    esac
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    9.1.5.2 增加脚本执行权限
    [summer@hadoop102 bin]$ chmod 777 f2.sh 
    
    • 1
    9.1.5.3 f2启动
    [summer@hadoop102 bin]$ f2.sh start
    
    • 1
    9.1.5.4 f2停止
    [summer@hadoop102 bin]$ f2.sh stop
    
    • 1
  • 相关阅读:
    js函数柯里化-面试手写版
    超火的低代码平台长什么样
    Linux中的开发工具(yum,vim,gcc/g++,gdb,Makefile,git)
    [每周一更]-(第73期):介绍容器监控工具-CAdvisor
    策略模式应用
    L2t*+NPS
    根据ip及子网掩码得出网段
    Kubernetes 进阶训练营 网络
    ElasticSearch ES 安装 常见错误 Kibana安装 设置 权限 密码
    线程退出学习
  • 原文地址:https://blog.csdn.net/Redamancy06/article/details/127590967