當前位置:
首頁 > 知識 > 技術分享:Flink 基本的 API

技術分享:Flink 基本的 API

Flink使用 DataSet 和 DataStream 代表數據集。DateSet 用於批處理,代表數據是有限的;而 DataStream 用於流數據,代表數據是無界的。數據集中的數據是不可以變的,也就是說不能對其中的元素增加或刪除。我們通過數據源創建 DataSet 或者 DataStream ,通過 map,filter 等轉換(transform)操作對數據集進行操作產生新的數據集。

獲得 execution 環境

創建輸入數據

在數據集上進行轉換操作(下文統一稱為:transform)

輸出結果數據

觸發程序執行下面我們將介紹編寫 Flink 程序所涉及的基本 API。

輸入和輸出

首先,需要獲得 execution 環境,Flink 提供了一下以下三種方式:

getExecutionEnvironment()createLocalEnvironment()createRemoteEnvironment(Stringhost,intport,String...jarFiles)

以第一個為例創建 execution 環境的代碼如下

批處理:

流處理:

words.txt 文件內容:

上面代碼創建了 execution 環境,同時利用 env 創建了輸入源。在數據集上調用 print 方法可以將數據輸出到控制台,當然也可以調用 writeAsText 等方法將數據輸出到其他介質。上面流處理最後一行代碼調用了 execute 方法,在流處理中需要顯式調用該方法觸發程序的執行。

上述代碼有兩種方式運行,一種是直接在 IDE 中執行,就像運行一個普通的 Java 程序,Flink 將啟動一個本地的環境執行程序。另一種方式是將程序打包,提交到 Flink 集群運行。上面例子基本包含了一個 Flink 程序的基本骨架,但是並沒有對數據集進行更多的 transform 操作,下面我們簡單介紹基本 transform 操作。

map操作

這裡的 map 操作類似 MapReduce 中的 map,對數據進行解析,處理。示例如下

批處理:

流處理

這裡批處理和流處理除了數據集的類型不同,其餘寫法都一樣。就是將每個單詞映射成了一個 (單詞, 1) 二元組。與 map 類似的 transform 還有 filter,過濾不需要的記錄,讀者可以自行嘗試。

指定 key

大數據處理經常需要按照某個維度進行處理,也就是需要指定 key。在 DataSet 中使用 groupBy 指定 key,而在 DataStream 中使用 keyBy 指定 key。這裡我們以 keyBy 為例進行介紹。

Flink 的數據模型並不是基於 key-value 的,key 是虛擬的,可以看做是定義在數據上的函數。

在 Tuple 中定義 key

KeyedStreamTuple2String, Integer, Tuple keyed = words.keyBy(0); //0 代表 Tuple2 (二元組)中第一個元素

KeyedStreamTuple2String, Integer, Tuple keyed = words.keyBy(0,1); //0,1 代表二元組中第一個和第二個元素作為 key\

對於嵌套的 tuple

DataStreamTuple3Tuple2Integer, Float,String,Long ds;

ds.keyBy(0) 將會把 Tuple2Integer, Float 整體作為 key。

用欄位表達式指定 key

這裡指定 WC 對象的 word 欄位作為 key。欄位表達式語法如下:

Java對象使用欄位名作為key,例子如上

對於 Tuple 類型使用欄位名(f0, f1,...)或者偏移(從0開始)指定 key,例如 f0 和 5 分別代表 Tuple 第一個欄位和第六個欄位

Java 對象和 Tuple 嵌套的欄位作為 key,例如:f1.user.zip 表示 Tuple 第二個欄位中的 user 對象中的 zip 欄位作為 key

通配符 * 代表選擇所有類型作為 key欄位表達式的舉例

count: WC類的 count 欄位

complex: complex 的所有欄位(遞歸地)

complex.word.f2: ComplexNestedClass 類中 word 三元組的第三個欄位

complex.hadoopCitizen: complex類中的 hadoopCitizen 欄位使用 Key Selector 指定 key

通過 key 選擇器函數來制定 key,key 選擇器的輸入為每個元素,輸出為指定的 key,例子如下

可以看到實現的效果與 keyBy(0) 是一樣的。

以上便是 Flink 指定 key 的方法。

總結

這篇文章主要介紹了 Flink 程序的基本骨架。獲得環境、創建輸入源、對數據集做 transform 以及輸出。由於數據處理經常會按照不同維度(不同的 key)進行統計,因此,本篇內容重點介紹了 Flink 中如何指定 key。後續將會繼續介紹 Flink API 的使用。

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 千鋒JAVA開發學院 的精彩文章:

MySQL 存儲毫秒數據的方法
Python數據分析基礎:異常值檢測和處理

TAG:千鋒JAVA開發學院 |