共计 7535 个字符,预计需要花费 19 分钟才能阅读完成。
我们知道,任何一个工程项目,最重要的是三个部分:输入,中间处理,输出。今天我们来深入的了解一下我们熟知的 Hadoop 系统中,输入是如何输入的?
在 hadoop 中,输入数据都是通过对应的 InputFormat 类和 RecordReader 类来实现的,其中 InputFormat 来实现将对应输入文件进行分片,RecordReader 类将对应分片中的数据读取进来。具体的方式如下:
(1)InputFormat 类是一个接口。
public interface InputFormat<K, V> {
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
RecordReader<K, V> getRecordReader(InputSplit split,
JobConf job,
Reporter reporter) throws IOException;
}
(2)FileInputFormat 类实现了 InputFormat 接口。该类实现了 getSplits 方法,但是它也没有实现对应的 getRecordReader 方法。也就是说 FileInputFormat 还是一个抽象类。这里需要说明的一个问题是,FileInputFormat 用 isSplitable 方法来指定对应的文件是否支持数据的切分。默认情况下都是支持的,一般子类都需要重新实现它。
public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
FileStatus[] files = listStatus(job);
// Save the number of input files in the job-conf
job.setLong(NUM_INPUT_FILES, files.length);
long totalSize = 0; // compute total size
for (FileStatus file: files) {// check we have valid files
if (file.isDir()) {
throw new IOException(“Not a file: “+ file.getPath());
}
totalSize += file.getLen();
}
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong(“mapred.min.split.size”, 1),
minSplitSize);
// generate splits
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
NetworkTopology clusterMap = new NetworkTopology();
for (FileStatus file: files) {
Path path = file.getPath();
FileSystem fs = path.getFileSystem(job);
long length = file.getLen();
BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
if ((length != 0) && isSplitable(fs, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(goalSize, minSize, blockSize);
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
String[] splitHosts = getSplitHosts(blkLocations,
length-bytesRemaining, splitSize, clusterMap);
splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
splitHosts));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkLocations.length-1].getHosts()));
}
} else if (length != 0) {
String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
splits.add(new FileSplit(path, 0, length, splitHosts));
} else {
//Create empty hosts array for zero length files
splits.add(new FileSplit(path, 0, length, new String[0]));
}
}
LOG.debug(“Total # of splits: ” + splits.size());
return splits.toArray(new FileSplit[splits.size()]);
}
// 该方法是用来判断是否可以进行数据的切分
protected boolean isSplitable(FileSystem fs, Path filename) {
return true;
}
// 但是它也没有实现对应的 getRecordReader 方法。也就是说 FileInputFormat 还是一个抽象类。
public abstract RecordReader<K, V> getRecordReader(InputSplit split,
JobConf job,
Reporter reporter) throws IOException;
}
(3)TextFileInputFormat类仅仅实现了 FileInputFormat 类的 getRecordReader 方法,并且重写了 isSplitable 方法,他并没有实现 getSplits 方法,由此可知,他的 getSplits 的实现还是交由父类 FileInputFormat 来实现的。(这里需要注意 TextFileInputFormat 并不是 InputFormat 的子类,TextFileInputFormat 它仅仅是继承了 InputFormat 的 getRecordReader 的方法而已。)
public class TextInputFormat extends FileInputFormat<LongWritable, Text>
implements JobConfigurable {
private CompressionCodecFactory compressionCodecs = null;
public void configure(JobConf conf) {
compressionCodecs = new CompressionCodecFactory(conf);
}
// 子类重新实现了 isSplitable 方法
protected boolean isSplitable(FileSystem fs, Path file) {
final CompressionCodec codec = compressionCodecs.getCodec(file);
if (null == codec) {
return true;
}
return codec instanceof SplittableCompressionCodec;
}
// 该方法实现了将文件中的数据读入到对应的 Map 方法中。
public RecordReader<LongWritable, Text> getRecordReader(
InputSplit genericSplit, JobConf job,
Reporter reporter)
throws IOException {
reporter.setStatus(genericSplit.toString());
String delimiter = job.get(“textinputformat.record.delimiter”);
byte[] recordDelimiterBytes = null;
if (null != delimiter) recordDelimiterBytes = delimiter.getBytes();
return new LineRecordReader(job, (FileSplit) genericSplit,
recordDelimiterBytes);
}
}
从上面可以看出一个 Text 格式的文件是通过什么样的类继承层次输入到 map 方法中。下面主要介绍一下,到底是如何切分的?我们从类的继承层次关系上可以看出,具体的切分方式是通过 FileInputFormat 类来实现的。因此,要了解文件是如何切分的,只需要查看一下 FileInputFormat 类中的 getSplits 方法的实现细节即可。下面我再次把 FileInputFormat 类中的 getSplits 方法贴出来:然后分析每一句代码。
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
FileStatus[] files = listStatus(job); // 列出当前 job 中所有的输入文件
// Save the number of input files in the job-conf
job.setLong(NUM_INPUT_FILES, files.length); // 设置当前 job 的输入文件数目
// 计算当前 job 所有输入文件总的大小
long totalSize = 0; // compute total size
// 遍历每一个文件
for (FileStatus file: files) {// check we have valid files
if (file.isDir()) {
throw new IOException(“Not a file: “+ file.getPath());
}
totalSize += file.getLen();
}
// numSplits 是分片数,goalSize 是平均每一个分片的大小,minSize 是每个分片最小值
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong(“mapred.min.split.size”, 1),
minSplitSize);
// generate splits 计算分片
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
NetworkTopology clusterMap = new NetworkTopology();
for (FileStatus file: files) {
Path path = file.getPath();
FileSystem fs = path.getFileSystem(job);
long length = file.getLen();
// 获取文件的位置
BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
//isSplitable 方法根据对应文件名称判断对应文件是否可以切分
if ((length != 0) && isSplitable(fs, path)) {
long blockSize = file.getBlockSize();// 获取文件块的大小
// computeSplitSize 方法计算真正的分片大小
long splitSize = computeSplitSize(goalSize, minSize, blockSize);
long bytesRemaining = length;// 文件剩余大小
// SPLIT_SLOP=1.1,文件大小 / 分片的大小 > SPLIT_SLOP 则进行切分。
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
// splitHosts 用来记录分片元数据信息(包括切片的位置,大小等等)
String[] splitHosts = getSplitHosts(blkLocations,
length-bytesRemaining, splitSize, clusterMap);
splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
splitHosts));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkLocations.length-1].getHosts()));
}
} else if (length != 0) {// 如果文件不能切分,相应的会将整个文件作为一个分片。
String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
splits.add(new FileSplit(path, 0, length, splitHosts));
} else {
//Create empty hosts array for zero length files
splits.add(new FileSplit(path, 0, length, new String[0]));
}
}
LOG.debug(“Total # of splits: ” + splits.size());
return splits.toArray(new FileSplit[splits.size()]);
}
// 真正计算分片大小的地方。
protected long computeSplitSize(long goalSize, long minSize,
long blockSize) {
return Math.max(minSize, Math.min(goalSize, blockSize));
}
综上所述,对于 MR 的输入文件采用的方式是通过 FileInputFormat 类来进行数据的切分,在切分之前,是通过 isSplitable 方法来判断是否可以切分,若不能切分,则会将整个文件作为一个分片作为输入。因此,若有业务需求需要对应文件不能进行切分的话,可以将 isSplitable 方法方位 false 即可。
这里还需要注意一个问题,倘若你的文件都是小文件的话,对应的 getSplits 方法也不会对其进行切分的。一般情况小文件指的是其大小小于对应 hadoop 中 HDFS 的块的大小(128M)。
下面关于 Hadoop 的文章您也可能喜欢,不妨看看:
Ubuntu14.04 下 Hadoop2.4.1 单机 / 伪分布式安装配置教程 http://www.linuxidc.com/Linux/2015-02/113487.htm
CentOS 安装和配置 Hadoop2.2.0 http://www.linuxidc.com/Linux/2014-01/94685.htm
Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm
Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm
Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm
Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm
单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm
更多 Hadoop 相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13
本文永久更新链接地址:http://www.linuxidc.com/Linux/2016-05/131571.htm