帶你看懂Spark2.x源碼之stage劃分
大家應該知道,我們對作業進行運行時,是通過action運算元來實現job的劃分,而每個job在提交過程中,又是怎樣去處理的呢,今天我給大家介紹一下spark2.x的源碼。
先通過action運算元,調用run job()方法,例如foreach運算元
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
1
2
3
4
然後一直點run job()直到出現如下,dagScheduler中的run job()方法
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD"s recursive dependencies:
" + rdd.toDebugString)
}
//進入spark最核心的DAGScheduler
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
進入提交任務的代碼
def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val start = System.nanoTime
//在這裡會提交一個Job任務,然後會返回一個阻塞的線程等待Job執行完成
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
//下面是根據不同的Job任務執行情況列印不同的Log信息
waiter.completionFuture.value.get match {
case scala.util.Success(_) =>
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
case scala.util.Failure(exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
val callerStackTrace = Thread.currentThread().getStackTrace.tail
exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
throw exception
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
點擊submitJob(),進入提交任務代碼的方法中
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
// Check to make sure we are not launching a task on a partition that does not exist.
//檢查分區是否存在保證Task正常運行
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
}
// 增加一個JobId作當前Job的標識(+1)
val jobId = nextJobId.getAndIncrement()
if (partitions.size == 0) {
// Return immediately if the job is running 0 tasks
// 如果沒有Task任務,將立即返回JobWaiter
return new JobWaiter[U](this, jobId, 0, resultHandler)
}
// 為分區做個判斷,確保分區大於0
assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
// 首先構造一個JobWaiter阻塞線程 等待job完成 然後把完成結果提交給resultHandler
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
// eventProcessLoop是DAGScheduler的事件隊列
// 因為可能集群同時運行著多個Job,而DAGSchduler默認是FIFO先進先出的資源調度
// 這裡傳入的事件類型為JobSubmitted,而在eventProcessLoop會調用doOnReceive
// 來匹配事件類型並執行對應的操作,最終會匹配到dagScheduler.handleJobSubmitted(....)
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
這裡會調用eventProcessLoop,而eventProcessLoop又是DAGSchedulerEventProcessLoop類的實例化,DAGSchedulerEventProcessLoop繼承了EventLoop,EventLoop中有一個事件隊列也就是eventQueue,每次接收到事件放入隊列中,生成線程從eventQueue中取出event事件,使用onRecevie方法執行事件,這時DAGSchedulerEventProcessLoop會調用doOnReceive對接收到的事件進行模式匹配,匹配到handleJobSubmitted
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
//大家注意,這裡是Stage劃分的精髓所在
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
case StageCancelled(stageId, reason) =>
dagScheduler.handleStageCancellation(stageId, reason)
case JobCancelled(jobId, reason) =>
dagScheduler.handleJobCancellation(jobId, reason)
case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)
case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()
case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)
case ExecutorLost(execId, reason) =>
val filesLost = reason match {
case SlaveLost(_, true) => true
case _ => false
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
在handleJobSubmitted中會創造ResultStage也就是最後一個stage
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
// 創建ResultStage,這裡才是真正開始處理提交的job劃分stage的時候
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
// 它會從後往前找遞歸遍歷它的每一個父RDD,從持久化中抽取反之重新計算
// 補充下:stage分為shuffleMapStage和ResultStage兩種
// 每個job都是由1個ResultStage和0+個ShuffleMapStage組成
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
在createResultStage中有調用了getOrCreateParentStage用來得來FinalStage的父stage
private def createResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
// 開始創建ResultStage的父stage
// 裡面有多個嵌套獲取shuffle依賴和循環創建shuffleMapStage,若沒有shuffle操作返回為空List
val parents = getOrCreateParentStages(rdd, jobId)
// 當前的stageId標識+1
val id = nextStageId.getAndIncrement()
// 放入剛剛生成的父stage等核心參數,生成ResultStage
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
// 把ResultStage和它的ID加入stageIdToStage
stageIdToStage(id) = stage
// 更新jobIds和jobIdToStageIds
updateJobIdStageIdMaps(jobId, stage)
// 返回這個ResultStage
stage
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
在getOrCreateParentStage中又調用了getOrCreateShuffleMapStage,由於除了最後一個stage其餘的都是shuffleStage,所以就可以調用這個方法來創造父stage
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
//從getShuffleDependencies開始,
// 這裡僅僅是抽取當前RDD的Shuffle依賴
// (Job的Stage是以Shuffle劃分的,1個Job中只會生成0+個ShuffleMapStage
// 和1個ResultStage),如果不是ShuffleDependency就繼續抽取父RDD...
// 迭代遍歷一直到抽取出為止或者沒有
getShuffleDependencies(rdd).map { shuffleDep =>
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
1
2
3
4
5
6
7
8
9
而在getOrCreateShuffleMapStage中講解了如何去創建MapStage,如果能夠通過傳入的stage提取到父stage就返回父stage,如果不能就提取該stage的依賴關係,最後會創建出寬依賴的shuffleMapStage
private def getOrCreateShuffleMapStage(
shuffleDep: ShuffleDependency[_, _, _],
firstJobId: Int): ShuffleMapStage = {
// 通過從ShuffleDependency提取到的shuffleId來提取shuffleIdToMapStage中的ShuffleMapStage
shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
// 如果能提取到 就直接返回
case Some(stage) =>
stage
// 如果提取不到就會依次找到所有父ShuffleDependencies並且構建所有父ShuffleMapStage
case None =>
// Create stages for all missing ancestor shuffle dependencies.
getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
// Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
// that were not already in shuffleIdToMapStage, it"s possible that by the time we
// get to a particular dependency in the foreach loop, it"s been added to
// shuffleIdToMapStage by the stage creation process for an earlier dependency. See
// SPARK-13902 for more information.
// 根據遍歷出來的所有ShuffleDependencies依次創建所有父ShuffleMapStage
// 接下來進行判斷是否是父stage
if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
createShuffleMapStage(dep, firstJobId)
}
}
// 最後會創建當前ShuffleDependency的ShuffleMapStage
// Finally, create a stage for the given shuffle dependency.
createShuffleMapStage(shuffleDep, firstJobId)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
這時,我們就得到了finalStage的父stage就可以創建出finalStage,當然在之前的代碼中可以看出可能會報異常,如果處理的hdfs上的數據發生改動或刪除就會報錯。當創建出finalStage後,會列印log日誌
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
//TODO 接下來開始進行最重要的操作,就是執行Stage劃分演算法
logInfo("Missing parents: " + getMissingParentStages(finalStage))
1
2
3
4
5
6
接著會對job進行一系列操作,為提交stage做鋪墊
val jobSubmissionTime = clock.getTimeMillis()
// HashMap結構,維護著jobId和jobIdToActiveJob的映射關係
jobIdToActiveJob(jobId) = job
// HashSet結構,維護著所有ActiveJob
activeJobs += job
// finalStage一旦生成就會把封裝自己的ActiveJob註冊到自己的_activeJob上
finalStage.setActiveJob(job)
// 提取出jobId對應的所有StageIds並轉換成數組
val stageIds = jobIdToStageIds(jobId).toArray
// 提取出每個stage的最新嘗試信息,當job啟動時會告知SparkListenersJob
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
//接下來開始提交Stage
submitStage(finalStage)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
在提交stage的方法中,我們可以看出我們先將stage放入,然後對這個stage進行判斷,如果這個stage有尚未提交的parentStage,就繼續調用這個方法提交它的parentStage並將該stage放入到waitingStages中,當它的父stage全部被提交完時才會提交它。這裡會對每個stage進行判斷,若stage沒有尚未提交的父stage,就會調用submitMissingTasks方法提交它的task,當Task提交完後這個stage就提交完成了。注意!!!在之後的提交Task,當stage提交Task提交完後,會調用一個submitWaitingChildStages來提交子stage,在其中對子stage根據id排序後再調用submitStage,所以我們可以說它是遞歸調用方法來提交stage。
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
//當所有的Stage劃分之後,進行Task提交操作
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
先獲取到stage中分區數量,因為一個分區對應一個task,根據stage來生成一系列的task。
val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
// 當匹配到生成的是ShuffleMapStage
case stage: ShuffleMapStage =>
// 首先保證pendingPartitions為空
// pendingPartitions中放的是還沒完成的partition,還沒完成的task
// 如果完成了就會從中清除
// DAGScheduler會用它來確定此state是否已完成
stage.pendingPartitions.clear()
// 開始遍歷操作每個需要計算的分區
partitionsToCompute.map { id =>
// 拿到分區地址
val locs = taskIdToLocations(id)
// 拿到此stage對應的rdd的分區
val part = stage.rdd.partitions(id)
// 加入運行狀態
stage.pendingPartitions += id
// 開始構建ShuffleMapTask對象,之後會通過這個對象調用runTask,
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
}
// 當匹配到ResultStage時生成的是ResultTask
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
// 他也進行封裝一些列參數,然後開始創建ResultTask。
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
}
}
} catch {
case NonFatal(e) =>
abortStage(stage, s"Task creation failed: $e
${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
之後會封裝Task來創建TaskSet,然後將TaskSet進行提交。接下來DagScheduler的工作就結束了,該進入到TaskScheduler的Task分配演算法了。
if (tasks.size > 0) {
logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
//通過TaskScheduler來提交所有的TaskSet
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should mark
// the stage as completed here in case there are no tasks to run
markStageAsFinished(stage, None)
1
2
3
4
5
6
7
8
9
10
11


※「Hadoop」hadoop 文件上傳和下載分析
※2分鐘看懂 Node.js 精髓
TAG:程序員小新人學習 |