• SparkStreaming写入Hive慢


    项目场景:

    spark streaming从 Kafka 消费数据,写到 Hive 表。


    问题描述

    数据量级上亿,SparkStreaming 的 bath time 为 1 min, 在某一个时刻开始出现任务堆积,即大量任务处于 Queued 状态,卡在了某个 job,最长延迟时间为 1.7 h。

    查看 job 状态一直处于 processing, 但是发现该 job 写 hive 的时间也就花费了 30 秒左右,但是该 job 最终执行完的时间远远大于这个时间。

    慢慢的,每一批次都要慢几分钟,出现堆积,最终造成数据大面积延迟。


    原因分析:

    • Spark源码:
      在这里插入图片描述
    • hive源码
     // If source path is a subdirectory of the destination path (or the other way around):
        //   ex: INSERT OVERWRITE DIRECTORY 'target/warehouse/dest4.out' SELECT src.value WHERE src.key >= 300;
        //   where the staging directory is a subdirectory of the destination directory
        // (1) Do not delete the dest dir before doing the move operation.
        // (2) It is assumed that subdir and dir are in same encryption zone.
        // (3) Move individual files from scr dir to dest dir.
        boolean srcIsSubDirOfDest = isSubDir(srcf, destf, srcFs, destFs, isSrcLocal),
            destIsSubDirOfSrc = isSubDir(destf, srcf, destFs, srcFs, false);
        final String msg = "Unable to move source " + srcf + " to destination " + destf;
        try {
          if (replace) {
            try{
              //if destf is an existing directory:
              //if replace is true, delete followed by rename(mv) is equivalent to replace
              //if replace is false, rename (mv) actually move the src under dest dir
              //if destf is an existing file, rename is actually a replace, and do not need
              // to delete the file first
              if (replace && !srcIsSubDirOfDest) {
                destFs.delete(destf, true);
                LOG.debug("The path " + destf.toString() + " is deleted");
              }
            } catch (FileNotFoundException ignore) {
            }
          }
          final SessionState parentSession = SessionState.get();
          if (isSrcLocal) {
            // For local src file, copy to hdfs
            destFs.copyFromLocalFile(srcf, destf);
            return true;
          } else {
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    分析

    阅读上面的源码,可以发现往 Hive 中写数据的时候会在目标表中(1.1 版本之后是默认位置目标表的文件夹)生成一个以.hive-staging 开头的lin时文件夹,结果会在临时文件夹存放。执行完成后会,将临时文件夹 rename,放到对应的目标表文件下。

    这里的 rename 并不是直接修改 hive 元数据那么简单。是在特定条件下才会执行 mv file 的,否则还是会 copy file 的形式。

    临时文件就直接放在目标表对应的目录下面了,所以最后执行的 copy 操作,如果文件多或者数据量大的情况下,会很慢。

    hive的MV策略如下:

     1.原文件是非hdfs文件,copyFromLocal
    
       2.原文件是hdfs文件
    
              2.1   Encrypted模式
    
                    copy操作,如果文件大于默认值(32MB),则会进行distcp操作。
    
             2.2  非Encrypted模式
    
                (1)原目录是目标目录的子目录,原目录下的每个文件进行copy操作,如果文件大于默认值(32MB),则会进行distcp操作。
    
                (2)其他情况,进行mv操作。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    hdfs mv原理

    当用户调用hdfs dfs -mv时,HDFS保证重命名操作的原子性.运行此命令时,客户端对NameNode进行RPC调用.此RPC的NameNode实现在修改inode树时保持锁定,并且只有在重命名完成后才会成功锁定或成功锁定. (由于许可或配额违规等原因,它可能会失败.)

    由于实现完全在NameNode内执行并且仅操纵文件系统元数据,因此不涉及实际的数据移动.实际上,在hdfs dfs -mv命令期间,没有与DataNode的交互.所有文件的块保持不变,与inode关联的阻止列表保持不变. NameNode只是从一个位置获取该文件的inode,并将其移动到文件系统树中的另一个位置.不会破坏块数据


    解决方案:

    方案一:修改临时目录

    临时文件不要放在目标表对应的目录下面了,此时会执行 mv 操作,不涉及文件的移动,这样就会很快。

    <property>  
        <name>hive.exec.stagingdirname>    
        <value>/tmp/hive/.hive-stagingvalue>  
        <description>hive任务生成临时文件夹地址description>
    property>
    
    <property>          
        <name>hive.insert.into.multilevel.dirsname> 
         <value>truevalue>  
         <description>hive.insert.into.mulltilevel.dirs设置成false的时候,insert 目标目录的上级目录必须存在;trued的时候允许不存在description>
     property>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    方案二:

    spark 直接落文件到 HDFS的对应分区中 ,hive 表见外部表与数据进行关联。这种就不依赖与 hive 了,减少中间环节。这是,尽可能的规避小文件,需要尽可能减少文件个数。

    参考:
    http://t.csdn.cn/0Rmyv

  • 相关阅读:
    情感支撑对话论文最近进展 Emotion Support Conversation
    大数据(二)大数据架构发展史
    彻底搞懂kubernetes调度框架与插件
    196. 删除重复的电子邮箱
    Haar cascade+opencv检测算法
    大数据Hadoop核心架构HDFS+MapReduce+Hbase+Hive内部机理详解
    【C++基础入门】43.C++中多态的概念和意义
    nodeJs读取mysql数据库的数据
    mysql入门,各种概念了解
    Java版分布式微服务云开发架构 Spring Cloud+Spring Boot+Mybatis 电子招标采购系统功能清单
  • 原文地址:https://blog.csdn.net/Lzx116/article/details/126499665