共计 12108 个字符,预计需要花费 31 分钟才能阅读完成。
背景
目前按照大数据处理类型来分大致可以分为:批量数据处理、交互式数据查询、实时数据流处理,这三种数据处理方式对应的业务场景也都不一样;
关注大数据处理的应该都知道 Hadoop,而 Hadoop 的核心为 HDFS 与MapReduce,HDFS 分布式文件系统在 Hadop 中是用来存储数据的;MapReduce 为 Hadoop 处理数据的核心,接触过函数式编程的都知道函数式语言中也存在着 Map、Reduce 函数其实这两者的思想是一致的;也正是因为 Hadoop 数据处理核心为 MapReduce 奠定了它注定不是适用场景广泛的大数据框架;
可以这么说 Hadoop 适用于 Map、Reduce 存在的任何场景,具体场景比如:WordCount、排序、PageRank、用户行为分析、数据统计等,而这些场景都算是批量数据处理,而 Hadoop 并不适用于交互式数据查询、实时数据流处理;
这时候就出现了各种数据处理模型下的专用框架如:Storm、Impala、GraphLab 等;
1、Storm:针对实时数据流处理的分布式框架;
2、Impala:适用于交互式大数据查询的分布式框架;
3、GraphLab:基于图模型的机器学习框架;
1、MapReduce 简单模型
这时候如果一个团队或一个公司中同时都有设计到大数据批量处理、交互式查询、实时数据流处理这三个场景;这时候就会有一些问题:
1、学习成本很高,每个框架都是不同的实现语言、不同的团队开发的;
2、各个场景组合起来代价必然会很大;
3、各个框架中共享的中间数据共享与移动成本高;
Spark
就在这时候 UC Berkeley AMP 推出了全新的大数据处理框架:Spark 提供了全面、统一适用与不同场景的大数据处理需求(批量数据处理、交互式数据查询、实时数据流处理、机器学习);Spark 不仅性能远胜于 Hadoop 而却还兼容 Hadoop 生态系统,Spark 可以运行在 Hadoop HDFS 之上提供争强 功能,可以说 Spark 替代了 Hadoop MapReduce,但 Spark 依然兼容 Hadoop 中的 YARN 与 Apache Mesos 组件,现有 Hadoop 用户可以很容易就迁移到 Spark;
Spark 提出了 RDD(Resilient Distributed Datasets) 这么一个全新的概念,RDD 弹性分布式数据集是并行、容错的分布式数据结构;RDD 可以持久化到硬盘或内存当中,为一个分区的数据集,分区的多少决定了并行计算的粒度;并且提供了一系列的操作 RDD 中的数据:
1、创建操作(Creation Operation):RDD 由 SparkContext 通过内存数据或外部文件系统创建;
2、转换操作(Transformation Operation):将 RDD 通过转换操作变为另一个 RDD,Spark 提供了 map、flatMap、filter 等一系列的转换操作;
3、控制操作(Control Operation):将 RDD 持久化到内存或硬盘当中,如 cache 将 filterRDD 缓存到内存;
4、行动操作:(Action Operation):Spark 采用了惰性计算,对于任何行动操作都会产生 Spark Job 运行产生最终结果;提供了 join、groupBy、count 等操作,Spark 中存在两种操作产生的结果为 Scala 集合或者标量与 RDD 保存到文件或数据库;
1、Spark 结构图
Spark RDD:Spark RDD 提供了一系列的操作接口,为不变的数据存储结构并存储与内存中使用 DAG 进行任务规划使更好的处理 MapReduce 类似的批处理;
Shark/Spark SQL:分布式 SQL 引擎,兼容 Hive 性能远比 Hive 高很多;
Spark Streaming:将数据流分解为一系列批处理作业使用 Spark 调度框架更好的支持数据流操作,支持的数据输入源有:Kafka、Flume 等;
GraphX:兼容 Pregel、GraphLab 接口为基于 Spark 的图计算框架;
MLlib:为 Spark 的机器学习算法库,支持常用的算法有:分类算法、推荐算法、聚类算法等等;
性能卓越、支持多种大数据处理模型、支持多种编程语言接口:Java、Scala、Python,许多大公司如 IBM 等大力支持推广 Spark 的发展;
前面简单的介绍了 Spark 的一些概念还有 Spark 生态圈的一些情况,这里主要是介绍 Spark 运行模式与 Spark Standalone 模式的部署;
Spark 运行模式
在 Spark 中存在着多种运行模式,可使用本地模式运行、可使用伪分布式模式运行、使用分布式模式也存在多种模式如:Spark Mesos 模式、Spark YARN 模式;
Spark Mesos 模式:官方推荐模式,通用集群管理,有两种调度模式:粗粒度模式(Coarse-grained Mode)与细粒度模式(Fine-grained Mode);
Spark YARN 模式:Hadoop YARN 资源管理模式;
Standalone 模式: 简单模式或称独立模式,可以单独部署到一个集群中,无依赖任何其他资源管理系统。不使用其他调度工具时会存在单点故障,使用 Zookeeper 等可以解决;
Local 模式:本地模式,可以启动本地一个线程来运行 job,可以启动 N 个线程或者使用系统所有核运行 job;
Standalone 模式部署实践
Standalone模式需要将 Spark 复制到集群中的每个节点,然后分别启动每个节点即可;Spark Standalone 模式的集群由 Master 与 Worker 节点组成,程序通过与 Master 节点交互申请资源,Worker 节点启动 Executor 运行;
这里使用了两节点部署 Spark 集群:192.168.2.131、192.168.2.133,下面简称为:133 与 131 节点;其中 133 节点既是 Master 节点同时又是 Worker 节点,131 节点为 Worker 节点;
节点结构图
部署步骤:
一、首先在 133 节点 上下载 Java、Scala 与 Spark 并解压到 /usr/local 目录下,这里使用的 Spark 是 带有 Hadoop 的版本;
下载解压到 local
二、配置 Java、Scala 与 Spark 环境变量,这里把环境变量配置到 /etc/profile 文件中,请忽略 Hadoop 环境变量;
环境变量配置
三、测试 Java、Scala 是否配置成功,在终端输入:java -version 与 scala -version
四、配置 Spark 环境变量,进入 Spark 目录下的 conf 目录把 slaves.template 重命名为 slaves,接着把 spark-env.sh.template 重命名为:spark-env.sh;
重命名
修改 spark-env.sh 文件,添加环境变量;
spark-env 修改
五、在 133 节点使用 scp 把下载好的 Java、Scala、Spark 发送到 131 节点,并在 131 节点上重复以上所有步骤;
六、在两个节点都完成以上所有步骤后开始启动 Spark,133 节点 既是 Master 又是 Worker;
1、首先在 133 启动 Spark,进入 Spark 目录的 sbin 目录执行./start-all.sh:
Master 启动
使用 jps 命令发现存在 Master 与 Worker 进程,说明 Spark 已启动成功;
2、启动 131 节点 的 Spark,进入 Spark 目录的 sbin 目录执行:./start-slave.sh spark://192.168.2.133:7077
start-slave.sh 后面的地址为 Master 节点的通信地址,指定当前 slave 节点连接到的 Master;
slave 启动:
使用 jps 命令,存在 Worker 进程则说明当前的 Spark Worker 节点启动成功;
七、 Spark Web 页面
可以通过 http://192.168.2.133:8080/ 地址查看到当前 Spark 集群的信息,这地址为 Master 节点的地址;
SparkWeb:
参考资料:
http://spark.apache.org/docs/latest/spark-standalone.html
背景
目前按照大数据处理类型来分大致可以分为:批量数据处理、交互式数据查询、实时数据流处理,这三种数据处理方式对应的业务场景也都不一样;
关注大数据处理的应该都知道 Hadoop,而 Hadoop 的核心为 HDFS 与MapReduce,HDFS 分布式文件系统在 Hadop 中是用来存储数据的;MapReduce 为 Hadoop 处理数据的核心,接触过函数式编程的都知道函数式语言中也存在着 Map、Reduce 函数其实这两者的思想是一致的;也正是因为 Hadoop 数据处理核心为 MapReduce 奠定了它注定不是适用场景广泛的大数据框架;
可以这么说 Hadoop 适用于 Map、Reduce 存在的任何场景,具体场景比如:WordCount、排序、PageRank、用户行为分析、数据统计等,而这些场景都算是批量数据处理,而 Hadoop 并不适用于交互式数据查询、实时数据流处理;
这时候就出现了各种数据处理模型下的专用框架如:Storm、Impala、GraphLab 等;
1、Storm:针对实时数据流处理的分布式框架;
2、Impala:适用于交互式大数据查询的分布式框架;
3、GraphLab:基于图模型的机器学习框架;
1、MapReduce 简单模型
这时候如果一个团队或一个公司中同时都有设计到大数据批量处理、交互式查询、实时数据流处理这三个场景;这时候就会有一些问题:
1、学习成本很高,每个框架都是不同的实现语言、不同的团队开发的;
2、各个场景组合起来代价必然会很大;
3、各个框架中共享的中间数据共享与移动成本高;
Spark
就在这时候 UC Berkeley AMP 推出了全新的大数据处理框架:Spark 提供了全面、统一适用与不同场景的大数据处理需求(批量数据处理、交互式数据查询、实时数据流处理、机器学习);Spark 不仅性能远胜于 Hadoop 而却还兼容 Hadoop 生态系统,Spark 可以运行在 Hadoop HDFS 之上提供争强 功能,可以说 Spark 替代了 Hadoop MapReduce,但 Spark 依然兼容 Hadoop 中的 YARN 与 Apache Mesos 组件,现有 Hadoop 用户可以很容易就迁移到 Spark;
Spark 提出了 RDD(Resilient Distributed Datasets) 这么一个全新的概念,RDD 弹性分布式数据集是并行、容错的分布式数据结构;RDD 可以持久化到硬盘或内存当中,为一个分区的数据集,分区的多少决定了并行计算的粒度;并且提供了一系列的操作 RDD 中的数据:
1、创建操作(Creation Operation):RDD 由 SparkContext 通过内存数据或外部文件系统创建;
2、转换操作(Transformation Operation):将 RDD 通过转换操作变为另一个 RDD,Spark 提供了 map、flatMap、filter 等一系列的转换操作;
3、控制操作(Control Operation):将 RDD 持久化到内存或硬盘当中,如 cache 将 filterRDD 缓存到内存;
4、行动操作:(Action Operation):Spark 采用了惰性计算,对于任何行动操作都会产生 Spark Job 运行产生最终结果;提供了 join、groupBy、count 等操作,Spark 中存在两种操作产生的结果为 Scala 集合或者标量与 RDD 保存到文件或数据库;
1、Spark 结构图
Spark RDD:Spark RDD 提供了一系列的操作接口,为不变的数据存储结构并存储与内存中使用 DAG 进行任务规划使更好的处理 MapReduce 类似的批处理;
Shark/Spark SQL:分布式 SQL 引擎,兼容 Hive 性能远比 Hive 高很多;
Spark Streaming:将数据流分解为一系列批处理作业使用 Spark 调度框架更好的支持数据流操作,支持的数据输入源有:Kafka、Flume 等;
GraphX:兼容 Pregel、GraphLab 接口为基于 Spark 的图计算框架;
MLlib:为 Spark 的机器学习算法库,支持常用的算法有:分类算法、推荐算法、聚类算法等等;
性能卓越、支持多种大数据处理模型、支持多种编程语言接口:Java、Scala、Python,许多大公司如 IBM 等大力支持推广 Spark 的发展;
Spark 中最核心的概念为 RDD(Resilient Distributed DataSets) 中文为:弹性分布式数据集 ,RDD 为对分布式内存对象的 抽象它表示一个 被分区不可变 且能 并行操作 的数据集;RDD 为可序列化的、可缓存到内存对 RDD 进行操作过后还可以存到内存中,下次操作直接把内存中 RDD 作为输入,避免了 Hadoop MapReduce 的大 IO 操作;
RDD 生成
Spark 所要处理的任何数据都是存储在 RDD 之中,目前两种方式可以生成一个 RDD:
1、从 RDD 进行转换操作
2、使用外部存储系统创建,如:HDFS;
RDD 操作
RDD 支持两种操作:
转换(transformation operation)
转换操作将一个 RDD 经过操作后返回一个全新的 RDD,转换操是 lazy(惰性)的这期间不会产生任何数据的计算;
转换函数有:distinct、filter、map、flatMap、union、groupByKey 等;
行动(action operation)
每一个行动操作都会触发 Spark Job 进行计算并返回最终的结果,行动操作有这么几类:返回标量,count 返回元素的个数;返回 Scala 集合,task(n)返回 0 到 n - 1 组成的集合;写入外部存储,saveAsHadoopFile(path)存储到 HDFS;
行动函数有:count、top、task、saveAsHadoopFile 等;
RDD 为 不可变 的数据集,可以使用转换操作“修改”一个 RDD,但这操作过后返回的是一个全新的 RDD 原本 RDD 并没有改变;
RDD 状态转换图
Lineage
Spark RDD 只支持 粗粒度 的操作,对一个 RDD 的操作都会被作用于该 RDD 的所有数据;为了保证 RDD 的高可用性 RDD 通过使用 Lineage(血统)记录 了 RDD 演变流程(从其他 RDD 到当前 RDD 所做的操作)当 RDD 分区数据丢失时可以通过 Lineage 的信息重新计算与恢复分区数据,或进行 RDD 的重建;
RDD 的依赖关系(dependencies):
由于对 RDD 的操作都是粗粒度的一个转换操作过后都会产生一个新的 RDD,RDD 之间会形成一个前后依赖关系;Spark 中存在两种依赖:窄依赖(Narrow Dependencies)、宽依赖(Wide Dependencies);
窄依赖(Narrow Dependencies):一个父 RDD 的分区只能被一个子 RDD 的一个分区使用;
宽依赖(Wide Dependencies):多个子 RDD 的分区依赖于一个父 RDD 的同一个分区;
窄依赖的节点(RDD)关系如果流水一般,所以当节点失败后只需重新计算父节点的分区即可,宽依赖需要重新计算父节点的多个分区代价是非常昂贵的;
窄依赖 Narrow
宽依赖 Wide
参考资料:
http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf
http://spark.apache.org/docs/latest/programming-guide.html
编译打包
Spark 支持 Maven 与 SBT 两种编译工具,这里使用了 Maven 进行编译打包;
在执行 make-distribution 脚本时它会检查本地是否已经存在 Maven 还有当前 Spark 所依赖的 Scala 版本,如果不存在它会自动帮你下载到 build 目录中并解压使用;Maven 源最好配置成 OSChina 的中央库,这下载依赖包比较快;
耐心等待,我编译过多次所以没有下载依赖包,大概半个小时左右编译完成;注意:如果使用的是 Java 1.8 需要给 JVM 配置堆与非堆内存,如:export MAVEN_OPTS=”-Xmx1.5g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m”;
进入 Spark 根目录下,执行:
./make-distribution.sh --tgz
--tgz 参数是指编译后生成 tgz 包
- PHadoop 支持 Hadoop
-Pyarn : 支持 yarn
-Phive : 支持 hive
--with-tachyon: 支持 tachyon 内存文件系统
-name: 与--tgz 一起用时,name 代替 Hadoop 版本号
./make-distribution.sh --tgz --name 2.6.0 -Pyarn -Phadoop-2.6 -Phive
开始编译检查本地环境,如不存在合适的 Scala 与 Maven 就在后台下载;
编译中:
编译完成并打包生成 tgz:
编译完成后把生成的文件拷贝到当前 Spark 的 dist 目录中并且打包生成 spark-1.5.3-SNAPSHOT-bin-2.2.0.tgz 文件;
Spark 执行不少操作时都依赖于 闭包函数 的调用,此时如果闭包函数使用到了外部变量驱动程序在使用行动操作时传递到集群中各 worker 节点任务时就会进行一系列操作:
1、驱动程序使将闭包中使用变量封装成对象,驱动程序序列化对象,传给 worker 节点任务;
2、worker 节点任务接收到对象,执行闭包函数;
由于使用外部变量势必会通过网络、序列化、反序列化,如外部变量过大或过多使用外部变量将会影响 Spark 程序的性能;
Spark 提供了两种类型的 共享变量(Shared Variables):广播变量(Broadcast Variables)、累加器(Accumulators);
广播变量(Broadcast Variables)
Spark 提供的 广播变量 可以解决闭包函数引用外部大变量引起的性能问题;广播变量将只读变量缓存在每个 worker 节点中,Spark 使用了高效广播算法分发变量从而提高通信性能;如直接在闭包函数中使用外部 变量该变量会缓存在每个任务(jobTask)中如果多个任务同时使用了一个大变量势必会影响到程序性能;
广播变量:每个 worker 节点中缓存一个副本,通过高效广播算法提高传输效率,广播变量是只读的;
Spark Scala Api 与 Java Api 默认使用了 Jdk 自带序列化库,通过使用第三方或使用自定义的序列化库还可以进一步提高广播变量的性能;
广播变量使用示例:
val sc = SparkContext("");
val eigenValue = sc.bradcast(loadEigenValue())
val eigen = computer.map{x =>
val temp = eigenValue.value
...
...
}
左节点不使用广播变量,右使用广播变量
累加器(Accumulators)
累加器可以使得 worker 节点中指定的值聚合到驱动程序中,如统计 Spark 程序执行过程中的事件总数等;
val sc = new SparkContext(...)
val file = sc.textFile("xxx.txt")
val eventCount = sc.accumulator(0,"EventAccumulator") // 累加器初始值为 0
val formatEvent = file.flatMap(line => {if(line.contains("error")){eventCount +=1
}
})
formatEvent.saveAsTextFile("eventData.txt")
println("error event count :" + eventCount);
在使用 累加器(Accumulators)时需要注意,只有在 行动操作 中才会触发累加器,也就是说上述代码中由于 flatMap()为 转换操作 因为 Spark 惰性特征所以只用当 saveAsTextFile() 执行时累加器才会被触发;累加器只有在驱动程序中才可访问,worker 节点中的任务不可访问累加器中的值;
Spark 原生支持了数字类型的的累加器如:Int、Double、Long、Float 等;此外 Spark 还支持自定义累加器用户可以通过继承 AccumulableParam 特征来实现自定义的累加器此外 Spark 还提供了 accumulableCollection()累加集合用于;创建累加器时可以使用名字也可以不是用名字,当使用了名字时在 Spark UI 中可看到当中程序中定义的累加器,广播变量存储级别为 MEMORY_AND_DISK;
Spark 作为分布式的大数据处理框架必然或涉及到大量的作业调度,如果能够理解 Spark 中的调度对我们编写或优化 Spark 程序都是有很大帮助的;
在 Spark 中存在 转换操作(Transformation Operation)与 行动操作 (Action Operation) 两种;而转换操作只是会从一个 RDD 中生成另一个 RDD 且是 lazy 的,Spark 中只有 行动操作(Action Operation)才会触发作业的提交,从而引发作业调度;在一个计算任务中可能会多次调用 转换操作这些操作生成的 RDD 可能存在着依赖关系,而由于转换都是 lazy 所以当行动操作(Action Operation)触发时才会有真正的 RDD 生成,这一系列的 RDD 中就存在着依赖关系形成一个 DAG(Directed Acyclc Graph),在 Spark 中 DAGScheuler 是基于 DAG 的顶层调度模块;
相关名词
Application:使用 Spark 编写的应用程序,通常需要提交一个或多个作业;
Job:在触发 RDD Action 操作时产生的计算作业
Task: 一个分区数据集中最小处理单元也就是真正执行作业的地方
TaskSet: 由多个 Task 所组成没有 Shuffle 依赖关系的任务集
Stage: 一个任务集对应的调度阶段,每个 Job 会被拆分成诺干个 Stage
1.1 作业调度关系图
RDD Action 作业提交流程
这里根据 Spark 源码跟踪触发 Action 操作时触发的 Job 提交流程,Count()是 RDD 中的一个 Action 操作所以调用 Count 时会触发 Job 提交;
在 RDD 源码 count()调用 SparkContext 的 runJob,在 runJob 方法中根据 partitions(分区)大小创建 Arrays 存放返回结果;
RDD.scala
/**
* Return the number of elements in the RDD.
*/
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
SparkContext.scala
def runJob[T, U: ClassTag](rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job:" + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
}
在 SparkContext 中将调用 DAGScheduler 的 runJob 方法提交作业,DAGScheduler 主要任务是计算作业与任务依赖关系,处理调用逻辑;DAGScheduler 提供了 submitJob 与 runJob 方法用于 提交作业,runJob 方法会一直等待作业完成,submitJob 则返回 JobWaiter 对象可以用于判断作业执行结果;
在 runJob 方法中将调用 submitJob,在 submitJob 中把提交操作放入到事件循环队列(DAGSchedulerEventProcessLoop)中;
def submitJob[T, U]( rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
......
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
......
}
在事件循环队列中将调用 eventprocessLoop 的 onReceive 方法;
Stage 拆分
提交作业时 DAGScheduler 会 从 RDD 依赖链尾部开始,遍历整个依赖链划分调度阶段;划分阶段以 ShuffleDependency 为依据,当没有 ShuffleDependency 时整个 Job 只会有一个 Stage;在事件循环队列中将会调用 DAGScheduler 的 handleJobSubmitted 方法,此方法会拆分 Stage、提交 Stage;
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {var finalStage: ResultStage = null
......
finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
......
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
......
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)
submitWaitingStages()}
调度阶段提交
在提交 Stage 时会先 调用 getMissingParentStages 获取父阶段 Stage,迭代该阶段所依赖的父调度阶段如果存在则先提交该父阶段的 Stage 当不存在父 Stage 或父 Stage 执行完成时会对当前 Stage 进行提交;
private def submitStage(stage: Stage) {val jobId = activeJobForStage(stage)
if (jobId.isDefined) {if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {val missing = getMissingParentStages(stage).sortBy(_.id)
if (missing.isEmpty) {submitMissingTasks(stage, jobId.get)
} else {for (parent <- missing) {submitStage(parent)
}
waitingStages += stage
}
}
}
......
}
参考资料:
http://spark.apache.org/docs/latest/
Scala 的详细介绍:请点这里
Scala 的下载地址:请点这里
本文永久更新链接地址:http://www.linuxidc.com/Linux/2016-03/129506.htm