當前位置:
首頁 > 科技 > 在Apache Spark 2.0中使用DataFrames和SQL

在Apache Spark 2.0中使用DataFrames和SQL

作者|馬小龍(Dr. Christoph Schubert)

責編|郭芮

Spark 2.0中使用DataFrames和SQL的第一步

Spark 2.0開發的一個動機是讓它可以觸及更廣泛的受眾,特別是缺乏編程技能但可能非常熟悉SQL的數據分析師或業務分析師。因此,Spark 2.0現在比以往更易使用。在這部分,我將介紹如何使用Apache Spark 2.0。並將重點關注DataFrames作為新Dataset API的無類型版本。

到Spark 1.3,彈性分布式數據集(Resilient Distributed Dataset,RDD)一直是Spark中的主要抽象。RDD API是在Scala集合框架之後建模的,因此間接提供了Hadoop Map / Reduce熟悉的編程原語以及函數式編程(Map、Filter、Reduce)的常用編程原語。雖然RDD API比Map / Reduce範例更具表達性,但表達複雜查詢仍然很繁瑣,特別是對於來自典型數據分析背景的用戶,他們可能熟悉SQL,或來自R/Python編程語言的數據框架。

Spark 1.3引入了DataFrames作為RDD頂部的一個新抽象。DataFrame是具有命名列的行集合,在R和Python相應包之後建模。

Spark 1.6看到了Dataset類作為DataFrame的類型化版本而引入。在Spark 2.0中,DataFrames實際上是Datasets的特殊版本,我們有type DataFrame = Dataset [Row],因此DataFrame和Dataset API是統一的。

表面上,DataFrame就像SQL表。Spark 2.0將這種關係提升到一個新水平:我們可以使用SQL來修改和查詢DataSets和DataFrames。通過限制表達數量,有助於更好地優化。數據集也與Catalyst優化器良好集成,大大提高了Spark代碼的執行速度。因此,新的開發應該利用DataFrames。

在本文中,我將重點介紹Spark 2.0中DataFrames的基本用法。我將嘗試強調Dataset API和SQL間的相似性,以及如何使用SQL和Dataset API互換地查詢數據。藉由整個代碼生成和Catalyst優化器,兩個版本將編譯相同高效的代碼。

代碼示例以Scala編程語言給出。我認為這樣的代碼最清晰,因為Spark本身就是用Scala編寫的。

SparkSession

SparkSession類替換了Apache Spark 2.0中的SparkContext和SQLContext,並為Spark集群提供了唯一的入口點。

為了向後兼容,SparkSession對象包含SparkContext和SQLContext對象,見下文。當我們使用互動式Spark shell時,為我們創建一個名為spark的SparkSession對象。

創建DataFrames

DataFrame是具有命名列的表。最簡單的DataFrame是使用SparkSession的range方法來創建:

使用show給我們一個DataFrame的表格表示,可以使用describe來獲得數值屬性概述。describe返回一個DataFrame:

觀察到Spark為數據幀中唯一的列選擇了名稱id。 對於更有趣的示例,請考慮以下數據集:

在這種情況下,customerDF對象將有名為_1、_2、_3、_4的列,它們以某種方式違反了命名列的目的。可以通過重命名列來恢復:

使用printSchema和describe提供以下輸出:

一般來說我們會從文件載入數據。SparkSession類為提供了以下方法:

在這裡我們讓Spark從CSV文件的第一行提取頭信息(通過設置header選項為true),並使用數字類型(age和total)將數字列轉換為相應的數據類型 inferSchema選項。

其他可能的數據格式包括parquet文件和通過JDBC連接讀取數據的可能性。

基本數據操作

我們現在將訪問DataFrame中數據的基本功能,並將其與SQL進行比較。

沿襲,操作,動作和整個階段的代碼生成

相同的譜系概念,轉換操作和行動操作之間的區別適用於Dataset和RDD。我們下面討論的大多數DataFrame操作都會產生一個新的DataFrame,但實際上不執行任何計算。要觸發計算,必須調用行動操作之一,例如show(將DataFrame的第一行作為表列印),collect(返回一個Row對象的Array),count(返回DataFrame中的行數),foreach(對每一行應用一個函數)。這是惰性求值(lazy evaluation)的常見概念。

下面Dataset類的所有方法實際上依賴於所有數據集的有向非循環圖(Directed Acyclic Graph,DAG),從現有數據集中創建一個新的「數據集」。這被稱為數據集的沿襲。僅使用調用操作時,Catalyst優化程序將分析沿襲中的所有轉換,並生成實際代碼。這被稱為整階段代碼生成,並且負責Dataset對RDD的性能改進。

Row-行對象

Row類在DataFrame的一行不帶類型數據值中充當容器。通常情況下我們不會自己創建Row對象,而是使用下面的語法:

Row對象元素通過位置(從0開始)或者使用apply進行訪問:

它會產生一個Any的對象類型。或者最好使用get,方法之一:

因為這樣就不會出現原始類型的開銷。我們可以使用isNull方法檢查行中的一個條目是否為』null』:

我們現在來看看DataFrame類最常用的轉換操作:

select

我們將要看的第一個轉換是「select」,它允許我們對一個DataFrame的列進行投影和變換。

引用列

通過它們的名稱有兩種方法來訪問DataFrame列:可以將其引用為字元串;或者可以使用apply方法,col-方法或$以字元串作為參數並返回一個Column(列)對象。所以customerDF.col(「customer」)和customerDF(「customer」)都是customerDF的第一列。

選擇和轉換列

最簡單的select轉換形式允許我們將DataFrame投影到包含較少列的DataFrame中。下面的四個表達式返回一個只包含customer和province列的DataFrame:

不能在單個select方法中調用混合字元串和列參數:customerDF.select(「customer」, $」province」)導致錯誤。

使用Column類定義的運算符,可以構造複雜的列表達式:

應用show得到以下結果:

列別名

新數據集的列名稱從用於創建的表達式中派生而來,我們可以使用alias或as將列名更改為其他助記符:

產生與前面相同內容的DataFrame,但使用名為name,newAge和isZJ的列。

Column類包含用於執行基本數據分析任務的各種有效方法。我們將參考讀者文檔的詳細信息。

最後,我們可以使用lit函數添加一個具有常量值的列,並使用when和otherwise重新編碼列值。 例如,我們添加一個新列「ageGroup」,如果「age

給出以下DataFrame:

drop是select相對的轉換操作;它返回一個DataFrame,其中刪除了原始DataFrame的某些列。

最後可使用distinct方法返回原始DataFrame中唯一值的DataFrame:

返回一個包含單個列的DataFrame和包含值的三行:「北京」、「江蘇」、「浙江」。

filter

第二個DataFrame轉換是Filter方法,它在DataFrame行中進行選擇。有兩個重載方法:一個接受一個Column,另一個接受一個SQL表達式(一個String)。例如,有以下兩種等效方式來過濾年齡大於30歲的所有客戶:

Filter轉換接受一般的布爾連接符and(和)和or(或):

我們在SQL版本中使用單個等號,或者使用三等式「===」(Column類的一個方法)。在==運算符中使用Scala的等於符號會導致錯誤。我們再次引用Column類文檔中的有用方法。

聚合(aggregation)

執行聚合是進行數據分析的最基本任務之一。例如,我們可能對每個訂單的總金額感興趣,或者更具體地,對每個省或年齡組的總金額或平均金額感興趣。可能還有興趣了解哪個客戶的年齡組具有高於平均水平的總數。借用SQL,我們可以使用GROUP BY表達式來解決這些問題。DataFrames提供了類似的功能。可以根據一些列的值進行分組,同樣,還可以使用字元串或「Column」對象來指定。

我們將使用以下DataFrame:

withColumn方法添加一個新的列或替換一個現有的列。

聚合數據分兩步進行:一個調用GroupBy方法將特定列中相等值的行組合在一起,然後調用聚合函數,如sum(求和值),max(最大值)或為原始DataFrame中每組行計算的「avg」(平均值)。從技術上來說,GroupBy會返回一個RelationalGroupedDataFrame類的對象。RelationalGroupedDataFrame包含max、min、avg、mean和sum方法,所有這些方法都對DataFrame的數字列執行指定操作,並且可以接受一個String-參數來限制所操作的數字列。此外,我們有一個count方法計算每個組中的行數,還有一個通用的agg方法允許我們指定更一般的聚合函數。所有這些方法都會返回一個DataFrame。

例如:

輸出以下內容:

customerAgeGroupDF.groupBy(「agegroup」).max().show()輸出:

最後,customerAgeGroupDF.groupBy(「agegroup」).min(「age」, 「total」).show()輸出:

還有一個通用的agg方法,接受複雜的列表達式。agg在RelationalGroupedDataFrame和Dataset中都可用。後一種方法對整個數據集執行聚合。這兩種方法都允許我們給出列表達式的列表:

輸出:

給出以下輸出:

其中null值表示沒有省/年齡組的組合。Pivot的重載版本接受一個值列表以進行透視。這一方面允許我們限制列數,另一方面更加有效,因為Spark不需要計算樞軸列中的所有值。例如:

給出以下輸出:

最後,使用樞紐數據也可以進行複雜聚合:

輸出:

這裡=!=是Column類的「不等於」方法。

排序和限制

OrderBy方法允許我們根據一些列對數據集的內容進行排序。和以前一樣,我們可以使用Strings或Column對象來指定列:customerDF.orderBy(」age」)和 customerDF.orderBy($」age」)給出相同的結果。默認排序順序為升序。如果要降序排序,可以使用Column類的desc方法或者desc函數:

觀察到desc函數返回了一個Column-object,任何其他列也需要被指定為Column-對象。

最後,limit方法返回一個包含原始DataFrame中第一個n行的DataFrame。

DataFrame方法與SQL對比

我們已經發現,DataFrame類的基本方法與SQLselect語句的部分密切相關。下表總結了這一對應關係:

到目前為止連接(join)在我們的討論中已經缺失。Spark的DataFrame支持連接,我們將在文章的下一部分討論它們。

下面將討論完全類型化的DataSets API,連接和用戶定義的函數(UDF)。

使用SQL來處理DataFrames

我們還在Apache Spark 2.0中直接執行SQL語句。SparkSession的SQL方法返回一個DataFrame。此外,DataFrame的selectExp方法也允許我們為單列指定SQL表達式,如下所示。為了能夠引用SQL表達式中的DataFrame,首先有必要將DataFrame註冊為臨時表,在Spark 2中稱為臨時視圖(temporary view,簡稱為tempview)。DataFrame為我們提供了以下兩種方法:

createTempView創建一個新視圖,如果具有該名稱的視圖已存在,則拋出一個異常;

createOrReplaceTempView創建一個用來替換的臨時視圖。

兩種方法都將視圖名稱作為唯一參數。

註冊表後,可以使用SparkSession的SQL方法來執行SQL語句:

返回具有以下內容的DataFrame:

SparkSession類的catalog欄位是Catalog類的一個對象,具有多種處理會話註冊表和視圖的方法。例如,Catalog的ListTables方法返回一個包含所有已註冊表信息的Dataset:

會返回一個包含有關註冊表「tableName」中列信息的Dataset,例如:

此外,可以使用DataSet的SelectExpr方法執行某些產生單列的SQL表達式,例如:

這兩者都產生DataFrame對象。

第一步結束語

Spark 2.0中使用DataFrames和SQL的第二步

本文第一部分使用了無類型的DataFrame API,其中每行都表示一個Row對象。在下面的內容中,我們將使用更新的DatasetAPI。Dataset是在Apache Spark 1.6中引入的,並已在Spark 2.0中使用DataFrames進行了統一,我們現在有了type DataFrame = Dataset [Row],其中方括弧([和] Scala中的泛型類型,因此類似於Java的)。因此,上面討論的所有諸如select、filter、groupBy、agg、orderBy、limit等方法都以相同的方式使用。

Datasets:返回類型信息

Spark 2.0以前的DataFrame API本質上是一個無類型的API,這也就意味著在編譯期間很可能會因為某些編譯器錯誤,導致無法訪問類型信息。

和之前一樣,我們將在示例中使用Scala,因為我相信Scala最為簡潔。可能涉及的例子:spark將表示SparkSession對象,代表我們的Spark集群。

例子:分析Apache訪問日誌

我們將使用Apache訪問日誌格式數據。先一起回顧Apache日誌中的典型行,如下所示:

此行包含以下部分:

127.0.0.1是向伺服器發出請求的客戶端(遠程主機)IP地址(或主機名,如果可用);

輸出中的第一個-表示所請求的信息(來自遠程機器的用戶身份)不可用;

輸出中的第二個-表示所請求的信息(來自本地登錄的用戶身份)不可用;

[01 / Aug / 1995:00:00:01 -0400]表示伺服器完成處理請求的時間,格式為:[日/月/年:小時:分:秒 時區],有三個部件:」GET /images/launch-logo.gif HTTP / 1.0」;

請求方法(例如,GET,POST等);

端點(統一資源標識符);

和客戶端協議版本(』HTTP / 1.0』)。

1.200這是伺服器返回客戶端的狀態代碼。這些信息非常有價值:成功回復(從2開始的代碼),重定向(從3開始的代碼),客戶端導致的錯誤(以4開頭的代碼),伺服器錯誤(代碼從5開始)。最後一個條目表示返回給客戶端的對象大小。如果沒有返回任何內容則是-或0。

首要任務是創建適當的類型來保存日誌行信息,因此我們使用Scala的case類,具體如下:

默認情況下,case類對象不可變。通過它們的值來比較相等性,而不是通過比較對象引用。

為日誌條目定義了合適的數據結構後,現在需要將表示日誌條目的String轉換為ApacheLog對象。我們將使用正則表達式來達到這一點,參考如下:

可以看到正則表達式包含9個捕獲組,用於表示ApacheLog類的欄位。

使用正則表達式解析訪問日誌時,會面臨以下問題:

一些日誌行的內容大小以-表示,我們想將它轉換為0;

一些日誌行不符合所選正則表達式給出的格式。

為了克服第二個問題,我們使用Scala的「Option」類型來丟棄不對的格式並進行確認。Option也是一個泛型類型,類型Option[ApacheLog]的對象可以有以下形式:

None,表示不存在一個值(在其他語言中,可能使用null);

Some(log)for a ApacheLog-objectlog。

以下為一行函數解析,並為不可解析的日誌條目返回None:

最好的方法是修改正則表達式以捕獲所有日誌條目,但Option是處理一般錯誤或不可解析條目的常用技術。

綜合起來,現在來剖析一個真正的數據集。我們將使用著名的NASA Apache訪問日誌數據集,它可以在ftp://ita.ee.lbl.gov/traces/NASA_access_log_Jul95.gz下載。

下載和解壓縮文件後,首先將其打開為String的Dataset,然後使用正則表達式解析:

現在可以解析數據集:

flatMap將parse_logline函數應用於rawData的每一行,並將Some(ApacheLog)形式的所有結果收集到apacheLogs中,同時丟棄所有不可解析的日誌行(所有結果的形式None)。

我們現在可以對「數據集」執行分析,就像在「DataFrame」上一樣。Dataset中的列名稱只是ApacheLog case類的欄位名稱。

例如,以下代碼列印生成最多404個響應的10個端點:

如前所述,可以將Dataset註冊為臨時視圖,然後使用SQL執行查詢:

上面的SQL查詢具有與上面的Scala代碼相同的結果。

用戶定義的函數(user defined function, UDF)

作為示例,我們使用以下函數提取主機名的頂級域:

如果想在SQL查詢中使用這個函數,首先需要註冊。這是通過SparkSession的udf對象實現的:

函數名後的最後一個下劃線將extractTLD轉換為部分應用函數(partially applied function),這是必要的,如果省略它會導致錯誤。register方法返回一個UserDefinedFunction對象,可以應用於列表達式。

一旦註冊,我們可以在SQL查詢中使用extractTLD:

要獲得註冊的用戶定義函數概述,可以使用spark.catalog對象的listFunctions方法,該對象返回SparkSession定義的所有函數DataFrame:

除了在SQL查詢中使用UDF,我們還可以直接將它們應用到列表達式。以下表達式返回.net域中的所有請求:

值得注意的是,與Spark在諸如filter,select等方法中的構建相反,用戶定義的函數只採用列表達式作為參數。寫extractTLD_UDF(「host」)會導致錯誤。

註冊UDF後,可以將它應用到Column表達式(例如filter裡面),如下所示:

但是不能在SQL查詢中使用它,因為還沒有通過名稱註冊它。

UDF和Catalyst優化器

Spark中用Catalyst優化器來優化所有涉及數據集的查詢,會將用戶定義的函數視作黑盒。值得注意的是,當過濾器操作涉及UDF時,在連接之前可能不會「下推」過濾器操作。我們通過下面的例子來說明。

通常來說,不依賴UDF而是從內置的「Column」表達式進行組合操作可能效果更好。

加盟

最後,我們將討論如何使用以下兩個Dataset方法連接數據集:

join返回一個DataFrame

joinWith返回一對Datasets

以下示例連接兩個表1、表2(來自維基百科):

表1 員工(Employee)

表2 部門(Department)

定義兩個case類,將兩個表編碼為case類對象的序列(由於空間原因不顯示),最後創建兩個Dataset對象:

為了執行內部等連接,只需提供要作為「String」連接的列名稱:

Spark會自動刪除雙列,joined.show給出以下輸出:

表3 輸出

在上面,joined是一個DataFrame,不再是Dataset。連接數據集的行可以作為Seq列名稱給出,或者可以指定要執行的equi-join(inner,outer,left_outer,right_outer或leftsemi)類型。想要指定連接類型的話,需要使用Seq表示法來指定要連接的列。請注意,如果執行內部聯接(例如,獲取在同一部門中工作的所有員工的對):employees.join(employees,Seq(「depID」)),我們沒有辦法訪問連接的DataFrame列:employees.join(employees, Seq(「depID」)).select(「lastname」)會因為重複的列名而失敗。處理這種情況的方法是重命名部分列:

除了等連接之外,我們還可以給出更複雜的連接表達式,例如以下查詢,它將所有部門連接到不知道部門ID且不在本部門工作的員工:

然後可以不指定任何連接條件,在兩個Datasets間執行笛卡爾聯接:departments.join(employees).show。

與joinWith類型保存連接

最後,Dataset的joinWith方法返回一個Dataset,包含原始數據集中匹配行的Scala元組。

表4 返回Dataset

這可以用於自連接後想要規避上述不可訪問列的問題情況。

加入和優化器

Catalyst優化器嘗試通過將「過濾器」操作向「下推」,以儘可能多地優化連接,因此它們在實際連接之前執行。

為了這個工作,用戶定義的函數(UDF),不應該在連接條件內使用用因為這些被Catalyst處理為黑盒子。

結論

我們已經討論了在Apache Spark 2.0中使用類型化的DatasetAPI,如何在Apache Spark中定義和使用用戶定義的函數,以及這樣做的危險。使用UDF可能產生的主要困難是它們會被Catalyst優化器視作黑盒。

點擊展開全文

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 CSDN 的精彩文章:

58同城移動端Passport SDK的設計與技術細節
zetcd:脫離ZooKeeper運行ZooKeeper應用程序
IT人眼中的IT人
無人駕駛剛剛開始的未來

TAG:CSDN |

您可能感興趣

BT 棄用 Apache CloudStack
從Spark Streaming到Apache Flink: 實時數據流在愛奇藝的演進
Apache Spark和DL/AI結合,誰與爭鋒?期待Spark3.0的到來!
專訪朱詩雄:Apache Spark中的全新流式引擎Structured Streaming
Netcraft 6月Web 伺服器排名:Nginx有望超越Microsoft,Apache持續走低
重溫 Apache Kafka
Apache已修復Apache Tomcat中的高危漏洞
Apache Kafka creator 饒軍談Kafka未來規劃
Docker 安裝 Apache
Apache Traffic Server發布新版v
阿里Apache Dubbo佈道師談Service Mesh技術
流式處理:使用 Apache Kafka的Streams API 實現 Rabobank 的實時財務告警
Apache HTTPD 2.4.38 Stable 發布
Apache Shiro 的Web應用支持指南
Linux查看Nginx、Apache、MySQL、PHP的編譯參數
Apache Spark 2.3 重要特性介紹
建站初學者必知的wordpress在Nginx/Apache/IIS中的偽靜態規則
Apache HTTP Server v2.4.33穩定版發布
Apache Storm流計算模型 及WordCount源碼實踐
RPC框架實踐之:Apache Thrift