将OLTP数据库的数据转移到OLAP数据库的过程一般包括以下几个步骤:
第一范式(字段不能重复且不能分解)
第一范式是最基本的范式。如果数据库表中的所有字段值都是不可分解的原子值,就说明该数据库表满足了第一范式。
第二范式(增加主键)
主键可以是一列或者多列组成的,根据主键,马上可以精确到特点的一行数据
第三范式(消除非主键的传递关系)
例如表中主键是商品编号(100),但是商品类别名称和商品类别描述可以根据商品类别编号(3)字段去检索,这样商品类别名称(100/3)和商品类别(100/3)描述编号会有冗余。
数据仓库为了解决企业数据集成和分析的问题,为企业割部门提供统一的,规范的数据出口。
数仓建模的目标是通过数仓建模更好的组织,存储数据,以便在性能,成本,效率和数据质量之间找到最佳的平衡点。
- 易于理解和查询
- 提高更高的数据粒度:维度建模允许在事实表种存储详细数据,在维度表种存储域数据相关的上下文信息
- 支持多维度分析:维度可以根据业务需求进行组合和切片
- 提高数据一致性和准确性:一些事实表需要用到共享信息,例如日期,时间,地理位置
- 支持快速数据查询和报告生成
主题域是面向业务过程,将业务活动事件进行抽象的集合,将数据主题划分到不同的主题域。(DWD)
数据域是面向业务分析:将业务过程或者维度进行抽象(DWS)
主题域 :保单分析,理赔分析,风险分析,客户分析
保单分析:这个主题域关注的是保单的销售、续保、退保等情况,以及与之相关的客户、产品、渠道等属性。可能的度量有保费收入、续保率、退保率等,可能的维度有保单日期、保单状态、产品类型、产品名称、客户类型、客户地区、渠道类型、渠道名称等。
理赔分析:这个主题域关注的是理赔的申请、审核、支付等情况,以及与之相关的客户、产品、赔案等属性。可能的度量有理赔支出、赔付率、理赔周期等,可能的维度有理赔日期、理赔状态、产品类型、产品名称、产品代码、客户类型、客户地区、赔案类型、赔案原因等。
原子指标:原子指标是基于某一业务事件或行为下的度量值,是业务定义中不可再拆分的指标,具有明确的业务含义和统计口径。例如,理赔申请数、理赔审核数、理赔支付数、理赔金额、理赔成本等,都是原子指标。
派生指标:派生指标是基于一个或多个原子指标,通过加减乘除或其他逻辑运算得到的指标,通常用于反映业务绩效或效率。例如,赔付率、理赔周期、理赔频次、理赔成本率等,都是派生指标。
赔付率:某一时间段内,保险公司向客户支付的理赔金额占该时间段内承保责任金额的比例。计算公式为:
理赔成本率:某一时间段内,保险公司为处理客户的理赔申请而发生的成本占该时间段内承保责任金额的比例。
Map Task共分为5个阶段,分别是:
Read阶段:Map Task通过用户编写的RecordReader从输入InputSplit中解析处一个个的Key/Value。
Map阶段:该阶段主要将解析出来的Key/Value交给用户编写的map()函数处理,并产生新的key/value。
Collect阶段:在用户编写的map()函数中,数据处理完时一般调用OutputCollector.collect()输出结果,在该函数内部,它首先调用Partitioner计算该key/value所属的partition,并将key/value写入到环形缓冲区中。
Spill阶段:即“溢写”阶段,当环形缓冲区比较满后,MapTask会将缓冲区中的数据写到本地磁盘的临时文件中,需要注意的是,在写入文件之前,MapTask会首先对数据进行一次排序,如果用户指定了压缩和combiner函数,那么还会对数据进行压缩和合并。
Merge阶段:当一个map task处理的数据很大,以至于超过缓冲区内存时,就会生成多个spill文件。此时就需要对同一个map任务产生的多个spill文件进行归并生成最终的一个已分区且已排序的大文件。配置属性mapreduce.task.io.sort.factor控制着一次最多能合并多少流,默认值是10。这个过程包括排序和合并(可选),归并得到的文件内键值对有可能拥有相同的key,这个过程如果client设置过Combiner,也会合并相同的key值的键值对(根据上面提到的combine的调用时机可知)。
溢出写文件归并完毕后,Map将删除所有的临时溢出写文件,并告知NodeManager任务已完成,只要其中一个MapTask完成,ReduceTask就开始复制它的输出(Copy阶段分区输出文件通过http的方式提供给reducer)
Shuffle阶段:也称为Copy阶段。Reduce Task从各个Map Task上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
Merge阶段:在远程拷贝数据的同时,Reduce Task启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
Sort阶段:按照MapReduce语义,用户编写的reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个Map Task已经实现对自己的处理结果进行了局部排序,因此,Reduce Task只需对所有数据进行一次归并排序即可。
Reduce阶段:在该阶段中,Reduce Task将每组数据依次交给用户编写的reduce()函数处理。
Write阶段:reduce()函数将计算结果写到HDFS上。
sort阶段的排序是通过一个叫做归约迭代器(Reduce Iterator)的机制实现的。归约迭代器是一个特殊的迭代器,它可以从merge阶段产生的有序数据流中读取数据,并且按照key的相等性进行分组。每当归约迭代器遇到一个新的key,它就会调用一次reduce函数,并且将该key对应的所有value作为输入参数。这样,归约迭代器就可以在一次遍历中完成数据的分组和归约,而不需要再进行额外的排序。
在MapReduce的shuffle过程中通常会执行三次排序,分别是:
1、Client访问zookeeper,获取元数据存储所在的regionserver
2、拿到对应的表存储的regionserver,通过刚刚获取的地址访问对应的regionserver,
3、去表所在的regionserver进行数据的添加
4、查找对应的region,在region中寻找列族,把数据分别写到Hlog和memstore各一份
5、当memstore写入的值变多,触发溢写操作(flush),进行文件的溢写,成为一个StoreFile
6、当溢写的文件过多时,会触发文件的合并(Compact)操作。合并有两种方式(major,minor)多个StoreFile合并成一个StoreFile,同时进行版本合并和数据删除
minor compaction:小范围合并,默认是3-10个文件进行合并,不会删除其他版本的数据。
major compaction:将当前目录下的所有文件全部合并,一般手动触发,会删除其他版本的数据(不同时间戳的)
7、当region中的数据逐渐变大之后,达到某一个阈值,会进行裂变(一个region等分为两个region,并分配到不同的regionserver),原本的Region会下线,新Split出来的两个Region会被HMaster分配到相应HRegionServer上,使得原先1个Region的压力得以分流到2个Region上。
1.Client访问zookeeper,获取元数据存储所在的regionserver
2.通过刚刚获取的地址访问对应的regionserver,拿到对应的表存储的regionserver
3.去表所在的regionserver进行数据的读取
4.查找对应的region,在region中寻找列族,先找到memstore,找不到去blockcache中寻找,再找不到就进行storefile的遍历
5.找到数据之后会先缓存到blockcache中,再将结果返回.blockcache逐渐满了之后,会采用LRU的淘汰策略。
在HBase中,BlockCache是一种读缓存。HBase会将一次文件查找的Block块缓存到Cache中,以便后续同一请求或者邻近数据查找请求直接从内存中获取,避免昂贵的IO操作。如果需要缓存的数据超过堆大小的情况下,推荐使用Block Cache下的off-heap。Off-heap是指堆外内存,不由GC管理,但可以通过full GC回收,通过-XX:MaxDirectMemorySize设置大小。
当scan获取数据时,可以通过setCacheBlocks方法来设置是否使用block cache,对于频繁访问的行才建议使用block cache。
对于MapReduce的Scan作为输入任务,应该设置为setCacheBlocks(false)以避免在MapReduce期间使用BlockCache。这是因为BlockCache是HBase的读缓存,保存着最近被访问的数据块。在MapReduce期间,由于大量的数据读取,BlockCache会被填满,导致缓存失效,从而导致性能下降。
.HStore存储是HBase存储的核心,其中由两部分组成,一部分是Memstore,一部分是StoreFile。
.HLog的的功能: 宕机数据恢复
在分布式系统环境中,我们是无法避免系统出错或者宕机的,一旦HRegionServer意外退出,MemStore中的内存数据就会丢失,而引入HLog就是为了防止这种情况。
工作机制:每个HRegionServer中都会有一个HLog对象,HLog是一个实现Write Ahead Log的类,每次用户操作写入Memstore的同时,也会写一份数据到HLog文件中,HLog文件定期会滚动出新,并删除旧的文件(已持久化到Storefile中的数据),当HRegionServer意外终止后,HMaster会通过Zookeeper感知,HMaster首先处理遗留的HLog文件,将不同region的log数据拆分,分别放在相应region目录下,然后再将失效的region(带有刚刚拆分的log)重新分配,领取到这些region的HRegionServer在Load Region的过程中,会发现有历史HLog需要处理,因此会Replay HLog中的数据到Memstore中,然后flush到StoreFile,完成数据恢复。
.Region就是StoreFiles,StoreFiles里由HFile构成,HFile里由hbase的data块构成,一个data块里面又有很多的keyvalue对,每个keyvalue里存了我们需要的值。
小文件来源和危害:
源数据本身有很多小文件;动态分区会产生大量小文件;reduce个数越多,小文件越多;按分区插入数据的时候会产生大量的小文件。
从Hive的角度看,小文件会开很多map,一个map开一个JVM去执行,所以这些任务的初始化,启动,执行会浪费大量的资源,严重影响性能。HDFS存储大多小文件,会导致namenode元数据特别大,占用太多内存,制约了集群的扩展。
解决方法:
合并小文件: 使用Hadoop的MapReduce或Hive自带的输入格式(TextFile、 SequenceFile等)来合并小文件,将它们合并成一个或几个更大的文件。
压缩文件: 使用Hadoop的压缩算法(如gzip、snappy、Izo等)来压缩小文件,减小文件大小,提高I/O效率。
调整输入格式: 使用Hive的输入格式 (如ORC、Parquet等) 代替TextFile、 SequenceFile等格式,可以提高查询性能,同时可以减少小文件的数量。
使用分区:将数据根据一些共同属性(如日期、地区等)进行分区,可以将小文件转换成更大的块,提高性能。
合理设置参数: 调整MapReduce任务的参数 (如mapreduceinputfileinputformat.splitminsize等)可以让任务更适合处理小文件,提高效率。
使用动态资源分配: 可以通过设置Spark参数 (如spark.dynamicAllocation.enabled) 来使用动态资源分配,从而优化Spark集群资源的使用。
1.用户向YARN中提交应用程序,其中包括ApplicationMaster程序,启动ApplicatioMaster的命令、用户程序
2.ResourceManager为该应用程序分配第一个container,并与对应的NodeManager通信,要求它在这个container中启动应用程序的ApplicationMaster
3.ApplicationMaster首先向ResourceManager注册。
4.ApplicationMaster采用轮询的方式通过RPC协议来向ResourceManager申请和领取资源
5.一旦ApplicationMaster申请到资源后,便于对应的NodeManager通信
6.NodeManager为任务设置好运行环境(包括环境变量,JAR包,二进制程序后,将任务启动命令写到一个脚本中)
7.各个任务通过RPC协议向ApplicationMaster向ResourceManager汇报自己的进度
RM是一个全局的资源管理器,负责整个系统的资源管理的分配和管理。它主要由两个组件构成:调度器(Scheduler)和应用程序管理器(Application Manager,ASM)
调度器根据容量、队列等限制条件(如每个队列分配一定的资源,最多执行一定数量的作业等) 将系统的资源分配给各个正在进行的应用程序。需要注意的是,该调度器是一个纯调度器,它不再从事任何与具体应用程序相关的工作。这些都交给ApplicationMaster完成。调度器仅根据各个应用程序的资源需求进行资源分配,而资源分配单位用一个抽象概念“资源容器”表示。
用户提交的每个应用程序均包含一个AM,主要功能:
当前YARN自带了两个AM实现,一个用于演示AM编写方法的实例程序,它可以申请一定数目的Container以并行运行一个Shell命令或者Shell脚本。另一个是运行MapReduce应用程序的AM— MRAppMaster
Nm是每个节点上的资源和任务管理器,一方面,他会定时的向RM汇报本节点上的资源使用情况和各个Container的运行状态。另一方面,它接受并处理来自AM的Container启动、停止请求。
Container是YARN中的资源抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等。当AM向RM申请资源时。RM为AM返回的资源是用Container表示的。YARN会表示为每个任务分配一个Container,且该任务只能使用该Container中描述的资源。需要注意的是,Container不同于MRv1中的solt,它是一个动态资源划分单位,是根据应用程序动态生成的,YARN只支持CPU和内存两种资源,使用Cgroup进行隔离
Application:应用程序
Spark SQL | Hive | |
---|---|---|
查询语言 | Spark SQL和HQL | HQL |
执行引擎 | Spark | 默认Mapreduce,可以自定义为Spark |
数据存储 | 本身不提供数据存储,可指定到不同存储系统,如HDFS,hive,Hbase,Mysql | HDFS |
元数据存储 | 可选元数据存储 | 必须指定元数据存储 |
API | DataFrame/DateSet DSL 和SQL | HQL |
Schema | schema自动关联 | 使用DDL显示表明schema |
对比方面\计算引擎 | spark | mapreduce | 备注 |
---|---|---|---|
计算方式 | 内存计算 | IO读写 | 迭代计算过程中,MR需要不断IO,而Spark引入了RDD和DAG,使计算过程基于内存完成,提升了处理性能 |
任务调度 | task为线程级别 | task为进程级别 | Spark可以通过复用线程池中的线程减少启动,关闭task所需要的消耗。 |
执行策略 | Spark在shuffle时只有部分场景需要排序,支持基于hash的分布式聚合,更加省时 | Mapreduce在shuffle前需要花费大量时间进行排序 |
Spark,hive on spark,spark on hive三者比较
Hive引擎包括:默认MR、tez、spark
Hive on Spark:Hive既作为存储元数据又负责SQL的解析优化,语法是HQL语法,执行引擎变成了Spark,Spark负责采用RDD执行。
Spark on Hive : Hive只作为存储元数据,Spark负责SQL解析优化,语法是Spark SQL语法,Spark负责采用RDD执行。
【spark on hive 】
hive只作为存储角色,spark 负责sql解析优化,底层运行的还是sparkRDD
具体可以理解为spark通过sparkSQL使用hive语句操作hive表,底层运行的还是sparkRDD,
步骤如下:
1.通过sparkSQL,加载Hive的配置文件,获取Hive的元数据信息
2.获取到Hive的元数据信息之后可以拿到Hive表的数据
3.通过sparkSQL来操作Hive表中的数据
【hive on spark】
hive既作为存储又负责sql的解析优化,spark负责执行
这里Hive的执行引擎变成了spark,不再是MR。
这个实现较为麻烦,必须重新编译spark并导入相关jar包
目前大部分使用spark on hive
select a.*,b.* from
tb1 a join tb2 b
on a.id = b.id
where a.c1 > 20 and b.c2< 100
优化为
select a.*,b.* from
(select * from tb1 where c1>20) a
join
(select * from tb2 where c2<100) b
on a.id = b.id
减少后期执行过程中的join的shuffle数据量;
select a.name,b.salary from
(select * from tb1 where c1>20) a
join
(select * from tb2 where c2<100) b
on a.id = b.id
优化为
select a.name,b.salary from
(select id,name from tb1 where c1>20) a
join
(select id,salary from tb2 where c2<100) b
on a.id = b.id
执行前将不需要的列裁剪掉,减少数据量获取;
select 1+1 as cnt from tb
select 2 as cnt from tb
Sqoop是一个用于将Hadoop和关系型数据库中的数据相互转移的工具,可以将一个关系型数据库(例如:MySQL、Oracle、Postgres等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。下面是将数据从MySQL导入到HDFS的步骤:
在HDFS中创建目录,用于导入后存放数据。
使用sqoop import命令将MySQL中的表导入到HDFS中。
sqoop import \
--connect jdbc:mysql://<mysql_host>/<database_name> \
--username <username> \
--password <password> \
--table <table_name> \
--delete-target-dir \
--target-dir <hdfs_directory_path> \
--m 1
其中,
要使用web调用MySQL中的数据进行前端展示,您需要使用一些技术和工具。以下是一些步骤:
ZAB协议的消息广播,崩溃恢复和数据同步
一个事务请求(Write)进来之后,Leader 节点会将写请求包装成 Proposal 事务,并添加一个全局唯一的 64 位递增事务 ID,也就是 Zxid(消息的先后顺序就是通过比较 Zxid);
Leader 节点向集群中其他节点广播 Proposal 事务,Leader 节点和 Follower 节点是解耦的,通信都会经过一个 FIFO 的消息队列,Leader 会为每一个 Follower 节点分配一个单独的 FIFO 队列,然后把 Proposal 发送到队列中;
Follower 节点收到对应的 Proposal 之后会把它持久到磁盘上,当完全写入之后,发一个 ACK 给 Leader;
当 Leader 节点收到超过半数 Follower 节点的 ACK 之后(Quorum 机制),会提交本地机器上的事务,同时开始广播 commit, Follower 节点收到 commit 之后,完成各自的事务提交。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9voQBzvj-1681778635762)(数仓面经.assets/v2-22d9ae5ce56af93dcbf4466e21b51381_1440w.webp)]
集群服务刚启动时进入崩溃恢复阶段选取 Leader 节点。
Leader 节点突然宕机或者由于网络原因导致 Leader 节点与过半的 Follower 失去了联系,集群也会进入崩溃恢复模式。
崩溃恢复完成选举以后,接下来的工作就是数据同步,在选举过程中,通过投票已经确认 Leader 节点是最大 Zxid 的节点,同步阶段就是利用 Leader 前一阶段获得的最新 Proposal 历史同步集群中所有的副本
例子:为交易事务设计事实表:
(1) 业务分析:交易包括:下单,支付,发货,完结四个业务过程,其中业务过程可以概括为一个个不可拆分的行为事件。
(2)确定粒度:一个订单中可以有多个商品,因此每个商品都可以对应一个子订单。因此下单,支付,完结选择子订单粒度,发货选择父订单粒度。
(3)确定维度:卖家,买家,商品,商品类别,发货地,收货地。在确定维度时应尽可能多地选择与业务过程相关的环境信息
(4)确定事实:例如在下单过程中有下单金额,下单数量;支持过程中有支付金额,积分金额。事实”一词,指的是每个业务过程的度量(通常是可累加的数值类型的值,如次数、个数、件数、金额等
事实表设计原则
维度属性通常不是静态的,而是随时间变化的,通常采用全量快照表或拉链表保存维度数据的历史状态。
a)第一步,进行数据调研;包括了业务调研和需求调研,业务调研就是要弄清楚公司有哪些业务,以及每个业务有包括哪些业务线,一般每个业务会独自建设数据仓库。
b)第二步,进行架构设计;包括了数据域划分和构建总线矩阵,数据域就是指 将业务过程或者维度进行抽象的集合,在划分数据域的时候,应该尽可能保证当前划分的能够覆盖所有的业务需求,又能在新业务进入时无影响的被包含到已有的数据域中或者扩展新的数据域,国际化数仓里面的数据域包括司机域,乘客域,交易域,客服域,安全域等等,阿里巴巴就会有 商品域 会员域 店铺域 交易域 日志域等等。构建总线矩阵,就需要明确每个数据域下有哪些业务过程,业务过程与哪些维度相关。
c)第三步,明确统计指标,深入分析需求,构建指标体系。绝大多数的统计需求可以使用原子指标、派生指标及衍生指标这套标准来定义。公用的派生指标统一保存在数据仓库的DWS层中
d)第四步,维度模型设计。提到的业务总线矩阵即可。事实表存储在DWD层中,维度表存储在DIM层中
e)第五步,汇总模型设计,一张汇总表通常包含业务过程相同、日期限定相同、粒度限定相同的多个派生指标。
f)第五步,进行代码开发和上线生成调度任务,进行周期运行。
全量表:存储了全部数据的表,全量表没有分区的,所有数据都储存在一个分区中。全量表存储的是截至到目前最新状态的全部记录。适用于数量小,更新频率低的场景,比如维度表,基础数据表。
增量表:在原表中数据的基础上新增本周期内产生的新数据,适用于更新频率快的场景,比如日志表,交易表。增量表每次新产生的数据是以最新分区增加到表中的,例如按天更新的流量表,每天更新只新增一天内产生的新数据。
快照表:快照表就是截至过去某个时间点的所有数据。拉链表储存的是在快照表的基础上去除了重复状态的数据。
拉链表:可以记录数据从开始一直到当前所有变化的信息,适用于订单表和客户表
拉链表中有开始时间 (start time) 和结束时间 (end time) 两个字段,同时有dt和dp两个分区字段。dp: 一般有ACTIVE (线上)和EXPIRED (过期)两个分区。ACTIVE表示数据当前在线上使用,所以其end time为4712-12-31 (系统能处理的最大时间,表示一个达不到的无限向后延伸的时间,意味着该数据在线上永久有效):EXPIRED表示数据过期,已变更,为历史状态,其end time是数据变更时具体的时间。对于部分拉链表dp中还有HISTORY分区,此是由于有些拉链表数据量巨大,造成ACTIVE分区使用困难,因此将一部分业务上不再变更的数据转移到HISTORY分区。数据所在的时间分区,记录数据从ACTIVE转移到EXPIRED的日期,即数据发生变更的时间,大部分与end time一致;当dp中有HISTORY分区,且数据转移到HISTORY分区时,其dt为数据转移到HISTORY的时间。