• Spark bulkload一些问题2


    hbase.mapreduce.hfileoutputformat.table.name cannot be empty

    日志:

    java.lang.IllegalArgumentException: hbase.mapreduce.hfileoutputformat.table.name cannot be empty
    	at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.createRecordWriter(HFileOutputFormat2.java:202)
    	at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.getRecordWriter(HFileOutputFormat2.java:185)
    	at org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil.initWriter(SparkHadoopWriter.scala:355)
    	at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:125)
    	at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$write$1(SparkHadoopWriter.scala:88)
    	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    	at org.apache.spark.scheduler.Task.run(Task.scala:127)
    	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463)
    	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    	at java.lang.Thread.run(Thread.java:748)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    解决:

    val hbaseConf: Configuration = HBaseConfiguration.create()
    hbaseConf.set(TableOutputFormat.OUTPUT_TABLE,tableName)//设置输出表路径
    
    //上边不行用这个
    conf.set("hbase.mapreduce.hfileoutputformat.table.name", tableName) //Hbase 输出表
    
    • 1
    • 2
    • 3
    • 4
    • 5

    Trying to load more than 32 hfiles to one family of one region

    源码分析

    org.apache.hadoop.hbase.tool.BulkLoadHFiles.java

    该工具可让您以编程方式将 HFileOutputFormat 的输出加载到现有表中。不是线程安全的。  
    @InterfaceAudience.Public
    public interface BulkLoadHFiles {
    	
    	static final String RETRY_ON_IO_EXCEPTION = "hbase.bulkload.retries.retryOnIOException";
      static final String MAX_FILES_PER_REGION_PER_FAMILY =
          "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily";
      static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
      static final String CREATE_TABLE_CONF_KEY = "create.table";
      static final String IGNORE_UNMATCHED_CF_CONF_KEY = "ignore.unmatched.families";
      static final String ALWAYS_COPY_FILES = "always.copy.files";
      /**
       * 按列族支持 bulkLoadHFile 以避免由于服务器端compact而长时间等待 bulkLoadHFile
       */
      public static final String BULK_LOAD_HFILES_BY_FAMILY = "hbase.mapreduce.bulkload.by.family";
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.java

    public void initialize() {
        Configuration conf = getConf();
        // disable blockcache for tool invocation, see HBASE-10500
        conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
        userProvider = UserProvider.instantiate(conf);
        fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
        assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
        maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32);
    	 //设置的默认值32
        bulkLoadByFamily = conf.getBoolean(BulkLoadHFiles.BULK_LOAD_HFILES_BY_FAMILY, false);
        nrThreads = conf.getInt("hbase.loadincremental.threads.max",
          Runtime.getRuntime().availableProcessors());
        numRetries = new AtomicInteger(0);
        rpcControllerFactory = new RpcControllerFactory(conf);
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    解决

    val conf = HBaseConfiguration.create()
        //为了预防hfile文件数过多无法进行导入,设置该参数值
        conf.setInt("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily", 3200)
    
    • 1
    • 2
    • 3

    java.lang.NoSuchMethodError: jar包冲突

    日志

    Caused by: java.lang.NoSuchMethodError: org.apache.hadoop.hbase.util.FSUtils.setStoragePolicy(Lorg/apache/hadoop/fs/FileSystem;Lorg/apache/hadoop/fs/Path;Ljava/lang/String;)V
    	at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.configureStoragePolicy(HFileOutputFormat2.java:459)
    	at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:275)
    	at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:230)
    	at org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil.write(SparkHadoopWriter.scala:363)
    	at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$executeTask$1(SparkHadoopWriter.scala:137)
    	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
    	at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:134)
    	... 9 more
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    解决

    Caused by: java.lang.NoSuchMethodError
    一般出现这个基本是jar冲突,组件版本问题
    
    • 1
    • 2

    Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializabl

    日志

    Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.hbase.io.ImmutableBytesWritable
    Serialization stack:
    	- object not serializable (class: org.apache.hadoop.hbase.io.ImmutableBytesWritable, value: 32 31 36 31 35 63 30 62 37 64 61 34 61 36 61 63 35 64 35 65 37 38 39 33 62 33 37 66 66 36 62 37 39 30 37 65 65 64 37 66 5f 65 6d 75 5f 73 63 61 6e)
    	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
    	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
    	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
    	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
    	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
    	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    解决

    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    sparkConf.registerKryoClasses(Array(classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable]))
    
    • 1
    • 2

    java.io.IOException: Added a key not lexically larger than previous

    日志

    java.io.IOException: Added a key not lexically larger than previous. Current cell = a0000a9d3a5e09eaaae62e29bfe68faaf0c79eb6_avp_scan/i:engine_name/1661754500397/Put/vlen=8/seqid=0, lastCell = a0000a9d3a5e09eaaae62e29bfe68faaf0c79eb6_avp_scan/i:task_type/1661754500397/Put/vlen=1/seqid=0fileContext=[usesHBaseChecksum=true, checksumType=CRC32C, bytesPerChecksum=16384, blocksize=65536, encoding=NONE, includesMvcc=true, includesTags=true, compressAlgo=NONE, compressTags=false, cryptoContext=[cipher=NONE keyHash=NONE], tableName=TEST:SAMPLE_TASK_SCAN_BULKLOAD, columnFamily=i, cellComparator=org.apache.hadoop.hbase.CellComparatorImpl@767aa5a7]
    	at org.apache.hadoop.hbase.io.hfile.HFileWriterImpl.checkKey(HFileWriterImpl.java:240)
    	at org.apache.hadoop.hbase.io.hfile.HFileWriterImpl.append(HFileWriterImpl.java:737)
    	at org.apache.hadoop.hbase.regionserver.StoreFileWriter.append(StoreFileWriter.java:299)
    	at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:325)
    	at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:233)
    	at org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil.write(SparkHadoopWriter.scala:363)
    	at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$executeTask$1(SparkHadoopWriter.scala:137)
    	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
    	at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:134)
    	at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$write$1(SparkHadoopWriter.scala:88)
    	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    	at org.apache.spark.scheduler.Task.run(Task.scala:127)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    解决1:处理重复数据

    查看重复:
    select rk,count(*) from merge_sample.hr_task_scan_official_3hh group by rk having count(*) > 1;
    
    处理重复 :
    1. distinct
    2.df.dropDuplicates()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    解决2:repartitionAndSortWithinPartitions 导致

    尝试从 SQL 读出数据生成 ArrayList[((ImmutableBytesWritable,Array[Byte]), KeyValue)] 列表,列表里保证 KeyValue 是按照列族,标识符排序好,但是在调用 repartitionAndSortWithinPartitions 方法之后,排序由于 shuffle 的原因重新变为乱序,最后的结果是总会看到 rowkey 确实排序好了,但是依旧因为列族与标识符没排序好而抛出的 "Added a key not lexically larger than previous" IOException
    自定义KeyQualifierComparator

    /***
     * @Author: lzx
     * @Description: 自定义比较器,先比较rk 在比较列名 (cf都一样)
     * @Date: 2022/8/25
     **/
    //qualifier,也就是一个列族里面data:xxx
    class CustomKeyQualifierComparator extends Comparator[(ImmutableBytesWritable,Array[Byte])] with Serializable {
      override def compare(o1: (ImmutableBytesWritable, Array[Byte]), o2: (ImmutableBytesWritable, Array[Byte])): Int = {
        var result: Int = o1._1.compareTo(o2._1) //rk
        if (result==0) {
          result = Bytes.compareTo(o1._2,o2._2) // cf
        }
        result
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    调用

     val repartitionAndSortRDD: JavaPairRDD[ImmutableBytesWritable, KeyValue] =
            flatMapRdd
              .repartitionAndSortWithinPartitions(new RegionPartitioner(regionSplits.init.toArray),new CustomComparator)
              .mapToPair(f=>(f._1._1,f._2)) //f:[((ImmutableBytesWritable,Array[Byte]),KeyValue)]
    
    • 1
    • 2
    • 3
    • 4

    参考: HBase Bulkload 实践探讨 - 腾讯云开发者社区-腾讯云


    java.lang.NoSuchMethodError:org.apache.hadoop.hdfs.client.HdfsDataInputStream.getReadStatistics

    日志

    2022-06-02 10:33:37 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.NoSuchMethodError: org.apache.hadoop.hdfs.client.HdfsDataInputStream.getReadStatistics()Lorg/apache/hadoop/hdfs/DFSInputStream$ReadStatistics;
    java.lang.NoSuchMethodError: org.apache.hadoop.hdfs.client.HdfsDataInputStream.getReadStatistics()Lorg/apache/hadoop/hdfs/DFSInputStream$ReadStatistics;
    	at org.apache.hadoop.hbase.io.FSDataInputStreamWrapper.updateInputStreamStatistics(FSDataInputStreamWrapper.java:253)
    	at org.apache.hadoop.hbase.io.FSDataInputStreamWrapper.close(FSDataInputStreamWrapper.java:300)
    	at org.apache.hadoop.hbase.io.hfile.HFile.isHFileFormat(HFile.java:590)
    	at org.apache.hadoop.hbase.io.hfile.HFile.isHFileFormat(HFile.java:571)
    	at org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.visitBulkHFiles(LoadIncrementalHFiles.java:1072)
    	at org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.discoverLoadQueue(LoadIncrementalHFiles.java:988)
    	at org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.prepareHFileQueue(LoadIncrementalHFiles.java:249)
    	at org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.doBulkLoad(LoadIncrementalHFiles.java:356)
    	at org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.doBulkLoad(LoadIncrementalHFiles.java:281)
    	at MidToHbase_daily$.main(MidToHbase_daily.scala:226)
    	at MidToHbase_daily.main(MidToHbase_daily.scala)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:498)
    	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:728)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    解决:

    https://blog.csdn.net/Lzx116/article/details/125059929

  • 相关阅读:
    小满Vue3第四十五章(Vue3 Web Components)
    Go+VsCode配置环境
    【C++】const 用法总结(非常实用)
    BatchNorm
    JAVA:实现文件中出现频率最高的K个单词以及出现的次数算法(附完整源码)
    重新整理 .net core 实践篇 ———— linux 上线篇 [外篇]
    Oracle数据库面试题-5
    BP神经网络算法基本原理,bp神经网络的算法步骤
    [Mac软件]VMware Fusion Pro for Mac 13.5.1 Build 23298085 VM虚拟机中文版
    基于SSM的视频播放系统的设计与实现
  • 原文地址:https://blog.csdn.net/Lzx116/article/details/126692164