當前位置:
首頁 > 知識 > 業餘草教你解讀Spark源碼閱讀之HistoryServer

業餘草教你解讀Spark源碼閱讀之HistoryServer

HistoryServer服務可以讓用戶通過SparkUI界面,查看歷史應用(已經執行完的應用)的執行細節,比如job信息、stage信息、task信息等,該功能是基於spark eventlogs日誌文件的,所以必須打開eventlogs日誌開關,關於日誌開關的打開和HistoryServer服務的啟動方法這裡不再講述,下面進入正題

下面使用的spark版本是2.0.2

類結構圖

Web相關

業餘草教你解讀Spark源碼閱讀之HistoryServer

數據流相關

業餘草教你解讀Spark源碼閱讀之HistoryServer

相關類及特質

WebUI

Web Server服務中UI層次結構的最頂層。每一個WebUI包含了一個tabs的集合,而每一個tab又包含了一個pages的集合。tabs頁是可選的,而且WebUI也可以直接添加page

繼承該特質的有SparkUI、MasterWebUI、WorkerWebUI和HistoryServer,在這裡我們主要介紹HistoryServer

WebUITab

一個tab包含了一個pages的集合。prefix通過追加到parent的url組成一個完整的url path,而且不能包含斜杠

繼承該特質有JobsTab、StagesTab、ExecutorsTab、StorageTab等(這裡沒有列全),對應於Spark UI界面上的Jobs、Stages、Executors、Storage等Tab頁

WebUIPage

一個page表示UI層次結構中的葉子節點。WebUIPage的直接父類即可以是WebUI,也可以是WebUITab。

如果父類是WebUI,prefix追加到parent的url形成完整的url path,如果父類是WebUITab,prefix追加到parent的prefix形成一個相對url path。Prefix中不能包含斜杠

繼承該特質的有JobPage、StagePage、ExecutionPage、StoragePage等,對應於Tab頁中具體的Page

HistoryPage

繼承至WebUIPage,通過render函數渲染生成history頁面

UIRoot

該特質被根容器(HistoryServer、SparkUI)繼承,用來為它們提供獲取application信息的統一介面

HistoryServer

def main(argStrings: Array[String]): Unit = {
……
val providerName = conf.getOption("spark.history.provider")
.getOrElse(classOf[FsHistoryProvider].getName)
val provider = Utils.classForName(providerName)
.getConstructor(classOf[SparkConf])
.newInstance(conf)
.asInstanceOf[ApplicationHistoryProvider]
val port = conf.getInt("spark.history.ui.port", 18080)
val server = new HistoryServer(conf, provider, securityManager, port)
server.bind
ShutdownHookManager.addShutdownHook { => server.stop }
// Wait until the end of the world... or if the HistoryServer process is manually stopped
while(true) { Thread.sleep(Int.MaxValue) }
}

HistoryServer繼承至WebUI,啟動的時候,會將環境配置以及provider作為成員變數來初始化HistoryServer實例,其中provider用來提供application的信息供web展示使用,HistoryServer實例化後執行bind函數,啟動jetty,將HTTP服務與web介面綁定,這時候historyserver web服務已經啟動了,之後添加了關閉server鉤子函數後進入無限循環等待

在HistoryServer實例化的過程中,會執行initialize函數,

def initialize {
attachPage(new HistoryPage(this))
attachHandler(ApiRootResource.getServletHandler(this))
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
val contextHandler = new ServletContextHandler
contextHandler.setContextPath(HistoryServer.UI_PATH_PREFIX)
contextHandler.addServlet(new ServletHolder(loaderServlet), "/*")
attachHandler(contextHandler)
}

在該函數中,首先通過attachPage函數在UI中添加了HistoryPage實例,該實例負責渲染生成history page,然後通過attachHandler添加了不同的handler,可以訪問url路由獲取對應的信息,其中ApiRootResource提供了api/vi/開頭的路由,通過該路由,history page可以獲取後台解析出的eventlog信息用以呈現,數據通過UIRoot提供的介面獲取

到這裡,HistoryServer的Web端基本構建完成

HsitoryServer數據緩存及獲取

數據緩存主要通過使用google緩存機制LoadingCache實現,關於LoadingCache在Spark HistoryServer中的運用在另外一篇文章中分析

FsHistoryProvider

前面完成了web結構的構建,接下來就需要提供介面獲取歷史application的信息來呈現,而FsHistoryProvider就是這個介面,作為成員變數傳遞給HistoryServer。這個類在實例化的時候,執行了initialize函數,在該函數中,首先會檢查hdfs是否處於安全模式,如果處於安全模式,則會等待至退出安全模式,如果不處於安全模式,則走進startPolling函數,在該函數中會讀取配置的eventlog路徑(默認為file:/tmp/spark-events,通過spark.history.fs.logDirectory配置),然後啟動一個線程不斷掃描該路徑下的eventlog文件,將文件解析後載入到內存中供web查詢使用,相關函數如下:

private val pool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder
.setNameFormat("spark-history-task-%d").setDaemon(true).build)

pool.scheduleWithFixedDelay(getRunner(checkForLogs), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS)

if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) {
pool.scheduleWithFixedDelay(getRunner(cleanLogs), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS)
}

另外如果配置了清理開關(默認一天清理一次),則會清理內存中超時的application信息,並刪除超時且已完成的文件,載入和清理這兩個動作由同一個線程完成,以防止衝突。

for (file <- logInfos) { tasks += replayExecutor.submit(new Runnable { override def run: Unit = mergeApplicationListing(file) }) }

在checkForLogs函數中,會首先檢查文件是否有更新,已經掃描過的文件保存在一個以文件名為key的映射中fileToAppInfo,如果文件不在這個映射中,或者存在這個映射中但是文件大小變大了,則將此文件加入到載入列表中,隨後進行解析。解析的過程是採用一個固定線程數的線程池replayExecutor對需要載入的文件進行解析,每解析完一個文件,會將此文件的信息更新至fileToAppInfo,這個過程在mergeApplicationListing函數中完成,另外pendingReplayTasksCount中保存了等待解析的文件數目,所有文件解析完成後,更新一下解析完成時間

private def replay(
eventLog: FileStatus,
appCompleted: Boolean,
bus: ReplayListenerBus,
eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): ApplicationEventListener = {
val logPath = eventLog.getPath
logInfo(s"Replaying log path: $logPath")
val logInput = EventLoggingListener.openEventLog(logPath, fs)
try {
val appListener = new ApplicationEventListener
bus.addListener(appListener)
bus.replay(logInput, logPath.toString, !appCompleted, eventsFilter)
appListener
} finally {
logInput.close
}
}

在mergeApplicationListing函數中,主要通過執行reply函數將eventlog日誌文件解析出來,在該函數中,首先將ApplicationEventListener監聽器加入到ReplayListenerBus實例中,ReplayListenerBus主要通過調用該實例的replay函數從eventlog記錄中解析event事件,每解析一個event,都會發通知到各監聽器處理event,在這裡通過監聽者模式將日誌解析與結果處理兩個過程解耦開。執行完reply函數後,也就完成了一個eventlog文件的解析,如果解析成功,則將該eventlog的信息加入到fileToAppInfo,表明已經掃描過該文件

在cleanLogs函數中,會在log directory中刪除已經任務執行完成且超時的文件。歡迎關注業餘草:www.xttblog.com;CODE大全:www.codedq.net;愛分享:www.ndislwf.com

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 達人科技 的精彩文章:

「PHP」PHP面向對象編程——phpOOP入門
將git版本號編譯進程序
webgl自學筆記——幾何圖形
「LeetCode」Wildcard Matching 題解

TAG:達人科技 |

您可能感興趣

mybaits sqlSession 源碼解讀
disruptor 源碼解讀
親子閱讀/英語啟蒙:The Very Lonely Firefly解讀
解讀目標檢測新範式:Segmentations is All You Need
專業解讀 Business Analytics項目
解讀葡萄牙人鑽石Portuguese Diamond
Kaggle Carvana 圖像分割比賽冠軍模型 TernausNet 解讀
全面解讀Liquidity.Network
NIPS2018最佳論文解讀:Neural Ordinary Differential Equations
深度解讀Chaumet Bee my Love愛·巢 & Liens 緣系?一生
解讀區塊鏈瀏覽器Tokenview.com
解讀 | 超級賬本 Brian Behlendorf 教你認識區塊鏈
網路專家解讀YouTube,Twitter或Reddit的盈利模式
靈感全揭秘-從A到Z解讀VirgilAbloh的LouisVuitton首秀
深入解讀Google Lens
《Nature Genetics》解讀腫瘤利器,預測癌症進化
【大牌解讀】鑽石之王Harry Winston 到底吸引了多少位時尚Icon?
Deep Forest 演算法解讀
iPhone8/8Plus iPhone X 核心資訊全解讀
英偉達官方解讀:Volta Tensor Core GPU實現AI性能新里程碑