劉彬同學準備寫一系列spark實戰系列,本文是第一篇,SparkContext初始化內部原理!贊!推薦給大家,希望大家喜歡和支持!
如果編寫Spark程序,那麼第??代碼就是new SparkContext().setMaster(「」).setAppName(「xx」),可以說SparkContext是整個Spark 計算的啟動器,只有將sparkContext 啟動起來,後續的關於調 度,存儲,計算相關的操作才能夠運?,本?基於spark2.x源碼概述關於SparkContext??所包含的啟動項都有哪些以及這些啟動項的作?是什麼,之後在說?下關於SparkEnv環境創建的過程。
閱讀本?最好打開spark源碼參考著看,可以在git上?打開spark關於sparkContext的代碼,地址為:https://github.com/apache/spark/blob/c5f9b89dda40ffaa4622a7ba2b3d0605dbe815c0/core/src/main/scala/org/apache/spark/SparkContext.scala#L73
01
SparkContext內部組件:
如圖為SparkContext內部的?些組件:
SparkEnv :Spark運?時環境,Spark 中任務執?是通過Executor,所有的Executor都有??的執?環境SparkEnv,在Driver中也包含了SparkEnv,為了保證Local模式的運?,SparkEnv內部還提 供了不同的組件,來實現不同的功能
LiveListenerBus:SparkContext中的事件匯流排,可以接收各個使?者的事件,非同步將SparkListenerevent傳遞給註冊的SparkListener
Spark UI :Spark的?戶界?,SparkUI間接依賴於計算引擎,調度引擎,存儲引擎,Job,Stage,Executor等組件的監控都會以SparkListenerEvent的形式傳遞給LiveListenerBus,SparkUI將從各 個SparkListener中讀取數據並顯?在web界?
SparkStatusTracker:?於監控作業和Stage進度狀態的低級API
ConsoleProgressBar :定期從sc.statusTracker獲得active stage的狀態信息,展?到進度條[在SparkUI上?可以看到進度條],會有?定的延時。內部有?個timer 500ms refresh?遍
DAGScheduler:DAG調度器,是Spark調度系統中重要的組件之?,負責創建Job,將DAG的RDD劃分到不同的Stage,提交stage等,SparkUI中有關Job和Stage監控數據都來?DAGScheduer
TaskScheduler:Task調度器,是Spark調度系統中重要的組件之?,負責將任務發送到集群,運?,如果有失敗的任務則重新執?,之後返回給DAGScheduler,TaskScheduler調度的Task是由 DAGScheduler創建的,所以DAGScheduler是TaskScheduler前置調度。
HeatbeatReceiver:?跳接收器,所有的Executor都會向HeatbeatReceiver發送?跳信息,HeatbeatReceiver接收到?跳之後,先更新Executor最後可?時間,然後將此信息交給TaskScheduler。
ContextCleaner:非同步清理RDD、shuffle和?播狀態信息
EventLoggingListener:將事件持久化到存儲的監聽器,是SparkContext的可選組件,當spark.eventLog.enable
ExecutorAllocationManager: Executor動態分配管理器,根據?作負載動態調整Executor數量,當在配置spark.dynamicAlloction.enabled屬性為true的情況下,在?local模式下或者 spark.dynamicAllcation.testing屬性為true時啟?
ShutdownHookManager:設置關閉鉤?的管理器,可以給應?設置鉤?,這樣當JVM退出的時候就會執?清理?作
除了以上這些SparkContext包含的內部組件,還包括如下?些屬性:
creationSite:CallSite類型,保存著線程棧中最靠近棧頂的?戶定義的類和最靠近棧底的Scala或者Spark核?類的信息,其中ShortForm屬性保存著上述信息的間斷描述,LongForm屬性保存著上述 信息的完整描述,具體的信息可以參閱源碼部分地址為:core/src/main/scala/org/apache/spark/util/Utils.scala/getCallSite
allowMulitipleContext : 是否允許多個SparkContext實例,默認為False,可以通過設置Spark.Driver.allowMulitipleContexts來控制
startTime:標記sparkContext的啟動時間戳
stopped:標記sparkContext是否處於停?狀態,采?原?類型AtomicBoolean
addedFiles:?於每個本地?件的URL與添加此?件到到addedFiles時的時間戳之間的映射緩存 new ConcurrentHashMap[String, Long]
addedJars:?於每個本地Jar?件的URL與添加此?件到addedJars時的時間戳之間的映射緩存 new ConcurrentHashMap[String, Long]
persistentRdds:?於對所有持久化的RDD保持跟蹤
executorEnvs:?於存儲環境變數,將?於Executor執?的時候使?
sparkUser:當前系統的登錄?戶,可以通過環境變數SPARK_USER來設置 通過Utils.getCurrentUserName()獲取
checkpointDir:RDD計算過程中?於記錄RDD檢查點的?錄
localProperties:InheritableThreadLocal保護的線程,其中的屬性值可以沿著線程棧?直傳遞下去
_conf:SparkContext的配置,會先調?config的clone?法,在進?驗證配置,是否設置了spark.master和spark.app.name
jars:?戶提交的jar?件,當選擇部署模式為yarn時,
_eventLogDir:事件?志的路徑,當spark.enabled屬性為true時啟?,默認為/tmp/spark-events,也可以通過spark.eventLog.dir來指定?錄 _eventLogCoder:事件?志的壓縮演算法,當spark.eventLog.enabled屬性與spark.eventLog.compress屬性為true時,壓縮演算法默認為lz4,也可以通過spark.io.compression.codec屬性指定,?前?持lzf,snappy和lz4
_hadoopConfiguration:Hadoop配置信息,如果系統屬性SPARK_YARN_MODE為true或者環境變數SPARK_YARN_MODEL為true,那麼將會是YARN的配置,否則為Hadoop的配置
_executorMemtory:Executor內存??,默認為1024MB,可以通過設置環境變數(SPARK_MEM或者SPARK_EXECUTOR_MEMORY)或者Spark.executor.memory屬性指定其中Spark.executor.memory優先順序最?
_applicationId:當前應?的標識,TaskScheduler啟動後會創建應?標識,通過調取TaskScheduler的ApplicationId獲取的
_applicationAttempId:當前應?嘗試執?的標識,SparkDriver在執?時會多次嘗試,每次嘗試都會?成?個標識來代表應?嘗試執?的?份
_listenerBusStarted:LiveListenerBus是否已經啟動的標記
nextShuffleId:類型為AtomicInteger,?於?成下?個shuffle標識
nextRddId:類型為atomicInteger,?於?成下?個rdd標識
02
初始化具體流程
創建SparkEnv
在Spark中,需要執?任務的地?就需要SparkEnv,在?產環境中,Spark往往運?於不同節點的Execute中,SparkEnv中的createDriverEnv?於創建SparkEnv,之後sparkEnv的實例通過set 設置到SparkEnv伴?對象env屬性中,然後在需要?到sparkEnv的地?直接通過伴?對象get獲取SparkEnv
創建?跳接受器(HeatbeatReceiver)
在Sparklocal運?模式中,driver和executor在同?個節點同?個進程中,所以driver和executor可以本地交互調?,但是在分散式的環境中,driver和executor往往運?在不同的節點不同 的進程中,driver就?法監控executor的信息了,所以driver端創建了?跳接收器,那麼?跳接收器是如何創建的。 ?先通過SparkEnv的NettyRpcEnv(基於NettyRPC)的setupEndPoint?法,然後向Dispatcher註冊HeartbeatReceiver,並返回HeartbeatReceiver的NettyRpcEndPointRef的引?
.創建和啟動調度系統
Spark調度系統主要分為TaskScheduler和DAGScheduler,TaskScheduler負責請求集群管理器給應?程序分配並運?Executor並給Task分配Executor並執?,DAGScheduler主要?於在任務交 給TaskSchduler執?之前做?些準備?作,?如創建Job,將DAG的RDD劃分到不同的Stage,提交Stage等,如代碼:
val (sched, ts) = SparkContext. createTaskScheduler( this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler( this)
_heartbeatReceiver. ask[ Boolean]( TaskSchedulerIsSet)
SparkContext.createTaskScheduler?法?於創建和啟動TaskScheduler,針對不同的部署模式創建調度器的?式也不同,在代碼中,_schedulerBackend表?SchedulerBackend的引?, _taskScheduler表?TaskScheduler的引?,在TaskScheduler中還會創建DAGScheduler的實例,最後向_heartbeatReceiver發送TaskSchedulerSet的消息,HeartbeatReceiver接收到之後將獲取 SparkContext的_taskScheduler屬性設置到??的Schduler屬性中
創建Executor動態分配管理器
ExecutorAllocationManager: Executor動態分配管理器,根據?作負載動態調整Executor數量,當在配置spark.dynamicAlloction.enabled屬性為true的情況下,在?local模式下或者 spark.dynamicAllcation.testing屬性為true時啟?
ExecutorAllocationManager內部會定期的根據負載計算所需的Executor數量,如果Executor需求數量?於之前向集群管理器申請的數量,那麼向集群管理器申請添加executor數量,反之,如果 executor需求數量?於之前向集群管理器申請的數量,那麼向集群管理器申請減少executor。此外,ExecutorAllocationManager還會定期向集群管理器申請移除已經過期的executor
創建和啟動ContextCleaner
ContextCleaner:非同步清理RDD、shuffle和?播狀態信息
通過配置spark.cleaner.referenceTracking(默認為true)來決定是否啟?ContextCleaner
ContextCleaner的組成:
referencesQueue:緩存頂級的AnyRef引?
referencesBuffer:緩存AnyRef的虛引?
listeners:緩存清理?作中的監聽器數組
cleaningThread:清理具體?作的線程,此線程為守護線程
periodicGCService:?於執?GC的調度線程池
periodicGCInterval:執?GC的時間間隔,可通過spark.cleaner.periodicGC.interval配置,默認30分鐘
blockOnCleanUpTasks:清理?shuffle數據是否是阻塞的,可通過配置spark.cleaner.referenceTracking.blocking配置,默認是true
blockOnShuffleCleanUpTasks:清理shuffle數據是否是阻塞的,可通過配置spark.cleaner.referenceTracking.blocking.shuffle ,默認是false
stoped:標記contextCleaner是否停?狀態
以上可以在github上打開spark源碼進?邊看?章邊看源碼,你會受益良多。 在這?推薦?個github源碼閱讀插件Insight.io for Github 在chrome擴展程序里可以直接查詢。
加入技術討論群
《大數據和雲計算技術》社區人數已經2500+,歡迎大家加下面助手微信,拉大家進群,自由交流。
喜歡釘釘群的,可以掃描下面二維碼:
喜歡QQ群的,可以掃描下面二維碼:
歡迎大家通過二維碼打賞支持技術社區(英雄請留名,社區感謝您,打賞次數超過55+):
喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!
本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧! 請您繼續閱讀更多來自 Hadoop技術學習 的精彩文章: