最近做底层架构升级,满足高并发需求。底层架构由ES转到Hbase,之前往ES里面写数据的时候,直接通过Hive工具即可完成写入。经过测试,用Hive提供的通过MR写入Hbase,在字段过多和数据量过大的情况下,没法满足写入要求,写入数据比较慢。同时在大量写入的过程中,导致ReginServer压力过大,从而导致写入失败。
传统的通过Hive映射Hbase的底层是通过MR写入到Hbase的,中间需要先写入到WAL,接着写入到Memstore,当Memstore的大小达到指定的阈值以后,会出发写入磁盘。写入磁盘以后,会经历Regin的大小合并等一系列操作,最终会落地磁盘,生成Hfile文件。
为了解决Hbase大批量数据迁移问题,Hbase官方提供了通过bulkload的方式。该方式的本质是直接通过Hive等工具,生成底层的Hfile文件。然后通过Hbase的加载工具,进行表的逆向修复,即通过Hfile信息逆向生成Hbase表元数据信息。这个过程的速度非常快,只需要几分钟即可完成。通过bulkload方式写入的最大优势是减少了中间WAL,Memstore,分裂合并等,可以说是一步到位
通过Hfile写入Hbase的优势很多,当然也存在难点。我们知道Hbase采用的是稀疏索引,需要保证rowkey的设计必须全局有序的。那么在这种情况下,我们必须保证我们生成的rowkey是全局有序。在几十上百亿的大表中做全局排序,很显然是很难完成的,就算把所有资源给Hive,只要是全局排序最后都会聚集到一个Reduce中去,导致排序失败
如果我们生成的数据分布本身就不均衡,产生数据热点问题。那么最终生成的Hfile文件也必然是存在数据热点的。这样带来的问题是在最后一步数据加载到Hbase库的时候,需要耗费大量的时间,因为他在加载的过程中如果发现数据不是连续的,或者是热点的情况下,需要进一步的进行切分。所以我们要保证我们生成的Hfile的数据就是均衡的,且大小差不多,这样在加载数据的时候,可以一步到位,不需要新一步的进行分割处理
对以上的两个问题进行深入的分析以后,其本质就是数据预分区的问题。将整体的数据进行合理的预分区散列。然后保证局部的分区以内的数据是有序的,通过预分区的StartKey保证整体的每个分区之间是有序的,最终保证全局有序
生成指定的预分区,我们这里的业务需求,所有的数据rowkey都需要经过md5加密,所以我们直接通过Hbase的提供的建表语句即可:
create 'namespace:tableName',{NAME=>'info',COMPRESSION=>'SNAPPY'},{NUMREGIONS => 128,SPLITALGO => 'HexStringSplit'}
NUMREGIONS 这个参数是我们需要指定的预分区数量大小,我们这里指定的是128个预分区
然后通过Hbase的client客户端,获取到每个分区的startkey,最后将startkey保存到Hive的表中
RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf("namespace:tablename"));
byte[][] startKeys = regionLocator.getStartKeys();
for(int i = 0; i < startKeys.length;i++){
System.out.println(Bytes.toString(startKeys[i]));
}
DROP table tmp.hb_range_keys;
create external table tmp.hb_range_keys(rowkey string)
row format serde
'org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe'
stored as
inputformat
'org.apache.hadoop.mapred.TextInputFormat'
outputformat
'org.apache.hadoop.hive.ql.io.HiveNullValueSequenceFileOutputFormat'
location '/tmp/hb_range_keys';
序列化的方式是字节数组,所以这个表生成以后不可以直接查询
最终生成的rowkey预分区文件路径是:/tmp/hb_range_keys
set mapred.reduce.tasks=1; -- 设置reduce的数量为1,保证只生成一个分区文件
INSERT overwrite TABLE tmp.hb_range_keys
SELECT partiton_value as rowkey
FROM tmp.tmp_hbase_table_partiton
ORDER BY rowkey;
tmp.tmp_hbase_table_partiton 这个临时表是我们前面准备好的Regin的startkey
create table hbsort(rowkey string, user_name string, amount double, ...)
stored as
INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.hbase.HiveHFileOutputFormat'
TBLPROPERTIES ('hfile.family.path' = '/tmp/hbsort/info');
hfile.family.path=中间表存储路径/列族名
SET mapred.reduce.tasks=128; -- 生成128个分区
SET hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
SET mapreduce.totalorderpartitioner.path=/tmp/hb_range_keys/000000_0; -- 指定预分区的Key
mapred.reduce.tasks 是保证最后生成的Hfie的文件个数,最终的生成结果可能是预分区个数的整数倍。其值为:分区信息表记录数+1
中间所需要的各种Jar包,需要自己添加,这你就不在赘述
mapreduce.totalorderpartitioner.path 这个参数需要指定到具体的文件,而不是文件夹。我们前面设置了reduce的个数为1个,所以最终只生成了1个文件,默认情况下都是以000000_0为结尾
TotalOrderPartitioner这个全局有序分区的类是Hive提供的,为了解决数据量过大而导致的全排序问题。在数据量过大的情况,需要做全排序的话,最终都会合并到一个reduce中进行排序,数据量巨大的情况下,根本不可能完成。 为了解决这个问题,Hive采用了类似于Hbase的预分区排序的方式,首先将每个分区的startkey给确定好,然后写入数据的时候根据TotalOrderPartitioner的内部方法,路由到指定的分区中,且保证在指定的分区中有序。这个思路和Hbase全局有序的稀疏索引原理相同。提供的StartKey即是全局有序的稀疏索引集合
insert overwrite table hbsort
select rowkey, user_name, amount, ...
from target_table
cluster by rowkey;
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /tmp/hbase_hfiles namespace:tableName
通过bulkLoad加载数据的核心就是对已知数据进行预分区处理。然后获取当前预分区的startKey,在通过Hive提供的工具包,把所有的数据写入到预分区中,最后生成Hfile文件
做预分区之前需要合理的设置分区参数。比如有的rowkey设计不是通过md5的,这样就需要自己根据业务的组成,进行rowkey的设计和拼接。最终生成文件导入到Hive中