共计 4789 个字符,预计需要花费 12 分钟才能阅读完成。
MapReduce 框架的优势是可以在集群中并行运行 mapper 和 reducer 任务,那如何确定 mapper 和 reducer 的数量呢,或者说 Hadoop 如何以编程的方式控制作业启动的 mapper 和 reducer 数量呢?在《Hadoop-2.4.1 学习之 Mapper 和 Reducer》中曾经提及建议 reducer 的数量为 (0.95~1.75) * 节点数量 * 每个节点上最大的容器数,并可使用方法 Job.setNumReduceTasks(int),mapper 的数量由输入文件的大小确定,且没有相应的 setNumMapTasks 方法,但可以通过 Configuration.set(JobContext.NUM_MAPS, int) 设置,其中 JobContext.NUM_MAPS 的值为 mapreduce.job.maps,而在 Hadoop 的官方网站上对该参数的描述为与 MapReduce 框架和作业配置巧妙地交互,并且设置起来更加复杂。从这样一句含糊不清的话无法得知究竟如何确定 mapper 的数量,显然只能求助于源代码了。
在 Hadoop 中 MapReduce 作业通过 JobSubmitter 类的 submitJobInternal(Jobjob, Cluster cluster)方法向系统提交作业(该方法不仅设置 mapper 数量,还执行了一些其它操作如检查输出格式等,感兴趣的可以参考源代码),在该方法中与设置 mapper 有关的代码如下:
int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info(“number of splits:” + maps);
方法 writeSplits 返回 mapper 的数量,该方法的源代码如下:
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,Path jobSubmitDir)
throws IOException,InterruptedException, ClassNotFoundException {
JobConf jConf = (JobConf)job.getConfiguration();
int maps;
if (jConf.getUseNewMapper()) {
maps = writeNewSplits(job, jobSubmitDir);
} else {
maps = writeOldSplits(jConf, jobSubmitDir);
}
return maps;
}
在该方法中,根据是否使用了新版本的 JobContext 而使用不同的方法计算 mapper 数量,实际情况是 jConf.getUseNewMapper()将返回 true,因此将执行 writeNewSplits(job,jobSubmitDir)语句,该方法的源代码如下:
Configuration conf = job.getConfiguration();
InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array);
return array.length;
通过上面的代码可以得知,实际的 mapper 数量为输入分片的数量,而分片的数量又由使用的输入格式决定,默认为 TextInputFormat,该类为 FileInputFormat 的子类。确定分片数量的任务交由 FileInputFormat 的 getSplits(job)完成,在此补充一下 FileInputFormat 继承自抽象类 InputFormat,该类定义了 MapReduce 作业的输入规范,其中的抽象方法 List<InputSplit> getSplits(JobContext context)定义了如何将输入分割为 InputSplit,不同的输入有不同的分隔逻辑,而分隔得到的每个 InputSplit 交由不同的 mapper 处理,因此该方法的返回值确定了 mapper 的数量。下面将分为两部分学习该方法是如何在 FileInputFormat 中实现的,为了将注意力集中在最重要的部分,对日志输出等信息将不做介绍,完整的实现可以参考源代码。
首先是第一部分,该部分代码计算了最大 InputSplit 和最小 InputSplit 的值,如下:
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
其中的 getMinSplitSize 和 getMaxSplitSize 方法分别用于获取最小 InputSplit 和最大 InputSplit 的值,对应的配置参数分别为 mapreduce.input.fileinputformat.split.minsize,默认值为 1L 和 mapreduce.input.fileinputformat.split.maxsize,默认值为 Long.MAX_VALUE,十六进制数值为 0x7fffffffffffffffL,对应的十进制为 9223372036854775807,getFormatMinSplitSize 方法返回该输入格式下 InputSplit 的下限。以上数字的单位都是 byte。由此得出 minSize 的大小为 1L,maxSize 的大小为 Long.MAX_VALUE。
其次是生成 InputSplit 的第二部分。在该部分将生成包含 InputSplit 的 List,而 List 的大小为 InputSplit 的数量,进而确定了 mapper 的数量。其中重要的代码为:
if (isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts()));
}
}
blockSize 的值为参数 dfs.blocksize 的值,默认为 128M。方法 computeSplitSize(blockSize, minSize, maxSize)根据 blockSize,minSize,maxSize 确定 InputSplit 的大小,源代码如下:
Math.max(minSize, Math.min(maxSize, blockSize))
从该代码并结合第一部分的分析可以得知,InputSplit 的大小取决于 dfs.blocksiz、mapreduce.input.fileinputformat.split.minsize、mapreduce.input.fileinputformat.split.maxsize 和所使用的输入格式。在输入格式为 TextInputFormat 的情况下,且不修改 InputSplit 的最大值和最小值的情况,InputSplit 的最终值为 dfs.blocksize 的值。
变量 SPLIT_SLOP 的值为 1.1,决定了当剩余文件大小多大时停止按照变量 splitSize 分割文件。根据代码可知,当剩余文件小于等于 1.1 倍 splitSize 时,将把剩余的文件做为一个 InputSplit,即最后一个 InputSplit 的大小最大为 1.1 倍 splitSize。
总结
本文分析了在输入格式为默认的 TextInputFormat 的情况,如何确定 mapper 的数量。在不修改源代码的情况下(修改输入格式的 InputSplit 下限),程序员可以通过设置 dfs.blocksiz、mapreduce.input.fileinputformat.split.minsize、mapreduce.input.fileinputformat.split.maxsize 参数的值设置 InputSplit 的大小来影响 InputSplit 的数量,进而决定 mapper 的数量。当输入为其它格式时,处理逻辑又不相同了,比如当输入格式为 DBInputFormat 时,会根据输入表的行数(记录数)决定 mapper 的数量,更多细节可以参考源代码。
CentOS 安装和配置 Hadoop2.2.0 http://www.linuxidc.com/Linux/2014-01/94685.htm
Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm
Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm
Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm
Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm
单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm
搭建 Hadoop 环境(在 Winodws 环境下用虚拟机虚拟两个 Ubuntu 系统进行搭建)http://www.linuxidc.com/Linux/2011-12/48894.htm
更多 Hadoop 相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13