共计 5587 个字符,预计需要花费 14 分钟才能阅读完成。
Hadoop 就是解决了大数据(大到一台计算机无法进行存储,一台计算机无法在要求的时间内进行处理)的可靠存储和处理。
HDFS,在由普通 PC 组成的集群上提供高可靠的文件存储,通过将块保存多个副本的办法解决服务器或硬盘坏掉的问题。
MapReduce,通过简单的 Mapper 和 Reducer 的抽象提供一个编程模型,可以在一个由几十台上百台的 PC 组成的不可靠集群上并发 地,分布式地处理大量的数据集,而把并发、分布式(如机器间通信)和故障恢复等计算细节隐藏起来。而 Mapper 和 Reducer 的抽象,又是各种各样的 复杂数据处理都可以分解为的基本元素。这样,复杂的数据处理可以分解为由多个 Job(包含一个 Mapper 和一个 Reducer)组成的有向无环图(DAG), 然后每个 Mapper 和 Reducer 放到 Hadoop 集群上执行,就可以得出结果。
用 MapReduce 统计一个文本文件中单词出现的频率的示例 WordCount 请参见:WordCount – Hadoop Wiki,如果对 MapReduce 不恨熟悉,通过该示例对 MapReduce 进行一些了解对理解下文有帮助。在 MapReduce 中,Shuffle 是 一个非常重要的过程,正是有了看不见的 Shuffle 过程,才可以使在 MapReduce 之上写数据处理的开发者完全感知不到分布式和并发的存在。
广义的 Shuffle 是指图中在 Map 和 Reuce 之间的一系列过程。
Hadoop 的局限和不足但是,MapRecue 存在以下局限,使用起来比较困难。
抽象层次低,需要手工编写代码来完成,使用上难以上手。
只提供两个操作,Map 和 Reduce,表达力欠缺。
一个 Job 只有 Map 和 Reduce 两个阶段(Phase),复杂的计算需要大量的 Job 完成,Job 之间的依赖关系是由开发者自己管理的。
处理逻辑隐藏在代码细节中,没有整体逻辑中间结果也放在 HDFS 文件系统中 ReduceTask 需要等待所有 MapTask 都完成后才可以开始
时延高,只适用 Batch 数据处理,对于交互式数据处理,实时数据处理的支持不够
对于迭代式数据处理性能比较差
比如说,用 MapReduce 实现两个表的 Join 都是一个很有技巧性的过程,
如下图所示:(图片来源:Real World Hadoop)因此,在 Hadoop 推出之后,出现了很多相关的技术对其中的局限进行改进,如 Pig,Cascading,JAQL,OOzie,Tez,Spark 等。
Apache SparkApache Spark 是一个新兴的大数据处理的引擎,主要特点是提供了一个集群的分布式内存抽象,以支持需要工作集的应用。
这个抽象就是 RDD(Resilient Distributed Dataset),RDD 就是一个不可变的带分区的记录集合,RDD 也是 Spark 中的编程模型。Spark 提供了 RDD 上的两类操作,转换和动作。转换 是用来定义一个新的 RDD,包括 map, flatMap, filter, union, sample, join, groupByKey, cogroup, ReduceByKey, cros, sortByKey, mapValues 等,动作是返回一个结果,包括 collect, reduce, count, save, lookupKey。
Spark 的 API 非常简单易用,Spark 的 WordCount 的示例如下所示:
val spark = new SparkContext(master, appName, [sparkHome], [jars])
val file = spark.textFile(“hdfs://…”)
val counts = file.flatMap(line => line.split(” “))
.map(word => (word, 1))
.reduceByKey(_ + _)
counts.saveAsTextFile(“hdfs://…”)
其中的 file 是根据 HDFS 上的文件创建的 RDD,后面的 flatMap,map,reduceByKe 都创建出一个新的 RDD,一个简短的程序就能够执行很多个转换和动作。
在 Spark 中,所有 RDD 的转换都是是惰性求值的。RDD 的转换操作会生成新的 RDD,新的 RDD 的数据依赖于原来的 RDD 的数据,每个 RDD 又包含多个分区。那么一段程序实际上就构造了一个由相互依赖的多个 RDD 组成的有向无环图(DAG)。并通过在 RDD 上执行动作将这个有向无环图作为一个 Job 提交给 Spark 执行。
例如,上面的 WordCount 程序就会生成如下的 DAGscala> counts.toDebugStringres0: String =MapPartitionsRDD[7] at reduceByKey at:14 (1 partitions) ShuffledRDD[6] at reduceByKey at:14 (1 partitions) MapPartitionsRDD[5] at reduceByKey at:14 (1 partitions) MappedRDD[4] at map at:14 (1 partitions) FlatMappedRDD[3] at flatMap at:14 (1 partitions) MappedRDD[1] at textFile at:12 (1 partitions) HadoopRDD[0] at textFile at:12 (1 partitions)
Spark 对于有向无环图 Job 进行调度,确定阶段(Stage),分区(Partition),流水线(Pipeline),任务(Task)和缓存(Cache),进行优化,并在 Spark 集群上运行 Job。RDD 之间的依赖分为宽依赖(依赖多个分区)和窄依赖(只依赖一个分区),在确定阶段 时,需要根据宽依赖划分阶段。根据分区划分任务。
Spark 支持故障恢复的方式也不同,提供两种方式,Linage,通过数据的血缘关系,再执行一遍前面的处理,Checkpoint,将数据集存储到持久存储中。
Spark 为迭代式数据处理提供更好的支持。每次迭代的数据可以保存在内存中,而不是写入文件。
Spark 的性能相比 Hadoop 有很大提升,2014 年 10 月,Spark 完成了一个 Daytona Gray 类别的 Sort Benchmark 测试,排序完全是在磁盘上进行的,与 Hadoop 之前的测试的对比结果如表格所示:
从表格中可以看出排序 100TB 的数据(1 万亿条数据),Spark 只用了 Hadoop 所用 1 /10 的计算资源,耗时只有 Hadoop 的 1 /3。
Spark 的优势不仅体现在性能提升上的,Spark 框架为批处理(Spark Core),交互式(Spark SQL),流式(Spark Streaming),机器学习(MLlib),图计算(GraphX)提供一个统一的数据处理平台,这相对于使用 Hadoop 有很大优势。
按照 Databricks 的连城的说法是 One Stack To Rule Them All
特别是在有些情况下,你需要进行一些 ETL 工作,然后训练一个机器学习的模型,最后进行一些查询,如果是使用 Spark,你可以在一段程序中将这三部分的逻辑完成形成一个大的有向无环图(DAG),而且 Spark 会对大的有向无环图进行整体优化。
例如下面的程序:
val points = sqlContext.sql(“SELECT latitude, longitude FROM historic_tweets”)
val model = KMeans.train(points, 10)
sc.twitterStream(…) .map(t => (model.closestCenter(t.location), 1)) .reduceByWindow(“5s”, _ + _)
(示例来源:http://www.slideshare.net/Hadoop_Summit/building-a-unified-data-pipeline-in-apache-spark)
这段程序的第一行是用 Spark SQL 查寻出了一些点,第二行是用 MLlib 中的 K -means 算法使用这些点训练了一个模型,第三行是用 Spark Streaming 处理流中的消息,使用了训练好的模型。
Lambda Architecture
Lambda Architecture 是一个大数据处理平台的参考模型,如下图所示:
其中包含 3 层,Batch Layer,Speed Layer 和 Serving Layer,由于 Batch Layer 和 Speed Layer 的数据处理逻辑是一致的,如果用 Hadoop 作为 Batch Layer,而用 Storm 作为 Speed Layer,你需要维护两份使用不同技术的代码。
而 Spark 可以作为 Lambda Architecture 一体化的解决方案, 大致如下:
Batch Layer,HDFS+Spark Core,将实时的增量数据追加到 HDFS 中,使用 Spark Core 批量处理全量数据,生成全量数据的视图。,
Speed Layer,Spark Streaming 来处理实时的增量数据,以较低的时延生成实时数据的视图。
Serving Layer,HDFS+Spark SQL(也许还有 BlinkDB),存储 Batch Layer 和 Speed Layer 输出的视图,提供低时延的即席查询功能,将批量数据的视图与实时数据的视图合并。
总结
如果说,MapReduce 是公认的分布式数据处理的低层次抽象,类似逻辑门电路中的与门,或门和非门,那么 Spark 的 RDD 就是分布式大数据处理的高层次抽象,类似逻辑电路中的编码器或译码器等。
RDD 就是一个分布式的数据集合(Collection),对这个集合的任何操作都可以像函数式编程中操作内存中的集合一样直观、简便,但集合操 作的实现确是在后台分解成一系列 Task 发送到几十台上百台服务器组成的集群上完成的。最近新推出的大数据处理框架 Apache Flink 也使用数据集(Data Set)和其上的操作作为编程模型的。
由 RDD 组成的有向无环图(DAG)的执行是调度程序将其生成物理计划并进行优化,然后在 Spark 集群上执行的。Spark 还提供了一个类似于 MapReduce 的执行引擎,该引擎更多地使用内存,而不是磁盘,得到了更好的执行性能。
那么 Spark 解决了 Hadoop 的哪些问题呢?
抽象层次低,需要手工编写代码来完成,使用上难以上手。
=> 基于 RDD 的抽象,实数据处理逻辑的代码非常简短。。
只提供两个操作,Map 和 Reduce,表达力欠缺。
=> 提供很多转换和动作,很多基本操作如 Join,GroupBy 已经在 RDD 转换和动作中实现。
一个 Job 只有 Map 和 Reduce 两个阶段(Phase),复杂的计算需要大量的 Job 完成,Job 之间的依赖关系是由开发者自己管理的。
=> 一个 Job 可以包含 RDD 的多个转换操作,在调度时可以生成多个阶段(Stage),而且如果多个 map 操作的 RDD 的分区不变,是可以放在同一个 Task 中进行。
处理逻辑隐藏在代码细节中,没有整体逻辑
=> 在 Scala 中,通过匿名函数和高阶函数,RDD 的转换支持流式 API,可以提供处理逻辑的整体视图。代码不包含具体操作的实现细节,逻辑更清晰。
中间结果也放在 HDFS 文件系统中
=> 中间结果放在内存中,内存放不下了会写入本地磁盘,而不是 HDFS。
ReduceTask 需要等待所有 MapTask 都完成后才可以开始
=> 分区相同的转换构成流水线放在一个 Task 中运行,分区不同的转换需要 Shuffle,被划分到不同的 Stage 中,需要等待前面的 Stage 完成后才可以开始。
时延高,只适用 Batch 数据处理,对于交互式数据处理,实时数据处理的支持不够
=> 通过将流拆成小的 batch 提供 Discretized Stream 处理流数据。
对于迭代式数据处理性能比较差
=> 通过在内存中缓存数据,提高迭代式计算的性能。
因此,Hadoop MapReduce 会被���一代的大数据处理平台替代是技术发展的趋势,而在新一代的大数据处理平台中,Spark 目前得到了最广泛的认可和支持。
下面关于 Hadoop 的文章您也可能喜欢,不妨看看:
Ubuntu14.04 下 Hadoop2.4.1 单机 / 伪分布式安装配置教程 http://www.linuxidc.com/Linux/2015-02/113487.htm
CentOS 安装和配置 Hadoop2.2.0 http://www.linuxidc.com/Linux/2014-01/94685.htm
Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm
Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm
Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm
Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm
单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm
更多 Hadoop 相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13
本文永久更新链接地址:http://www.linuxidc.com/Linux/2015-11/125374.htm