• 数仓相关,总结


    flume:
    组件:source 、 channel 、 sink 、三个器 、碰到的问题
    ①source
    我们使用的是taildirsource,这个是apache 1.7版本才有,选择这个source的原因是taildirsource可以实时监控多个文件且有断点续传功能
    ②channel
    Channel一共有三种:filechannel、memorychannel和kafkachannel
    fileChannel是基于磁盘,io多,性能差,但是可靠性高
    Memorychannel基于内存,性能高,但是可靠性相对低,存在丢失数据的风险
    Kafkachannel是基于磁盘,可靠性高,性能还优于memorychannel + kafkasink
    我们是将数据采集到kafka,所以我们使用了kafkachannel
    ③sink
    kafkachannel可以直接将数据发送到kafka,所以我们没有使用sink。
    ④拦截器
    我们使用了etl拦截器,过滤掉不完整的josn数据
    同时还使用了分类拦截器,我们的日志分为了5类数据,启动、页面、动作、曝光和错误数据,我通过给event的header加上对应的标签,后面配合多路复用的选择器,指定不同类型的数据去到不同的topic中。??
    我们定义拦截器的步骤:
    ①自定义一个类,实现interceptor,实现4个抽象方法:分别是:初始化、关闭资源、单个event和多个event方法,
    ②创建一个内部类实现builder类,实现两个抽象方法。
    ③最后打包 -> 上传到flume 的lib包下 -> 在配置文件中添加拦截器,写上全类名$bulid类
    ⑤选择器:
    一共有两种选择器,一种是replicating,默认的选择器,每一个通道发送一份数据,另外一种是multiplexing,多路复用,根据event的header指定数据去到哪个通道,我们选择的多路复用选择器,将数据发送到kafka不同topic中。
    ⑥监控器
    我们还使用到ganglia监控器,对flume的运行状况进行监控,主要是监控flume的put和take事务,当尝试提交的次数远远大于成功提交的次数以后,我们对flume进行优化,通过配置flume-env文件,增大flume的内存,默认是2G,我们调整到4G
    同时在双十一、618等大型活动,flume采集通道也是抗不住这么大的数据量,我们通过临时购买阿里云的日志服务器,然后将flume部署到日志服务器上。

    碰到的问题
    我们遇到过flume挂掉的情况,我们当时的分析是:
    Source -> channel有put事务,channel 到sink有take事务,所以这个两个环节是没有问题的,后端是kafka,那么是暂时无法获取数据,也没有关系。
    采集数据的前端,使用的是taildirsource,它的特性是先拉取数据,再维护offset,所以不会丢失数据,可能存在重复数据,针对重复数据,我们开会分讨论是:
    可以通过增加事务的方式实现不重复数据,但我们评估这样做性能非常低,我们在hive的dwd层,可以通过groupby进行去重,也可以使用sparkSQL或者redis进行去重,所以在这里我们就没有进行处理。
    ②使用
    内部表和外部表
    hive数仓中,我们用到了内部表和外部表,两者的最大区别是:删除内部表元数据和原始数据都会被删除,而删除外部表,只会删除元数据不会删除原始数据,我自己使用的一些临时表采用内部表,其他的表基本是外部表,用来防止因误操作将原始数据删除了。

    4个by
    当然,还使用了4个by,分别是order by 、 sort by 、distribute by和cluster by。Order by 很少使用,因为是全局排序,很容易出现oom,sort by 和distribute by 一般是配合使用,分区内排序,当分区字段和排序字段相同时,可以使用cluster by 代替,不过用的比较少。

    系统函数
    在计算指标时,我们使用了各种函数,如系统函数,用next_day处理周指标,用get_json_object对log数据进行解析,还使用了开窗函数,rank 和over函数,计算topN指标

    ②碰到的问题
    在数仓的过程中,也遇到了很多问题。
    问题1:大表 和 大表
    问题2:小表和大表
    问题3:单个key数据倾斜
    问题4:多个key数据倾斜
    在数仓计算的过程中,遇到了数据倾斜的问题,当时我们发现有一个reducetask卡在99%,而其他的任务都执行完成了,第一反应可能是数据倾斜了,然后对数据进行group by 求count,发现null的数据很多,然后我们采取使用随机数将null值打散,然后对计算结果数据进行转换,去掉随机数,再进行一次聚合。这个问题解决了,
    后来我们还开启了负载均衡的功能。

    ③ 常规操作
    在hive使用的过程中,做了一些常规优化
    一是参数优化:
    ①开启mapjoin、开启map端的combiner和使用压缩
    遇到小文件时
    ①开启了merge功能:就是将任务结束时产生的多个小于16m的小文件捏合成一个256M的大文件
    ②使用combinerhiveinputformat;将多个文件捏合在一起,减少maptask的数量
    ③开启jvm重用
    也可以考虑设置缓冲区的大小,因为缓冲区越大,溢出就会越慢,小文件就会越小
    二是业务优化:
    ①创建分区表:避免全局扫描
    ②先过滤再计算
    ③列式存储:提高数据查询的速度
    这里说一下我们项目里面的压缩格式:

    ④合理设置reduce的个数:避免资源的浪费
    ⑤根据需求更换对应的计算引擎
    数仓分层:
    ods
    采集前端埋点日志表,对用户的操作动作进行行为采集
    日志表
    业务表
    dim
    ● 产品表:由于每个产品的产品属性都不一样,从产品分类,产品属性,产品模板表中关联查询, nameStruct结构体数组封装产品的相关属性
    ● 地区维度:全表导入
    ● 时间维度:提前对时间进行导入
    ● 用户维度(动态分区):
    ○ 拉链表适用于数据会发生变化,但是变化维度并不高的维度,缓慢变化维
    ○ 拉链表,记录生命周期,将每条记录添加一个字段为截止时间
    ○ 生效日期<= 日期
    ○ 结束日期>=日期
    ○ 日期为9999-99-99里面保存最新的数据,这之前的分区都属于过期日期
    ● 用户维度表的实现思路
    ○ 首日装载:将用户表的存量数据添加开始和结束时间
    ○ 每日装载:
    ■ 9999分区全外关联ods的当日分区,使用不为nvl()取出不为null的参数为最新数据
    ■ new_id和old_id都不为null的为变化数据
    ■ union all两个查询结果并动态分区插入9999分区和
    dwd

    1. 用户行为日志
      在ods表中数据为一行数据,需要通过get_json_object函数进行json字符穿的截取
      a. 启动日志表:截取start字段
      b. 页面日志表:包含公共信息和页面信息,取出所有的page字段
      c. 动作日志表:
      ⅰ. 自定义UDF:将action的json数据炸裂开,然后再通过get_json_object()获取每个字段
      ⅱ. 自定义udf的实现思路:集成GenericUDF,init方法中实现参数校验, process方法中使用将数组遍历,并使用forword()输出
      ⅲ. 创建永久函数,create function,指定jar包
      d. 曝光日志表
      e. 错误日志表:过滤err字段,然后使用get_json_object()解析json数组
    2. 业务数据
      a. 订单明细事实表(事务型事实表):关联查询 oder_info ,order_detail等有关表
      b. 退单事实表(事务型事实表):退单表关联订单信息表
      c. 收藏事实表(周期型快照事实表):全量分区
      d. 支付事实表(累计型快照事实表):订单支付表关联订单表查询得出, 由于是累计型快照事实表,每日装载采用动态分区,因为支付记录存在完成和未完成,未完成的放在9999分区,完成的放在历史分区, 使用全外联+union all
      dws
      事实表+与事实表中的维度模型中的相关维度字段的聚合值,大宽表,由dwd中事实表聚合数据
    3. 用户主题
      a. 以用户ID为主键, 通过事实表+与事实表中的维度模型中的相关维度字段的聚合值
    4. 资源池主题:
      a.
    5. 地区主题:
    6. 产品主题:
    7. 负责了用户分析部分的需求实现:
      用户统计订单时,由于订单是累计型事实表,完成的订单会在9999分区,未完成的订单会在当天的分区,所以这里过滤的时候要选创建订单的时间,完成的付款信息应该在支付事实表中,应该去除9999分区
      活跃用户统计:在dwd Page_log中统计last_page不为null 的数据,在dws中使用聚合函数统计,统计登录日期为当天的登录日期
      新增用户统计:
      用户转化率分析
      用户留存率分析: 新增用户日期分组,留存用户 / 新增用户数 ,留存用户,末次活跃日期等于用户的新增日期就是留存了, 用留存/ 新增用户数就可以了
      用户流失率分析等。

    问题及调优

  • 相关阅读:
    算法进阶-2sat-cf-1657F
    MyBatis: 向oracle表中插入null字段的处理
    SpringBoot——指标监控,自定义指标监控
    【数据结构】栈和队列
    第八章用matplotlib、seaborn、pyecharts绘制散点图
    代码随想录算法训练营第四十九天| 123.买卖股票的最佳时机III 188.买卖股票的最佳时机IV
    【洛谷 P1596】[USACO10OCT] Lake Counting S 题解(深度优先搜索)
    SpringBoot+Vue项目校园博客系统
    SQL Server入门-SSMS简单使用(2008R2版)-2
    万字长文详解HBase读写性能优化
  • 原文地址:https://blog.csdn.net/m0_46914845/article/details/126163725