當前位置:
首頁 > 知識 > 首次嘗試Flink的一些 感受

首次嘗試Flink的一些 感受

最近打算研究研究 Flink,根據官方文檔寫個 Hello,World。入門還是比較容易的,不需要複雜的安裝環境、配置。這篇文章簡單介紹 Flink 的使用感受以及入門。

感受

搭建環境方便:Flink 可以在 Windows 下運行與開發。對於喜歡 Windows 下開發的人,可以免去搭建虛擬機的成本。並且不依賴其他框架,本地環境搭建簡單。這點很關鍵,許多人學習框架都放棄在了環境搭建上。減少搭建環境的成本,可以避免初學者浪費過多精力。Hadoop 的搭建框架就非常麻煩,並且早期 Hadoop 只能運行在 Linux 下。

文檔詳細:Flink 官網的文檔介紹非常詳細,開發過程中會涉及的哪些步驟,以及每個步驟的操作路徑,Flink 官網都有詳細介紹。包括將 Flink 源碼導入 IDEA,這解決了想閱讀源碼的人的一大痛點。

中文文檔:Flink 官網已經有中文版的頁面,雖然目前中文頁面比較少,應該正在翻譯中。說明 Flink 社區比較重視國內開發者。

不依賴 Hadoop:這對於一個全新的框架是件好事,這樣可以沒有歷史包袱。並且對於學習該框架的人可以獨立部署、開發,而不需要有其他框架的背景。

關注度在上升:在微信中搜索 Flink 發現大部分文章都是 18、19年寫的,說明 Flink 關注度在逐漸上升。一些大廠也都開始使用 Flink 構建實時數據倉庫,如:阿里巴巴。可以看出 Flink 致力於為開發者提供一種方便、易用的編程框架。同時,社區非常注重文檔的詳細程序以及開發者使用的便利性。

下面的內容是搭建 Flink 環境,並運行 WordCount。

本地運行

Flink 可以運行在 Linux、Mac OS X 和 Windows 環境。我喜歡在 Windows 下開發,所以在 Windows 運行 Flink。Flink 的最新版本(1.8.0)需要 JDK 的版本為 1.8 以上。本地啟動 Flink 非常容易,下載 Flink 二進位包,需要選擇 Scala 的版本,如果不用 Scala 開發 Flink 應用程序選哪個版本無所謂。我下載的是flink-1.8.0-bin-scala_2.11.tgz。啟動步驟如下:

cdflink-1.8.0#解壓後的目錄cdbinstart-cluster.bat#啟動本地Flink

啟動後會發現彈出了兩個 Java 程序的窗口。一個是 JobManager,另一個是 TaskManater。通過http://localhost:8081 訪問 Flink 的 web 頁面,該站點用於查看運行環境和資源、提交和監控 Flink 作業。

WordCount

通過簡單的 WordCount 感受一下 Flink 應用程序的編寫過程。Flink 已經提供生成 Maven 工程的模板

如果不想通過命令行的方式生成 maven 工程,可以通過如下設置在 IDEA 中創建 Flink 應用的模板工程,以 Java 為例

在如上的頁面點擊 「Add Archetype...」,然後再彈出的對話框填寫如下內容

選擇我們添加的 archetype 便可繼續創建 maven 工程。除了 maven 工程還可以創建 Gradle 和 Sbt 工程。

為了快速運行 Flink 應用,我們可以直接將官網 WordCount 例子的代碼拷貝自己的項目。Java 代碼如下

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.common.functions.ReduceFunction;

import org.apache.flink.api.java.utils.ParameterTool;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.windowing.time.Time;

import org.apache.flink.util.Collector;

public class FirstCase {

public static void main(String[] args) throws Exception {

// the port to connect to

final int port = 9000;

// get the execution environment

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// get input data by connecting to the socket

DataStreamString text = env.socketTextStream(192.168.29.132, port, \n);

// parse the data, group it, window it, and aggregate the counts

DataStreamWordWithCount windowCounts = text

.flatMap(new FlatMapFunctionString, WordWithCount() {

@Override

public void flatMap(String value, CollectorWordWithCount out) { for (String word : value.split(\\s)) {

out.collect(new WordWithCount(word, 1L));

}

}

})

.keyBy(word)

.timeWindow(Time.seconds(5), Time.seconds(1))

.reduce(new ReduceFunctionWordWithCount() {

@Override

public WordWithCount reduce(WordWithCount a, WordWithCount b) { return new WordWithCount(a.word, a.count b.count);

}

});

// print the results with a single thread, rather than in parallel

windowCounts.print().setParallelism(1);

env.execute(Socket Window WordCount);

} // Data type for words with count

public static class WordWithCount {

public String word;

public long count;

public WordWithCount() {}

public WordWithCount(String word, long count) {

this.word = word;

this.count = count;

}

@Override

public String toString() {

return word : count;

}

}

}

雖然不太熟悉 Flink 編程模型,但從上面代碼中基本上能推測出每一步的含義。由於我們入門 Flink ,剛開始沒必要太糾結代碼本身。先將 Demo 運行起來,在慢慢深入學習。現在統計程序已經有了,但是還缺少數據源。官網的例子使用的是 netcat ,我在 Windows 下安裝了該工具,但是覺得用起來不方便。在 Linux 虛擬機上裝了一個,這樣用法跟官網一致的。我的虛擬機系統為 Centos 7 64位,安裝命令如下

yum install nmap-ncat.x86_64

啟動 netcat 用於發數據

nc -l 9000

接下來便是啟動 Flink 應用程序連接數據源並進行統計。 啟動之前需要將以下代碼中 ip 和 埠換成自己的

DataStreamString text = env.socketTextStream(192.168.29.132, port, \n);

啟動 Flink 應用程序有兩種方式,一種是直接直接在 IDEA 中直接運行 Java 程序;另一種是通過 maven 打一個 jar 包,提交到 Flink 集群運行。第二種方式的命令如下

$FLINK_HOME\bin\flink run $APP_HOME\flink-ex-1.0-SNAPSHOT.jar

FLINK_HOME 為 flink 二進位包的目錄

APP_HOME 為上面創建的 maven 工程的目錄

啟動 Flink 應用後,我們可以在 netcat 中輸入文本,並觀察 Flink 的統計結果

$ nc -l 9000a a

我們只發送了一行,內容為「a a」。如果在 IDEA 中啟動程序可以直接在 IDEA 控制台看到輸出結果,如果通過 flink run 方式啟動,需要在 TaskManager 的窗口中查看輸出。輸出內容如下

a : 2a : 2a : 2a : 2a : 2

為什麼輸出了 5 次。來看一下我們的應用程序中有這樣一句

.timeWindow(Time.seconds(5), Time.seconds(1))

它代表 Flink 應用程序每次處理的數據窗口為 5s,處理完後,整個窗口向前滑動 1s 。也就是每次處理的數據為「最近 5s」的數據。因為最近 5s 數據源中只有「a a」這一條記錄,因此輸出 5 次。

以上便是 Java 版的 WordCount。當然我們也可以用 Scala 編寫,且 Scala 的寫法更簡潔,代碼量更少。

import org.apache.flink.api.java.utils.ParameterTool

import org.apache.flink.streaming.api.scala._

import org.apache.flink.streaming.api.windowing.time.Time

object SocketWindowWordCount {

def main(args: Array[String]) : Unit = {

// get the execution environment

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

// get input data by connecting to the socket

val text = env.socketTextStream(192.168.29.132, 9000, "\n")

// parse the data, group it, window it, and aggregate the counts

val windowCounts = text

.flatMap { w = w.split(\\s) }

.map { w = WordWithCount(w, 1) }

.keyBy(word)

.timeWindow(Time.seconds(5), Time.seconds(1))

.sum(count)

// print the results with a single thread, rather than in parallel

windowCounts.print().setParallelism(1)

env.execute(Socket Window WordCount)

}

// Data type for words with count

case class WordWithCount(word: String, count: Long)

}

基本上是 Java 一半的代碼量。個人感覺 Scala 做大數據統計代碼還是挺合適的,雖然 Scala 門檻比較高。Scala 程序的運行方式跟 Java 一樣。編寫過程中如果出現以下錯誤,需要看看是不是 import 語句沒寫對

Error:(29, 16) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String]

.flatMap { w = w.split(\\s) }

解決方法

import org.apache.flink.streaming.api.scala._

總結

以上便是 Flink 的簡單入門,後續繼續關注 Flink 框架。

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 千鋒JAVA開發學院 的精彩文章:

Kotlin技術分享:擴展函數和擴展屬性
分享:更好的SQL查詢如何做到

TAG:千鋒JAVA開發學院 |