flume:
组件:source 、 channel 、 sink 、三个器 、碰到的问题
①source
我们使用的是taildirsource,这个是apache 1.7版本才有,选择这个source的原因是taildirsource可以实时监控多个文件且有断点续传功能
②channel
Channel一共有三种:filechannel、memorychannel和kafkachannel
fileChannel是基于磁盘,io多,性能差,但是可靠性高
Memorychannel基于内存,性能高,但是可靠性相对低,存在丢失数据的风险
Kafkachannel是基于磁盘,可靠性高,性能还优于memorychannel + kafkasink
我们是将数据采集到kafka,所以我们使用了kafkachannel
③sink
kafkachannel可以直接将数据发送到kafka,所以我们没有使用sink。
④拦截器
我们使用了etl拦截器,过滤掉不完整的josn数据
同时还使用了分类拦截器,我们的日志分为了5类数据,启动、页面、动作、曝光和错误数据,我通过给event的header加上对应的标签,后面配合多路复用的选择器,指定不同类型的数据去到不同的topic中。??
我们定义拦截器的步骤:
①自定义一个类,实现interceptor,实现4个抽象方法:分别是:初始化、关闭资源、单个event和多个event方法,
②创建一个内部类实现builder类,实现两个抽象方法。
③最后打包 -> 上传到flume 的lib包下 -> 在配置文件中添加拦截器,写上全类名$bulid类
⑤选择器:
一共有两种选择器,一种是replicating,默认的选择器,每一个通道发送一份数据,另外一种是multiplexing,多路复用,根据event的header指定数据去到哪个通道,我们选择的多路复用选择器,将数据发送到kafka不同topic中。
⑥监控器
我们还使用到ganglia监控器,对flume的运行状况进行监控,主要是监控flume的put和take事务,当尝试提交的次数远远大于成功提交的次数以后,我们对flume进行优化,通过配置flume-env文件,增大flume的内存,默认是2G,我们调整到4G
同时在双十一、618等大型活动,flume采集通道也是抗不住这么大的数据量,我们通过临时购买阿里云的日志服务器,然后将flume部署到日志服务器上。
碰到的问题
我们遇到过flume挂掉的情况,我们当时的分析是:
Source -> channel有put事务,channel 到sink有take事务,所以这个两个环节是没有问题的,后端是kafka,那么是暂时无法获取数据,也没有关系。
采集数据的前端,使用的是taildirsource,它的特性是先拉取数据,再维护offset,所以不会丢失数据,可能存在重复数据,针对重复数据,我们开会分讨论是:
可以通过增加事务的方式实现不重复数据,但我们评估这样做性能非常低,我们在hive的dwd层,可以通过groupby进行去重,也可以使用sparkSQL或者redis进行去重,所以在这里我们就没有进行处理。
②使用
内部表和外部表
hive数仓中,我们用到了内部表和外部表,两者的最大区别是:删除内部表元数据和原始数据都会被删除,而删除外部表,只会删除元数据不会删除原始数据,我自己使用的一些临时表采用内部表,其他的表基本是外部表,用来防止因误操作将原始数据删除了。
4个by
当然,还使用了4个by,分别是order by 、 sort by 、distribute by和cluster by。Order by 很少使用,因为是全局排序,很容易出现oom,sort by 和distribute by 一般是配合使用,分区内排序,当分区字段和排序字段相同时,可以使用cluster by 代替,不过用的比较少。
系统函数
在计算指标时,我们使用了各种函数,如系统函数,用next_day处理周指标,用get_json_object对log数据进行解析,还使用了开窗函数,rank 和over函数,计算topN指标
②碰到的问题
在数仓的过程中,也遇到了很多问题。
问题1:大表 和 大表
问题2:小表和大表
问题3:单个key数据倾斜
问题4:多个key数据倾斜
在数仓计算的过程中,遇到了数据倾斜的问题,当时我们发现有一个reducetask卡在99%,而其他的任务都执行完成了,第一反应可能是数据倾斜了,然后对数据进行group by 求count,发现null的数据很多,然后我们采取使用随机数将null值打散,然后对计算结果数据进行转换,去掉随机数,再进行一次聚合。这个问题解决了,
后来我们还开启了负载均衡的功能。
③ 常规操作
在hive使用的过程中,做了一些常规优化
一是参数优化:
①开启mapjoin、开启map端的combiner和使用压缩
遇到小文件时
①开启了merge功能:就是将任务结束时产生的多个小于16m的小文件捏合成一个256M的大文件
②使用combinerhiveinputformat;将多个文件捏合在一起,减少maptask的数量
③开启jvm重用
也可以考虑设置缓冲区的大小,因为缓冲区越大,溢出就会越慢,小文件就会越小
二是业务优化:
①创建分区表:避免全局扫描
②先过滤再计算
③列式存储:提高数据查询的速度
这里说一下我们项目里面的压缩格式:
④合理设置reduce的个数:避免资源的浪费
⑤根据需求更换对应的计算引擎
数仓分层:
ods
采集前端埋点日志表,对用户的操作动作进行行为采集
日志表
业务表
dim
● 产品表:由于每个产品的产品属性都不一样,从产品分类,产品属性,产品模板表中关联查询, nameStruct结构体数组封装产品的相关属性
● 地区维度:全表导入
● 时间维度:提前对时间进行导入
● 用户维度(动态分区):
○ 拉链表适用于数据会发生变化,但是变化维度并不高的维度,缓慢变化维
○ 拉链表,记录生命周期,将每条记录添加一个字段为截止时间
○ 生效日期<= 日期
○ 结束日期>=日期
○ 日期为9999-99-99里面保存最新的数据,这之前的分区都属于过期日期
● 用户维度表的实现思路
○ 首日装载:将用户表的存量数据添加开始和结束时间
○ 每日装载:
■ 9999分区全外关联ods的当日分区,使用不为nvl()取出不为null的参数为最新数据
■ new_id和old_id都不为null的为变化数据
■ union all两个查询结果并动态分区插入9999分区和
dwd
问题及调优