阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

入门Hadoop的WordCount程序

215次阅读
没有评论

共计 8663 个字符,预计需要花费 22 分钟才能阅读完成。

本篇文章主要说两部分:简单介绍 MapReduce 的工作原理;详细解释 WordCount 程序。

1. MapReduce 的工作原理

在《Hadoop in action》一书中,对 MapReduce 计算模型有了很好的描述,在这里我们直接引用过来:“

在 Hadoop 中,用于执行 MapReduce 任务的机器角色有两个:一个是 JobTracker;另一个是 TaskTracker,JobTracker 是用于调度工作的,TaskTracker 是用于执行工作的。一个 Hadoop 集群中只有一台 JobTracker。

在分布式计算中,MapReduce 框架负责处理了并行编程中分布式存储、工作调度、负载均衡、容错均衡、容错处理以及网络通信等复杂问题,把处理过程高度抽象为两个函数:map 和 reduce,map 负责把任务分解成多个任务,reduce 负责把分解后多任务处理的结果汇总起来。

在 Hadoop 中,每个 MapReduce 任务都被初始化为一个 Job,每个 Job 又可以分为两种阶段:map 阶段和 reduce 阶段。这两个阶段分别用两个函数表示,即 map 函数和 reduce 函数。map 函数接收一个 <key,value> 形式的输入,然后同样产生一个 <key,value> 形式的中间输出,Hadoop 函数接收一个如 <key,(list ofvalues)> 形式的输入,然后对这个 value 集合进行处理,每个 reduce 产生 0 或 1 个输出,reduce 的输出也是 <key,value> 形式的。”

从上面的解释我们可以看出:MapReduce 把对大规模数据集的操作,分发给一个主节点(master)管理下的各个分节点(slaves)共同完成,然后通过整合各个节点的中间结果,得到最终结果。用 MapReduce 来处理的数据集(或任务)必须具备这样的特点:待处理的数据集可以分解成许多小的数据集,而且每一个小数据集都可以完全并行地进行处理。整个过程都是按照 <key, value> 的形式来输入输出。

下图很好的描述了 MapReduce 的工作过程:

入门 Hadoop 的 WordCount 程序

下面我们结合一个简单的实例来说明 MapReduce 的内部运行流程,首先给出一个 WordCount 的数据流程图:

入门 Hadoop 的 WordCount 程序

Step1:输入文件 file1 和 file2 先按照 TextInputFormat 形式被处理成两个 InputSplit,然后输入到两个 map 中。此时 map 的输入格式为 <key, value> 的,需要指出的是此时的 key 为当前的行号(位移量),value 为对应行的内容;

Step 2:Map 对每行的内容进行切词,每切下一个词就将其组织成 <word, 1> 的形式输出,注意每个 word 的 value 均为 1;

Step 3:把 map 的输出进入 Reduce 阶段,TaskTracker 会接收到 <word, {1,1,1,…}> 形式的数据,reduce 会进行统计频数,组织成 <word, sum> 的形式直接输出。

在以上过程初始的输入文件和最终输出结果都是放在 HDFS 上的,但是中间的 map 程序只是写到本地磁盘中,而不是写到 HDFS 中。这是因为 Map 的输出在 Jpb 完成后即可删除了,因此不需要存储到 HDFS 上。虽然存储到 HDFS 上会比较安全,但是因为网络传输会降低 MapReduce 任务的执行效率,因此 Map 的输出文件时写在本地磁盘的。如果 Map 程序在没来得及将数据传送给 Reduce 时就崩毁了,那么 JobTracker 只需要另选一台机器重新执行这个 Task 就可以了(JobTracker 是要有这个功能的,JobTracker 调度任务给 TaskTracker,TaskTracker 执行任务的时候,会返还进行报告,JobTracker 则会记录进度的进行状况,如果某个 TaskTracker 上的任务执行失败了,那么 JobTracker 会把这个任务分配给另一个 TaskTracker,直到任务执行完成)。

2. 详解 WordCount 程序

WordCount 程序是学习 Hadoop 的入门程序,我们有必要详解一下。能够完整的运行 WordCount 程序需要如下结果步骤:本地的文本文件上传到 HDFS 上,WordCount 程序实现 MapReduce 过程,输出结果到 HDFS 上。

Step 1:进入 CentOS 系统(上篇博文已经介绍过在 centos6.0 上搭建 hadoop 框架),在本地新建一个 file 文件夹。在 file 文件夹中,新建两个文本文件 file1 和 file2. file1 的内容为 Hello World,file2 的内容为 Hello Hadoop。

Step 2:在 HDFS 上创建输入文件夹,并把本地的 file 中文件上传到集群的 input 目录上;

Step 3:在集群上运行 WordCount 程序,以 input 为输入目录,以 output 为输出目录;

以上过程可以再 centos 的终端完成:

mkdir ~/file  在本地创建 file 文件夹
cd file

echo “Hello World” > file1.txt  在 file1 和 file2 文件中存放文本,
echo “Hello Hadoop” > file2.txt  echo 命令的作用是输出内容,> file1/ 2 就是输出到文件中

hadoop fs -mkdir input  在 HDFS 上创建输入目录文件夹

hadoop fs -put ~file/file*.txt input  上传本地 file 文件夹中的文件到 HDFS 上

运行 WordCount 程序
hadoop jar /usr/local/hadoop/hadoop-0.20.2/hadoop-0.20.2-examples.jar wordcount input output
“hadoop jar”——执行 jar 命令;
“/usr/local/hadoop/hadoop-0.20.2/hadoop-0.20.2-examples.jar”——WordCount 所在的 jar 包的地址
“wordcount”  程序主类名
“input output”  输入、输出文件夹

hadoop fs -cat output/part-r-00000  查看输出文件中的内容

细心的读者会发现,hadoop 框架下操作命令都是以 hadoop fs -* 的形式。下面我们就列举一下 hadoop fs -* 命令:

1,hadoop fs –fs [local | <file system URI>]:声明 hadoop 使用的文件系统,如果不声明的话,使用当前配置文件配置的,按如下顺序查找:hadoop jar 里的 hadoop-default.xml->$HADOOP_CONF_DIR 下的 hadoop-default.xml->$HADOOP_CONF_DIR 下的 hadoop-site.xml。使用 local 代表将本地文件系统作为 hadoop 的 DFS。如果传递 uri 做参数,那么就是特定的文件系统作为 DFS。
2,hadoop fs –ls <path>:等同于本地系统的 ls,列出在指定目录下的文件内容,支持 pattern 匹配。输出格式如 filename(full path)  <r n>  size. 其中 n 代表 replica 的个数,size 代表大小(单位 bytes)。
3,hadoop fs –lsr <path>:递归列出匹配 pattern 的文件信息,类似 ls,只不过递归列出所有子目录信息。
4,hadoop fs –du <path>:列出匹配 pattern 的指定的文件系统空间总量(单位 bytes),等价于 unix 下的针对目录的 du –sb <path>/* 和针对文件的 du –b <path>,输出格式如 name(full path)  size(in bytes)。
5,hadoop fs –dus <path>:等价于 -du,输出格式也相同,只不过等价于 unix 的 du -sb。
<span style=”color:#ff0000;”>6,hadoop fs –mv <src> <dst>:将制定格式的文件 move 到指定的目标位置。当 src 为多个文件时,dst 必须是个目录。
7,hadoop fs –cp <src> <dst>:拷贝文件到目标位置,当 src 为多个文件时,dst 必须是个目录。</span>
8<span style=”color:#ff0000;”>,hadoop fs –rm [-skipTrash] <src>:删除匹配 pattern 的指定文件,等价于 unix 下的 rm <src>。</span>
9,hadoop fs –rmr [skipTrash] <src>:递归删掉所有的文件和目录,等价于 unix 下的 rm –rf <src>。
10,hadoop fs –rmi [skipTrash] <src>:等价于 unix 的 rm –rfi <src>。
<span style=”color:#ff0000;”>11,hadoop fs –put <localsrc> … <dst>:从本地系统拷贝文件到 DFS。</span>
12,hadoop fs –copyFromLocal <localsrc> … <dst>:等价于 -put。
13,hadoop fs –moveFromLocal <localsrc> … <dst>:等同于 -put,只不过源文件在拷贝后被删除。
<span style=”color:#ff0000;”>14,hadoop fs –get [-ignoreCrc] [-crc] <src> <localdst>:从 DFS 拷贝文件到本地文件系统,文件匹配 pattern,若是多个文件,则 dst 必须是目录。</span>
15,hadoop fs –getmerge <src> <localdst>:顾名思义,从 DFS 拷贝多个文件、合并排序为一个文件到本地文件系统。
<span style=”color:#ff0000;”>16,hadoop fs –cat <src>:展示文件内容。</span>
17,hadoop fs –copyToLocal [-ignoreCrc] [-crc] <src> <localdst>:等价于 -get。
<span style=”color:#ff0000;”>18,hadoop fs –mkdir <path>:在指定位置创建目录。</span>
19,hadoop fs –setrep [-R] [-w] <rep> <path/file>:设置文件的备份级别,- R 标志控制是否递归设置子目录及文件。
20,hadoop fs –chmod [-R] <MODE[,MODE]…|OCTALMODE> PATH…:修改文件的权限,- R 标记递归修改。MODE 为 a +r,g-w,+rwx 等,OCTALMODE 为 755 这样。
21,hadoop fs -chown [-R] [OWNER][:[GROUP]] PATH…:修改文件的所有者和组。- R 表示递归。
22,hadoop fs -chgrp [-R] GROUP PATH…:等价于 -chown … :GROUP …。
23,hadoop fs –count[-q] <path>:计数文件个数及所占空间的详情,输出表格的列的含义依次为:DIR_COUNT,FILE_COUNT,CONTENT_SIZE,FILE_NAME 或者如果加了 - q 的话,还会列出 QUOTA,REMAINING_QUOTA,SPACE_QUOTA,REMAINING_SPACE_QUOTA。

程序以及详细的注释如下:

package test;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

  public static class TokenizerMapper
      extends Mapper<Object, Text, Text, IntWritable>{
// 显然这里的 Mapper<Object,Text,Text,IntWritable> 是范型,其实是
  //Mapper<input_Key_Type,input_Value_Type,output_key_type,output_value_type>
  // 也就是借此规定 map 中用到的数据类型
// 这几种类型除 Object 之外,其它是 jdk 中没有的,这是 hadoop 对它相应的 jdk 中数据类型的封装,
// 这里的 Text 相当于 jdk 中的 String,IntWritable 相当于 jdk 的 int 类型,
// 这样做的原因主要是为了 hadoop 的数据序化而做的。
    private final static IntWritable one = new IntWritable(1);
 // 声时一个 IntWritable 变量,作计数用,每出现一个 key,给其一个 value= 1 的值
    private Text word = new Text();  // 用来暂存 map 输出中的 key 值,Text 类型的,故有此声明

 // 这里就是 map 函数,也用到了范型,它是和 Mapper 抽象类中的相对应的,
 // 此处的 Object key,Text value 的类型和上边的 Object,Text 是相对应的,而且最好一样,
 // 不然的话,多数情况运行时会报错。
    public void map(Object key, Text value, Context context) throws IOException,
 InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
  //Hadoop 读入的 value 是以行为单位的,其 key 为该行所对应的行号
// 因为我们要计算每个单词的数目,默认以空格作为间隔,故用 StringTokenizer 辅助做一下字符串的拆分,
// 也可以用 string.split(“”) 来作。
      while (itr.hasMoreTokens()) {// 遍历一下每行字符串中的单词,
        word.set(itr.nextToken());// 出现一个单词就给它设成一个 key 并将其值设为 1
        context.write(word, one);// 输出设成的 key/value 值。
// 以上就是打散的过程
      }
    }
  }

  public static class IntSumReducer  //reduce 所在的静态类
      extends Reducer<Text,IntWritable,Text,IntWritable> {
    // 这里和 Map 中的作用是一样的,设定输入 / 输出的值的类型
    private IntWritable result = new IntWritable();
    public void reduce(Text key, Iterable<IntWritable> values,
                      Context context
                      ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
    // 由于 map 的打散,这里会得到如,{key,values}={“hello”,{1,1,1,1,1,1,….}}, 这样的集合
        sum += val.get(); // 这里需要逐一将它们的 value 取出来予以相加,取得总的出现次数,即为汇和
      }
      result.set(sum);                  // 将 values 的和取得,并设成 result 对应的值
      context.write(key, result); 
  // 此时的 key 即为 map 打散之后输出的 key,没有变化,
  // 变化的是 result, 以前得到的是一个数字的集合,此时已经 // 给算出和了,并做为 key/value 输出。
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration(); // 取得系统的参数
    if (args.length != 2) {// 判断一下命令行输入路径 / 输出路径是否齐全,即是否为两个参数
      System.err.println(“Usage: wordcount <in> <out>”);
      System.exit(2);  // 若非两个参数,即退出
    }

 // 此程序的执行,在 hadoop 看来是一个 Job,故进行初始化 job 操作
    Job job = new Job(conf, “Word Count”);

 // 配置作业名,此程序要执行 WordCount.class 这个字节码文件
    job.setJarByClass(WordCount.class);

 // 配置作业的各个类
 // 此处设置了使用 TokenizerMapper 完成 Map 过程中的处理
 // 使用 IntSumReducer 完成 Combine 和 Reduce 过程中的处理。
 // 在这个 job 中,用 TokenizerMapper 这个类的 map 函数
    job.setMapperClass(TokenizerMapper.class);

 // 在这个 job 中,用 IntSumReducer 这个类的 reduce 函数
    job.setReducerClass(IntSumReducer.class);

  // 在 reduce 的输出时,key 的输出类型为 Text
    job.setOutputKeyClass(Text.class);

  // 在 reduce 的输出时,value 的输出类型为 IntWritable
    job.setOutputValueClass(IntWritable.class);

// 任务的输出和输入路径则由命令行参数指定,并由 FileInputFormat 和 FileOutputFormat 分别设定

 // 初始化要计算 word 的文件的路径
    FileInputFormat.addInputPath(job, new Path(args[0]));

 // 初始化要计算 word 的文件的之后的结果的输出路径
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

 // 这里就是真正的去提交 job 到 hadoop 上去执行了,
 // 完成相应任务的参数设定后,即可调用 job.waitForCompletion()方法执行任务。
 // 意思是指如果这个 job 真正的执行完了则主函数退出了,若没有真正的执行完就退出了,则为非法退出
    System.exit(job.waitForCompletion(true) ? 0 : 1);         

  }
}

————————————– 分割线 ————————————–

Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm

Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm

Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm

Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm

单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm

搭建 Hadoop 环境(在 Winodws 环境下用虚拟机虚拟两个 Ubuntu 系统进行搭建)http://www.linuxidc.com/Linux/2011-12/48894.htm

————————————– 分割线 ————————————–

更多 Hadoop 相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-20发表,共计8663字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中