序列化:就是把内存中的对象转换成字节序列(或其他数据传输协议),以便于存储到磁盘(持久化)和网络传输。
反序列化:就是将收到的字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。
因为Java自带的序列化存储了很多额外信息(各种校验、Header、继承体系等),是一个重量级序列化框架,不便于在网络中传输,所以Hadoop有自己的一套序列化。
Hadoop序列化的特点:
实际开发中,Hadoop自带的Text、IntWritable等基本的序列化类型往往不够用,需要自定义一些可序列化的 JavaBean。
自定义需要序列化的类:
package com.study.mapreduce.writable;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class MyWritable implements Writable,Comparable<MyWritable> {
private Integer id;
private Long scale;
private Integer age;
/**
* 需要有无参构造器
*/
public MyWritable() {
}
/**
* 序列化
* @param out
* @throws IOException
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(id);
out.writeLong(scale);
out.writeInt(age);
}
/**
* 反序列化
* @param in
* @throws IOException
*/
@Override
public void readFields(DataInput in) throws IOException {
// 读取顺序要和write方法写的顺序一致,即先进先出
id = in.readInt();
scale = in.readLong();
age = in.readInt();
}
/**
* 如果想当做key在MapReduce中传输,需要实现Comparable,因为Shuffle过程要求key必须能排序
* @param o
* @return
*/
@Override
public int compareTo(MyWritable o) {
return this.id > o.getId() ? -1 : 1;
}
/**
* 为方便查看,还可以重写toString()方法
* @return
*/
@Override
public String toString() {
MessageFormat mf = new MessageFormat("MyWritable:{id:[0], scale:[1], age:[2]}");
return mf.format(new Object[]{id, scale, age});
}
// 生成getter/setter
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public Long getScale() {
return scale;
}
public void setScale(Long scale) {
this.scale = scale;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
}
MapReduce框架运行流程:
InputFormat是一个抽象类,它有很多实现类,例如 FileInputFormat、CombineFileInputFormat、TextInputFormat等。
前面的WordCount示例中,我们使用文件输入,使用的就是 FileInputFormat。
InputFormat类有两个抽象方法:
MapTask的并行度决定Map阶段的任务处理并发度,进而影响到整个 Job 的处理速度。
数据块:Block是HDFS物理上把数据分成一块一块的,数据块是HDFS存储数据单位。
数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask。
数据块是物理上的分开存储,例如一个129Mb的数据文件,在存入 hdfs 时,因为hdfs一个数据块默认大小只有128Mb,所以会被分成两个数据块存储:block0存储128Mb,block1存储1Mb,这两个数据块可能存储在不同的服务器上,这个是物理上的分开存储。
数据切片是逻辑上的切片,不是真正的物理磁盘上分开存储。例如将这个129Mb的文件切成一个100Mb和一个29Mb两个片,那么只会找个位置记录下来:0-100索引位置属于第一个片,100-129索引位置属于第二个片,并不影响物理上的存储。
切片的大小会影响到执行的效率:
如图:对于一个300Mb的文件切成3片(即产生3个MapTask)
如果切片大小设置为100Mb(红色线条),那么:
● 第一个MapTask只能执行DataNode1上存储的128Mb数据中的100Mb
● MapTask2需要通过网络IO获取DataNode1上剩余的28Mb,加上DataNode2上的一部分,组成100Mb
● MapTask3需要通过网络IO获取DataNode2上剩余的数据,加上DataNode3上的进行处理
这样就会因为这些网络IO导致效率低下。
如果切片代销设置为128Mb(蓝色线条),那么:
● MapTask1执行本机DataNode1上的128Mb
● MapTask2执行本机DataNode2上的128Mb
● MapTask3执行本机DataNode3上的数据
没有了网络IO,效率就会高很多
Hadoop默认参数中,切片大小就是等于数据块大小。(也可以自定义配置,但不推荐)
切片时,不考虑数据集整体,而是逐个针对每一个文件单独切片。
例如:输入数据中有两个文件(a.txt、b.txt),其中 a.txt 大小为300Mb, b.txt大小为100Mb。切片时,不会按整体400Mb切分,而是逐个对这两个文件进行切分。切片大小为128Mb的话:
先将a.txt切分成3个片:128Mb、128Mb、44Mb,然后再将b.txt切分成一个100Mb的片。最后产生4个切片、4个MapTask。
而不是将b.txt追加到a.txt上,因为b.txt的block和a.txt的block不是同一个,如果合一起切就又会产生网络IO。
根据WordCount程序可知,MapReduce中真正执行job的是job.waitForCompletion(true)方法。job.waitForCompletion(true)方法中调用了submit()提交job。
submit()
方法的源码如下:
public void submit() throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE); // 确认job的状态为DEFINE(未运行)状态
setUseNewAPI(); // 如果使用的是mapred包(hadoop 1.x)中的类,做一些特殊配置进行兼容
// 获取hadoop集群的连接
// connect()方法内部会调用到Cluster类构造方法,Cluster构造方法会调用initialize()初始化方法
// Cluster的initialize()初始化方法中,会得到两个Provider:YarnClientProtocolProvider、LocalClientProtocolProvider
// 如果配置项mapreduce.framework.name的值为yarn,则使用YarnClientProtocolProvider;如果为local,则使用LocalClientProtocolProvider
// mapreduce.framework.name在mapred-default.xml默认值为local,我们本地的hadoop没有修改该配置项,所以本地运行WordCount会使用本地模式运行,即输入、输出路径都是本地路径。而我们的hadoop102集群中,在mapred-site.xml中将该值配置为了yarn,所以在hadoop102上运行WordCount程序时走的是Yarn,即输入、输出路径都是hdfs路径。
connect();
final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {
// 向集群提交job信息
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING; // 将DEFINE(未运行)状态改为 RUNNING(运行)
LOG.info("The url to track the job: " + getTrackingURL());
}
submitter.submitJobInternal(Job.this, cluster)
向集群提交job信息的源码:
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
// 校验output文件夹是否存在。如果存在,会抛出异常output文件夹已存在
checkSpecs(job);
Configuration conf = job.getConfiguration();
addMRFrameworkToDistributedCache(conf);
// 获取临时路径(该路径后面会用于临时存放job的切片等信息,处理完毕后该文件夹会清空)
// 该路径的前面一部分可以通过mapreduce.jobtracker.staging.root.dir进行配置
// 如果没有配置,则默认取/tmp/hadoop/mapred/staging
// 然后再后面加上 <当前用户名(用户名为空则取dummy)+随机数>/.staging
// 例如:/tmp/hadoop/mapred/staging/tengyer2113150384/.staging
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
// ......配置一些命令行信息。省略
// 创建一个jobID,提交的每个任务都有一个唯一的jobID,例如:job_local2113150384_0001
JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId);
// 在上面创建的/tmp/hadoop/mapred/staging/tengyer2113150384/.staging基础上,
// 创建一个新的path:/tmp/hadoop/mapred/staging/tengyer2113150384/.staging/job_local2113150384_0001
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
try {
// ........ 中间这一块是设置一些配置信息、缓存信息,省略
// 将job的jar包、依赖、配置文件等内容提交到集群
// yarn模式才会将jar包提交到集群,local模式不提交
copyAndConfigureFiles(job, submitJobDir);
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
// 切片,将切片信息临时保存到/tmp/hadoop/mapred/staging/tengyer2113150384/.staging/job_local2113150384_0001
// 会生成 job.split、.job.split.crc、job.splitmetainfo、.job.splitmetainfo.crc文件,保存切片信息
int maps = writeSplits(job, submitJobDir);
// 将切片的个数赋值给将来要创建的MapTask数量,有几个切片就有几个MapTask
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);
int maxMaps = conf.getInt(MRJobConfig.JOB_MAX_MAP,
MRJobConfig.DEFAULT_JOB_MAX_MAP);
if (maxMaps >= 0 && maxMaps < maps) {
throw new IllegalArgumentException("The number of map tasks " + maps +
" exceeded limit " + maxMaps);
}
// ....... 队列、缓存等信息。省略
// 在/tmp/hadoop/mapred/staging/tengyer2113150384/.staging/job_local2113150384_0001下创建 jbo.xml、.job.xml.crc
// job.xml中保存了job运行需要的配置参数信息
writeConf(conf, submitJobFile);
//
// Now, actually submit the job (using the submit name)
// 真正的提交 job
//
printTokens(jobId, job.getCredentials());
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
if (status != null) {
return status;
} else {
throw new IOException("Could not launch job");
}
} finally {
if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir);
if (jtFs != null && submitJobDir != null)
jtFs.delete(submitJobDir, true);
}
}
}
最后,在job.waitForCompletion(true)
运行完monitorAndPrintJob()
方法后,/tmp/hadoop/mapred/staging/tengyer2113150384/.staging/job_local2113150384_0001
文件夹被清空。
在submitter.submitJobInternal(Job.this, cluster)
向集群提交job信息的源码中,执行到int maps = writeSplits(job, submitJobDir);
时会进行切片。
writeSplits(job, submitJobDir)
中,会进行判断,Hadoop 1.x的程序会调用 writeOldSplits(jConf, jobSubmitDir)
,Hadoop 2.x的程序会调用writeNewSplits(job, jobSubmitDir)
。
writeNewSplits(job, jobSubmitDir)
会调用具体的InputFormat
实现类的getSplits(job)
去进行切片。·
public List<InputSplit> getSplits(JobContext job) throws IOException {
StopWatch sw = new StopWatch().start();
// getFormatMinSplitSize()在本类中固定返回1
// getMinSplitSize(job)获取配置项mapreduce.input.fileinputformat.split.minsize的值,在mapred-default.xml中该配置项默认值为0
// 所以两个数取最大值结果为1
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
// 获取配置项mapreduce.input.fileinputformat.split.maxsize的值,mapred-default.xml中默认没有该配置项
// 没有该配置项时,取默认值:Long.MAX_VALUE
long maxSize = getMaxSplitSize(job);
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
boolean ignoreDirs = !getInputDirRecursive(job)
&& job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);
// 循环遍历所有输入文件,分别对每个文件进行分片
// 这也证明了:切片时,不考虑数据集整体,而是逐个针对每一个文件单独切片。
for (FileStatus file: files) {
if (ignoreDirs && file.isDirectory()) {
continue;
}
// 获取文件路径、内容长度
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
// 判断文件是否允许切片
// 对于普通文本文件,从文件中间截断不会有影响。
// 但是如果是一些压缩文件(有些压缩文件也支持切片),截断后内容不完整程序就无法处理类。所以一些压缩文件就只允许切成1个片(即没有切片)
if (isSplitable(job, path)) {
// 获取数据块大小。
// local本地模式,默认的数据块大小是 32Mb
// Hadoop 1.x:默认的数据块大小是 64Mb
// Hadoop 2.x/3.x:默认的数据块大小是 128Mb
long blockSize = file.getBlockSize();
// 计算切片大小
// 该方法的实现为:Math.max(minSize, Math.min(maxSize, blockSize))
// 由前面的配置可知,minSize为1,maxSize为Long.MAX_VALUE,blockSize为 32Mb(当前是本地模式运行)
// 所以 Math.min(maxSize, blockSize) = 32Mb, Math.max(minSize, 32Mb) = 32Mb
// 即:默认情况下,切片大小等于块大小
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
// 进行切片,只有大于切片大小的 SPLIT_SLOP(本类中该值固定为1.1) 倍才切。
// 例如,传入一个 66Mb 的文件:
// 第一次进while时,66Mb > 35.2Mb(32Mb * 1.1), 会被切分一个 32Mb的片,剩余 34Mb
// 第二次进while时, 34Mb < 35.2Mb (32Mb * 1.1), 那么剩下的 34Mb 就不再切分了,就当做1个片处理
// 这样做的好处是:防止过度切分,导致最后一台服务器过于空闲,初始化时间比真正处理数据时间还长,拉低效率。
// 例如上面剩下的 34Mb 如果继续切分成 32Mb 和 2 Mb的片,最后处理 2Mb 的那台机器真正处理的数据量太少,造成资源浪费、效率降低
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
// 对于不能切片的文件,直接设置成1个片
if (LOG.isDebugEnabled()) {
// Log only if the file is big enough to be splitted
if (length > Math.min(file.getBlockSize(), minSize)) {
LOG.debug("File is not splittable so no parallelization "
+ "is possible: " + file.getPath());
}
}
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return splits;
}
根据切片的源码可知,切片的大小为:
long splitSize = Math.max(minSize, Math.min(maxSize, blockSize));
所以,如果要修改切片大小,可以通过调整 minSize、maxSize 进行修改:
默认的TextInputFormat切片机制是第任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask。这样如果有大量小文件,就会产生大量的MapTask,处理效率变低。
CombineTextInputFormat
用于小文件过多的场景,可以将多个小文件从逻辑上规划到一个切片中,这样多个小文件就可以交给一个或多个MapTask处理。
流程: