當前位置:
首頁 > 知識 > Hadoop源碼系列(一)FairScheduler申請和分配container的過程

Hadoop源碼系列(一)FairScheduler申請和分配container的過程


1、如何申請資源

1.1 如何啟動AM並申請資源1.1.1 如何啟動AM

val yarnClient = YarnClient.createYarnClient
setupCredentials
yarnClient.init(yarnConf)
yarnClient.start
// Get a new application from our RM
val newApp = yarnClient.createApplication
val newAppResponse = newApp.getNewApplicationResponse
appId = newAppResponse.getApplicationId

// Set up the appropriate contexts to launch our AM
val containerContext = createContainerLaunchContext(newAppResponse)
val appContext = createApplicationSubmissionContext(newApp, containerContext)

// Finally, submit and monitor the application
logInfo(s"Submitting application $appId to ResourceManager")
yarnClient.submitApplication(appContext)

1.1.2 FairScheduler如何處理AM的ResourceRequest

1、FairScheduler接收到SchedulerEventType.APP_ADDED之後,調用addApplication方法把把RMApp添加到隊列裡面,結束之後發送RMAppEventType.APP_ACCEPTED給RMApp

2、RMApp啟動RMAttempt之後,發送SchedulerEventType.APP_ATTEMPT_ADDED給FairScheduler

LOG.info("Added Application Attempt " + applicationAttemptId + " to scheduler from user: " + user);

3、FairScheduler調用addApplicationAttempt方法,發送RMAppAttemptEventType.ATTEMPT_ADDED事件給RMAppAttempt,RMAppAttempt隨後調用Scheduler的allocate方法發送AM的ResourceRequest

4、FairScheduler在allocate方法裡面對該請求進行處理,FairScheduler對於AM的資源請求的優先順序上並沒有特殊的照顧,詳細請看章節2 如何分配資源

1.2 AM啟動之後如何申請資源1.2.1、註冊AM

amClient = AMRMClient.createAMRMClient
amClient.init(conf)
amClient.start
amClient.registerApplicationMaster(Utils.localHostName, 0, uiAddress)

1.2.2、發送資源請求

// 1.創建資源請求
amClient.addContainerRequest(request)
// 2.發送資源請求
val allocateResponse = amClient.allocate(progressIndicator)
val allocatedContainers = allocateResponse.getAllocatedContainers
if (allocatedContainers.size > 0) {
// 3.請求返回之後處理Container
handleAllocatedContainers(allocatedContainers.asScala)
}

1.2.3、啟動Container

def startContainer: java.util.Map[String, ByteBuffer] = {
val ctx = Records.newRecord(classOf[ContainerLaunchContext])
.asInstanceOf[ContainerLaunchContext]
val env = prepareEnvironment.asJava

ctx.setLocalResources(localResources.asJava)
ctx.setEnvironment(env)

val credentials = UserGroupInformation.getCurrentUser.getCredentials
val dob = new DataOutputBuffer
credentials.writeTokenStorageToStream(dob)
ctx.setTokens(ByteBuffer.wrap(dob.getData))

val commands = prepareCommand

ctx.setCommands(commands.asJava)
ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr).asJava)

// If external shuffle service is enabled, register with the Yarn shuffle service already
// started on the NodeManager and, if authentication is enabled, provide it with our secret
// key for fetching shuffle files later
if (sparkConf.get(SHUFFLE_SERVICE_ENABLED)) {
val secretString = securityMgr.getSecretKey
val secretBytes =
if (secretString != null) {
// This conversion must match how the YarnShuffleService decodes our secret
JavaUtils.stringToBytes(secretString)
} else {
// Authentication is not enabled, so just provide dummy metadata
ByteBuffer.allocate(0)
}
ctx.setServiceData(Collections.singletonMap("spark_shuffle", secretBytes))
}

// Send the start request to the ContainerManager
try {
nmClient.startContainer(container.get, ctx)
} catch {
case ex: Exception =>
throw new SparkException(s"Exception while starting container ${container.get.getId}" +
s" on host $hostname", ex)
}
}

View Code

2、如何分配資源

2.1 接受資源請求步驟

在FairScheduler的allocate方法裡面僅僅是記錄ResourceRequest,並不會真正的立馬分配。

流程如下:

1、檢查該APP是否註冊過

2、檢查資源的請求是否超過最大內存和最大CPU的限制

3、記錄資源請求的時間,最後container分配的延遲會體現在隊列metrics的appAttemptFirstContainerAllocationDelay當中

4、釋放AM發過來的已經不需要的資源,主要邏輯在FSAppAttempt的containerCompleted方法里

5、更新資源請求,所有資源請求都是記錄在AppSchedulingInfo當中的requests(注意:只有是ANY的資源請求才會被立馬更新到QueueMetrics的PendingResources里)

6、找出該APP被標記為搶佔的container ID列表preemptionContainerIds

7、更新APP的黑名單列表,該信息被記錄在AppSchedulingInfo當中

8、從FSAppAttempt的newlyAllocatedContainers當中獲取最新被分配的container

9、返回preemptionContainerIds、HeadRoom、ContainerList、NMTokenList。(註:Headroom = Math.min(Math.min(queueFairShare - queueUsage, 0), maxAvailableResource)

2.2 請求和分配的關係

Hadoop源碼系列(一)FairScheduler申請和分配container的過程

請求和分配的過程是非同步的,關係如上圖,每次調用allocate獲得的container,其實是之前的請求被分配的結果

2.3 如何分配2.3.1 分配方式

分配有兩種方式:

1、接收到NodeManager的心跳的時候進行分配

NodeManager每隔一秒(yarn.resourcemanager.nodemanagers.heartbeat-interval-ms)給ResourceManager發送一個心跳事件NODE_UPDATE,接收到心跳事件之後,在FairScheduler的nodeUpdate方法里進行處理。

NodeManager會彙報新啟動的Container列表newlyLaunchedContainers和已經結束的Container列表completedContainers。然後在attemptScheduling方法裡面進行分配。

2、持續調度方式

它有一個單獨的線程,線程名稱是FairSchedulerContinuousScheduling,每5毫秒對所有節點的資源進行排序,然後遍歷所有節點,調用attemptScheduling方法進行分配。

開啟持續調度模式之後,在接收到心跳事件NODE_UPDATE的時候,只有在completedContainers不為空的情況下,才會進行調度

attemptScheduling首先會檢查是否有資源預留,如果有預留,則直接為預留的APP分配container

沒有預留的分配過程如下:

1、最大可分配資源為這台機器的可用資源的一半,從root隊列開始自上而下進行分配Resource assignment = queueMgr.getRootQueue.assignContainer(node);

2、分配到一個Container之後,判斷是否要連續分配多個,最大支持連續分配多少個?

以下是涉及到的各個參數以及參數的默認值:

yarn.scheduler.fair.assignmultiple false (建議設置為true)

yarn.scheduler.fair.dynamic.max.assign true (hadoop2.7之後就沒有這個參數了)

yarn.scheduler.fair.max.assign -1 (建議設置為2~3,不要設置得太多,否則會有調度傾斜的問題)

2.3.2 如何從隊列當中選出APP進行資源分配

入口在queueMgr.getRootQueue.assignContainer(node);

1、檢查當前隊列的使用量是否小於最大資源量

2、首先對子隊列進行排序,優先順序請參照章節 2.3.4 如何確定優先順序

3、排序完再調用子隊列的assignContainer方法分配container

4、一直遞歸到葉子隊列

葉子隊列如何進行分配?

1、先對runnableApps進行排序,排序完成之後,for循環遍歷一下

2、先檢查該Node是否在APP的黑名單當中

3、檢查該隊列是否可以運行該APP的AM,主要是檢查是否超過了maxAMShare(根據amRunning欄位判斷是否已經啟動了AM了)

檢查邏輯的偽代碼如下:

maxResource = getFairShare
if (maxResource == 0) {
// 最大資源是隊列的MaxShare和集群總資源取一個小的值
maxResource = Math.min(getRootQueue.AvailableResource, getMaxShare);
}
maxAMResource = maxResource * maxAMShare
if (amResourceUsage + amResource) > maxAMResource) {
// 可以運行
return true
} else {
// 不可以運行
return false
}

View Code

4、給該APP分配container

下面以一個例子來說明分配的過程是如何選擇隊列的:

假設隊列的結構是這樣子的

root

---->BU_1

-------->A

-------->B

---->BU_2

-------->C

-------->D

Hadoop源碼系列(一)FairScheduler申請和分配container的過程

2.3.3 任務分配Container的本地性

任務分配Container的時候會考慮請求的本地性,對於調度器來說,它的本地性分為三種:NODE_LOCAL, RACK_LOCAL, OFF_SWITCH

具體方法位於FSAppAttempt的assignContainer方法

遍歷優先順序

給該優先順序的調度機會+1

獲取RackLocal和NodeLocal的任務

計算允許分配的本地性級別allowedLocality,默認是NODE_LOCAL

1、心跳分配方式

計算調度機會,如果該優先順序的任務的調度機會超過了(節點數 * NODE_LOCAL閾值),降級為RACK_LOCAL,如果該優先順序的任務的調度機會超過了(節點數 * RACK_LOCAL閾值),降級為OFF_SWITCH

2、連續分配方式

計算等待時間waitTime -= lastScheduledContainer.get(priority);

如果waitTime超過了NODE_LOCAL允許的delay時間,就降級為RACK_LOCAL,再超過RACK_LOCAL允許的delay的時間,就降級為OFF_SWITCH

分配NODE_LOCAL的container

允許分配的本地性級別>=RACK_LOCAL,分配RACK_LOCAL的container

允許分配的本地性級別=OFF_SWITCH,分配OFF_SWITCH的container

都分不到,等待下一次機會

相關參數:

默認值全是-1,則允許的本地性級別是OFF_SWITCH

yarn.scheduler.fair.locality-delay-node-ms -1

yarn.scheduler.fair.locality-delay-rack-ms -1

yarn.scheduler.fair.locality.threshold.node -1

yarn.scheduler.fair.locality.threshold.rack -1

2.3.4 Container分配

1、檢查該節點的資源是否足夠,如果資源充足

2、如果當前的allowedLocality比實際分配的本地性低,則重置allowedLocality

3、把新分配的Container加到newlyAllocatedContainers和liveContainers列表中

4、把分配的container信息同步到appSchedulingInfo當中

5、發送RMContainerEventType.START事件

6、更新FSSchedulerNode記錄的container信息

7、如果被分配的是AM,則設置amRunning為true

如果資源不夠,則檢查是否可以預留資源

條件:

1)Container的資源請求必須小於Scheduler的增量分配內存 * 倍數(默認應該是2g)

2)如果已經存在的預留數 < 本地性對應的可用節點 * 預留比例

3)一個節點只允許同時為一個APP預留資源

相關參數:

yarn.scheduler.increment-allocation-mb 1024

yarn.scheduler.increment-allocation-vcores 1

yarn.scheduler.reservation-threshold.increment-multiple 2

yarn.scheduler.fair.reservable-nodes 0.05

2.3.4 如何確定優先順序

該比較規則同時適用於隊列和APP,詳細代碼位於FairSharePolicy當中

MinShare = Math.min(getMinShare, getDemand)

1、(當前資源使用量 / MinShare)的比值越小,優先順序越高

2、如果雙方資源使用量都超過MinShare,則(當前資源使用量 / 權重)的比值越小,優先順序越高

3、啟動時間越早,優先順序越高

4、最後實在比不出來,就比名字...

從上面分配的規則當中能看出來MinShare是非常重要的一個指標,當資源使用量沒有超過MinShare之前,隊列在分配的時候就會比較優先,切記一定要設置啊!

註:getMinShare是FairScheduler當中隊列的minResources

6887116 mb,4491 vcores

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 達人科技 的精彩文章:

Android系統——輸入系統(十五)實戰 使用GlobalKey一鍵啟動程序
MySql單表最大8000W+ 之資料庫遇瓶頸記
一顆簡單的JDBC栗子
nodejs伺服器部署教程二
使用JS開發桌面端應用程序NW.js-3-開發問題小記

TAG:達人科技 |

您可能感興趣

Red velvet最新MV公開,粉絲不滿歌詞part分配,Irene美貌引熱議
Alphabet 的兩位創始人,以及 Google CEO 平日如何分配工作?
出道前feat GD?團隊part分配不均?BLACKPINK全能ace金智妮Jennie究竟有什麼過人之處?
Creo/Preo軟體自學第二篇:部分配置文件在config中路徑的設置
如何優化生產環境下的Kubernetes資源分配
Nine percent粉絲抵制偶像加場?蔡徐坤C位鏡頭分配遭炮轟
愛立信將Fraunhofer MPEG-H電視音頻系統集成到旗下傳輸分配編/解碼器解決方案
三星或將推出三星Galaxy Note 10e,部分配置將縮水!
《創造101》最新寢室分配,yamy、sunnee「同居」,三大臉贊住一起
技嘉首創Azure AI筆記本Aorus 15:CPU/GPU智能分配
尤長靖說出ninepercent成員們宿舍分配方式,所有大勢CP都住一起
Nine Percent九人變八人?都是分配不均惹的禍!
Selina首揭S.H.E「C位之謎」 曝遺產分配「覺得不孝」
「小天使」使它永作「熊孩子」 -elife:不對稱分配的長鏈非編碼RNA cherub 協助幹細胞成為惡性癌細胞
ROG再無A卡!華碩官網為Radeon分配AREZ新品牌
Valentino 擴大激勵機制,考慮向更多核心員工免費分配公司股權
女團刀群舞典範!談談GFriend小女友主打《夜》舞台及part分配!
魅族16s Pro部分配置曝光 驍龍855Plus+Flyme7.8+新增墨綠配色
「EXO」「分享」181113 EXO成員宿舍分配現狀公開 吵鬧line和安靜line五五配比很科學
AMD為Vulkan創建了一個直接內存分配器