1.2~1.5:大数据开发阶段
1.3~1.5:周期性调度执行,三个阶段需要通过azkaban任务调度工具进行自动化周期调度执行,
(项目的第7个阶段,任务调度阶段)
数据产生主要目的是为了模拟真实业务场景下的数据产生的流程。因为项目的数据应该是来自于白龙马电商网站的用户在网站界面上的行为触发之后然后由网站的后端自动记录用户的行为数据到日志文件中。
主要借助JavaSE中集合+随机数+IO流+for循环+时间格式类
【注意】数据产生不是我们大数据工作人员的任务(不一定),在开发大数据项目时,数据来源一般都是已经存在的
项目中产生的数据是海量的,而且默认情况下项目产生的用户行为数据是记录在日志文件当中,但是日志文件中的数据不是持久化保存的、而且也无法存储大量的数据。但是数据中隐含了很多的价值信息。因此我们需要将网站产生的大批量的日志文件采集存储到大数据环境中进行持久化、海量化存储。
技术:Flume+HDFS
MapReduce技术
【注意】
因为我们只需要做数据的清洗预处理操作,不涉及到聚合操作,因此我们只需要一个Mapper阶段即可,不需要reduce阶段
MapReduce数据清洗预处理是周期性调度执行的,一天执行一次,第二天处理前一天采集存储的数据,前一天采集存储的数据是以时间为基准的动态目录下存放,因此MR程序处理数据时,输入数据的目录必须得是昨天时间的目录。
【注意】会在第二天处理前一天的数据,一般会在第二天的凌晨去处理第一天采集存储的数据。(任务调度的事情)
MR程序处理完成的数据输出到HDFS上,但是数据清洗预处理完成的数据给Hive做统计分析的,Hive我们也是一天执行一次,Hive是在数据清洗预处理完成之后执行的。 MR程序处理完成的数据输出到HDFS上时,也必须以基于时间的动态目录存放
统计分析就是基于我们清洗预处理完成的高质量,从不同的数据纬度聚合数据,或者对数据进行计算得到我们感兴趣的一些指标或者是对网站运营发展有关的一些指标。
统计分析进行数据计算时,可能涉及到大量的聚合操作以及一些排名、排序等等操作,而这些操作也都是数据计算,那么我们就可以使用大数据计算框架完成,而大数据计算框架MapReduce如果要聚合、排序、分组等操作,MR代码就会非常的复杂。因此我们一般做统计分析时有一个想法,既能计算大量的数据,还能快速简单的进行数据的聚合、排名、分组等操作。就可以使用Hive数据仓库技术完成。
【注意】基本上到现在为止,如果我们要做大数据统计分析,不是直接使用大数据计算框架(MapReduce、Spark、Flink),因为大数据统计分析涉及到大量的聚合、排序、分组等等操作,操作如果直接使用大数据计算框架代码会非常的复杂。基本上都是使用类SQL(表面上写的是类SQL语句,底层还是大数据计算框架)的方式进行大数据统计分析的。
Hadoop—Hive
Spark—Spark SQL
Flink—Flink SQL
数据仓库建模是用来梳理表和表之间的关系的,便于我们后期进行统计分析。数据仓库分层是我们使用数据仓库进行统计分析的开发流程。
数据仓库分层从最底层开始到最高层主要有如下三层(不同的公司基于三层更加细致的分层)
如果我们要使用Hive数据仓库做统计分析,首先我们需要把清洗预处理完成的数据导入到Hive中加载成为一个数据表,ODS层指的就是把清洗预处理完成的数据原模原样的导入到Hive中,导入进来之后这些表组成了ODS层
DW层(数据仓库层)–Hive统计分析的核心 数据仓库建模的阶段
DWD层(明细宽表层)
DIM层(纬表层)
纬度表如果比较多,那么纬度表单独划分到DW的DIM层
将统计分析的结果以指标表的形式存储到ADS层
建模的目的是为了方便我们后期统计分析
在使用Hive进行数据统计分析时,首先必须先把清洗预处理完成的数据加载到Hive中成为数据表,而且一般在真实的企业项目中,清洗预处理完成数据不止一个,各种各样的数据,数据和数据之间都是有关系的。
所谓的数据仓库建模就是我们在对数据进行清洗预处理的时候,清洗预处理完成之后的多个数据之间的关系梳理建模
数据仓库建模的名词解释
数据仓库模型建立有很多种方式的,主要分为
3NF数据仓库建模
纬度建模
星型模型
事实表直接与纬度表关联,而且只有一级关联
雪花模型
事实表直接与维度表关联,纬度表拆分出更加细致的一些纬度表
星座模型
在一个数仓中,事实表有多个,每一个事实表都有它自己对应的纬度表,纬度表还有它的二级纬度表
如何完成建模?数据清洗预处理的时候,把数据处理成为合适的模型结构
ODS层指的是我们把清洗预处理完成的数据不加以任何的处理,直接原模原样的在Hive中构建与之对应的表格,并且把数据装载到表格当中
清洗预处理完成的数据格式以\001特殊字符分割的,这样的话可以避免分隔符和字段的中一些符号冲突,导致装载数据到Hive出现串行的问题。
Hive中数据表有很多分类的:内部表、外部表、分区表、分桶表
考虑:数据统计分析一天执行一次,也就意味着我们每天处理完成的数据都需要往Hive的ODS层的数据表导入一份,如何区分ODS层导入的数据是哪一天?需要构建一个分区表(基于时间的)。外部表
DWD明细宽表层就是把ODS层的数据表字段拆分成为更加细粒度的字段,便于我们后期的统计分析。 DWD层说白了就是在ODS的数据表基础之上在多增加一些冗余字段,但是方便我们后期操作了
可以拆分的字段主要有两个
时间字段:后期需要基于细粒度的时间做统计分析
年
月
日
时
来源URL字段:后期统计站内站外的流量占比,站内站外的对比是基于HOST主机名/域名——HOST
DWD层这个数据表就属于我们Hive的自有表了,因此明细宽表我们构建成为内部分区表即可
明细宽表中没有数据,明细宽表中的数据从什么地方来?因为DWD层是基于ODS层建立的,因此DWD层的数据需要从ODS层查询获得。 需要从ODS层对应的数据表中查询指定的数据添加到DWD层当中(注意一下分区的问题)。
ADS层其实就是我们基于DW数据仓库库构建的DWD和DIM层的数据表,进行查询,通过聚合、分组、排序等等操作统计相关的指标,得到指标数据,然后将指标数据存储到一个Hive数据表中。
基于时间纬度的指标
统计网站每年的用户的流量
网站每天都会产生数据,每一天数据一增加,那么当前年份的用户访问量必然增加一天的数据
思路:不是针对明细宽表某一个分区的数据进行统计分析,而是针对于明细宽表中整体数据集进行统计分析(所有的分区进行操作)
实现:因为在明细宽表中已经拆分除了visit_year字段,因此我们只需要根据visit_year分区聚合数据即可得到,每一年的用户访问量
select visit_year,count(*) from dwd_user_behavior_detail group by visit_year;
统计网站每一年不同月份的用户流量
统计网站每一年不同月份下每天用户的访问量
统计网站每一年不同月份下每天的每小时用户的访问量
统计网站每一年每一个月的流量相比于上个月的比例:开窗函数(上边界和下边界),针对每一年不同月份的用户流量指标的二次分析结果
select
temp.*,
concat(round(temp.flow/temp.before_month_flow,1)*100,"%") as rate
from (
select
* ,
first_value(flow) over(partition by visit_year order by visit_month asc rows between 1 PRECEDING and CURRENT ROW) as before_month_flow
from ads_month_flow
) as temp
基于地理纬度的指标
统计网站不同省份每天用户的流量
统计网站不同省份每月/每年用户流量
针对的就是数据集整体 而非某一个分区
每天访问量TOP10的省份
针对的不是明细宽表 而是我们的前面统计不同省份每天用户流量指标(二次分析),指标统计出来之后需要覆盖添加
需要使用排名函数
ads_province_day_flow
select temp.date_time, temp.province, temp.flow from{ select *, row_number() over(partition by data_time order by flow desc) as rank_num from ads_province_day_flow } as temp where temp.rank_num <=10;
基于用户纬度的指标
统计网站不同年龄段用户的流量
明细宽表当中,存在一个字段代表的是用户的年龄,而用户年龄都是大于等于18岁,小于100岁。
基于年龄这个字段,我想查看一下网站不同年龄段的用户情况
用户年龄段
【注意】案例针对是数据集整体,不参杂时间的纬度概念,指标需要覆盖添加的
需要用到hive中的分支函数
统计网站每年/每月/每天的不同年龄段的用户访问量
统计每天网站的独立访客数
统计网站每月、每年的独立访问数
针对数据集整体了
基于终端纬度的指标
统计网站用户使用的不同浏览器的占比情况
user_agent,user_agent
当中就包含着我们用户使用的浏览器信息情况,基于这个字段统计统计网站不同浏览器的占比情况统计网站不同时间段下的不同浏览器的占比情况
基于来源纬度的指标
指标有很多,可以进行各种自由扩展
我们统计分析也是每天执行一次,我们总不能每天运行统计分析,指标代码我们自己手动挨着运行
我们统计分析需要把所有统计分析代码封装到一个xxx.sql文件中,然后到时候统计分析需要执行,我们直接使用hive -f xxx.sql --hiveconf xxx=xxxx
现在我们通过Hive数据仓库做的统计分析指标都是在Hive的ADS层存储着。统计的指标的主要目的是为了指导网站的发展和运营的,因此统计完成的数据其中可以做很多操作:基于统计分析的结果进行二次统计分析;基于统计分析的指标结合相关大数据算法做一些数据预测或者数据的深度挖掘;基于统计分析的结果进行可视化大屏的制作。
我们项目中最终需要把统计分析的结果以图表的形式进行可视化展示。
目前现有的可视化技术基本都不太支持从Hive中直接读取数据然后进行可视化展示,但是这些技术支持从RDBMS(MySQL)中读取数据进行可视化展示。
因此我们做可视化大屏之前,需要把Hive数据仓库中ADS层的数据迁移导出到RDBMS关系型数据库当中,然后再借助大数据技术+RDBMS实现数据可视化展示即可
我们就是想把Hive数据仓库中数据导出到RDBMS中,目前只学了SQOOP技术
当然除了Sqoop技术以外,还有一个技术DataX(阿里云提供的数据传输工具)
要把Hive的数据迁移到MySQL中,SQOOP既可以实现把RDBMS数据迁移到大数据环境(导入),同时也支持把大数据环境数据迁移到RDBMS中(导出)
只需要编写针对性的SQOOP导出数据的命令即可,导出的时候需要注意两个问题:
【注意】:后期的话我们做数据可视化,我们需要连接MySQL,我们本次只讲第三方工具的可视化,工具可视化基本上都是支持公网数据库,我们的自己的局域网数据库不支持的。 如何获取带有一个公网IP的MySQL数据库:
- 我们购买一个云服务器,然后在云服务器上自己安装一个MySQL即可。
- 我们直接购买一个云MySQL数据库。
【问题】有部分数据是基于分区统计的,这种数据我们需要追加导出,但是Hive导出数据以文件的形式进行导出的,会把Hive数据表中的所有数据全部导出出去,如果追加导出,数据也会重复。 所以想分区统计的数据也需要覆盖导出的,或者我们在统计基于分区的指标时,我们需要指标表设置成为分区表。
将统计分析的结果以图表的形式进行展示,好处可以直观的看到数据的变化趋势以及数据内部隐藏的一些价值信息
编写HTML/CSS/JS/JavaWEB+ECharts+MySQL
阿里云DataV、腾讯云图可视化、华为云DLV…
【注意】大屏制作的有一个要求、口诀的:越重要的图表放到大屏的中间或者上方,不重要的指标图表放到大屏的下部或者两边。
在本次我们项目中,项目是一个纯离线计算项目,项目中除了数据采集存储以及数据可视化是7*24小时不间断运行的,剩余的阶段:数据清洗预处理、数据统计分析、数据迁移导出这三个阶段都是周期性调度执行,而且周期性调度执行还有一个规则,必须先执行数据清洗预处理,然后数据统计分析阶段再基于数据清洗预处理的基础之上运行,数据迁移导出又是基于数据统计分析的结果运行的。
在企业当中,处理的数据量一般是非常庞大的,而且一般情况下,对数据的计算我们都是在每一天的凌晨完成的(凌晨的服务器资源空闲的比较多,计算的时候可利用资源就多了)。我们总不能每天凌晨我们手动启动数据计算任务。
为了解决计算任务的调度问题,我们需要使用到大数据中任务调度技术,任务调度技术可以实现计算的任务的自动调度(定时器),同时任务调度技术还支持进行多任务的依赖串联,任务调度技术还支持失败重试以及任务报警
自己编译azkaban得到azkaban的安装包
解压安装包、修改配置文件即可
source /azkaba-db.xxxxx/create-all.xxxx.sql
max_allowed_packet=1024M
webserver
的地址execserver
的端口号需要把azkaban-webserver和azkaban-execserver依赖jar包升级或者引入
mysql-connector-java
derby
web/exec的lib目录下
启动azkaban
azkaban主要是用来做任务调度的,azkaban支持将多个任务以流的形式串联起来(将多任务之间互相依赖),然后对串联的工作流设置任务调度时间,让流以指定的时间周期性调度执行
同时azkaban还支持对流中每一个任务配置失败重试,某一个流中任务单元执行失败,可以进行特定次数的重试,如果特定次数之后,任务还是失败的,那么我们才会认为任务执行失败。
两个配置文件
azkaban.project,配置文件中声明azkaban使用的工作流的版本
azkaban-flow-version: 2.0
xxx.flow,配置文件中需要指定azkaban的多个任务,以及多任务之间的依赖关系
eg:
fitst.flow
nodes:
- name: jobA
type: command
config:
command: echo 'zs'
- name: jobB
type: command
dependsOn:
- jobA
config:
command: echo 'ls'
second.flow
nodes:
- name: first
type: command
config:
command: sh /root/a.sh
retries: 3
retry.backoff: 10000
project.flow
nodes:
- name: data-clean
type: command
config:
command: sh /root/project/data-clean/data-clean.sh
retries: 3
retry.backoff: 60000
- name: data-analy
type: command
dependsOn:
- data-clean
config:
command: sh /root/project/data-analy/data-analy.sh
retries: 3
retry.backoff: 60000
- name: data-export
type: command
dependsOn:
- data-analy
config:
command: sh /root/project/data-export/data-export.sh
retries: 3
retry.backoff: 60000
一个压缩包xxx.zip
压缩包就是我们需要把两个编写好的配置文件打包到一个压缩包中
现在想通过azkaban实现给linux的**/root/a.txt**文件每隔一分钟增加一行zs数据
项目中azkaban进行任务调度,主要调度数据清洗预处理程序、数据统计分析程序、数据迁移导出程序
而且这三个任务是有先后依赖关系,先执行数据清洗预处理程序,再执行数据统计分析程序,最后再执行数据迁移导出程序
而且目前三个阶段的代码全部封装成为了sh脚本,只需要执行sh脚本就可以执行三个阶段
使用yum install -y nodejs
安装一下nodejs
同时在azkaban工作目录下有个azkabanwebserver里面有个build.gra…文件,把里面的node部分的download改为fasle
./gradlew build installDist -x test
【注意】:数据清洗预处理和数据统计分析,底层需要用到MR程序,一定一定要注意MR程序的Map任务的个数和reduce任务的个数,以及每一个map任务和reduce任务占用的内存。