共计 4028 个字符,预计需要花费 11 分钟才能阅读完成。
Spark 作为分布式的大数据处理框架必然或涉及到大量的作业调度,如果能够理解 Spark 中的调度对我们编写或优化 Spark 程序都是有很大帮助的;
在 Spark 中存在 转换操作(Transformation Operation)与 行动操作 (Action Operation) 两种;而转换操作只是会从一个 RDD 中生成另一个 RDD 且是 lazy 的,Spark 中只有 行动操作(Action Operation)才会触发作业的提交,从而引发作业调度;在一个计算任务中可能会多次调用 转换操作这些操作生成的 RDD 可能存在着依赖关系,而由于转换都是 lazy 所以当行动操作(Action Operation)触发时才会有真正的 RDD 生成,这一系列的 RDD 中就存在着依赖关系形成一个 DAG(Directed Acyclc Graph),在 Spark 中 DAGScheuler 是基于 DAG 的顶层调度模块;
相关名词
Application:使用 Spark 编写的应用程序,通常需要提交一个或多个作业;
Job:在触发 RDD Action 操作时产生的计算作业
Task: 一个分区数据集中最小处理单元也就是真正执行作业的地方
TaskSet: 由多个 Task 所组成没有 Shuffle 依赖关系的任务集
Stage: 一个任务集对应的调度阶段,每个 Job 会被拆分成诺干个 Stage
1.1 作业调度关系图
RDD Action 作业提交流程
这里根据 Spark 源码跟踪触发 Action 操作时触发的 Job 提交流程,Count()是 RDD 中的一个 Action 操作所以调用 Count 时会触发 Job 提交;
在 RDD 源码 count()调用 SparkContext 的 runJob,在 runJob 方法中根据 partitions(分区)大小创建 Arrays 存放返回结果;
RDD.scala
/**
* Return the number of elements in the RDD.
*/
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
SparkContext.scala
def runJob[T, U: ClassTag](rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job:" + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
}
在 SparkContext 中将调用 DAGScheduler 的 runJob 方法提交作业,DAGScheduler 主要任务是计算作业与任务依赖关系,处理调用逻辑;DAGScheduler 提供了 submitJob 与 runJob 方法用于 提交作业,runJob 方法会一直等待作业完成,submitJob 则返回 JobWaiter 对象可以用于判断作业执行结果;
在 runJob 方法中将调用 submitJob,在 submitJob 中把提交操作放入到事件循环队列(DAGSchedulerEventProcessLoop)中;
def submitJob[T, U]( rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
......
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
......
}
在事件循环队列中将调用 eventprocessLoop 的 onReceive 方法;
Stage 拆分
提交作业时 DAGScheduler 会 从 RDD 依赖链尾部开始,遍历整个依赖链划分调度阶段;划分阶段以 ShuffleDependency 为依据,当没有 ShuffleDependency 时整个 Job 只会有一个 Stage;在事件循环队列中将会调用 DAGScheduler 的 handleJobSubmitted 方法,此方法会拆分 Stage、提交 Stage;
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {var finalStage: ResultStage = null
......
finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
......
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
......
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)
submitWaitingStages()}
调度阶段提交
在提交 Stage 时会先 调用 getMissingParentStages 获取父阶段 Stage,迭代该阶段所依赖的父调度阶段如果存在则先提交该父阶段的 Stage 当不存在父 Stage 或父 Stage 执行完成时会对当前 Stage 进行提交;
private def submitStage(stage: Stage) {val jobId = activeJobForStage(stage)
if (jobId.isDefined) {if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {val missing = getMissingParentStages(stage).sortBy(_.id)
if (missing.isEmpty) {submitMissingTasks(stage, jobId.get)
} else {for (parent <- missing) {submitStage(parent)
}
waitingStages += stage
}
}
}
......
}
参考资料:
http://spark.apache.org/docs/latest/
更多 Spark 相关教程见以下内容:
CentOS 7.0 下安装并配置 Spark http://www.linuxidc.com/Linux/2015-08/122284.htm
Spark1.0.0 部署指南 http://www.linuxidc.com/Linux/2014-07/104304.htm
CentOS 6.2(64 位)下安装 Spark0.8.0 详细记录 http://www.linuxidc.com/Linux/2014-06/102583.htm
Spark 简介及其在 Ubuntu 下的安装使用 http://www.linuxidc.com/Linux/2013-08/88606.htm
安装 Spark 集群(在 CentOS 上) http://www.linuxidc.com/Linux/2013-08/88599.htm
Hadoop vs Spark 性能对比 http://www.linuxidc.com/Linux/2013-08/88597.htm
Spark 安装与学习 http://www.linuxidc.com/Linux/2013-08/88596.htm
Spark 并行计算模型 http://www.linuxidc.com/Linux/2012-12/76490.htm
Ubuntu 14.04 LTS 安装 Spark 1.6.0(伪分布式)http://www.linuxidc.com/Linux/2016-03/129068.htm
Spark 的详细介绍:请点这里
Spark 的下载地址:请点这里
本文永久更新链接地址:http://www.linuxidc.com/Linux/2016-03/129504.htm