在Apache Spark 2.0中使用DataFrames和SQL
作者|馬小龍(Dr. Christoph Schubert)
責編|郭芮
Spark 2.0中使用DataFrames和SQL的第一步
Spark 2.0開發的一個動機是讓它可以觸及更廣泛的受眾,特別是缺乏編程技能但可能非常熟悉SQL的數據分析師或業務分析師。因此,Spark 2.0現在比以往更易使用。在這部分,我將介紹如何使用Apache Spark 2.0。並將重點關注DataFrames作為新Dataset API的無類型版本。
到Spark 1.3,彈性分布式數據集(Resilient Distributed Dataset,RDD)一直是Spark中的主要抽象。RDD API是在Scala集合框架之後建模的,因此間接提供了Hadoop Map / Reduce熟悉的編程原語以及函數式編程(Map、Filter、Reduce)的常用編程原語。雖然RDD API比Map / Reduce範例更具表達性,但表達複雜查詢仍然很繁瑣,特別是對於來自典型數據分析背景的用戶,他們可能熟悉SQL,或來自R/Python編程語言的數據框架。
Spark 1.3引入了DataFrames作為RDD頂部的一個新抽象。DataFrame是具有命名列的行集合,在R和Python相應包之後建模。
Spark 1.6看到了Dataset類作為DataFrame的類型化版本而引入。在Spark 2.0中,DataFrames實際上是Datasets的特殊版本,我們有type DataFrame = Dataset [Row],因此DataFrame和Dataset API是統一的。
表面上,DataFrame就像SQL表。Spark 2.0將這種關係提升到一個新水平:我們可以使用SQL來修改和查詢DataSets和DataFrames。通過限制表達數量,有助於更好地優化。數據集也與Catalyst優化器良好集成,大大提高了Spark代碼的執行速度。因此,新的開發應該利用DataFrames。
在本文中,我將重點介紹Spark 2.0中DataFrames的基本用法。我將嘗試強調Dataset API和SQL間的相似性,以及如何使用SQL和Dataset API互換地查詢數據。藉由整個代碼生成和Catalyst優化器,兩個版本將編譯相同高效的代碼。
代碼示例以Scala編程語言給出。我認為這樣的代碼最清晰,因為Spark本身就是用Scala編寫的。
SparkSession
SparkSession類替換了Apache Spark 2.0中的SparkContext和SQLContext,並為Spark集群提供了唯一的入口點。
為了向後兼容,SparkSession對象包含SparkContext和SQLContext對象,見下文。當我們使用互動式Spark shell時,為我們創建一個名為spark的SparkSession對象。
創建DataFrames
DataFrame是具有命名列的表。最簡單的DataFrame是使用SparkSession的range方法來創建:
使用show給我們一個DataFrame的表格表示,可以使用describe來獲得數值屬性概述。describe返回一個DataFrame:
觀察到Spark為數據幀中唯一的列選擇了名稱id。 對於更有趣的示例,請考慮以下數據集:
在這種情況下,customerDF對象將有名為_1、_2、_3、_4的列,它們以某種方式違反了命名列的目的。可以通過重命名列來恢復:
使用printSchema和describe提供以下輸出:
一般來說我們會從文件載入數據。SparkSession類為提供了以下方法:
在這裡我們讓Spark從CSV文件的第一行提取頭信息(通過設置header選項為true),並使用數字類型(age和total)將數字列轉換為相應的數據類型 inferSchema選項。
其他可能的數據格式包括parquet文件和通過JDBC連接讀取數據的可能性。
基本數據操作
我們現在將訪問DataFrame中數據的基本功能,並將其與SQL進行比較。
沿襲,操作,動作和整個階段的代碼生成
相同的譜系概念,轉換操作和行動操作之間的區別適用於Dataset和RDD。我們下面討論的大多數DataFrame操作都會產生一個新的DataFrame,但實際上不執行任何計算。要觸發計算,必須調用行動操作之一,例如show(將DataFrame的第一行作為表列印),collect(返回一個Row對象的Array),count(返回DataFrame中的行數),foreach(對每一行應用一個函數)。這是惰性求值(lazy evaluation)的常見概念。
下面Dataset類的所有方法實際上依賴於所有數據集的有向非循環圖(Directed Acyclic Graph,DAG),從現有數據集中創建一個新的「數據集」。這被稱為數據集的沿襲。僅使用調用操作時,Catalyst優化程序將分析沿襲中的所有轉換,並生成實際代碼。這被稱為整階段代碼生成,並且負責Dataset對RDD的性能改進。
Row-行對象
Row類在DataFrame的一行不帶類型數據值中充當容器。通常情況下我們不會自己創建Row對象,而是使用下面的語法:
Row對象元素通過位置(從0開始)或者使用apply進行訪問:
它會產生一個Any的對象類型。或者最好使用get,方法之一:
因為這樣就不會出現原始類型的開銷。我們可以使用isNull方法檢查行中的一個條目是否為』null』:
我們現在來看看DataFrame類最常用的轉換操作:
select
我們將要看的第一個轉換是「select」,它允許我們對一個DataFrame的列進行投影和變換。
引用列
通過它們的名稱有兩種方法來訪問DataFrame列:可以將其引用為字元串;或者可以使用apply方法,col-方法或$以字元串作為參數並返回一個Column(列)對象。所以customerDF.col(「customer」)和customerDF(「customer」)都是customerDF的第一列。
選擇和轉換列
最簡單的select轉換形式允許我們將DataFrame投影到包含較少列的DataFrame中。下面的四個表達式返回一個只包含customer和province列的DataFrame:
不能在單個select方法中調用混合字元串和列參數:customerDF.select(「customer」, $」province」)導致錯誤。
使用Column類定義的運算符,可以構造複雜的列表達式:
應用show得到以下結果:
列別名
新數據集的列名稱從用於創建的表達式中派生而來,我們可以使用alias或as將列名更改為其他助記符:
產生與前面相同內容的DataFrame,但使用名為name,newAge和isZJ的列。
Column類包含用於執行基本數據分析任務的各種有效方法。我們將參考讀者文檔的詳細信息。
最後,我們可以使用lit函數添加一個具有常量值的列,並使用when和otherwise重新編碼列值。 例如,我們添加一個新列「ageGroup」,如果「age
給出以下DataFrame:
drop是select相對的轉換操作;它返回一個DataFrame,其中刪除了原始DataFrame的某些列。
最後可使用distinct方法返回原始DataFrame中唯一值的DataFrame:
返回一個包含單個列的DataFrame和包含值的三行:「北京」、「江蘇」、「浙江」。
filter
第二個DataFrame轉換是Filter方法,它在DataFrame行中進行選擇。有兩個重載方法:一個接受一個Column,另一個接受一個SQL表達式(一個String)。例如,有以下兩種等效方式來過濾年齡大於30歲的所有客戶:
Filter轉換接受一般的布爾連接符and(和)和or(或):
我們在SQL版本中使用單個等號,或者使用三等式「===」(Column類的一個方法)。在==運算符中使用Scala的等於符號會導致錯誤。我們再次引用Column類文檔中的有用方法。
聚合(aggregation)
執行聚合是進行數據分析的最基本任務之一。例如,我們可能對每個訂單的總金額感興趣,或者更具體地,對每個省或年齡組的總金額或平均金額感興趣。可能還有興趣了解哪個客戶的年齡組具有高於平均水平的總數。借用SQL,我們可以使用GROUP BY表達式來解決這些問題。DataFrames提供了類似的功能。可以根據一些列的值進行分組,同樣,還可以使用字元串或「Column」對象來指定。
我們將使用以下DataFrame:
withColumn方法添加一個新的列或替換一個現有的列。
聚合數據分兩步進行:一個調用GroupBy方法將特定列中相等值的行組合在一起,然後調用聚合函數,如sum(求和值),max(最大值)或為原始DataFrame中每組行計算的「avg」(平均值)。從技術上來說,GroupBy會返回一個RelationalGroupedDataFrame類的對象。RelationalGroupedDataFrame包含max、min、avg、mean和sum方法,所有這些方法都對DataFrame的數字列執行指定操作,並且可以接受一個String-參數來限制所操作的數字列。此外,我們有一個count方法計算每個組中的行數,還有一個通用的agg方法允許我們指定更一般的聚合函數。所有這些方法都會返回一個DataFrame。
例如:
輸出以下內容:
customerAgeGroupDF.groupBy(「agegroup」).max().show()輸出:
最後,customerAgeGroupDF.groupBy(「agegroup」).min(「age」, 「total」).show()輸出:
還有一個通用的agg方法,接受複雜的列表達式。agg在RelationalGroupedDataFrame和Dataset中都可用。後一種方法對整個數據集執行聚合。這兩種方法都允許我們給出列表達式的列表:
輸出:
給出以下輸出:
其中null值表示沒有省/年齡組的組合。Pivot的重載版本接受一個值列表以進行透視。這一方面允許我們限制列數,另一方面更加有效,因為Spark不需要計算樞軸列中的所有值。例如:
給出以下輸出:
最後,使用樞紐數據也可以進行複雜聚合:
輸出:
這裡=!=是Column類的「不等於」方法。
排序和限制
OrderBy方法允許我們根據一些列對數據集的內容進行排序。和以前一樣,我們可以使用Strings或Column對象來指定列:customerDF.orderBy(」age」)和 customerDF.orderBy($」age」)給出相同的結果。默認排序順序為升序。如果要降序排序,可以使用Column類的desc方法或者desc函數:
觀察到desc函數返回了一個Column-object,任何其他列也需要被指定為Column-對象。
最後,limit方法返回一個包含原始DataFrame中第一個n行的DataFrame。
DataFrame方法與SQL對比
我們已經發現,DataFrame類的基本方法與SQLselect語句的部分密切相關。下表總結了這一對應關係:
到目前為止連接(join)在我們的討論中已經缺失。Spark的DataFrame支持連接,我們將在文章的下一部分討論它們。
下面將討論完全類型化的DataSets API,連接和用戶定義的函數(UDF)。
使用SQL來處理DataFrames
我們還在Apache Spark 2.0中直接執行SQL語句。SparkSession的SQL方法返回一個DataFrame。此外,DataFrame的selectExp方法也允許我們為單列指定SQL表達式,如下所示。為了能夠引用SQL表達式中的DataFrame,首先有必要將DataFrame註冊為臨時表,在Spark 2中稱為臨時視圖(temporary view,簡稱為tempview)。DataFrame為我們提供了以下兩種方法:
createTempView創建一個新視圖,如果具有該名稱的視圖已存在,則拋出一個異常;
createOrReplaceTempView創建一個用來替換的臨時視圖。
兩種方法都將視圖名稱作為唯一參數。
註冊表後,可以使用SparkSession的SQL方法來執行SQL語句:
返回具有以下內容的DataFrame:
SparkSession類的catalog欄位是Catalog類的一個對象,具有多種處理會話註冊表和視圖的方法。例如,Catalog的ListTables方法返回一個包含所有已註冊表信息的Dataset:
會返回一個包含有關註冊表「tableName」中列信息的Dataset,例如:
此外,可以使用DataSet的SelectExpr方法執行某些產生單列的SQL表達式,例如:
這兩者都產生DataFrame對象。
第一步結束語
Spark 2.0中使用DataFrames和SQL的第二步
本文第一部分使用了無類型的DataFrame API,其中每行都表示一個Row對象。在下面的內容中,我們將使用更新的DatasetAPI。Dataset是在Apache Spark 1.6中引入的,並已在Spark 2.0中使用DataFrames進行了統一,我們現在有了type DataFrame = Dataset [Row],其中方括弧([和] Scala中的泛型類型,因此類似於Java的)。因此,上面討論的所有諸如select、filter、groupBy、agg、orderBy、limit等方法都以相同的方式使用。
Datasets:返回類型信息
Spark 2.0以前的DataFrame API本質上是一個無類型的API,這也就意味著在編譯期間很可能會因為某些編譯器錯誤,導致無法訪問類型信息。
和之前一樣,我們將在示例中使用Scala,因為我相信Scala最為簡潔。可能涉及的例子:spark將表示SparkSession對象,代表我們的Spark集群。
例子:分析Apache訪問日誌
我們將使用Apache訪問日誌格式數據。先一起回顧Apache日誌中的典型行,如下所示:
此行包含以下部分:
127.0.0.1是向伺服器發出請求的客戶端(遠程主機)IP地址(或主機名,如果可用);
輸出中的第一個-表示所請求的信息(來自遠程機器的用戶身份)不可用;
輸出中的第二個-表示所請求的信息(來自本地登錄的用戶身份)不可用;
[01 / Aug / 1995:00:00:01 -0400]表示伺服器完成處理請求的時間,格式為:[日/月/年:小時:分:秒 時區],有三個部件:」GET /images/launch-logo.gif HTTP / 1.0」;
請求方法(例如,GET,POST等);
端點(統一資源標識符);
和客戶端協議版本(』HTTP / 1.0』)。
1.200這是伺服器返回客戶端的狀態代碼。這些信息非常有價值:成功回復(從2開始的代碼),重定向(從3開始的代碼),客戶端導致的錯誤(以4開頭的代碼),伺服器錯誤(代碼從5開始)。最後一個條目表示返回給客戶端的對象大小。如果沒有返回任何內容則是-或0。
首要任務是創建適當的類型來保存日誌行信息,因此我們使用Scala的case類,具體如下:
默認情況下,case類對象不可變。通過它們的值來比較相等性,而不是通過比較對象引用。
為日誌條目定義了合適的數據結構後,現在需要將表示日誌條目的String轉換為ApacheLog對象。我們將使用正則表達式來達到這一點,參考如下:
可以看到正則表達式包含9個捕獲組,用於表示ApacheLog類的欄位。
使用正則表達式解析訪問日誌時,會面臨以下問題:
一些日誌行的內容大小以-表示,我們想將它轉換為0;
一些日誌行不符合所選正則表達式給出的格式。
為了克服第二個問題,我們使用Scala的「Option」類型來丟棄不對的格式並進行確認。Option也是一個泛型類型,類型Option[ApacheLog]的對象可以有以下形式:
None,表示不存在一個值(在其他語言中,可能使用null);
Some(log)for a ApacheLog-objectlog。
以下為一行函數解析,並為不可解析的日誌條目返回None:
最好的方法是修改正則表達式以捕獲所有日誌條目,但Option是處理一般錯誤或不可解析條目的常用技術。
綜合起來,現在來剖析一個真正的數據集。我們將使用著名的NASA Apache訪問日誌數據集,它可以在ftp://ita.ee.lbl.gov/traces/NASA_access_log_Jul95.gz下載。
下載和解壓縮文件後,首先將其打開為String的Dataset,然後使用正則表達式解析:
現在可以解析數據集:
flatMap將parse_logline函數應用於rawData的每一行,並將Some(ApacheLog)形式的所有結果收集到apacheLogs中,同時丟棄所有不可解析的日誌行(所有結果的形式None)。
我們現在可以對「數據集」執行分析,就像在「DataFrame」上一樣。Dataset中的列名稱只是ApacheLog case類的欄位名稱。
例如,以下代碼列印生成最多404個響應的10個端點:
如前所述,可以將Dataset註冊為臨時視圖,然後使用SQL執行查詢:
上面的SQL查詢具有與上面的Scala代碼相同的結果。
用戶定義的函數(user defined function, UDF)
作為示例,我們使用以下函數提取主機名的頂級域:
如果想在SQL查詢中使用這個函數,首先需要註冊。這是通過SparkSession的udf對象實現的:
函數名後的最後一個下劃線將extractTLD轉換為部分應用函數(partially applied function),這是必要的,如果省略它會導致錯誤。register方法返回一個UserDefinedFunction對象,可以應用於列表達式。
一旦註冊,我們可以在SQL查詢中使用extractTLD:
要獲得註冊的用戶定義函數概述,可以使用spark.catalog對象的listFunctions方法,該對象返回SparkSession定義的所有函數DataFrame:
除了在SQL查詢中使用UDF,我們還可以直接將它們應用到列表達式。以下表達式返回.net域中的所有請求:
值得注意的是,與Spark在諸如filter,select等方法中的構建相反,用戶定義的函數只採用列表達式作為參數。寫extractTLD_UDF(「host」)會導致錯誤。
註冊UDF後,可以將它應用到Column表達式(例如filter裡面),如下所示:
但是不能在SQL查詢中使用它,因為還沒有通過名稱註冊它。
UDF和Catalyst優化器
Spark中用Catalyst優化器來優化所有涉及數據集的查詢,會將用戶定義的函數視作黑盒。值得注意的是,當過濾器操作涉及UDF時,在連接之前可能不會「下推」過濾器操作。我們通過下面的例子來說明。
通常來說,不依賴UDF而是從內置的「Column」表達式進行組合操作可能效果更好。
加盟
最後,我們將討論如何使用以下兩個Dataset方法連接數據集:
join返回一個DataFrame
joinWith返回一對Datasets
以下示例連接兩個表1、表2(來自維基百科):
表1 員工(Employee)
表2 部門(Department)
定義兩個case類,將兩個表編碼為case類對象的序列(由於空間原因不顯示),最後創建兩個Dataset對象:
為了執行內部等連接,只需提供要作為「String」連接的列名稱:
Spark會自動刪除雙列,joined.show給出以下輸出:
表3 輸出
在上面,joined是一個DataFrame,不再是Dataset。連接數據集的行可以作為Seq列名稱給出,或者可以指定要執行的equi-join(inner,outer,left_outer,right_outer或leftsemi)類型。想要指定連接類型的話,需要使用Seq表示法來指定要連接的列。請注意,如果執行內部聯接(例如,獲取在同一部門中工作的所有員工的對):employees.join(employees,Seq(「depID」)),我們沒有辦法訪問連接的DataFrame列:employees.join(employees, Seq(「depID」)).select(「lastname」)會因為重複的列名而失敗。處理這種情況的方法是重命名部分列:
除了等連接之外,我們還可以給出更複雜的連接表達式,例如以下查詢,它將所有部門連接到不知道部門ID且不在本部門工作的員工:
然後可以不指定任何連接條件,在兩個Datasets間執行笛卡爾聯接:departments.join(employees).show。
與joinWith類型保存連接
最後,Dataset的joinWith方法返回一個Dataset,包含原始數據集中匹配行的Scala元組。
表4 返回Dataset
這可以用於自連接後想要規避上述不可訪問列的問題情況。
加入和優化器
Catalyst優化器嘗試通過將「過濾器」操作向「下推」,以儘可能多地優化連接,因此它們在實際連接之前執行。
為了這個工作,用戶定義的函數(UDF),不應該在連接條件內使用用因為這些被Catalyst處理為黑盒子。
結論
我們已經討論了在Apache Spark 2.0中使用類型化的DatasetAPI,如何在Apache Spark中定義和使用用戶定義的函數,以及這樣做的危險。使用UDF可能產生的主要困難是它們會被Catalyst優化器視作黑盒。
點擊展開全文


※58同城移動端Passport SDK的設計與技術細節
※zetcd:脫離ZooKeeper運行ZooKeeper應用程序
※IT人眼中的IT人
※無人駕駛剛剛開始的未來
TAG:CSDN |
※BT 棄用 Apache CloudStack
※從Spark Streaming到Apache Flink: 實時數據流在愛奇藝的演進
※Apache Spark和DL/AI結合,誰與爭鋒?期待Spark3.0的到來!
※專訪朱詩雄:Apache Spark中的全新流式引擎Structured Streaming
※Netcraft 6月Web 伺服器排名:Nginx有望超越Microsoft,Apache持續走低
※重溫 Apache Kafka
※Apache已修復Apache Tomcat中的高危漏洞
※Apache Kafka creator 饒軍談Kafka未來規劃
※Docker 安裝 Apache
※Apache Traffic Server發布新版v
※阿里Apache Dubbo佈道師談Service Mesh技術
※流式處理:使用 Apache Kafka的Streams API 實現 Rabobank 的實時財務告警
※Apache HTTPD 2.4.38 Stable 發布
※Apache Shiro 的Web應用支持指南
※Linux查看Nginx、Apache、MySQL、PHP的編譯參數
※Apache Spark 2.3 重要特性介紹
※建站初學者必知的wordpress在Nginx/Apache/IIS中的偽靜態規則
※Apache HTTP Server v2.4.33穩定版發布
※Apache Storm流計算模型 及WordCount源碼實踐
※RPC框架實踐之:Apache Thrift