java.lang.IllegalArgumentException: hbase.mapreduce.hfileoutputformat.table.name cannot be empty
at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.createRecordWriter(HFileOutputFormat2.java:202)
at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.getRecordWriter(HFileOutputFormat2.java:185)
at org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil.initWriter(SparkHadoopWriter.scala:355)
at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:125)
at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$write$1(SparkHadoopWriter.scala:88)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
val hbaseConf: Configuration = HBaseConfiguration.create()
hbaseConf.set(TableOutputFormat.OUTPUT_TABLE,tableName)//设置输出表路径
//上边不行用这个
conf.set("hbase.mapreduce.hfileoutputformat.table.name", tableName) //Hbase 输出表
org.apache.hadoop.hbase.tool.BulkLoadHFiles.java
该工具可让您以编程方式将 HFileOutputFormat 的输出加载到现有表中。不是线程安全的。
@InterfaceAudience.Public
public interface BulkLoadHFiles {
static final String RETRY_ON_IO_EXCEPTION = "hbase.bulkload.retries.retryOnIOException";
static final String MAX_FILES_PER_REGION_PER_FAMILY =
"hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily";
static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
static final String CREATE_TABLE_CONF_KEY = "create.table";
static final String IGNORE_UNMATCHED_CF_CONF_KEY = "ignore.unmatched.families";
static final String ALWAYS_COPY_FILES = "always.copy.files";
/**
* 按列族支持 bulkLoadHFile 以避免由于服务器端compact而长时间等待 bulkLoadHFile
*/
public static final String BULK_LOAD_HFILES_BY_FAMILY = "hbase.mapreduce.bulkload.by.family";
org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.java
public void initialize() {
Configuration conf = getConf();
// disable blockcache for tool invocation, see HBASE-10500
conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
userProvider = UserProvider.instantiate(conf);
fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32);
//设置的默认值32
bulkLoadByFamily = conf.getBoolean(BulkLoadHFiles.BULK_LOAD_HFILES_BY_FAMILY, false);
nrThreads = conf.getInt("hbase.loadincremental.threads.max",
Runtime.getRuntime().availableProcessors());
numRetries = new AtomicInteger(0);
rpcControllerFactory = new RpcControllerFactory(conf);
}
val conf = HBaseConfiguration.create()
//为了预防hfile文件数过多无法进行导入,设置该参数值
conf.setInt("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily", 3200)
Caused by: java.lang.NoSuchMethodError: org.apache.hadoop.hbase.util.FSUtils.setStoragePolicy(Lorg/apache/hadoop/fs/FileSystem;Lorg/apache/hadoop/fs/Path;Ljava/lang/String;)V
at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.configureStoragePolicy(HFileOutputFormat2.java:459)
at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:275)
at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:230)
at org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil.write(SparkHadoopWriter.scala:363)
at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$executeTask$1(SparkHadoopWriter.scala:137)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:134)
... 9 more
Caused by: java.lang.NoSuchMethodError
一般出现这个基本是jar冲突,组件版本问题
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.hbase.io.ImmutableBytesWritable
Serialization stack:
- object not serializable (class: org.apache.hadoop.hbase.io.ImmutableBytesWritable, value: 32 31 36 31 35 63 30 62 37 64 61 34 61 36 61 63 35 64 35 65 37 38 39 33 62 33 37 66 66 36 62 37 39 30 37 65 65 64 37 66 5f 65 6d 75 5f 73 63 61 6e)
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.registerKryoClasses(Array(classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable]))
java.io.IOException: Added a key not lexically larger than previous. Current cell = a0000a9d3a5e09eaaae62e29bfe68faaf0c79eb6_avp_scan/i:engine_name/1661754500397/Put/vlen=8/seqid=0, lastCell = a0000a9d3a5e09eaaae62e29bfe68faaf0c79eb6_avp_scan/i:task_type/1661754500397/Put/vlen=1/seqid=0fileContext=[usesHBaseChecksum=true, checksumType=CRC32C, bytesPerChecksum=16384, blocksize=65536, encoding=NONE, includesMvcc=true, includesTags=true, compressAlgo=NONE, compressTags=false, cryptoContext=[cipher=NONE keyHash=NONE], tableName=TEST:SAMPLE_TASK_SCAN_BULKLOAD, columnFamily=i, cellComparator=org.apache.hadoop.hbase.CellComparatorImpl@767aa5a7]
at org.apache.hadoop.hbase.io.hfile.HFileWriterImpl.checkKey(HFileWriterImpl.java:240)
at org.apache.hadoop.hbase.io.hfile.HFileWriterImpl.append(HFileWriterImpl.java:737)
at org.apache.hadoop.hbase.regionserver.StoreFileWriter.append(StoreFileWriter.java:299)
at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:325)
at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:233)
at org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil.write(SparkHadoopWriter.scala:363)
at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$executeTask$1(SparkHadoopWriter.scala:137)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:134)
at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$write$1(SparkHadoopWriter.scala:88)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
查看重复:
select rk,count(*) from merge_sample.hr_task_scan_official_3hh group by rk having count(*) > 1;
处理重复 :
1. distinct
2.df.dropDuplicates()
尝试从 SQL 读出数据生成 ArrayList[((ImmutableBytesWritable,Array[Byte]), KeyValue)]
列表,列表里保证 KeyValue 是按照列族,标识符排序好,但是在调用 repartitionAndSortWithinPartitions 方法之后,排序由于 shuffle 的原因重新变为乱序
,最后的结果是总会看到 rowkey 确实排序好了,但是依旧因为列族与标识符没排序好而抛出的 "Added a key not lexically larger than previous" IOException
自定义KeyQualifierComparator
/***
* @Author: lzx
* @Description: 自定义比较器,先比较rk 在比较列名 (cf都一样)
* @Date: 2022/8/25
**/
//qualifier,也就是一个列族里面data:xxx
class CustomKeyQualifierComparator extends Comparator[(ImmutableBytesWritable,Array[Byte])] with Serializable {
override def compare(o1: (ImmutableBytesWritable, Array[Byte]), o2: (ImmutableBytesWritable, Array[Byte])): Int = {
var result: Int = o1._1.compareTo(o2._1) //rk
if (result==0) {
result = Bytes.compareTo(o1._2,o2._2) // cf
}
result
}
}
调用
val repartitionAndSortRDD: JavaPairRDD[ImmutableBytesWritable, KeyValue] =
flatMapRdd
.repartitionAndSortWithinPartitions(new RegionPartitioner(regionSplits.init.toArray),new CustomComparator)
.mapToPair(f=>(f._1._1,f._2)) //f:[((ImmutableBytesWritable,Array[Byte]),KeyValue)]
参考: HBase Bulkload 实践探讨 - 腾讯云开发者社区-腾讯云
2022-06-02 10:33:37 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.NoSuchMethodError: org.apache.hadoop.hdfs.client.HdfsDataInputStream.getReadStatistics()Lorg/apache/hadoop/hdfs/DFSInputStream$ReadStatistics;
java.lang.NoSuchMethodError: org.apache.hadoop.hdfs.client.HdfsDataInputStream.getReadStatistics()Lorg/apache/hadoop/hdfs/DFSInputStream$ReadStatistics;
at org.apache.hadoop.hbase.io.FSDataInputStreamWrapper.updateInputStreamStatistics(FSDataInputStreamWrapper.java:253)
at org.apache.hadoop.hbase.io.FSDataInputStreamWrapper.close(FSDataInputStreamWrapper.java:300)
at org.apache.hadoop.hbase.io.hfile.HFile.isHFileFormat(HFile.java:590)
at org.apache.hadoop.hbase.io.hfile.HFile.isHFileFormat(HFile.java:571)
at org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.visitBulkHFiles(LoadIncrementalHFiles.java:1072)
at org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.discoverLoadQueue(LoadIncrementalHFiles.java:988)
at org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.prepareHFileQueue(LoadIncrementalHFiles.java:249)
at org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.doBulkLoad(LoadIncrementalHFiles.java:356)
at org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.doBulkLoad(LoadIncrementalHFiles.java:281)
at MidToHbase_daily$.main(MidToHbase_daily.scala:226)
at MidToHbase_daily.main(MidToHbase_daily.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:728)