• Hbase大批量数据迁移之BulkLoad


    一、概述:

    最近做底层架构升级,满足高并发需求。底层架构由ES转到Hbase,之前往ES里面写数据的时候,直接通过Hive工具即可完成写入。经过测试,用Hive提供的通过MR写入Hbase,在字段过多和数据量过大的情况下,没法满足写入要求,写入数据比较慢。同时在大量写入的过程中,导致ReginServer压力过大,从而导致写入失败。


    二、通过Hfile方式:

    传统的通过Hive映射Hbase的底层是通过MR写入到Hbase的,中间需要先写入到WAL,接着写入到Memstore,当Memstore的大小达到指定的阈值以后,会出发写入磁盘。写入磁盘以后,会经历Regin的大小合并等一系列操作,最终会落地磁盘,生成Hfile文件。

    为了解决Hbase大批量数据迁移问题,Hbase官方提供了通过bulkload的方式。该方式的本质是直接通过Hive等工具,生成底层的Hfile文件。然后通过Hbase的加载工具,进行表的逆向修复,即通过Hfile信息逆向生成Hbase表元数据信息。这个过程的速度非常快,只需要几分钟即可完成。通过bulkload方式写入的最大优势是减少了中间WAL,Memstore,分裂合并等,可以说是一步到位

    2.1、难点问题
    2.1.1 保证rowkey全局有序

    通过Hfile写入Hbase的优势很多,当然也存在难点。我们知道Hbase采用的是稀疏索引,需要保证rowkey的设计必须全局有序的。那么在这种情况下,我们必须保证我们生成的rowkey是全局有序。在几十上百亿的大表中做全局排序,很显然是很难完成的,就算把所有资源给Hive,只要是全局排序最后都会聚集到一个Reduce中去,导致排序失败

    2.1.2 分区数据热点

    如果我们生成的数据分布本身就不均衡,产生数据热点问题。那么最终生成的Hfile文件也必然是存在数据热点的。这样带来的问题是在最后一步数据加载到Hbase库的时候,需要耗费大量的时间,因为他在加载的过程中如果发现数据不是连续的,或者是热点的情况下,需要进一步的进行切分。所以我们要保证我们生成的Hfile的数据就是均衡的,且大小差不多,这样在加载数据的时候,可以一步到位,不需要新一步的进行分割处理

    2.2、问题解决

    对以上的两个问题进行深入的分析以后,其本质就是数据预分区的问题。将整体的数据进行合理的预分区散列。然后保证局部的分区以内的数据是有序的,通过预分区的StartKey保证整体的每个分区之间是有序的,最终保证全局有序


    三、最终实现:

    3.1、生成预分区范围

    生成指定的预分区,我们这里的业务需求,所有的数据rowkey都需要经过md5加密,所以我们直接通过Hbase的提供的建表语句即可:

    create 'namespace:tableName',{NAME=>'info',COMPRESSION=>'SNAPPY'},{NUMREGIONS => 128,SPLITALGO => 'HexStringSplit'}
    
    • 1

    NUMREGIONS 这个参数是我们需要指定的预分区数量大小,我们这里指定的是128个预分区

    然后通过Hbase的client客户端,获取到每个分区的startkey,最后将startkey保存到Hive的表中

    RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf("namespace:tablename"));
            byte[][] startKeys = regionLocator.getStartKeys();
            for(int i = 0; i < startKeys.length;i++){
                System.out.println(Bytes.toString(startKeys[i]));
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    3.2、生成Hive可以识别的预分区文件表
    3.2.1 生成Hive预分区表
    DROP table tmp.hb_range_keys;
    create external table tmp.hb_range_keys(rowkey string)
    row format serde
    'org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe'
    stored as
    inputformat
    'org.apache.hadoop.mapred.TextInputFormat'
    outputformat
    'org.apache.hadoop.hive.ql.io.HiveNullValueSequenceFileOutputFormat'
    location '/tmp/hb_range_keys';
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    序列化的方式是字节数组,所以这个表生成以后不可以直接查询
    最终生成的rowkey预分区文件路径是:/tmp/hb_range_keys

    3.2.2 写入预分区StartKey
    set  mapred.reduce.tasks=1; -- 设置reduce的数量为1,保证只生成一个分区文件
    INSERT overwrite TABLE tmp.hb_range_keys
    SELECT partiton_value as rowkey
    FROM tmp.tmp_hbase_table_partiton
    ORDER BY rowkey;
    
    • 1
    • 2
    • 3
    • 4
    • 5

    tmp.tmp_hbase_table_partiton 这个临时表是我们前面准备好的Regin的startkey


    3.3、写入数据生成Hfile文件
    3.3.1 .创建生成最终Hfile对应的表
    create table hbsort(rowkey string, user_name string, amount double, ...)
    stored as
    INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
    OUTPUTFORMAT 'org.apache.hadoop.hive.hbase.HiveHFileOutputFormat'
    TBLPROPERTIES ('hfile.family.path' = '/tmp/hbsort/info');
    
    • 1
    • 2
    • 3
    • 4
    • 5

    hfile.family.path=中间表存储路径/列族名

    3.3.2 核心必要参数设置
    SET mapred.reduce.tasks=128; -- 生成128个分区
    SET hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
    SET mapreduce.totalorderpartitioner.path=/tmp/hb_range_keys/000000_0; -- 指定预分区的Key
    
    • 1
    • 2
    • 3

    mapred.reduce.tasks 是保证最后生成的Hfie的文件个数,最终的生成结果可能是预分区个数的整数倍。其值为:分区信息表记录数+1

    中间所需要的各种Jar包,需要自己添加,这你就不在赘述

    mapreduce.totalorderpartitioner.path 这个参数需要指定到具体的文件,而不是文件夹。我们前面设置了reduce的个数为1个,所以最终只生成了1个文件,默认情况下都是以000000_0为结尾

    TotalOrderPartitioner这个全局有序分区的类是Hive提供的,为了解决数据量过大而导致的全排序问题。在数据量过大的情况,需要做全排序的话,最终都会合并到一个reduce中进行排序,数据量巨大的情况下,根本不可能完成。 为了解决这个问题,Hive采用了类似于Hbase的预分区排序的方式,首先将每个分区的startkey给确定好,然后写入数据的时候根据TotalOrderPartitioner的内部方法,路由到指定的分区中,且保证在指定的分区中有序。这个思路和Hbase全局有序的稀疏索引原理相同。提供的StartKey即是全局有序的稀疏索引集合

    3.4、写入Hfile文件
    insert overwrite table hbsort
    select rowkey, user_name, amount, ...
    from target_table
    cluster by rowkey;
    
    • 1
    • 2
    • 3
    • 4

    四、数据加载

    hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /tmp/hbase_hfiles namespace:tableName
    
    • 1

    五、结语

    通过bulkLoad加载数据的核心就是对已知数据进行预分区处理。然后获取当前预分区的startKey,在通过Hive提供的工具包,把所有的数据写入到预分区中,最后生成Hfile文件

    做预分区之前需要合理的设置分区参数。比如有的rowkey设计不是通过md5的,这样就需要自己根据业务的组成,进行rowkey的设计和拼接。最终生成文件导入到Hive中

  • 相关阅读:
    Pandas数据分析17——pandas数据清洗(缺失值、重复值处理)
    Thrift -- 跨语言RPC 框架
    minigui编译移植
    Cluster聚类算法大比拼:性能、应用场景和可视化对比总结
    04 无穷大量与无穷小量
    mybatis详解
    .net core 的 winform 的 浏览器控件 WebView2
    看完这篇文章,你就入门了所有有关API的热门概念!
    多维度聊一聊 k8s 和 openstack
    2023-2028年中国硫酸钇市场发展态势及前景预测报告
  • 原文地址:https://blog.csdn.net/u014730165/article/details/126622805