當前位置:
首頁 > 知識 > 企業大數據平台MapReduce應用之Join實踐!

企業大數據平台MapReduce應用之Join實踐!

在《Hadoop從入門到精通》大型專題的上半部分(專題鏈接見文末),我們講解了Hadoop基本原理並且知道如何在Hadoop中組織、移動和存儲數據。接下來,我們將探討如何簡化企業大數據技術應用。本章主要研究大數據模式,針對優化MapReduce操作技術,例如對大型數據集進行連接和排序,這些技術將讓任務運行更快,並允許更有效地使用計算資源。

MapReduce包含許多強大的功能,本章重點介紹連接、排序和採樣。這三種模式很重要,因為它們是被頻繁使用的大數據操作,並且滿足企業集群獲取更優性能的目標。

6.1 Joining

Join是用於將關係組合在一起的操作。在MapReduce中,Join適用於組合兩個或更多數據集的情況,例如,希望將用戶(從OLTP資料庫中提取)與日誌文件(包含用戶活動詳細信息)組合在一起。在以下情況中,將數據集組合在一起是有用的,例如:

  • 希望根據用戶人口統計信息(例如用戶習慣差異,比較青少年和30多歲用戶的差異)來匯總數據。
  • 希望向未在指定天數內使用該網站的用戶發送電子郵件。
  • 希望創建反饋循環來檢查用戶瀏覽習慣,允許系統向用戶推薦以前未探索的站點功能。

所有這些場景都要求將數據集連接在一起,兩種最常見的Join類型是內連接和外連接。內連接比較關係L和R中的所有元組,並在滿足連接謂詞時生成結果。相反,外連接不需要基於連接謂詞匹配元組,即使不存在匹配也可保留L或R的記錄。

企業大數據平台MapReduce應用之Join實踐!

打開今日頭條,查看更多精彩圖片

圖6.1 不同類型的Join策略,顯示為維恩圖。陰影區域顯示在連接中保留的數據。

本節,我們將介紹MapReduce的三種Join策略,支持兩種最常見的Join類型(內部和外部)。通過利用MapReduce排序—合併架構,可以在map或reduce端進行連接:

  • Repartition join——將兩個或多個大型數據集連接在一起的情況下減少側連接;
  • Replication join ——map端連接,適用於其中一個數據集足夠小可進行緩存的情況;
  • Semi-join——map側連接,其中一個數據集太大而無法容納到內存中,但經過某些過濾後可以減少到適合內存的大小

Join data

這些技術都將利用兩個數據集來連接用戶和日誌。用戶數據包含用戶名,年齡和狀態。完整數據集如下:

企業大數據平台MapReduce應用之Join實踐!

日誌數據集顯示可以從應用程序或Web伺服器日誌中提取一些基於用戶的活動。數據包括用戶名,操作和源IP地址,以下是完整的數據集:

企業大數據平台MapReduce應用之Join實踐!

讓我們先看一下應該根據數據選擇哪種Join方法。

實踐:為數據選擇最佳Join策略

本節介紹的每種Join策略都有不同的優點和缺點,確定哪種策略最適合正在使用的數據可能具有挑戰性。該技術可以查看數據中的不同特徵,並使用該信息選擇Join數據的最佳方法。

問題

選擇Join數據的最佳方法。

解決方案

使用數據驅動的決策樹來選擇最佳的Join策略。

討論

圖6.2顯示了可以使用的決策樹。

企業大數據平台MapReduce應用之Join實踐!

圖6.2 用於選擇Join策略的決策樹

決策樹可歸納為以下三點:

  • 如果某個數據集小到足以放入mapper內存中,則maponly複製的Join是有效的。
  • 如果兩個數據集都很大,並且其中一個可以通過預過濾縮小與另一個數據集的不匹配,則Semi-join效果更好。
  • 如果無法預處理數據且數據太大而無法緩存 - 這意味著必須在reducer中執行Join

無論選擇哪種策略,應該在Join中執行的最基本活動之一是使用過濾器和投影。

過濾器,投影和下推

在mapper中有效使用過濾器和投影可以減少MapReduce中使用和溢出的數據量。此技術還包括一種稱為下推的更高級優化,這可以進一步改善數據管道。

問題

正在處理大量數據,並且希望有效管理輸入數據以優化作業。

解決方案

過濾並投影數據,僅包含將在工作中使用的數據。

討論

過濾和投影數據是在Join數據時以及在處理數據時可以進行的最大優化。這是一種適用於任何OLAP活動的技術,它在Hadoop中同樣有效。

為什麼過濾和投影如此重要? 他們減少了處理管道需要處理的數據量。使用較少的數據非常重要,尤其是當跨網路和磁碟邊界推送數據時。MapReduce中隨機播放步驟非常昂貴,因為數據正在寫入本地磁碟和整個網路,因此更少的位元組數就意味著作業和MapReduce框架的工作量更少,這意味著CPU,磁碟和網路設備上有更少的負載。


企業大數據平台MapReduce應用之Join實踐!

圖6.3 使用過濾器和投影來減少數據大小

過濾器和投影應儘可能靠近數據源執行,在MapReduce中,這項工作最好在mapper中執行。以下代碼顯示了過濾器示例,該過濾器排除了30歲以下的用戶,並且只投影其名稱和狀態:

企業大數據平台MapReduce應用之Join實踐!

在連接中使用過濾器的挑戰是:所加入的所有數據集都可能包含要過濾的欄位。如果是這種情況,請查看使用Bloom過濾器解決此挑戰。

投影和謂詞下推

在使用可以基於下推來跳過記錄或整個塊的存儲格式時,將投影和謂詞下推存儲格式進一步過濾非常有效。

企業大數據平台MapReduce應用之Join實踐!

表6.1存儲格式及其下推支持

關於下推

很明顯,Parquet的一大優勢是能夠支持兩種類型的下推。如果正在處理大型數據集並且只定期處理記錄和欄位的子集,那麼應該將Parquet視為存儲格式。

6.1.1 map端Join

首先介紹在mapper中執行Join。如果數據可以支持map側Join,那麼將是最佳Join策略。對比mapper和reducer之間重排數據的開銷,減小數據集的大小以便可以在map端Join是明智的。

本節將介紹三種不同風格的map側Join。一是其中一個數據集已足夠小可以在存儲器高速緩存的情況下工作良好;二是要求在過濾掉兩個數據集中存在連接鍵的記錄之後,一個數據集可以適合存儲器;三是適用於以特定方式對文件進行排序和分布的情況。

實踐:Join數據,其中一個數據集可以適合內存

replicated join是一個map端連接,它從其函數中獲取名稱。最小的數據集將複製到所有map主機,其操作取決於一個事實,即Join數據集之一足夠小,可以緩存在內存中。

問題

希望對數據執行Join,其中一個數據集適合mapper內存。

解決方案

使用分散式緩存較小數據集,並在較大數據集流式傳輸到mapper時執行Join。

討論

使用分散式緩存將小數據集複製到運行map tasks2的節點,並使用每個map任務的初始化方法將其載入到哈希表中。使用從大數據集反饋到map函數的每個記錄中的鍵來查找小數據集哈希表,並在大數據集和小數據集中與Join值匹配的所有記錄之間執行Join。 圖6.4顯示了該方法在MapReduce中的工作原理。

企業大數據平台MapReduce應用之Join實踐!

圖6.4僅map的複製Join

以下代碼執行此Join:

企業大數據平台MapReduce應用之Join實踐!


企業大數據平台MapReduce應用之Join實踐!


企業大數據平台MapReduce應用之Join實踐!

要執行此Join,首先需要將要加入的兩個文件複製到HDFS中的主目錄:


企業大數據平台MapReduce應用之Join實踐!

接下來,運行作業並在完成後檢查其輸出:

企業大數據平台MapReduce應用之Join實踐!

Hive

通過在執行之前配置作業,可以將Hive連接轉換為map端連接。重要的是最大的表是查詢中的最後一個表,因為這是Hive將在mapper中流式傳輸的表(其他表將被緩存):

企業大數據平台MapReduce應用之Join實踐!

不再強調map-join提示Hive 0.11實現了一些更改,這些更改表面上刪除了map-join提示作為SELECT語句的一部分,但是不清楚在哪些情況下不再需要提示。全連接或右外連接不支持map端Join,它們將作為重新分區Join執行(reduce端連接)。

總結

內部和外部Join都可以使用複製Join來支持。此技術實現了內部Join,因為只發出了兩個數據集中具有相同鍵的記錄。要將其轉換為外部Join,可以流式傳輸到mapper中的值,這些值在哈希表中沒有相應的條目,可以類似地跟蹤與流式map記錄匹配的哈希表條目並使用方法在map任務結束時從哈希表中發出與任何map輸入都不匹配的記錄。

在數據集足夠小以便在內存中緩存的情況下,有沒有辦法進一步優化map側Join?現在是時候看看Semi-join了。

實踐:在大型數據集上執行Semi-join

想像一下,正在使用兩個大型數據集,例如用戶日誌和來自OLTP資料庫的用戶數據。這些數據集中的任何一個都不足夠小以在map內存中緩存,因此必須自己執行reducer端連接。問自己一個問題:如果要刪除與其他數據集中記錄不匹配的所有記錄,其中一個數據集是否適合map端內存?

在示例中,日誌中顯示的用戶很可能只佔OLTP資料庫整體用戶集的一小部分,因此通過刪除日誌中未顯示的所有OLTP用戶,可以將數據集縮小到適合內存的大小。如果是這種情況,Semi-join就是解決方案。 圖6.5顯示了執行Semi-join所需執行的三個MapReduce作業。

問題

希望將大型數據集Join在一起,同時避免混洗和排序階段的開銷。

解決方案

在此技術中,我們將使用三個MapReduce作業將兩個數據集連接在一起,以避免reducer端連接的開銷。此技術在處理大型數據集的情況下非常有用,過濾掉與其他數據集不匹配的記錄,可以將作業減小到可以放入任務內存的大小。

討論

該技術將分解圖6.5中所示的三個作業。

Job1

第一個MapReduce作業的功能是生成一組存在於日誌文件中的唯一用戶名,可以通過讓map函數執行用戶名的投影來執行此操作,然後使用reducer發出用戶名。要減少map和reduce階段之間傳輸的數據量,將使map任務緩存HashSet中的所有用戶名,並在清理方法中發出HashSet值。圖6.6顯示了此作業的流程。

企業大數據平台MapReduce應用之Join實踐!

圖6.5構成Semi-join的三個MapReduce作業


企業大數據平台MapReduce應用之Join實踐!

圖6.6 Semi-join中的第一個作業生成一組存在於日誌文件中的唯一用戶名。

以下代碼顯示了MapReduce作業:

企業大數據平台MapReduce應用之Join實踐!

第一個作業的結果是出現在日誌文件中的一組唯一用戶。

Job2

第二步是精心過濾MapReduce作業,其目標是從用戶數據集中刪除日誌數據中不存在的用戶。 這是一個僅map端作業,使用複製的Join來緩存日誌文件中顯示的用戶名,並將其與用戶數據集Join起來。Job1的唯一用戶輸出將遠遠小於整個用戶數據集,這使其成為緩存的自然選擇。圖6.7顯示了此作業的流程。

企業大數據平台MapReduce應用之Join實踐!

圖6.7 Semi-join中的第二個作業從日誌數據中缺少的用戶數據集中刪除用戶。

Job 3

在最後一步中,我們將把Job 2生成的過濾用戶與原始用戶日誌結合起來。過濾後的用戶現在應該足夠少以留在內存中,允許將它們放入分散式緩存中,圖6.8顯示了此作業的流程。

企業大數據平台MapReduce應用之Join實踐!

圖6.8 Semi-join中的第三個作業將Job2生成的用戶與原始用戶日誌組合在一起。

運行代碼並查看前面每個步驟生成的輸出:

企業大數據平台MapReduce應用之Join實踐!

輸出顯示Semi-join和最終Join輸出中作業的邏輯進展。

總結

在這種技術中,我們研究了如何使用Semi-join將兩個數據集組合在一起。Semi-join構造涉及比其他Join更多的步驟,但即使在處理大型數據集時,它也是使用map側Join的有效方式(需要注意的是,其中一個數據集必須縮小到適合內存的大小))。

實踐:加入預分類和預分區數據

Map側Join是最有效的技術,前兩個map側策略都要求其中一個數據集可以載入到內存中。 如果正在使用無法按照先前技術的要求縮小到較小尺寸的大型數據集,該怎麼辦? 在這種情況下,複合map端連接可能是可行的,但僅當滿足以下所有要求時:

  • 沒有任何數據集可以完整地載入到內存中。
  • 數據集全部按Join鍵排序。
  • 每個數據集具有相同數量的文件。
  • 每個數據集中的文件N包含相同的連接密鑰K.
  • 每個文件都小於HDFS塊的大小,因此不會拆分分區。 或者,數據的輸入拆分不會拆分文件。

圖6.9顯示了有助於複合Join的已排序和分區文件的示例,此技術將介紹如何在作業中使用複合Join。

問題

希望對已排序的分區數據執行map端Join。

企業大數據平台MapReduce應用之Join實踐!

圖6.9 用作複合Join的輸入已排序文件示例

解決方案

使用與MapReduce捆綁在一起的CompositeInputFormat。

討論

CompositeInputFormat非常強大,支持內部和外部聯接。 以下示例顯示如何對數據執行內部聯接:

企業大數據平台MapReduce應用之Join實踐!

複合Join要求輸入文件按鍵排序(在我們的示例中是用戶名),因此在運行示例之前,需要對這兩個文件進行排序並將它們上傳到HDFS:

企業大數據平台MapReduce應用之Join實踐!

接下來,運行作業並在完成後檢查其輸出:

企業大數據平台MapReduce應用之Join實踐!


企業大數據平台MapReduce應用之Join實踐!

Hive

Hive支持稱為排序合併Join的map端連接,其操作方式與此技術的操作方式大致相同。要求在兩個表中對所有鍵進行排序,並且必須將表格劃分為相同數量的bucket。需要指定一些配置,並使用MAPJOIN提示來啟用此行為:

企業大數據平台MapReduce應用之Join實踐!

總結

複合Join實際上支持N路連接,因此可以Join兩個以上的數據集。但是,所有數據集必須符合同一組限制,由於每個mapper使用兩個或多個數據輸入,因此數據位置只能與其中一個數據集一起存在,因此其餘數據集必須從其他數據節點流式傳輸。對於在運行Join之前數據必須存在的方式,此Join肯定是限制性的,但如果數據已經按照這種方式布局,那麼這是一種加入數據並避免基於reducer的shuffle開銷的好方法。

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 IT168企業級 的精彩文章:

一張圖讀懂H3C-MSG360-4-PWR多業務網關
從誕生到成長!數家名企大數據平台應用演進之路解析!

TAG:IT168企業級 |