• 记一次 mapreduce 加载HFile文件到HBase中


    code:

    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.fs.Path
    import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}
    import org.apache.hadoop.hbase.client.{Admin, HTable, Table}
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapred.TableOutputFormat
    import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2
    import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles
    import org.apache.hadoop.mapreduce.Job
    import org.apache.hadoop.util.ThreadUtil.getResourceAsStream
    
    import java.util.Properties
    
    
    class BulkLoadDataToHBase {
    
    }
    
    object BulkLoadDataToHBase {
    
    
    	def main(args: Array[String]): Unit = {
    		System.setProperty("user.name", "hdfs")
    		val properties = new Properties
    		properties.load(getResourceAsStream("conf.properties"))
    
    
    		val hBaseConf: Configuration = HBaseConfiguration.create()
    		// 不是必须的,只需要hbase的连接信息基本上就足够了
    		hBaseConf.addResource("hbase-site.xml")
    		hBaseConf.addResource("core-site.xml")
    		hBaseConf.addResource("yarn-site.xml")
    		//对map输出的内容进行压缩
    		hBaseConf.set("mapred.compress.map.output", "true")
    		hBaseConf.set("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.SnappyCodec")
    
    		//对reduce输出的内容进行压缩
    		hBaseConf.set("mapred.output.compress", "true")
    		hBaseConf.set("mapred.output.compression", "org.apache.hadoop.io.compress.SnappyCodec")
    		hBaseConf.set("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily", "128")
    		//		val outTableName = "test_drive_event"
    		val outTableName: String = args(1)
    		hBaseConf.set(TableOutputFormat.OUTPUT_TABLE, outTableName)
    		import org.apache.hadoop.hbase.client.Connection
    		import org.apache.hadoop.hbase.client.ConnectionFactory
    		val conn: Connection = ConnectionFactory.createConnection(hBaseConf)
    		val admin: Admin = conn.getAdmin
    		val outTable: Table = conn.getTable(TableName.valueOf(outTableName))
    		val job: Job = Job.getInstance(hBaseConf)
    		job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
    		job.setMapOutputValueClass(classOf[KeyValue])
    		HFileOutputFormat2.configureIncrementalLoad(job, outTable, conn.getRegionLocator(TableName.valueOf(outTableName)))
    
    		//Bulk load Hfiles to Hbase
    		val bulkLoader = new LoadIncrementalHFiles(hBaseConf)
    		bulkLoader.doBulkLoad(new Path(args(0)), admin, outTable, conn.getRegionLocator(TableName.valueOf(outTableName)))
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    编译打包后放到服务器上用java -classpath XXX.jar XXX.BulkLoadDataToHBase /path outPutTabel

    运行程序前需要先在HBase中建好表

    create  'tmp_v1',  {NAME => 'T', DATA_BLOCK_ENCODING => 'FAST_DIFF', COMPRESSION => 'SNAPPY', METADATA => {'DISABLE_WAL' => 'true',  'IMMUTABLE_ROWS' => 'true'}}
    
    • 1

    此时运行会出现报错

    CorruptHFileException: Problem reading HFile Trailer from file XXXXXXXX/XXXXXXX/
            at org.apache.hadoop.hbase.io.hfile.HFile.pickReaderVersion(HFile.java:495)
            at org.apache.hadoop.hbase.io.hfile.HFile.createReader(HFile.java:538)
            at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.groupOrSplit(LoadIncrementalHFiles.java:661)
            at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles$3.call(LoadIncrementalHFiles.java:574)
            at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles$3.call(LoadIncrementalHFiles.java:571)
            at java.util.concurrent.FutureTask.run(FutureTask.java:262)
            ... 3 more
    Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
            at org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)
            at org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
            at org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195)
            at org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:181)
            at org.apache.hadoop.hbase.io.compress.Compression$Algorithm.getDecompressor(Compression.java:328)
            at org.apache.hadoop.hbase.io.compress.Compression.decompress(Compression.java:423)
            at org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext.prepareDecoding(HFileBlockDefaultDecodingContext.java:90)
            at org.apache.hadoop.hbase.io.hfile.HFileBlock.unpack(HFileBlock.java:549)
            at org.apache.hadoop.hbase.io.hfile.HFileBlock$AbstractFSReader$1.nextBlock(HFileBlock.java:1380)
            at org.apache.hadoop.hbase.io.hfile.HFileBlock$AbstractFSReader$1.nextBlockWithBlockType(HFileBlock.java:1386)
            at org.apache.hadoop.hbase.io.hfile.HFileReaderV2.<init>(HFileReaderV2.java:150)
            at org.apache.hadoop.hbase.io.hfile.HFile.pickReaderVersion(HFile.java:483)
            ... 8 more
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z

    错误原因是程序运行时无法找到libsnappy.so.*相关的文件,检查了一遍程序中打包的依赖,发现可以找到对应的类
    在这里插入图片描述
    在google上搜索了后发现,有人建议在运行jar时添加-Djava.library.path=/XXX/hadoop/native/选项后,解决了问题。

    于是在服务器上用
    find / -name native后找到/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/hadoop/lib/native这个路径,修改运行命令,添加 -Djava.library.path=/XXX/hadoop/native/选项后执行,发现数据成功被加载到HBase中

     sudo -u hdfs /usr/java/jdk1.8.0_181-cloudera/bin/java \
     -Djava.library.path=/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/hadoop/lib/native \
     -classpath /XXX.jar com.XXX.BulkLoadDataToHBase hdfs://172.168.100.171:8020/distcp_dir/test_d046b9c64abfbb9c67b16e97d0a28d20 tmp_v1
    
    • 1
    • 2
    • 3

    java.lang.NoSuchMethodError: org.slf4j.helpers.MessageFormatter.format(Ljava/lang/String;Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/String;
    期间还遇到了这个异常,异常原因是打包后的程序内存在重复的slf4j包,删除低版本的保留高版本的jar包后,可解决该问题。

    在这里插入图片描述
    我这里是多了这两个

    六月 30, 2022 3:15:59 下午 org.apache.zookeeper.ClientCnxn$SendThread logStartConnect
    信息: Opening socket connection to server bd.vn0038.jmrh.com/172.168.100.171:2181. Will not attempt to authenticate using SASL (unknown error)
    六月 30, 2022 3:15:59 下午 org.apache.zookeeper.ClientCnxn$SendThread run
    警告: Session 0x0 for server bd.vn0038.jmrh.com/172.168.100.171:2181, unexpected error, closing socket connection and attempting reconnect
    java.lang.NoSuchMethodError: org.slf4j.helpers.MessageFormatter.format(Ljava/lang/String;Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/String;
            at org.slf4j.impl.JDK14LoggerAdapter.info(JDK14LoggerAdapter.java:326)
            at org.apache.zookeeper.ClientCnxn$SendThread.primeConnection(ClientCnxn.java:962)
            at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:352)
            at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1224)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    参考资料:
    using snappy and other compressions with Nifi hdfs components
    https://community.cloudera.com/t5/Community-Articles/using-snappy-and-other-compressions-with-Nifi-hdfs/ta-p/248341

    使用 Snappy 文件源运行映射时,Informatica DEI 中出现错误:“java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z”
    https://knowledge.informatica.com/s/article/498636?language=en_US

    HBase排查|HBase Bulkload失败问题排查改进
    https://www.modb.pro/db/57374

  • 相关阅读:
    JS继承有哪些,你能否手写其中一两种呢?
    词法分析器的设计与实现--编译原理操作步骤,1、你的算法工作流程图; 2、你的函数流程图;3,具体代码
    【CSDN Daily Practice】【二分】X的平方根
    for..of的用法
    2512. 奖励最顶尖的 K 名学生
    java计算机毕业设计ssm的“萨丁”留学资讯网的设计与实现
    微信小程序ios下,border显示不全兼容问题解决
    iOS 关于UIAlertController常见使用方法
    css:overflow-y属性
    3 开源鸿蒙OpenHarmony4.1源码下载、编译,生成OHOS_Image可执行文件的最简易流程
  • 原文地址:https://blog.csdn.net/az9996/article/details/125545893