當前位置:
首頁 > 最新 > Hadoop之Flume

Hadoop之Flume

1、Flume概述

Hadoop的構建宗旨是處理大型數據集,通常,我們都是假設這些數據已經存儲在HDFS中或者能夠隨時批量複製到HDFS。然而,有許多系統並不符合此假設,它們生產出的是我們想要通過Hadoop來匯總、存儲和分析的數據流。與這類系統打交道,Apache Flume就再合適不過了。

Flume作為cloudera公司開發的實時日誌採集系統,受到了業界的廣泛認可和應用。設計Flume的目的是向Hadoop批量導入基於事件的海量數據,一個典型的例子是利用Flume從一組Web伺服器中收集日誌文件,然後把這些文件中的日誌事件轉移到一個新的HDFS匯總文件中,以做進一步處理,其終點(用Flume的術語叫做Sink)通常為HDFS。不過,Flume具有足夠的靈活性,也可以將數據存儲到其他系統中,如HBase或Solr等。

要想使用Flume,就需要運行Flume代理(Agent),Flume代理是由持續運行的Source(數據來源)、Sink(數據目標)以及Channel(用於連接Source和Sink)構成的Java進程。Flume的Source產生事件,並將其傳送給Channel,Channel存儲這些事件直至轉發給Sink。可以把Source-Channel-Sink的組合當作Flume構件。

Flume由一組以分散式拓撲結構相互連接的代理構成。系統邊緣的代理(如與Web伺服器共存於同一台機器上的代理)負責採集數據,並把數據轉發給負責匯總的代理,然後再將這些數據存儲到最終目的地。代理通過配置來運行一組特定的Source和Sink,因此,使用Flume所要做的主要工作就是通過配置文件使得各個組件融合到一起。

2、Flume的特點

Flume是一個分散式的、可靠的以及高可用的海量日誌採集、聚合和傳輸的系統,支持在日誌系統中定製各類數據發送方,用於採集數據,同時,Flume還提供對數據進行簡單處理,並最終存儲到各種數據接受方(比如文本、HDFS、HBase等)的能力。

Flume的數據流由事件(Event)貫穿始終,事件是Flume的基本數據單位,它攜帶日誌數據(位元組數組形式)並且攜帶有頭信息。這些Event由Agent外部的Source生成,當Source捕獲事件後會進行特定的格式化,然後Source會把事件推入(單個或多個)Channel中。可以將Channel看作是一個緩衝區,它將保存事件直到Sink處理完該事件。Sink負責持久化日誌或者把事件推向另一個Source。

A、Flume的可靠性

Flume使用兩個獨立的事務分別負責從Source到Channel以及從Channel到Sink的事件傳遞。一旦事務中的所有事件全部傳遞到Channel且提交成功,那麼Source就將該文件標記為完成。事務以類似的方式應用於從Channel到Sink的事件傳遞過程,如果由於某種原因使得事件無法記錄,那麼事務將會回滾,而所有的事件仍然保留在Channel中,等待重新傳遞。

B、Flume的可恢復性

Flume的可恢復性依靠Channel來實現,建議使用File Channel,這種類型的Channel具有持久性:只要事件被寫入Channel,即使代理重新啟動,事件也不會丟失。(Flume還提供有Memory Channel,由於它的事件緩存在存儲器中,因此它不具有持久存儲能力,採用Memory Channel時,如果代理重新啟動,事件就會丟失。在某些應用場景中這種情況是可以接受的,具體取決於實際需求,與File Channel相比,Memory Channel的優勢在於具有較高的吞吐量)

C、Flume的批量處理

為了提供效率,Flume在有可能的情況下盡量以事務為單位來批量處理事件,而不是逐個事件地進行處理。批量處理方式尤其有利於提高File Channel的性能,因為每個事務只需要寫一次本地磁碟和調用一次fsync。而批量的大小取決於組件的類型,並且在大多數情況下是可以配置的。例如,Spooling Directory Source以100行文本作為一個批次來讀取(可通過BatchSize屬性進行設置)。同樣地,在通過Avro RPC發送之前,Avro Sink試圖從Channel中批量讀取100個事件,當然,如果可用的事件不足100個也不會引起阻塞。

3、Flume的體系結構

Flume運行的核心是代理(Agent),Flume以Agent為最小的獨立運行的基本單位,一個Agent就是一個獨立的進程,它是一個完整的數據採集工具,含有三個核心組件,分別是Source、Channel以及Sink。通過這些組件,Event可以從一個地方流向另一個地方,Flume的基本結構如下圖所示。

A、Source

Source是數據的採集端,負責將數據捕獲後進行特殊的格式化,並將數據封裝到事件(Event)里,然後推入Channel中。Flume提供了很多內置的Source,如Spooling Directory Source、Kafka Source、JMS Source、Exec Source、Avro Source、Log4j Source以及Syslog Source等,可以讓應用程序同已有的Source直接打交道,如果內置的Source無法滿足應用需求,Flume還支持自定義Source。

Flume主要支持的Source類型如下圖所示:

B、Channel

Channel是連接Source和Sink的組件,可以將它看作是一個數據的緩衝區(數據隊列),它可以將事件暫存到內存中,也可以持久化到本地磁碟上,直到Sink處理完該事件,最常用的兩個Channel大概就是Memory Channel和File Channel。

Flume主要支持的Channel類型如下圖所示:

C、Sink

Sink從Channel中取出事件,然後將數據轉發到其它地方,可以是文件系統、資料庫、HDFS等,也可以是其他Agent的Source。在日誌數據較少時,可以將數據存儲在文件系統中,並且設置一定的時間間隔保存數據。

Flume主要支持的Sink類型如下圖所示:

4、Flume的攔截器和數據流

A、Flume的攔截器

當需要對數據進行過濾時,除了可以在Source、Channel和Sink組件端進行代碼修改外,Flume還為使用者提供了攔截器,攔截器也是採用chain的形式運用於Agent中。攔截器的使用位置處於Source和Channel之間,當為Source指定攔截器後,在攔截器中會得到Event,根據需求就可以對Event進行保留還是拋棄的處理,拋棄的數據不會進入到Channel中,該過程如下圖所示:

B、Flume的數據流

Flume的核心是把數據從數據源採集過來,再送到目的地,為了保證傳送一定成功,在送到目的地之前,會先緩存數據,待數據真正到達目的地後,再刪除緩存的數據。Flume傳輸數據的基本單位是Event,如果是文本文件,通常是一行記錄,這也是事務的基本單位,Event從Source,流向Channel,再到Sink,本身為一個byte數組,並可攜帶header信息。Event代表著一個數據流的最小完整單元,從外部數據源來,再向外部的目的地去。

值得注意的是,Flume提供了大量內置的Source、Channel和Sink類型,不同類型的Source,Channel和Sink可以自由組合,組合方式基於用戶自定義的配置文件,非常靈活。比如,Channel可以把事件暫存在內存里,也可以持久化到本地磁碟中,Sink可以把日誌寫入到HDFS,HBase,甚至是另外一個Source等等。Flume還支持用戶建立多級流,也就是說,多個Agent可以協同工作,並且支持Fan-in(扇入)、Fan-out(扇出)、Contextual Routing(上下文路由)以及Backup Routes(備用路由),這也正是Flume的強大之處,如下圖所示:

-5、Flume的安裝和使用

A、Flume的安裝

Flume的安裝非常簡單,將Flume安裝包上傳到/root/tools目錄下,然後運行命令tar -zxvf apache-flume-1.7.0-bin.tar.gz–C /root/training/,將Flume解壓到/root/training目錄下即可。

B、Flume的使用

使用Flume的關鍵是配置自己的Agent,這需要我們自定義一個conf配置文件。下面將列舉幾個實例,以供參考學習。準備工作,在Flume的安裝目錄/root/training/apache-flume-1.7.0-bin下創建目錄MyAgent,用於存放自定義配置文件。

實例一:按照相應策略採集指定目錄下新產生的日誌文件中的log

進入到/root/training/apache-flume-1.7.0-bin/MyAgent目錄下,運行命令vi a1.conf,創建文件,並在文件中輸入以下內容:

#運行Flume使用的命令:bin/flume-ng agent -na1 -f MyAgent/a1.conf -c conf -Dflume.root.logger=INFO,console

#定義Agent的組件, Source、Channel、Sink的名稱

a1.sources = r1

a1.channels = c1

a1.sinks = k1

#具體定義source

a1.sources.r1.spoolDir = /root/temp/logs

#具體定義channel

#定義攔截器,為消息添加時間戳

#具體定義sink

a1.sinks.k1.hdfs.path = hdfs://192.168.12.221:9000/flume/%Y%m%d

#不按照條數生成文件

#HDFS上的文件達到128M時生成一個文件

#HDFS上的臨時文件達到60秒時生成最終文件

#組裝source、channel、sink

在目錄/root/training/apache-flume-1.7.0-bin下,運行命令bin/flume-ng agent-n a1 -f MyAgent/a1.conf -c conf -Dflume.root.logger=INFO,console,從log中可以看到自定義的r1(Source組件)、c1(Channel組件)、k1(Sink組件)已經啟動,如下圖所示:

然後,將/root/temp目錄下的file1.txt文件(該文件中的內容為MapReduce is simple),拷貝到被監聽目錄/root/temp/logs下,輸出的log信息如下圖所示:

拷貝完文件之後,迅速(保證在啟動上面那條長長的命令後的60秒鐘之內)去查看HDFS對應目錄,如下圖所示,注意文件events-.1531664661571.tmp帶有後綴tmp,表明是臨時文件;等到超過60秒鐘之後再去查看,發現該tmp後綴消失了,這個現象的產生,是由配置文件中定義的文件生成策略所決定的,要麼生成的文件大小超過128M,要麼時間超過60秒鐘,才會生成最終的文件。

60秒鐘之後,再查看HDFS對應目錄,如下圖所示:

實例二:監聽埠8888,採集網路請求的日誌信息

進入到/root/training/apache-flume-1.7.0-bin/MyAgent目錄下,運行命令vi a2.conf,創建文件,並在文件中輸入以下內容:

#運行Flume使用的命令:bin/flume-ng agent -na2 -f MyAgent/a2.conf -c conf -Dflume.root.logger=INFO,console

#定義Agent的組件, Source、Channel、Sink的名稱

a2.sources = r2

a2.channels = c2

a2.sinks = k2

#具體定義source

#具體定義channel

#具體定義sink

#組裝source、channel、sink

實例三:監聽在命令行上命令執行的結果

進入到/root/training/apache-flume-1.7.0-bin/MyAgent目錄下,運行命令vi a3.conf創建文件,並在文件中輸入以下內容:

#運行Flume使用的命令:bin/flume-ng agent -n a3 -f MyAgent/a3.conf -c conf-Dflume.root.logger=INFO,console

#定義Agent的組件, Source、Channel、Sink的名稱

a3.sources= r3

a3.channels= c3

a3.sinks= k3

#具體定義source

a3.sources.r3.command= tail -f /root/temp/aaa.log

#具體定義channel

#具體定義sink

實例四:採集指定目錄下新產生的日誌文件中的log(實例一的簡化版本)

進入到/root/training/apache-flume-1.7.0-bin/MyAgent目錄下,運行命令vi a4.conf,創建文件,並在文件中輸入以下內容:

#bin/flume-ng agent -n a4 -f MyAgent/a4.conf -c conf-Dflume.root.logger=INFO,console

#定義Agent的組件, Source、Channel、Sink的名稱

a4.sources = r4

a4.channels = c4

a4.sinks = k4

#具體定義source

a4.sources.r4.spoolDir = /root/temp/logs1

#具體定義channel

#具體定義sink

#組裝source、channel、sink

至此,Flume介紹完畢,歡迎關注下期文章!

參考文獻:

——《Hadoop權威指南》

——《CSDN博客》

——《百度百科》


喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 程序猿的修身養性 的精彩文章:

Hadoop之HBase

TAG:程序猿的修身養性 |