當前位置:
首頁 > 知識 > Adaptive Execution 讓 Spark SQL 更智能更高效

Adaptive Execution 讓 Spark SQL 更智能更高效

1 背景

前面《Spark SQL / Catalyst 內部原理 與 RBO》與《Spark SQL 性能優化再進一步 CBO 基於代價的優化》介紹的優化,從查詢本身與目標數據的特點的角度儘可能保證了最終生成的執行計劃的高效性。但是

執行計劃一旦生成,便不可更改,即使執行過程中發現後續執行計劃可以進一步優化,也只能按原計劃執行

CBO 基於統計信息生成最優執行計劃,需要提前生成統計信息,成本較大,且不適合數據更新頻繁的場景

CBO 基於基礎表的統計信息與操作對數據的影響推測中間結果的信息,只是估算,不夠精確

本文介紹的 Adaptive Execution 將可以根據執行過程中的中間數據優化後續執行,從而提高整體執行效率。核心在於兩點

  • 執行計劃可動態調整
  • 調整的依據是中間結果的精確統計信息

2 動態設置 Shuffle Partition

2.1 Spark Shuffle 原理

Spark Shuffle 一般用於將上游 Stage 中的數據按 Key 分區,保證來自不同 Mapper (表示上游 Stage 的 Task)的相同的 Key 進入相同的 Reducer (表示下游 Stage 的 Task)。一般用於 group by 或者 Join 操作。


Adaptive Execution 讓 Spark SQL 更智能更高效


Spark Shuffle 過程

如上圖所示,該 Shuffle 總共有 2 個 Mapper 與 5 個 Reducer。每個 Mapper 會按相同的規則(由 Partitioner 定義)將自己的數據分為五份。每個 Reducer 從這兩個 Mapper 中拉取屬於自己的那一份數據。

2.2 原有 Shuffle 的問題

使用 Spark SQL 時,可通過 spark.sql.shuffle.partitions 指定 Shuffle 時 Partition 個數,也即 Reducer 個數

該參數決定了一個 Spark SQL Job 中包含的所有 Shuffle 的 Partition 個數。如下圖所示,當該參數值為 3 時,所有 Shuffle 中 Reducer 個數都為 3


Adaptive Execution 讓 Spark SQL 更智能更高效


Spark SQL with multiple Shuffle

這種方法有如下問題

  • Partition 個數不宜設置過大
  • Reducer(代指 Spark Shuffle 過程中執行 Shuffle Read 的 Task) 個數過多,每個 Reducer 處理的數據量過小。大量小 Task 造成不必要的 Task 調度開銷與可能的資源調度開銷(如果開啟了 Dynamic Allocation)
  • Reducer 個數過大,如果 Reducer 直接寫 HDFS 會生成大量小文件,從而造成大量 addBlock RPC,Name node 可能成為瓶頸,並影響其它使用 HDFS 的應用
  • 過多 Reducer 寫小文件,會造成後面讀取這些小文件時產生大量 getBlock RPC,對 Name node 產生衝擊
  • Partition 個數不宜設置過小
  • 每個 Reducer 處理的數據量太大,Spill 到磁碟開銷增大
  • Reducer GC 時間增長
  • Reducer 如果寫 HDFS,每個 Reducer 寫入數據量較大,無法充分發揮並行處理優勢
  • 很難保證所有 Shuffle 都最優
  • 不同的 Shuffle 對應的數據量不一樣,因此最優的 Partition 個數也不一樣。使用統一的 Partition 個數很難保證所有 Shuffle 都最優
  • 定時任務不同時段數據量不一樣,相同的 Partition 數設置無法保證所有時間段執行時都最優

2.3 自動設置 Shuffle Partition 原理

如 Spark Shuffle 原理 一節圖中所示,Stage 1 的 5 個 Partition 數據量分別為 60MB,40MB,1MB,2MB,50MB。其中 1MB 與 2MB 的 Partition 明顯過小(實際場景中,部分小 Partition 只有幾十 KB 及至幾十位元組)

開啟 Adaptive Execution 後

  • Spark 在 Stage 0 的 Shuffle Write 結束後,根據各 Mapper 輸出,統計得到各 Partition 的數據量,即 60MB,40MB,1MB,2MB,50MB
  • 通過 ExchangeCoordinator 計算出合適的 post-shuffle Partition 個數(即 Reducer)個數(本例中 Reducer 個數設置為 3)
  • 啟動相應個數的 Reducer 任務
  • 每個 Reducer 讀取一個或多個 Shuffle Write Partition 數據(如下圖所示,Reducer 0 讀取 Partition 0,Reducer 1 讀取 Partition 1、2、3,Reducer 2 讀取 Partition 4)


Adaptive Execution 讓 Spark SQL 更智能更高效


Spark SQL adaptive reducer 1

三個 Reducer 這樣分配是因為

  • targetPostShuffleInputSize 默認為 64MB,每個 Reducer 讀取數據量不超過 64MB
  • 如果 Partition 0 與 Partition 2 結合,Partition 1 與 Partition 3 結合,雖然也都不超過 64 MB。但讀完 Partition 0 再讀 Partition 2,對於同一個 Mapper 而言,如果每個 Partition 數據比較少,跳著讀多個 Partition 相當於隨機讀,在 HDD 上性能不高
  • 目前的做法是只結合相臨的 Partition,從而保證順序讀,提高磁碟 IO 性能
  • 該方案只會合併多個小的 Partition,不會將大的 Partition 拆分,因為拆分過程需要引入一輪新的 Shuffle
  • 基於上面的原因,默認 Partition 個數(本例中為 5)可以大一點,然後由 ExchangeCoordinator 合併。如果設置的 Partition 個數太小,Adaptive Execution 在此場景下無法發揮作用

由上圖可見,Reducer 1 從每個 Mapper 讀取 Partition 1、2、3 都有三根線,是因為原來的 Shuffle 設計中,每個 Reducer 每次通過 Fetch 請求從一個特定 Mapper 讀數據時,只能讀一個 Partition 的數據。也即在上圖中,Reducer 1 讀取 Mapper 0 的數據,需要 3 輪 Fetch 請求。對於 Mapper 而言,需要讀三次磁碟,相當於隨機 IO。

為了解決這個問題,Spark 新增介面,一次 Shuffle Read 可以讀多個 Partition 的數據。如下圖所示,Task 1 通過一輪請求即可同時讀取 Task 0 內 Partition 0、1 和 2 的數據,減少了網路請求數量。同時 Mapper 0 一次性讀取並返回三個 Partition 的數據,相當於順序 IO,從而提升了性能。


Adaptive Execution 讓 Spark SQL 更智能更高效


Spark SQL adaptive reducer 2

由於 Adaptive Execution 的自動設置 Reducer 是由 ExchangeCoordinator 根據 Shuffle Write 統計信息決定的,因此即使在同一個 Job 中不同 Shuffle 的 Reducer 個數都可以不一樣,從而使得每次 Shuffle 都儘可能最優。

上文 原有 Shuffle 的問題 一節中的例子,在啟用 Adaptive Execution 後,三次 Shuffle 的 Reducer 個數從原來的全部為 3 變為 2、4、3。


Adaptive Execution 讓 Spark SQL 更智能更高效


Spark SQL with adaptive Shuffle

2.4 使用與優化方法

可通過 spark.sql.adaptive.enabled=true 啟用 Adaptive Execution 從而啟用自動設置 Shuffle Reducer 這一特性

通過 spark.sql.adaptive.shuffle.targetPostShuffleInputSize 可設置每個 Reducer 讀取的目標數據量,其單位是位元組,默認值為 64 MB。上文例子中,如果將該值設置為 50 MB,最終效果仍然如上文所示,而不會將 Partition 0 的 60MB 拆分。具體原因上文已說明

3 動態調整執行計劃

3.1 固定執行計劃的不足

在不開啟 Adaptive Execution 之前,執行計劃一旦確定,即使發現後續執行計劃可以優化,也不可更改。如下圖所示,SortMergJoin 的 Shuffle Write 結束後,發現 Join 一方的 Shuffle 輸出只有 46.9KB,仍然繼續執行 SortMergeJoin


Adaptive Execution 讓 Spark SQL 更智能更高效


Spark SQL with fixed DAG

此時完全可將 SortMergeJoin 變更為 BroadcastJoin 從而提高整體執行效率。

3.2 SortMergeJoin 原理

SortMergeJoin 是常用的分散式 Join 方式,它幾乎可使用於所有需要 Join 的場景。但有些場景下,它的性能並不是最好的。

SortMergeJoin 的原理如下圖所示

  • 將 Join 雙方以 Join Key 為 Key 按照 HashPartitioner 分區,且保證分區數一致
  • Stage 0 與 Stage 1 的所有 Task 在 Shuffle Write 時,都將數據分為 5 個 Partition,並且每個 Partition 內按 Join Key 排序
  • Stage 2 啟動 5 個 Task 分別去 Stage 0 與 Stage 1 中所有包含 Partition 分區數據的 Task 中取對應 Partition 的數據。(如果某個 Mapper 不包含該 Partition 的數據,則 Redcuer 無須向其發起讀取請求)。
  • Stage 2 的 Task 2 分別從 Stage 0 的 Task 0、1、2 中讀取 Partition 2 的數據,並且通過 MergeSort 對其進行排序
  • Stage 2 的 Task 2 分別從 Stage 1 的 Task 0、1 中讀取 Partition 2 的數據,且通過 MergeSort 對其進行排序
  • Stage 2 的 Task 2 在上述兩步 MergeSort 的同時,使用 SortMergeJoin 對二者進行 Join


Adaptive Execution 讓 Spark SQL 更智能更高效


Spark SQL SortMergeJoin

3.3 BroadcastJoin 原理

當參與 Join 的一方足夠小,可全部置於 Executor 內存中時,可使用 Broadcast 機制將整個 RDD 數據廣播到每一個 Executor 中,該 Executor 上運行的所有 Task 皆可直接讀取其數據。(本文中,後續配圖,為了方便展示,會將整個 RDD 的數據置於 Task 框內,而隱藏 Executor)

對於大 RDD,按正常方式,每個 Task 讀取並處理一個 Partition 的數據,同時讀取 Executor 內的廣播數據,該廣播數據包含了小 RDD 的全量數據,因此可直接與每個 Task 處理的大 RDD 的部分數據直接 Join


Adaptive Execution 讓 Spark SQL 更智能更高效


Spark SQL BroadcastJoin

根據 Task 內具體的 Join 實現的不同,又可分為 BroadcastHashJoin 與 BroadcastNestedLoopJoin。後文不區分這兩種實現,統稱為 BroadcastJoin

與 SortMergeJoin 相比,BroadcastJoin 不需要 Shuffle,減少了 Shuffle 帶來的開銷,同時也避免了 Shuffle 帶來的數據傾斜,從而極大地提升了 Job 執行效率

同時,BroadcastJoin 帶來了廣播小 RDD 的開銷。另外,如果小 RDD 過大,無法存於 Executor 內存中,則無法使用 BroadcastJoin

對於基礎表的 Join,可在生成執行計劃前,直接通過 HDFS 獲取各表的大小,從而判斷是否適合使用 BroadcastJoin。但對於中間表的 Join,無法提前準確判斷中間表大小從而精確判斷是否適合使用 BroadcastJoin

《Spark SQL 性能優化再進一步 CBO 基於代價的優化》一文介紹的 CBO 可通過表的統計信息與各操作對數據統計信息的影響,推測出中間表的統計信息,但是該方法得到的統計信息不夠準確。同時該方法要求提前分析表,具有較大開銷

而開啟 Adaptive Execution 後,可直接根據 Shuffle Write 數據判斷是否適用 BroadcastJoin

3.4 動態調整執行計劃原理

如上文 SortMergeJoin 原理 中配圖所示,SortMergeJoin 需要先對 Stage 0 與 Stage 1 按同樣的 Partitioner 進行 Shuffle Write

Shuffle Write 結束後,可從每個 ShuffleMapTask 的 MapStatus 中統計得到按原計劃執行時 Stage 2 各 Partition 的數據量以及 Stage 2 需要讀取的總數據量。(一般來說,Partition 是 RDD 的屬性而非 Stage 的屬性,本文為了方便,不區分 Stage 與 RDD。可以簡單認為一個 Stage 只有一個 RDD,此時 Stage 與 RDD 在本文討論範圍內等價)

如果其中一個 Stage 的數據量較小,適合使用 BroadcastJoin,無須繼續執行 Stage 2 的 Shuffle Read。相反,可利用 Stage 0 與 Stage 1 的數據進行 BroadcastJoin,如下圖所示


Adaptive Execution 讓 Spark SQL 更智能更高效


Spark SQL Auto BroadcastJoin

具體做法是

  • 將 Stage 1 全部 Shuffle Write 結果廣播出去
  • 啟動 Stage 2,Partition 個數與 Stage 0 一樣,都為 3
  • 每個 Stage 2 每個 Task 讀取 Stage 0 每個 Task 的 Shuffle Write 數據,同時與廣播得到的 Stage 1 的全量數據進行 Join

註:廣播數據存於每個 Executor 中,其上所有 Task 共享,無須為每個 Task 廣播一份數據。上圖中,為了更清晰展示為什麼能夠直接 Join 而將 Stage 2 每個 Task 方框內都放置了一份 Stage 1 的全量數據

雖然 Shuffle Write 已完成,將後續的 SortMergeJoin 改為 Broadcast 仍然能提升執行效率

  • SortMergeJoin 需要在 Shuffle Read 時對來自 Stage 0 與 Stage 1 的數據進行 Merge Sort,並且可能需要 Spill 到磁碟,開銷較大
  • SortMergeJoin 時,Stage 2 的所有 Task 需要取 Stage 0 與 Stage 1 的所有 Task 的輸出數據(如果有它要的數據 ),會造成大量的網路連接。且當 Stage 2 的 Task 較多時,會造成大量的磁碟隨機讀操作,效率不高,且影響相同機器上其它 Job 的執行效率
  • SortMergeJoin 時,Stage 2 每個 Task 需要從幾乎所有 Stage 0 與 Stage 1 的 Task 取數據,無法很好利用 Locality
  • Stage 2 改用 Broadcast,每個 Task 直接讀取 Stage 0 的每個 Task 的數據(一對一),可很好利用 Locality 特性。最好在 Stage 0 使用的 Executor 上直接啟動 Stage 2 的 Task。如果 Stage 0 的 Shuffle Write 數據並未 Spill 而是在內存中,則 Stage 2 的 Task 可直接讀取內存中的數據,效率非常高。如果有 Spill,那可直接從本地文件中讀取數據,且是順序讀取,效率遠比通過網路隨機讀數據效率高

3.5 使用與優化方法

該特性的使用方式如下

  • 當 spark.sql.adaptive.enabled 與 spark.sql.adaptive.join.enabled 都設置為 true 時,開啟 Adaptive Execution 的動態調整 Join 功能
  • spark.sql.adaptiveBroadcastJoinThreshold 設置了 SortMergeJoin 轉 BroadcastJoin 的閾值。如果不設置該參數,該閾值與 spark.sql.autoBroadcastJoinThreshold 的值相等
  • 除了本文所述 SortMergeJoin 轉 BroadcastJoin,Adaptive Execution 還可提供其它 Join 優化策略。部分優化策略可能會需要增加 Shuffle。spark.sql.adaptive.allowAdditionalShuffle 參數決定了是否允許為了優化 Join 而增加 Shuffle。其默認值為 false

4 自動處理數據傾斜

4.1 解決數據傾斜典型方案

《Spark性能優化之道——解決Spark數據傾斜(Data Skew)的N種姿勢》一文講述了數據傾斜的危害,產生原因,以及典型解決方法

  • 保證文件可 Split 從而避免讀 HDFS 時數據傾斜
  • 保證 Kafka 各 Partition 數據均衡從而避免讀 Kafka 引起的數據傾斜
  • 調整並行度或自定義 Partitioner 從而分散分配給同一 Task 的大量不同 Key
  • 使用 BroadcastJoin 代替 ReduceJoin 消除 Shuffle 從而避免 Shuffle 引起的數據傾斜
  • 對傾斜 Key 使用隨機前綴或後綴從而分散大量傾斜 Key,同時將參與 Join 的小表擴容,從而保證 Join 結果的正確性

4.2 自動解決數據傾斜

目前 Adaptive Execution 可解決 Join 時數據傾斜問題。其思路可理解為將部分傾斜的 Partition (傾斜的判斷標準為該 Partition 數據是所有 Partition Shuffle Write 中位數的 N 倍) 進行單獨處理,類似於 BroadcastJoin,如下圖所示


Adaptive Execution 讓 Spark SQL 更智能更高效


Spark SQL resolve joinm skew

在上圖中,左右兩邊分別是參與 Join 的 Stage 0 與 Stage 1 (實際應該是兩個 RDD 進行 Join,但如同上文所述,這裡不區分 RDD 與 Stage),中間是獲取 Join 結果的 Stage 2

明顯 Partition 0 的數據量較大,這裡假設 Partition 0 符合「傾斜」的條件,其它 4 個 Partition 未傾斜

以 Partition 對應的 Task 2 為例,它需獲取 Stage 0 的三個 Task 中所有屬於 Partition 2 的數據,並使用 MergeSort 排序。同時獲取 Stage 1 的兩個 Task 中所有屬於 Partition 2 的數據並使用 MergeSort 排序。然後對二者進行 SortMergeJoin

對於 Partition 0,可啟動多個 Task

  • 在上圖中,啟動了兩個 Task 處理 Partition 0 的數據,分別名為 Task 0-0 與 Task 0-1
  • Task 0-0 讀取 Stage 0 Task 0 中屬於 Partition 0 的數據
  • Task 0-1 讀取 Stage 0 Task 1 與 Task 2 中屬於 Partition 0 的數據,並進行 MergeSort
  • Task 0-0 與 Task 0-1 都從 Stage 1 的兩個 Task 中所有屬於 Partition 0 的數據
  • Task 0-0 與 Task 0-1 使用 Stage 0 中屬於 Partition 0 的部分數據與 Stage 1 中屬於 Partition 0 的全量數據進行 Join

通過該方法,原本由一個 Task 處理的 Partition 0 的數據由多個 Task 共同處理,每個 Task 需處理的數據量減少,從而避免了 Partition 0 的傾斜

對於 Partition 0 的處理,有點類似於 BroadcastJoin 的做法。但區別在於,Stage 2 的 Task 0-0 與 Task 0-1 同時獲取 Stage 1 中屬於 Partition 0 的全量數據,是通過正常的 Shuffle Read 機制實現,而非 BroadcastJoin 中的變數廣播實現

4.3 使用與優化方法

開啟與調優該特性的方法如下

  • 將 spark.sql.adaptive.skewedJoin.enabled 設置為 true 即可自動處理 Join 時數據傾斜
  • spark.sql.adaptive.skewedPartitionMaxSplits 控制處理一個傾斜 Partition 的 Task 個數上限,默認值為 5
  • spark.sql.adaptive.skewedPartitionRowCountThreshold 設置了一個 Partition 被視為傾斜 Partition 的行數下限,也即行數低於該值的 Partition 不會被當作傾斜 Partition 處理。其默認值為 10L * 1000 * 1000 即一千萬
  • spark.sql.adaptive.skewedPartitionSizeThreshold設置了一個 Partition 被視為傾斜 Partition 的大小下限,也即大小小於該值的 Partition 不會被視作傾斜 Partition。其默認值為 64 * 1024 * 1024 也即 64MB
  • spark.sql.adaptive.skewedPartitionFactor 該參數設置了傾斜因子。如果一個 Partition 的大小大於 spark.sql.adaptive.skewedPartitionSizeThreshold的同時大於各 Partition 大小中位數與該因子的乘積,或者行數大於 spark.sql.adaptive.skewedPartitionRowCountThreshold 的同時大於各 Partition 行數中位數與該因子的乘積,則它會被視為傾斜的 Partition

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 程序員小新人學習 的精彩文章:

MYSQL包含逗號的欄位拆分查詢
分散式框架spring-session實現session一致性使用問題

TAG:程序員小新人學習 |