spark streaming从 Kafka 消费数据,写到 Hive 表。
数据量级上亿,SparkStreaming 的 bath time 为 1 min, 在某一个时刻开始出现任务堆积,即大量任务处于 Queued 状态,卡在了某个 job,最长延迟时间为 1.7 h。
查看 job 状态一直处于 processing, 但是发现该 job 写 hive 的时间也就花费了 30 秒左右,但是该 job 最终执行完的时间远远大于这个时间。
慢慢的,每一批次都要慢几分钟,出现堆积,最终造成数据大面积延迟。
// If source path is a subdirectory of the destination path (or the other way around):
// ex: INSERT OVERWRITE DIRECTORY 'target/warehouse/dest4.out' SELECT src.value WHERE src.key >= 300;
// where the staging directory is a subdirectory of the destination directory
// (1) Do not delete the dest dir before doing the move operation.
// (2) It is assumed that subdir and dir are in same encryption zone.
// (3) Move individual files from scr dir to dest dir.
boolean srcIsSubDirOfDest = isSubDir(srcf, destf, srcFs, destFs, isSrcLocal),
destIsSubDirOfSrc = isSubDir(destf, srcf, destFs, srcFs, false);
final String msg = "Unable to move source " + srcf + " to destination " + destf;
try {
if (replace) {
try{
//if destf is an existing directory:
//if replace is true, delete followed by rename(mv) is equivalent to replace
//if replace is false, rename (mv) actually move the src under dest dir
//if destf is an existing file, rename is actually a replace, and do not need
// to delete the file first
if (replace && !srcIsSubDirOfDest) {
destFs.delete(destf, true);
LOG.debug("The path " + destf.toString() + " is deleted");
}
} catch (FileNotFoundException ignore) {
}
}
final SessionState parentSession = SessionState.get();
if (isSrcLocal) {
// For local src file, copy to hdfs
destFs.copyFromLocalFile(srcf, destf);
return true;
} else {
阅读上面的源码,可以发现往 Hive 中写数据的时候会在目标表中(1.1 版本之后是默认位置目标表的文件夹)生成一个以.hive-staging 开头的lin时文件夹,结果会在临时文件夹存放。执行完成后会,将临时文件夹 rename,放到对应的目标表文件下。
这里的 rename 并不是直接修改 hive 元数据那么简单。是在特定条件下才会执行 mv file 的,否则还是会 copy file 的形式。
临时文件就直接放在目标表对应的目录下面了,所以最后执行的 copy 操作,如果文件多或者数据量大的情况下,会很慢。
1.原文件是非hdfs文件,copyFromLocal
2.原文件是hdfs文件
2.1 Encrypted模式
copy操作,如果文件大于默认值(32MB),则会进行distcp操作。
2.2 非Encrypted模式
(1)原目录是目标目录的子目录,原目录下的每个文件进行copy操作,如果文件大于默认值(32MB),则会进行distcp操作。
(2)其他情况,进行mv操作。
当用户调用hdfs dfs -mv时,HDFS保证重命名操作的原子性.运行此命令时,客户端对NameNode进行RPC调用.此RPC的NameNode实现在修改inode树时保持锁定,并且只有在重命名完成后才会成功锁定或成功锁定. (由于许可或配额违规等原因,它可能会失败.)
由于实现完全在NameNode内执行并且仅操纵文件系统元数据,因此不涉及实际的数据移动.实际上,在hdfs dfs -mv命令期间,没有与DataNode的交互.所有文件的块保持不变,与inode关联的阻止列表保持不变. NameNode只是从一个位置获取该文件的inode,并将其移动到文件系统树中的另一个位置.不会破坏块数据
临时文件不要放在目标表对应的目录下面了,此时会执行 mv 操作,不涉及文件的移动,这样就会很快。
<property>
<name>hive.exec.stagingdirname>
<value>/tmp/hive/.hive-stagingvalue>
<description>hive任务生成临时文件夹地址description>
property>
<property>
<name>hive.insert.into.multilevel.dirsname>
<value>truevalue>
<description>hive.insert.into.mulltilevel.dirs设置成false的时候,insert 目标目录的上级目录必须存在;trued的时候允许不存在description>
property>
spark 直接落文件到 HDFS的对应分区中 ,hive 表见外部表与数据进行关联。这种就不依赖与 hive 了,减少中间环节。这是,尽可能的规避小文件,需要尽可能减少文件个数。