當前位置:
首頁 > 知識 > 帶你看懂Spark2.x源碼之stage劃分

帶你看懂Spark2.x源碼之stage劃分

大家應該知道,我們對作業進行運行時,是通過action運算元來實現job的劃分,而每個job在提交過程中,又是怎樣去處理的呢,今天我給大家介紹一下spark2.x的源碼。

先通過action運算元,調用run job()方法,例如foreach運算元

def foreach(f: T => Unit): Unit = withScope {

val cleanF = sc.clean(f)

sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))

}

1

2

3

4

然後一直點run job()直到出現如下,dagScheduler中的run job()方法

def runJob[T, U: ClassTag](

rdd: RDD[T],

func: (TaskContext, Iterator[T]) => U,

partitions: Seq[Int],

resultHandler: (Int, U) => Unit): Unit = {

if (stopped.get()) {

throw new IllegalStateException("SparkContext has been shutdown")

}

val callSite = getCallSite

val cleanedFunc = clean(func)

logInfo("Starting job: " + callSite.shortForm)

if (conf.getBoolean("spark.logLineage", false)) {

logInfo("RDD"s recursive dependencies:
" + rdd.toDebugString)

}

//進入spark最核心的DAGScheduler

dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)

progressBar.foreach(_.finishAll())

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

進入提交任務的代碼

def runJob[T, U](

rdd: RDD[T],

func: (TaskContext, Iterator[T]) => U,

partitions: Seq[Int],

callSite: CallSite,

resultHandler: (Int, U) => Unit,

properties: Properties): Unit = {

val start = System.nanoTime

//在這裡會提交一個Job任務,然後會返回一個阻塞的線程等待Job執行完成

val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)

ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)

//下面是根據不同的Job任務執行情況列印不同的Log信息

waiter.completionFuture.value.get match {

case scala.util.Success(_) =>

logInfo("Job %d finished: %s, took %f s".format

(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))

case scala.util.Failure(exception) =>

logInfo("Job %d failed: %s, took %f s".format

(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))

// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.

val callerStackTrace = Thread.currentThread().getStackTrace.tail

exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)

throw exception

}

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

點擊submitJob(),進入提交任務代碼的方法中

def submitJob[T, U](

rdd: RDD[T],

func: (TaskContext, Iterator[T]) => U,

partitions: Seq[Int],

callSite: CallSite,

resultHandler: (Int, U) => Unit,

properties: Properties): JobWaiter[U] = {

// Check to make sure we are not launching a task on a partition that does not exist.

//檢查分區是否存在保證Task正常運行

val maxPartitions = rdd.partitions.length

partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>

throw new IllegalArgumentException(

"Attempting to access a non-existent partition: " + p + ". " +

"Total number of partitions: " + maxPartitions)

}

// 增加一個JobId作當前Job的標識(+1)

val jobId = nextJobId.getAndIncrement()

if (partitions.size == 0) {

// Return immediately if the job is running 0 tasks

// 如果沒有Task任務,將立即返回JobWaiter

return new JobWaiter[U](this, jobId, 0, resultHandler)

}

// 為分區做個判斷,確保分區大於0

assert(partitions.size > 0)

val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]

// 首先構造一個JobWaiter阻塞線程 等待job完成 然後把完成結果提交給resultHandler

val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)

// eventProcessLoop是DAGScheduler的事件隊列

// 因為可能集群同時運行著多個Job,而DAGSchduler默認是FIFO先進先出的資源調度

// 這裡傳入的事件類型為JobSubmitted,而在eventProcessLoop會調用doOnReceive

// 來匹配事件類型並執行對應的操作,最終會匹配到dagScheduler.handleJobSubmitted(....)

eventProcessLoop.post(JobSubmitted(

jobId, rdd, func2, partitions.toArray, callSite, waiter,

SerializationUtils.clone(properties)))

waiter

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

這裡會調用eventProcessLoop,而eventProcessLoop又是DAGSchedulerEventProcessLoop類的實例化,DAGSchedulerEventProcessLoop繼承了EventLoop,EventLoop中有一個事件隊列也就是eventQueue,每次接收到事件放入隊列中,生成線程從eventQueue中取出event事件,使用onRecevie方法執行事件,這時DAGSchedulerEventProcessLoop會調用doOnReceive對接收到的事件進行模式匹配,匹配到handleJobSubmitted

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {

case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>

//大家注意,這裡是Stage劃分的精髓所在

dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>

dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)

case StageCancelled(stageId, reason) =>

dagScheduler.handleStageCancellation(stageId, reason)

case JobCancelled(jobId, reason) =>

dagScheduler.handleJobCancellation(jobId, reason)

case JobGroupCancelled(groupId) =>

dagScheduler.handleJobGroupCancelled(groupId)

case AllJobsCancelled =>

dagScheduler.doCancelAllJobs()

case ExecutorAdded(execId, host) =>

dagScheduler.handleExecutorAdded(execId, host)

case ExecutorLost(execId, reason) =>

val filesLost = reason match {

case SlaveLost(_, true) => true

case _ => false

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

在handleJobSubmitted中會創造ResultStage也就是最後一個stage

private[scheduler] def handleJobSubmitted(jobId: Int,

finalRDD: RDD[_],

func: (TaskContext, Iterator[_]) => _,

partitions: Array[Int],

callSite: CallSite,

listener: JobListener,

properties: Properties) {

// 創建ResultStage,這裡才是真正開始處理提交的job劃分stage的時候

var finalStage: ResultStage = null

try {

// New stage creation may throw an exception if, for example, jobs are run on a

// HadoopRDD whose underlying HDFS files have been deleted.

// 它會從後往前找遞歸遍歷它的每一個父RDD,從持久化中抽取反之重新計算

// 補充下:stage分為shuffleMapStage和ResultStage兩種

// 每個job都是由1個ResultStage和0+個ShuffleMapStage組成

finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)

} catch {

case e: Exception =>

logWarning("Creating new stage failed due to exception - job: " + jobId, e)

listener.jobFailed(e)

return

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

在createResultStage中有調用了getOrCreateParentStage用來得來FinalStage的父stage

private def createResultStage(

rdd: RDD[_],

func: (TaskContext, Iterator[_]) => _,

partitions: Array[Int],

jobId: Int,

callSite: CallSite): ResultStage = {

// 開始創建ResultStage的父stage

// 裡面有多個嵌套獲取shuffle依賴和循環創建shuffleMapStage,若沒有shuffle操作返回為空List

val parents = getOrCreateParentStages(rdd, jobId)

// 當前的stageId標識+1

val id = nextStageId.getAndIncrement()

// 放入剛剛生成的父stage等核心參數,生成ResultStage

val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)

// 把ResultStage和它的ID加入stageIdToStage

stageIdToStage(id) = stage

// 更新jobIds和jobIdToStageIds

updateJobIdStageIdMaps(jobId, stage)

// 返回這個ResultStage

stage

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

在getOrCreateParentStage中又調用了getOrCreateShuffleMapStage,由於除了最後一個stage其餘的都是shuffleStage,所以就可以調用這個方法來創造父stage

private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {

//從getShuffleDependencies開始,

// 這裡僅僅是抽取當前RDD的Shuffle依賴

// (Job的Stage是以Shuffle劃分的,1個Job中只會生成0+個ShuffleMapStage

// 和1個ResultStage),如果不是ShuffleDependency就繼續抽取父RDD...

// 迭代遍歷一直到抽取出為止或者沒有

getShuffleDependencies(rdd).map { shuffleDep =>

getOrCreateShuffleMapStage(shuffleDep, firstJobId)

}.toList

1

2

3

4

5

6

7

8

9

而在getOrCreateShuffleMapStage中講解了如何去創建MapStage,如果能夠通過傳入的stage提取到父stage就返回父stage,如果不能就提取該stage的依賴關係,最後會創建出寬依賴的shuffleMapStage

private def getOrCreateShuffleMapStage(

shuffleDep: ShuffleDependency[_, _, _],

firstJobId: Int): ShuffleMapStage = {

// 通過從ShuffleDependency提取到的shuffleId來提取shuffleIdToMapStage中的ShuffleMapStage

shuffleIdToMapStage.get(shuffleDep.shuffleId) match {

// 如果能提取到 就直接返回

case Some(stage) =>

stage

// 如果提取不到就會依次找到所有父ShuffleDependencies並且構建所有父ShuffleMapStage

case None =>

// Create stages for all missing ancestor shuffle dependencies.

getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>

// Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies

// that were not already in shuffleIdToMapStage, it"s possible that by the time we

// get to a particular dependency in the foreach loop, it"s been added to

// shuffleIdToMapStage by the stage creation process for an earlier dependency. See

// SPARK-13902 for more information.

// 根據遍歷出來的所有ShuffleDependencies依次創建所有父ShuffleMapStage

// 接下來進行判斷是否是父stage

if (!shuffleIdToMapStage.contains(dep.shuffleId)) {

createShuffleMapStage(dep, firstJobId)

}

}

// 最後會創建當前ShuffleDependency的ShuffleMapStage

// Finally, create a stage for the given shuffle dependency.

createShuffleMapStage(shuffleDep, firstJobId)

}

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

這時,我們就得到了finalStage的父stage就可以創建出finalStage,當然在之前的代碼中可以看出可能會報異常,如果處理的hdfs上的數據發生改動或刪除就會報錯。當創建出finalStage後,會列印log日誌

logInfo("Got job %s (%s) with %d output partitions".format(

job.jobId, callSite.shortForm, partitions.length))

logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")

logInfo("Parents of final stage: " + finalStage.parents)

//TODO 接下來開始進行最重要的操作,就是執行Stage劃分演算法

logInfo("Missing parents: " + getMissingParentStages(finalStage))

1

2

3

4

5

6

接著會對job進行一系列操作,為提交stage做鋪墊

val jobSubmissionTime = clock.getTimeMillis()

// HashMap結構,維護著jobId和jobIdToActiveJob的映射關係

jobIdToActiveJob(jobId) = job

// HashSet結構,維護著所有ActiveJob

activeJobs += job

// finalStage一旦生成就會把封裝自己的ActiveJob註冊到自己的_activeJob上

finalStage.setActiveJob(job)

// 提取出jobId對應的所有StageIds並轉換成數組

val stageIds = jobIdToStageIds(jobId).toArray

// 提取出每個stage的最新嘗試信息,當job啟動時會告知SparkListenersJob

val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))

listenerBus.post(

SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))

//接下來開始提交Stage

submitStage(finalStage)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

在提交stage的方法中,我們可以看出我們先將stage放入,然後對這個stage進行判斷,如果這個stage有尚未提交的parentStage,就繼續調用這個方法提交它的parentStage並將該stage放入到waitingStages中,當它的父stage全部被提交完時才會提交它。這裡會對每個stage進行判斷,若stage沒有尚未提交的父stage,就會調用submitMissingTasks方法提交它的task,當Task提交完後這個stage就提交完成了。注意!!!在之後的提交Task,當stage提交Task提交完後,會調用一個submitWaitingChildStages來提交子stage,在其中對子stage根據id排序後再調用submitStage,所以我們可以說它是遞歸調用方法來提交stage。

private def submitStage(stage: Stage) {

val jobId = activeJobForStage(stage)

if (jobId.isDefined) {

logDebug("submitStage(" + stage + ")")

if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {

val missing = getMissingParentStages(stage).sortBy(_.id)

logDebug("missing: " + missing)

if (missing.isEmpty) {

logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")

//當所有的Stage劃分之後,進行Task提交操作

submitMissingTasks(stage, jobId.get)

} else {

for (parent <- missing) {

submitStage(parent)

}

waitingStages += stage

}

}

} else {

abortStage(stage, "No active job for stage " + stage.id, None)

}

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

先獲取到stage中分區數量,因為一個分區對應一個task,根據stage來生成一系列的task。

val tasks: Seq[Task[_]] = try {

val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()

stage match {

// 當匹配到生成的是ShuffleMapStage

case stage: ShuffleMapStage =>

// 首先保證pendingPartitions為空

// pendingPartitions中放的是還沒完成的partition,還沒完成的task

// 如果完成了就會從中清除

// DAGScheduler會用它來確定此state是否已完成

stage.pendingPartitions.clear()

// 開始遍歷操作每個需要計算的分區

partitionsToCompute.map { id =>

// 拿到分區地址

val locs = taskIdToLocations(id)

// 拿到此stage對應的rdd的分區

val part = stage.rdd.partitions(id)

// 加入運行狀態

stage.pendingPartitions += id

// 開始構建ShuffleMapTask對象,之後會通過這個對象調用runTask,

new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,

taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),

Option(sc.applicationId), sc.applicationAttemptId)

}

// 當匹配到ResultStage時生成的是ResultTask

case stage: ResultStage =>

partitionsToCompute.map { id =>

val p: Int = stage.partitions(id)

val part = stage.rdd.partitions(p)

val locs = taskIdToLocations(id)

// 他也進行封裝一些列參數,然後開始創建ResultTask。

new ResultTask(stage.id, stage.latestInfo.attemptId,

taskBinary, part, locs, id, properties, serializedTaskMetrics,

Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)

}

}

} catch {

case NonFatal(e) =>

abortStage(stage, s"Task creation failed: $e
${Utils.exceptionString(e)}", Some(e))

runningStages -= stage

return

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

之後會封裝Task來創建TaskSet,然後將TaskSet進行提交。接下來DagScheduler的工作就結束了,該進入到TaskScheduler的Task分配演算法了。

if (tasks.size > 0) {

logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +

s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")

//通過TaskScheduler來提交所有的TaskSet

taskScheduler.submitTasks(new TaskSet(

tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))

stage.latestInfo.submissionTime = Some(clock.getTimeMillis())

} else {

// Because we posted SparkListenerStageSubmitted earlier, we should mark

// the stage as completed here in case there are no tasks to run

markStageAsFinished(stage, None)

1

2

3

4

5

6

7

8

9

10

11

帶你看懂Spark2.x源碼之stage劃分

打開今日頭條,查看更多精彩圖片
喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 程序員小新人學習 的精彩文章:

「Hadoop」hadoop 文件上傳和下載分析
2分鐘看懂 Node.js 精髓

TAG:程序員小新人學習 |