spark2.1.0之配置與源碼分析
任何優秀的軟體或服務都會提供一些配置參數,這些配置參數有些是內置的,有些則是可以由用戶配置的。對於熟悉Java的開發人員來說,對JVM進行性能調優是一個經常需要面對的工作,這個過程常常伴隨著各種JVM參數的調整與測試。之所以將這些參數交給具體的開發人員去調整,是因為軟體或者服務的提供者也無法保證給定的默認參數是最符合用戶應用場景與軟硬體環境的。一個簡單的例子:當用戶的QPS發生變化時,對於Web服務的JVM來說也應當相應調整內存的大小或限制。
Spark作為一款優秀的計算框架,也配備了各種各樣的系統配置參數(例如:spark.master,spark.app.name,spark.driver.memory,spark.executor.memory等)。通過這些配置參數可以定義應用的名稱、使用的部署模式、調度模式、executor數量、executor的內核數、driver或executor的內存大小、採用的內存模型等。
SparkConf是Spark的配置類,這個類在Spark的歷史版本中已經存在很久了,Spark中的每一個組件都直接或者間接的使用著它所存儲的屬性,這些屬性都存儲在如下的數據結構中:
[java] view plain copy
private
val settings =
new
ConcurrentHashMap[String, String]()
由以上代碼的泛型[1] 可以看出Spark的所有配置,無論是key還是value都是String類型。Spark的配置通過以下三種方式獲取:
- 來源於系統參數(即使用System.getProperties獲取的屬性)中以spark.作為前綴的那部分屬性;
- 使用SparkConf的API進行設置;
- 從其它SparkConf中克隆。
下面將具體說明這三種方式的實現。
系統屬性中的配置
在SparkConf中有一個Boolean類型的構造器屬性loadDefaults,當loadDefaults為true時將會從系統屬性中載入Spark配置,代碼如下:
[java] view plain copy
if
(loadDefaults) {- loadFromSystemProperties(
false
) - }
private
[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {- // Load any spark.* system properties
for
((key, value) <- Utils.getSystemProperties
if
key.startsWith("spark.")) {- set(key, value, silent)
- }
this
- }
以上代碼調用了Utils工具類[2] 的getSystemProperties方法,其作用為獲取系統的鍵值對屬性。loadFromSystemProperties方法在獲取了系統屬性後,使用Scala守衛過濾出其中以「spark.」字元串為前綴的key和value並且調用set方法(見代碼清單3-1)最終設置到settings中。
代碼清單3-1 SparkConf中set方法的實現
[java] view plain copy
private
[spark] def set(key:String, value: String, silent: Boolean): SparkConf = {if
(key ==null
) {throw
newNullPointerException("nullkey")- }
if
(value ==
null
) {throw
newNullPointerException("nullvalue for " + key)- }
if
(!silent) {- logDeprecationWarning(key)
- }
- settings.put(key,value)
this
- }
使用SparkConf配置的API
給SparkConf添加配置的一種常見方式是使用SparkConf中提供的API。其中有些API最終實際調用了set的重載方法,見代碼清單3-2。
代碼清單3-2 SparkConf中重載的set方法
[java] view plain copy
- def set(key:String, value: String): SparkConf = {
- set(key,value,
false
) - }
可以看到代碼清單3-2中的set方法實際也是調用了代碼清單3-1中的set方法。
SparkConf中的setMaster、setAppName、setJars、setExecutorEnv、setSparkHome、setAll等方法最終都是通過代碼清單3-2中的set方法完成Spark配置的,本書以其中最為常用的setMaster和setAppName為例,用代碼清單3-3和代碼清單3-4來展示他們的實現。
代碼清單3-3 設置Spark的部署模式的配置方法setMaster
[java] view plain copy
- def setMaster(master: String): SparkConf = {
- set("spark.master", master)
- }
代碼清單3-4 設置Spark的應用名稱的配置方法setAppName
[java] view plain copy
- def setAppName(name: String): SparkConf = {
- set("spark.app.name", name)
- }
克隆SparkConf配置
有些情況下,同一個SparkConf實例中的配置信息需要被Spark中的多個組件共用,例如:組件A中存在一個SparkConf實例a,組件B中也很需要實例a中的配置信息,這時該如何處理?我們往往首先想到的方法是將SparkConf實例定義為全局變數或者通過參數傳遞給其它組件,但是這會引入並發問題。雖然settings是線程安全的ConcurrentHashMap類,而且ConcurrentHashMap也被證明是高並發下性能表現不錯的數據結構,但是只要存在並發就一定會有性能的損失問題。我們可以新建一個SparkConf實例b,並將a中的配置信息全部拷貝到b中,這種方式顯然不是最優雅的,複製代碼會散落在程序的各個角落。現在是時候閱讀下SparkConf的構造器了,代碼如下所示:
[java] view plain copy
class
SparkConf(loadDefaults: Boolean)extends
Cloneable with Logging withSerializable {
- //省略無關代碼
- def
this
() =this
(true
)
SparkConf繼承了Cloneable特質並實現了clone方法,clone方法(見代碼清單3-5)的實現跟我們所討論的方式是一樣的,並且通過Cloneable特質提高了代碼的可復用性。
代碼清單3-5 克隆SparkConf配置
[java] view plain copy
- override def clone: SparkConf ={
- val cloned =
new
SparkConf(false
) - settings.entrySet().asScala.foreach { e =>
- cloned.set(e.getKey(),e.getValue(),
true
) - }
- cloned
- }
這樣我們就可以在任何想要使用SparkConf的地方使用克隆方式來優雅的編程了。
[1] Scala泛型的語法採用了方括弧,而非Java中的尖括弧。
[2] Utils是Spark中最常用的工具類,其每個方法實現的功能都比較單一,理解起來比較簡單,所以本書只將相關的介紹放入附錄A中單獨進行介紹。如果不去閱讀Utils中各個方法的實現,對閱讀本書主幹內容也不會有太多影響。如果是Spark的初學者或者是剛剛接觸Scala語言的開發者還是建議閱讀。
TAG:程序員小新人學習 |