阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Hadoop数据输入的源码解析

187次阅读
没有评论

共计 7535 个字符,预计需要花费 19 分钟才能阅读完成。

我们知道,任何一个工程项目,最重要的是三个部分:输入,中间处理,输出。今天我们来深入的了解一下我们熟知的 Hadoop 系统中,输入是如何输入的?

在 hadoop 中,输入数据都是通过对应的 InputFormat 类和 RecordReader 类来实现的,其中 InputFormat 来实现将对应输入文件进行分片,RecordReader 类将对应分片中的数据读取进来。具体的方式如下:

(1)InputFormat 类是一个接口。

public interface InputFormat<K, V> {

  InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;

  RecordReader<K, V> getRecordReader(InputSplit split,

                                    JobConf job,

                                    Reporter reporter) throws IOException;

}

(2)FileInputFormat 类实现了 InputFormat 接口。该类实现了 getSplits 方法,但是它也没有实现对应的 getRecordReader 方法。也就是说 FileInputFormat 还是一个抽象类。这里需要说明的一个问题是,FileInputFormat 用 isSplitable 方法来指定对应的文件是否支持数据的切分。默认情况下都是支持的,一般子类都需要重新实现它。

public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {

  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {

    FileStatus[] files = listStatus(job);

    // Save the number of input files in the job-conf

    job.setLong(NUM_INPUT_FILES, files.length);

    long totalSize = 0;                          // compute total size

    for (FileStatus file: files) {// check we have valid files

      if (file.isDir()) {

        throw new IOException(“Not a file: “+ file.getPath());

      }

      totalSize += file.getLen();

    }

    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);

    long minSize = Math.max(job.getLong(“mapred.min.split.size”, 1),

                            minSplitSize);

    // generate splits

    ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);

    NetworkTopology clusterMap = new NetworkTopology();

    for (FileStatus file: files) {

      Path path = file.getPath();

      FileSystem fs = path.getFileSystem(job);

      long length = file.getLen();

      BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);

      if ((length != 0) && isSplitable(fs, path)) {

        long blockSize = file.getBlockSize();

        long splitSize = computeSplitSize(goalSize, minSize, blockSize);

 

        long bytesRemaining = length;

        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {

          String[] splitHosts = getSplitHosts(blkLocations,

              length-bytesRemaining, splitSize, clusterMap);

          splits.add(new FileSplit(path, length-bytesRemaining, splitSize,

              splitHosts));

          bytesRemaining -= splitSize;

        }

        if (bytesRemaining != 0) {

          splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,

                    blkLocations[blkLocations.length-1].getHosts()));

        }

      } else if (length != 0) {

        String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);

        splits.add(new FileSplit(path, 0, length, splitHosts));

      } else {

        //Create empty hosts array for zero length files

        splits.add(new FileSplit(path, 0, length, new String[0]));

      }

    }

    LOG.debug(“Total # of splits: ” + splits.size());

    return splits.toArray(new FileSplit[splits.size()]);

  }

// 该方法是用来判断是否可以进行数据的切分

  protected boolean isSplitable(FileSystem fs, Path filename) {

    return true;

  }

// 但是它也没有实现对应的 getRecordReader 方法。也就是说 FileInputFormat 还是一个抽象类。

  public abstract RecordReader<K, V> getRecordReader(InputSplit split,

                                              JobConf job,

                                               Reporter reporter)  throws IOException;

 }

(3)TextFileInputFormat类仅仅实现了 FileInputFormat 类的 getRecordReader 方法,并且重写了 isSplitable 方法,他并没有实现 getSplits 方法,由此可知,他的 getSplits 的实现还是交由父类 FileInputFormat 来实现的。(这里需要注意 TextFileInputFormat 并不是 InputFormat 的子类,TextFileInputFormat 它仅仅是继承了 InputFormat 的 getRecordReader 的方法而已。)

public class TextInputFormat extends FileInputFormat<LongWritable, Text>

  implements JobConfigurable {

 

  private CompressionCodecFactory compressionCodecs = null;

 

  public void configure(JobConf conf) {

    compressionCodecs = new CompressionCodecFactory(conf);

  }

 

// 子类重新实现了 isSplitable 方法

  protected boolean isSplitable(FileSystem fs, Path file) {

    final CompressionCodec codec = compressionCodecs.getCodec(file);

    if (null == codec) {

      return true;

    }

    return codec instanceof SplittableCompressionCodec;

  }

 // 该方法实现了将文件中的数据读入到对应的 Map 方法中。

  public RecordReader<LongWritable, Text> getRecordReader(

                                          InputSplit genericSplit, JobConf job,

                                          Reporter reporter)

    throws IOException {

   

    reporter.setStatus(genericSplit.toString());

    String delimiter = job.get(“textinputformat.record.delimiter”);

    byte[] recordDelimiterBytes = null;

    if (null != delimiter) recordDelimiterBytes = delimiter.getBytes();

    return new LineRecordReader(job, (FileSplit) genericSplit,

        recordDelimiterBytes);

  }

}

从上面可以看出一个 Text 格式的文件是通过什么样的类继承层次输入到 map 方法中。下面主要介绍一下,到底是如何切分的?我们从类的继承层次关系上可以看出,具体的切分方式是通过 FileInputFormat 类来实现的。因此,要了解文件是如何切分的,只需要查看一下 FileInputFormat 类中的 getSplits 方法的实现细节即可。下面我再次把 FileInputFormat 类中的 getSplits 方法贴出来:然后分析每一句代码。

  public InputSplit[] getSplits(JobConf job, int numSplits)

    throws IOException {

    FileStatus[] files = listStatus(job); // 列出当前 job 中所有的输入文件

   

    // Save the number of input files in the job-conf

job.setLong(NUM_INPUT_FILES, files.length); // 设置当前 job 的输入文件数目

 

// 计算当前 job 所有输入文件总的大小

long totalSize = 0;                          // compute total size

// 遍历每一个文件

    for (FileStatus file: files) {// check we have valid files

      if (file.isDir()) {

        throw new IOException(“Not a file: “+ file.getPath());

      }

      totalSize += file.getLen();

    }

// numSplits 是分片数,goalSize 是平均每一个分片的大小,minSize 是每个分片最小值

    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);

    long minSize = Math.max(job.getLong(“mapred.min.split.size”, 1),

                            minSplitSize);

 

    // generate splits  计算分片

    ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);

    NetworkTopology clusterMap = new NetworkTopology();

    for (FileStatus file: files) {

      Path path = file.getPath();

      FileSystem fs = path.getFileSystem(job);

      long length = file.getLen();

    // 获取文件的位置

      BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);

//isSplitable 方法根据对应文件名称判断对应文件是否可以切分

     if ((length != 0) && isSplitable(fs, path)) {

        long blockSize = file.getBlockSize();// 获取文件块的大小

        // computeSplitSize 方法计算真正的分片大小

        long splitSize = computeSplitSize(goalSize, minSize, blockSize);

 

        long bytesRemaining = length;// 文件剩余大小

 

// SPLIT_SLOP=1.1,文件大小 / 分片的大小 > SPLIT_SLOP 则进行切分。

        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {

        // splitHosts 用来记录分片元数据信息(包括切片的位置,大小等等)

          String[] splitHosts = getSplitHosts(blkLocations,

              length-bytesRemaining, splitSize, clusterMap);

          splits.add(new FileSplit(path, length-bytesRemaining, splitSize,

              splitHosts));

          bytesRemaining -= splitSize;

        }

       

        if (bytesRemaining != 0) {

          splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,

                    blkLocations[blkLocations.length-1].getHosts()));

        }

      } else if (length != 0) {// 如果文件不能切分,相应的会将整个文件作为一个分片。

        String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);

        splits.add(new FileSplit(path, 0, length, splitHosts));

      } else {

        //Create empty hosts array for zero length files

        splits.add(new FileSplit(path, 0, length, new String[0]));

      }

    }

    LOG.debug(“Total # of splits: ” + splits.size());

    return splits.toArray(new FileSplit[splits.size()]);

  }

// 真正计算分片大小的地方。

protected long computeSplitSize(long goalSize, long minSize,

                                      long blockSize) {

    return Math.max(minSize, Math.min(goalSize, blockSize));

  }

综上所述,对于 MR 的输入文件采用的方式是通过 FileInputFormat 类来进行数据的切分,在切分之前,是通过 isSplitable 方法来判断是否可以切分,若不能切分,则会将整个文件作为一个分片作为输入。因此,若有业务需求需要对应文件不能进行切分的话,可以将 isSplitable 方法方位 false 即可。

这里还需要注意一个问题,倘若你的文件都是小文件的话,对应的 getSplits 方法也不会对其进行切分的。一般情况小文件指的是其大小小于对应 hadoop 中 HDFS 的块的大小(128M)。 

下面关于 Hadoop 的文章您也可能喜欢,不妨看看:

Ubuntu14.04 下 Hadoop2.4.1 单机 / 伪分布式安装配置教程  http://www.linuxidc.com/Linux/2015-02/113487.htm

CentOS 安装和配置 Hadoop2.2.0  http://www.linuxidc.com/Linux/2014-01/94685.htm

Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm

Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm

Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm

Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm

单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm

更多 Hadoop 相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13

本文永久更新链接地址:http://www.linuxidc.com/Linux/2016-05/131571.htm

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-21发表,共计7535字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中