共计 7449 个字符,预计需要花费 19 分钟才能阅读完成。
Hadoop 是 Apache 下的一个项目,由 HDFS、MapReduce、HBase、Hive 和 ZooKeeper 等成员组成。其中,HDFS 和 MapReduce 是两个最基础最重要的成员。
HDFS 是 Google GFS 的开源版本,一个高度容错的分布式文件系统,它能够提供高吞吐量的数据访问,适合存储海量(PB 级)的大文件(通常超过 64M),其原理如下图所示:
采用 Master/Slave 结构。NameNode 维护集群内的元数据,对外提供创建、打开、删除和重命名文件或目录的功能。DatanNode 存储数据,并提负责处理数据的读写请求。DataNode 定期向 NameNode 上报心跳,NameNode 通过响应心跳来控制 DataNode。
MapReduce 是大规模数据(TB 级)计算的利器,Map 和 Reduce 是它的主要思想,来源于函数式编程语言,它的原理如下图所示:Map 负责将数据打散,Reduce 负责对数据进行聚集,用户只需要实现 map 和 reduce 两个接口,即可完成 TB 级数据的计算,常见的应用包括:日志分析和数据挖掘等数据分析应用。另外,还可用于科学数据计算,如圆周率 PI 的计算等。Hadoop MapReduce 的实现也采用了 Master/Slave 结构。Master 叫做 JobTracker,而 Slave 叫做 TaskTracker。用户提交的计算叫做 Job,每一个 Job 会被划分成若干个 Tasks。JobTracker 负责 Job 和 Tasks 的调度,而 TaskTracker 负责执行 Tasks。
MapReduce 中的 Shuffle 和 Sort 分析
MapReduce 是现今一个非常流行的分布式计算框架,它被设计用于并行计算海量数据。第一个提出该技术框架的是 Google 公司,而 Google 的灵感则来自于函数式编程语言,如 LISP,Scheme,ML 等。MapReduce 框架的核心步骤主要分两部分:Map 和 Reduce。当你向 MapReduce 框架提交一个计算作业时,它会首先把计算作业拆分成若干个 Map 任务,然后分配到不同的节点上去执行,每一个 Map 任务处理输入数据中的一部分,当 Map 任务完成后,它会生成一些中间文件,这些中间文件将会作为 Reduce 任务的输入数据。Reduce 任务的主要目标就是把前面若干个 Map 的输出汇总到一起并输出。从高层抽象来看,MapReduce 的数据流图如图 1 所示:
MapReduce 作业运行流程:
本文的重点是剖析 MapReduce 的核心过程 —-Shuffle 和 Sort。在本文中,Shuffle 是指从 Map 产生输出开始,包括系统执行排序以及传送 Map 输出到 Reducer 作为输入的过程。在这里我们将去探究 Shuffle 是如何工作的,因为对基础的理解有助于对 MapReduce 程序进行调优。
————————————– 分割线 ————————————–
Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm
Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm
Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm
Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm
单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm
搭建 Hadoop 环境(在 Winodws 环境下用虚拟机虚拟两个 Ubuntu 系统进行搭建)http://www.linuxidc.com/Linux/2011-12/48894.htm
————————————– 分割线 ————————————–
首先从 Map 端开始分析,当 Map 开始产生输出的时候,他并不是简单的把数据写到磁盘,因为频繁的操作会导致性能严重下降,他的处理更加复杂,数据首先是写到内存中的一个缓冲区,并作一些预排序,以提升效率,如图:
每个 Map 任务都有一个用来写入输出数据的循环内存缓冲区,这个缓冲区默认大小是 100M,可以通过 io.sort.mb 属性来设置具体的大小,当缓冲区中的数据量达到一个特定的阀值 (io.sort.mb * io.sort.spill.percent,其中 io.sort.spill.percent 默认是 0.80) 时,系统将会启动一个后台线程把缓冲区中的内容 spill 到磁盘。在 spill 过程中,Map 的输出将会继续写入到缓冲区,但如果缓冲区已经满了,Map 就会被阻塞直道 spill 完成。spill 线程在把缓冲区的数据写到磁盘前,会对他进行一个二次排序,首先根据数据所属的 partition 排序,然后每个 partition 中再按 Key 排序。输出包括一个索引文件和数据文件,如果设定了 Combiner,将在排序输出的基础上进行。Combiner 就是一个 Mini Reducer,它在执行 Map 任务的节点本身运行,先对 Map 的输出作一次简单的 Reduce,使得 Map 的输出更紧凑,更少的数据会被写入磁盘和传送到 Reducer。Spill 文件保存在由 mapred.local.dir 指定的目录中,Map 任务结束后删除。
更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2014-09/107138p2.htm
每当内存中的数据达到 spill 阀值的时候,都会产生一个新的 spill 文件,所以在 Map 任务写完他的最后一个输出记录的时候,可能会有多个 spill 文件,在 Map 任务完成前,所有的 spill 文件将会被归并排序为一个索引文件和数据文件。如图 3 所示。这是一个多路归并过程,最大归并路数由 io.sort.factor 控制(默认是 10)。如果设定了 Combiner,并且 spill 文件的数量至少是 3(由 min.num.spills.for.combine 属性控制),那么 Combiner 将在输出文件被写入磁盘前运行以压缩数据。
我们再来看一下 Combiner 的执行时机。实际上,Conbiner 函数的执行时机可能会在 map 的 merge 操作完成之前,也可能在 merge 之后执行,这个时机由配置参数 min.num.spill.for.combine(该值默认为 3),也就是说在 map 端产生的 spill 文件最少有 min.num.spill.for.combine 的时候,Conbiner 函数会在 merge 操作合并最终的本机结果文件之前执行,否则在 merge 之后执行。通过这种方式,就可以在 spill 文件很多并且需要做 conbine 的时候,减少写入本地磁盘的数据量,同样也减少了对磁盘的读写频率,可以起到优化作业的目的。(不好理解,白话说。在合并的时候每个读入的 spill 文件分别进行 combiner 然后再合并 / 把读入的所有 spill 文件合并后再做 combiner)
对写入到磁盘的数据进行压缩(这种压缩同 Combiner 的压缩不一样)通常是一个很好的方法,因为这样做使得数据写入磁盘的速度更快,节省磁盘空间,并减少需要传送到 Reducer 的数据量。默认输出是不被压缩的,但可以很简单的设置 mapred.compress.map.output 为 true 启用该功能。压缩所使用的库由 mapred.map.output.compression.codec 来设定。
当 spill 文件归并完毕后,Map 将删除所有的临时 spill 文件,并告知 TaskTracker 任务已完成。Reducers 通过 HTTP 来获取对应的数据。用来传输 partitions 数据的工作线程个数由 tasktracker.http.threads 控制,这个设定是针对每一个 TaskTracker 的,并不是单个 Map,默认值为 40,在运行大作业的大集群上可以增大以提升数据传输速率。
现在让我们转到 Shuffle 的 Reduce 部分。Map 的输出文件放置在运行 Map 任务的 TaskTracker 的本地磁盘上(注意:Map 输出总是写到本地磁盘,但是 Reduce 输出不是,一般是写到 HDFS),它是运行 Reduce 任务的 TaskTracker 所需要的输入数据。Reduce 任务的输入数据分布在集群内的多个 Map 任务的输出中,Map 任务可能会在不同的时间内完成,只要有其中一个 Map 任务完成,Reduce 任务就开始拷贝他的输出。这个阶段称为拷贝阶段,Reduce 任务拥有多个拷贝线程,可以并行的获取 Map 输出。可以通过设定 mapred.reduce.parallel.copies 来改变线程数。
Reduce 是怎么知道从哪些 TaskTrackers 中获取 Map 的输出呢?当 Map 任务完成之后,会通知他们的父 TaskTracker,告知状态更新,然后 TaskTracker 再转告 JobTracker,这些通知信息是通过心跳通信机制传输的,因此针对以一个特定的作业,jobtracker 知道 Map 输出与 tasktrackers 的映射关系。Reducer 中有一个线程会间歇的向 JobTracker 询问 Map 输出的地址,直到把所有的数据都取到。在 Reducer 取走了 Map 输出之后,TaskTracker 不会立即删除这些数据,因为 Reducer 可能会失败,他们会在整个作业完成之后,JobTracker 告知他们要删除的时候才去删除。
如果 Map 输出足够小,他们会被拷贝到 Reduce TaskTracker 的内存中(缓冲区的大小由 mapred.job.shuffle.input.buffer.percnet 控制),或者达到了 Map 输出的阀值的大小(由 mapred.inmem.merge.threshold 控制),缓冲区中的数据将会被归并然后 spill 到磁盘。
拷贝来的数据叠加在磁盘上,有一个后台线程会将它们归并为更大的排序文件,这样做节省了后期归并的时间。对于经过压缩的 Map 输出,系统会自动把它们解压到内存方便对其执行归并。
当所有的 Map 输出都被拷贝后,Reduce 任务进入排序阶段(更恰当的说应该是归并阶段,因为排序在 Map 端就已经完成),这个阶段会对所有的 Map 输出进行归并排序,这个工作会重复多次才能完成。
假设这里有 50 个 Map 输出(可能有保存在内存中的),并且归并因子是 10(由 io.sort.factor 控制,就像 Map 端的 merge 一样),那最终需要 5 次归并。每次归并会把 10 个文件归并为一个,最终生成 5 个中间文件。在这一步之后,系统不再把 5 个中间文件归并成一个,而是排序后直接“喂”给 Reduce 函数,省去向磁盘写数据这一步。最终归并的数据可以是混合数据,既有内存上的也有磁盘上的。由于归并的目的是归并最少的文件数目,使得在最后一次归并时总文件个数达到归并因子的数目,所以每次操作所涉及的文件个数在实际中会更微妙些。譬如,如果有 40 个文件,并不是每次都归并 10 个最终得到 4 个文件,相反第一次只归并 4 个文件,然后再实现三次归并,每次 10 个,最终得到 4 个归并好的文件和 6 个未归并的文件。要注意,这种做法并没有改变归并的次数,只是最小化写入磁盘的数据优化措施,因为最后一次归并的数据总是直接送到 Reduce 函数那里。在 Reduce 阶段,Reduce 函数会作用在排序输出的每一个 key 上。这个阶段的输出被直接写到输出文件系统,一般是 HDFS。在 HDFS 中,因为 TaskTracker 节点也运行着一个 DataNode 进程,所以第一个块备份会直接写到本地磁盘。到此,MapReduce 的 Shuffle 和 Sort 分析完毕。
更多 Hadoop 相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13
Hadoop 是 Apache 下的一个项目,由 HDFS、MapReduce、HBase、Hive 和 ZooKeeper 等成员组成。其中,HDFS 和 MapReduce 是两个最基础最重要的成员。
HDFS 是 Google GFS 的开源版本,一个高度容错的分布式文件系统,它能够提供高吞吐量的数据访问,适合存储海量(PB 级)的大文件(通常超过 64M),其原理如下图所示:
采用 Master/Slave 结构。NameNode 维护集群内的元数据,对外提供创建、打开、删除和重命名文件或目录的功能。DatanNode 存储数据,并提负责处理数据的读写请求。DataNode 定期向 NameNode 上报心跳,NameNode 通过响应心跳来控制 DataNode。
MapReduce 是大规模数据(TB 级)计算的利器,Map 和 Reduce 是它的主要思想,来源于函数式编程语言,它的原理如下图所示:Map 负责将数据打散,Reduce 负责对数据进行聚集,用户只需要实现 map 和 reduce 两个接口,即可完成 TB 级数据的计算,常见的应用包括:日志分析和数据挖掘等数据分析应用。另外,还可用于科学数据计算,如圆周率 PI 的计算等。Hadoop MapReduce 的实现也采用了 Master/Slave 结构。Master 叫做 JobTracker,而 Slave 叫做 TaskTracker。用户提交的计算叫做 Job,每一个 Job 会被划分成若干个 Tasks。JobTracker 负责 Job 和 Tasks 的调度,而 TaskTracker 负责执行 Tasks。
MapReduce 中的 Shuffle 和 Sort 分析
MapReduce 是现今一个非常流行的分布式计算框架,它被设计用于并行计算海量数据。第一个提出该技术框架的是 Google 公司,而 Google 的灵感则来自于函数式编程语言,如 LISP,Scheme,ML 等。MapReduce 框架的核心步骤主要分两部分:Map 和 Reduce。当你向 MapReduce 框架提交一个计算作业时,它会首先把计算作业拆分成若干个 Map 任务,然后分配到不同的节点上去执行,每一个 Map 任务处理输入数据中的一部分,当 Map 任务完成后,它会生成一些中间文件,这些中间文件将会作为 Reduce 任务的输入数据。Reduce 任务的主要目标就是把前面若干个 Map 的输出汇总到一起并输出。从高层抽象来看,MapReduce 的数据流图如图 1 所示:
MapReduce 作业运行流程:
本文的重点是剖析 MapReduce 的核心过程 —-Shuffle 和 Sort。在本文中,Shuffle 是指从 Map 产生输出开始,包括系统执行排序以及传送 Map 输出到 Reducer 作为输入的过程。在这里我们将去探究 Shuffle 是如何工作的,因为对基础的理解有助于对 MapReduce 程序进行调优。
————————————– 分割线 ————————————–
Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm
Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm
Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm
Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm
单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm
搭建 Hadoop 环境(在 Winodws 环境下用虚拟机虚拟两个 Ubuntu 系统进行搭建)http://www.linuxidc.com/Linux/2011-12/48894.htm
————————————– 分割线 ————————————–
首先从 Map 端开始分析,当 Map 开始产生输出的时候,他并不是简单的把数据写到磁盘,因为频繁的操作会导致性能严重下降,他的处理更加复杂,数据首先是写到内存中的一个缓冲区,并作一些预排序,以提升效率,如图:
每个 Map 任务都有一个用来写入输出数据的循环内存缓冲区,这个缓冲区默认大小是 100M,可以通过 io.sort.mb 属性来设置具体的大小,当缓冲区中的数据量达到一个特定的阀值 (io.sort.mb * io.sort.spill.percent,其中 io.sort.spill.percent 默认是 0.80) 时,系统将会启动一个后台线程把缓冲区中的内容 spill 到磁盘。在 spill 过程中,Map 的输出将会继续写入到缓冲区,但如果缓冲区已经满了,Map 就会被阻塞直道 spill 完成。spill 线程在把缓冲区的数据写到磁盘前,会对他进行一个二次排序,首先根据数据所属的 partition 排序,然后每个 partition 中再按 Key 排序。输出包括一个索引文件和数据文件,如果设定了 Combiner,将在排序输出的基础上进行。Combiner 就是一个 Mini Reducer,它在执行 Map 任务的节点本身运行,先对 Map 的输出作一次简单的 Reduce,使得 Map 的输出更紧凑,更少的数据会被写入磁盘和传送到 Reducer。Spill 文件保存在由 mapred.local.dir 指定的目录中,Map 任务结束后删除。
更多详情见请继续阅读下一页的精彩内容:http://www.linuxidc.com/Linux/2014-09/107138p2.htm