共计 11114 个字符,预计需要花费 28 分钟才能阅读完成。
Hadoop 的 job 提交过程相对来说还是有点复杂的,所以在学习源码的时候会显得有些乱,时常看了后面忘了前面,所以在看了多遍之后决定用文章的方式记录下来,一边自己下次再看的时候能够清晰些,同时也为初次接触这方面源码的同学提供一些帮助吧。希望自己可以写的足够详细。(本文针对 hadoop1.2.1)
1.job.waitForCompletion:一般情况下我们提交一个 job 都是通过 job.waitForCompletion 方法提交,该方法内部会调用 job.submit()方法
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
if (state == JobState.DEFINE) {
submit();
}
if (verbose) {
jobClient.monitorAndPrintJob(conf, info);
} else {
info.waitForCompletion();
}
return isSuccessful();
}
2.job.submit():在 submit 中会调用 setUseNewAPI(),setUseNewAPI()这个方法主要是判断是使用新的 api 还是旧的 api,之后会调用 connect()方法,该方法主要是实例化 jobClient,然后会调用 jobClient.submitJobInternal(conf)这个方法进行 job 的提交
public void submit() throws IOException, InterruptedException,
ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
// Connect to the JobTracker and submit the job
connect();
info = jobClient.submitJobInternal(conf);
super.setJobID(info.getID());
state = JobState.RUNNING;
}
3.jobClient.submitJobInternal():这个方法会将 job 运行时所需的所有文件上传到 jobTarcker 文件系统(一般是 hdfs)中,同时进行备份(备份数默认是 10,通过 mapred.submit.replication 变量可以设置),这个方法需要深入进行解读。
4.JobSubmissionFiles.getStagingDir:这个方法是在 jobClient.submitJobInternal()最先调用的,这个方法主要是获取一个 job 提交的根目录,主要是通过 Path stagingArea = client.getStagingAreaDir(); 方法获得,这个方法最终会调用 jobTracker.getStagingAreaDirInternal()方法,代码如下:
private String getStagingAreaDirInternal(String user) throws IOException {
final Path stagingRootDir =
new Path(conf.get(“mapreduce.jobtracker.staging.root.dir”,
“/tmp/hadoop/mapred/staging”));
final FileSystem fs = stagingRootDir.getFileSystem(conf);
return fs.makeQualified(new Path(stagingRootDir,
user+”/.staging”)).toString();
}
在获取了 stagingDir 之后会执行 JobID jobId = jobSubmitClient.getNewJobId(); 为 job 获取一个 jobId,然后执行 Path submitJobDir = new Path(jobStagingArea, jobId.toString()); 获得该 job 提交的路径,也就是在 stagingDir 目录下建一个以 jobId 为文件名的目录。有了 submitJobDir 之后就可以将 job 运行所需的全部文件上传到对应的目录下了,具体是调用 jobClient.copyAndConfigureFiles(jobCopy, submitJobDir)这个方法。
5.jobClient.copyAndConfigureFiles(jobCopy, submitJobDir):这个方法最终调用 jobClient.copyAndConfigureFiles(job, jobSubmitDir, replication); 这个方法实现文件上传。
6.jobClient.copyAndConfigureFiles(job, jobSubmitDir, replication):这个方法首先获取用户在使用命令执行 job 的时候所指定的 -libjars, -files, -archives 文件,对应的 conf 配置参数是 tmpfiles tmpjars tmparchives,这个过程是在 ToolRunner.run()的时候进行解析的,当用户指定了这三个参数之后,会将这三个参数对应的文件都上传到 hdfs 上,下面我们具体看一个参数的处理:tmpfiles(其他两个基本相同)
7.jobClient 处理 tmpfiles:该方法会将 tmpfiles 参数值按‘,’分割,然后将每一个文件上传到 hdfs,其中如何文件的路径本身就在 hdfs 中,那么将不进行上传操作,上传操作只针对文件不在 hdfs 中的文件。调用的方法是:Path newPath = copyRemoteFiles(fs,filesDir, tmp, job, replication),该方法内部使用的是 FileUtil.copy(remoteFs, originalPath, jtFs, newPath, false, job)方法将文件上传至 hdfs,注意此处的 remoteFs 和 jtFs,remoteFs 就是需上传文件的原始文件系统,jtFs 则是 jobTracker 的文件系统(hdfs)。在文件上传至 hdfs 之后,会执行 DistributedCache.createSymlink(job)这个方法,这个方法是创建一个别名(好像是这么个名字),这里需要注意的是 tmpfiles 和 tmparchives 都会创建别名,而 tmpjars 则不会,个人认为 tmpjars 则 jar 文件,不是用户在 job 运行期间调用,所以不需要别名,而 tmpfiles 和 tmparchives 则在 job 运行期间用户可能会调用,所以使用别名可以方便用户调用
8. 将这三个参数指定的文件上传到 hdfs 之后,需要将 job 的 jar 文件上传到 hdfs,名称为 submitJobDir/job.jar,使用 fs.copyFromLocalFile(originalJarFile, submitJarFile)上传即可。
到这里 jobClient.copyAndConfigureFiles(jobCopy, submitJobDir)方法就完成了,期间丢了 jobClient.copyAndConfigureFiles(jobCopy, submitJobDir),TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(job),TrackerDistributedCacheManager.getDelegationTokens(job, job.getCredentials())三个方法,这三个方法是进行一些 cached archives and files 的校验和保存其时间戳和权限内容
9. 继续我们的 jobClient.submitJobInternal()方法,这之后会根据我们设置的 outputFormat 类执行 output.checkOutputSpecs(context),进行输出路径的检验,主要是保证输出路径不存在,存在会抛出异常。这之后就是对输入文件进行分片操作了,writeSplits(context, submitJobDir)。
更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2013-12/93700p2.htm
相关阅读:
Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm
Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm
Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm
Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm
单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm
搭建 Hadoop 环境(在 Winodws 环境下用虚拟机虚拟两个 Ubuntu 系统进行搭建)http://www.linuxidc.com/Linux/2011-12/48894.htm
10.jobClient.writeSplits():这个方法内部会根据我们之前判断的使用 new-api 还是 old-api 分别进行分片操作,我们只看 new-api 的分片操作。
private int writeSplits(org.apache.Hadoop.mapreduce.JobContext job,
Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
JobConf jConf = (JobConf)job.getConfiguration();
int maps;
if (jConf.getUseNewMapper()) {
maps = writeNewSplits(job, jobSubmitDir);
} else {
maps = writeOldSplits(jConf, jobSubmitDir);
}
return maps;
}
11.jobClient.writeNewSplits():这个方法主要是根据我们设置的 inputFormat.class 通过反射获得 inputFormat 对象,然后调用 inputFormat 对象的 getSplits 方法,当获得分片信息之后调用 JobSplitWriter.createSplitFiles 方法将分片的信息写入到 submitJobDir/job.split 文件中。
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length;
}
12.JobSplitWriter.createSplitFiles:这个方法的作用就是讲分片信息写入到 submitJobDir/job.split 文件中,方法内部调用 JobSplitWriter.writeNewSplits 进行写操作
13.JobSplitWriter.writeNewSplits:该方法具体对每一个 InputSplit 对象进行序列化写入到输出流中,具体每个 InputSplit 对象写入的信息包括:split.getClass().getName(),serializer.serialize(split)将整个对象序列化。然后将 InputSplit 对象的 locations 信息放入 SplitMetaInfo 对象中,同时还包括 InputSpilt 元信息在 job.split 文件中的偏移量,该 InputSplit 的长度,再将 SplitMetaInfo 对象。然后调用 JobSplitWriter.writeJobSplitMetaInfo()方法将 SplitMetaInfo 对象写入 submitJobDir/job.splitmetainfo 文件中。
14.JobSplitWriter.writeJobSplitMetaInfo():将 SplitMetaInfo 对象写入 submitJobDir/job.splitmetainfo 文件中,具体写入的信息包括:JobSplit.META_SPLIT_FILE_HEADER,splitVersion,allSplitMetaInfo.length(SplitMetaInfo 对象的个数,一个 split 对应一个 SplitMetaInfo),然后分别将所有的 SplitMetaInfo 对象序列化到输出流中,到此文件的分片工作完成。
15. 继续回头看 jobClient.submitJobInternal()方法:在上一步进行分片操作之后,或返回切片的数目,据此设定 map 的数量,所以在 job 中设置的 map 数量是没有用的。
16. 继续往下走:
String queue = jobCopy.getQueueName();
AccessControlList acl = jobSubmitClient.getQueueAdmins(queue);
jobCopy.set(QueueManager.toFullPropertyName(queue,
QueueACL.ADMINISTER_JOBS.getAclName()), acl.getACLString());
这三句话是获得 job 对应的任务队列信息,这里涉及到 hadoop 的作业调度内容,就不深入研究了
17. 继续:下面就是讲 job 的配置文件信息(jobConf 对象) 写入到 xml 文件中,以便用户查看,具体文件是:submitJobDir/job.xml,通过 jobCopy.writeXml(out)方法,
方法比较简单,就是写 xml 文件。下面就进入到 jobTracker 提交任务环节了,status = jobSubmitClient.submitJob(jobId, submitJobDir.toString(), jobCopy.getCredentials()),
就到这吧,后面下次再慢慢研究。
总结下:在用户提交 job 之后,第一步主要是 jobClient 对 job 进行一些必要的文件上传操作,主要包括:
1)为 job 生成一个 jobId,然后获得 job 提交的 stagingDir,根据 jobId 获得 submitJobDir,之后所有的 job 运行时文件豆浆保存在此目录下
2)将用户在命令行通过 -libjars, -files, -archives 指定的文件上传到 jobTracker 的文件系统中,并将 job.jar 上传到 hdfs 中
3)校验输出路径
4)进行输入文件的分片操作,并将分片信息写入 submitJobDir 下的相应文件中,有两个文件:job.split 以及 job.splitmetainfo
5)将 job 的配置参数(jobConf 对象) 写入到 job.xml 文件中
这就是 jobClient 提交 job 的全部过程,如有遗漏下面评论指出,谢谢
更多 Hadoop 相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13
Hadoop 的 job 提交过程相对来说还是有点复杂的,所以在学习源码的时候会显得有些乱,时常看了后面忘了前面,所以在看了多遍之后决定用文章的方式记录下来,一边自己下次再看的时候能够清晰些,同时也为初次接触这方面源码的同学提供一些帮助吧。希望自己可以写的足够详细。(本文针对 hadoop1.2.1)
1.job.waitForCompletion:一般情况下我们提交一个 job 都是通过 job.waitForCompletion 方法提交,该方法内部会调用 job.submit()方法
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
if (state == JobState.DEFINE) {
submit();
}
if (verbose) {
jobClient.monitorAndPrintJob(conf, info);
} else {
info.waitForCompletion();
}
return isSuccessful();
}
2.job.submit():在 submit 中会调用 setUseNewAPI(),setUseNewAPI()这个方法主要是判断是使用新的 api 还是旧的 api,之后会调用 connect()方法,该方法主要是实例化 jobClient,然后会调用 jobClient.submitJobInternal(conf)这个方法进行 job 的提交
public void submit() throws IOException, InterruptedException,
ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
// Connect to the JobTracker and submit the job
connect();
info = jobClient.submitJobInternal(conf);
super.setJobID(info.getID());
state = JobState.RUNNING;
}
3.jobClient.submitJobInternal():这个方法会将 job 运行时所需的所有文件上传到 jobTarcker 文件系统(一般是 hdfs)中,同时进行备份(备份数默认是 10,通过 mapred.submit.replication 变量可以设置),这个方法需要深入进行解读。
4.JobSubmissionFiles.getStagingDir:这个方法是在 jobClient.submitJobInternal()最先调用的,这个方法主要是获取一个 job 提交的根目录,主要是通过 Path stagingArea = client.getStagingAreaDir(); 方法获得,这个方法最终会调用 jobTracker.getStagingAreaDirInternal()方法,代码如下:
private String getStagingAreaDirInternal(String user) throws IOException {
final Path stagingRootDir =
new Path(conf.get(“mapreduce.jobtracker.staging.root.dir”,
“/tmp/hadoop/mapred/staging”));
final FileSystem fs = stagingRootDir.getFileSystem(conf);
return fs.makeQualified(new Path(stagingRootDir,
user+”/.staging”)).toString();
}
在获取了 stagingDir 之后会执行 JobID jobId = jobSubmitClient.getNewJobId(); 为 job 获取一个 jobId,然后执行 Path submitJobDir = new Path(jobStagingArea, jobId.toString()); 获得该 job 提交的路径,也就是在 stagingDir 目录下建一个以 jobId 为文件名的目录。有了 submitJobDir 之后就可以将 job 运行所需的全部文件上传到对应的目录下了,具体是调用 jobClient.copyAndConfigureFiles(jobCopy, submitJobDir)这个方法。
5.jobClient.copyAndConfigureFiles(jobCopy, submitJobDir):这个方法最终调用 jobClient.copyAndConfigureFiles(job, jobSubmitDir, replication); 这个方法实现文件上传。
6.jobClient.copyAndConfigureFiles(job, jobSubmitDir, replication):这个方法首先获取用户在使用命令执行 job 的时候所指定的 -libjars, -files, -archives 文件,对应的 conf 配置参数是 tmpfiles tmpjars tmparchives,这个过程是在 ToolRunner.run()的时候进行解析的,当用户指定了这三个参数之后,会将这三个参数对应的文件都上传到 hdfs 上,下面我们具体看一个参数的处理:tmpfiles(其他两个基本相同)
7.jobClient 处理 tmpfiles:该方法会将 tmpfiles 参数值按‘,’分割,然后将每一个文件上传到 hdfs,其中如何文件的路径本身就在 hdfs 中,那么将不进行上传操作,上传操作只针对文件不在 hdfs 中的文件。调用的方法是:Path newPath = copyRemoteFiles(fs,filesDir, tmp, job, replication),该方法内部使用的是 FileUtil.copy(remoteFs, originalPath, jtFs, newPath, false, job)方法将文件上传至 hdfs,注意此处的 remoteFs 和 jtFs,remoteFs 就是需上传文件的原始文件系统,jtFs 则是 jobTracker 的文件系统(hdfs)。在文件上传至 hdfs 之后,会执行 DistributedCache.createSymlink(job)这个方法,这个方法是创建一个别名(好像是这么个名字),这里需要注意的是 tmpfiles 和 tmparchives 都会创建别名,而 tmpjars 则不会,个人认为 tmpjars 则 jar 文件,不是用户在 job 运行期间调用,所以不需要别名,而 tmpfiles 和 tmparchives 则在 job 运行期间用户可能会调用,所以使用别名可以方便用户调用
8. 将这三个参数指定的文件上传到 hdfs 之后,需要将 job 的 jar 文件上传到 hdfs,名称为 submitJobDir/job.jar,使用 fs.copyFromLocalFile(originalJarFile, submitJarFile)上传即可。
到这里 jobClient.copyAndConfigureFiles(jobCopy, submitJobDir)方法就完成了,期间丢了 jobClient.copyAndConfigureFiles(jobCopy, submitJobDir),TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(job),TrackerDistributedCacheManager.getDelegationTokens(job, job.getCredentials())三个方法,这三个方法是进行一些 cached archives and files 的校验和保存其时间戳和权限内容
9. 继续我们的 jobClient.submitJobInternal()方法,这之后会根据我们设置的 outputFormat 类执行 output.checkOutputSpecs(context),进行输出路径的检验,主要是保证输出路径不存在,存在会抛出异常。这之后就是对输入文件进行分片操作了,writeSplits(context, submitJobDir)。
更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2013-12/93700p2.htm
相关阅读:
Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm
Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm
Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm
Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm
单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm
搭建 Hadoop 环境(在 Winodws 环境下用虚拟机虚拟两个 Ubuntu 系统进行搭建)http://www.linuxidc.com/Linux/2011-12/48894.htm