當前位置:
首頁 > 最新 > Hadoop之Storm

Hadoop之Storm

1Storm的安裝和配置

Storm主要有兩種安裝模式,分別是偽分布模式(也叫單機模式)和全分布模式(也叫集群模式),這裡需要注意的是,Storm的運行依賴於Zookeeper,因此在安裝Storm之前,要先安裝好Zookeeper。前面已經在主機hadoop221上安裝好了單機版的Zookeeper,在主機hadoop222、hadoop223和hadoop224上安裝好了全分布版的Zookeeper,本文就基於這個前提條件,分別介紹Storm的偽分布模式和全分布模式的安裝和配置。

AStorm偽分布模式

Storm偽分布模式僅需要一台機器,提供的基本功能跟全分散式模式類似,一般用於開發和測試,下面就介紹如何在主機hadoop221上安裝和配置偽分布模式的Storm。

首先,將Storm安裝包apache-storm-1.0.3.tar.gz上傳到/root/tools目錄下,運行命令tar -zxvf apache-storm-1.0.3.tar.gz-C /root/training/,將Storm解壓到/root/training目錄下;然後,配置Storm的環境變數,運行命令vi /root/.bash_profile,在文件末尾添加如下代碼:

STORM_HOME=/root/training/apache-storm-1.0.3

export STORM_HOME

PATH=$STORM_HOME/bin:$PATH

export PATH

保存退出,並運行命令source /root/.bash_profile,使環境變數生效;接下來需要配置Storm的核心配置文件storm.yaml,進入到/root/training/apache-storm-1.0.3/conf目錄下,運行命令vi storm.yaml,編輯storm.yaml文件,修改如下:

#配置Zookeeper地址,注意「-」後有一個空格

-"hadoop221"

#配置主節點地址,注意「:」後有一個空格

nimbus.seeds: ["hadoop221"]

#增加如下一行代碼,設置Storm客戶端提交任務的jar包所保存的位置目錄

storm.local.dir:"/root/training/apache-storm-1.0.3/tmp"

#增加如下幾行代碼,配置每個從節點上運行worker的個數及對應的埠號

- 6700

- 6701

- 6702

- 6703

#增加如下一行代碼,設置啟用Event Logger,可以用於打開Debug,查看處理的數據

保存退出,運行命令mkdir/root/training/apache-storm-1.0.3/tmp,創建tmp目錄。至此,Storm的偽分布模式安裝配置完成。在啟動Storm前,先確保Hadoop和Zookeeper都已經正常運行,然後運行命令storm nimbus &,以後台方式啟動Storm主節點Nimbus;運行命令storm supervisor &,以後台方式啟動Storm從節點Supervisor;運行命令storm ui &,以後台方式啟動Storm網頁;在瀏覽器地址欄中輸入http://192.168.12.221:8080,可以看到如下界面:

BStorm全分布模式

Storm的全分布模式,也叫作集群模式,這是生產環境中經常使用的一種安裝模式,需要在多台機器上進行安裝和配置。下面就介紹在主機hadoop222(安裝主節點Nimbus)、hadoop223(安裝從節點Supervisor)和hadoop224(安裝從節點Supervisor)上安裝Storm全分布模式。

在主機hadoop222上解壓Storm安裝包,然後在三台主機上均配置好Storm的環境變數,這些操作跟前面完全一樣,這裡不做贅述。然後,配置Storm的核心配置文件storm.yaml,修改如下:

#配置Zookeeper集群的地址

-"hadoop222"

-"hadoop223"

-"hadoop224"

#配置Storm主節點Nimbus的地址

nimbus.seeds: ["hadoop222"]

#增加如下一行代碼,設置Storm客戶端提交任務的jar包所保存的位置目錄

storm.local.dir:"/root/training/apache-storm-1.0.3/tmp"

#增加如下幾行代碼,配置每個從節點上運行worker的個數及對應的埠號

- 6700

- 6701

- 6702

- 6703

#增加如下一行代碼,設置啟用Event Logger,可以用於打開Debug,查看處理的數據

保存退出,運行命令mkdir/root/training/apache-storm-1.0.3/tmp,創建tmp目錄;然後,分別運行命令scp -r/root/training/apache-storm-1.0.3/ root@hadoop223:/root/training/,scp -r /root/training/apache-storm-1.0.3/root@hadoop224:/root/training/,將配置好的Storm通過網路拷貝到主機hadoop223和主機hadoop224的/root/training/目錄上,這樣相當於主機hadoop223和hadoop224也安裝配置好了Storm。如此,Storm的全分布模式就算安裝完成了。

下面驗證Storm是否安裝成功。在啟動Storm相應組件之前,確保Hadoop和Zookeeper正常啟動。然後,在hadoop222上依次運行storm nimbus &,storm ui &;在從節點hadoop223和從節點hadoop224上運行storm supervisor &。在瀏覽器地址欄中輸入http://192.168.12.222:8080,可以看到如下界面:

CStorm HA模式的配置

Storm HA(High Availability)模式是為了增強Storm集群的可用性,通過為Storm集群配置多個Nimbus主節點實現,當正在運行的主節點意外宕機後,備用的主節點可以立刻接管工作,保證Storm集群正常工作。

前面已經安裝配置好了Storm的全分布模式,在這個基礎之上,再來安裝Storm的HA模式,非常簡單,只需要在配置文件中稍做修改即可。停止Storm集群的運行,然後編輯三台主機上Storm的核心配置文件storm.yaml,將nimbus.seeds:["hadoop222"]修改為nimbus.seeds: ["hadoop222","hadoop223"],這樣將在主機hadoop223上也運行Storm的一個主節點Nimbus。

最後,啟動Storm集群。在主機hadoop222上依次運行storm nimbus &、storm ui &、storm logviewer &(以後台方式啟動日誌查看器);在主機hadoop223上依次運行storm nimbus &、storm supervisor &、storm ui &以及storm logviewer &;在主機hadoop224上依次運行storm supervisor &和storm logviewer &。在瀏覽器地址欄中輸入http://192.168.12.222:8080,可以看到如下界面:

2Storm Demo的運行

在介紹運行Storm Demo示例之前,先簡單介紹Storm的常用命令。在Storm中有許多簡單且有用的命令可以用來管理拓撲,它們可以提交、殺死、禁用、再平衡拓撲等。

提交任務的命令格式為:storm jar【jar路徑】【拓撲包名.拓撲類名】【拓撲名稱】,如storm jar storm-starter-topologies-1.0.3.jarorg.apache.storm.starter.WordCountTopology MyWorcCountTest;

殺死Storm任務的命令格式為:storm kill【拓撲名稱】-w 10,如storm kill topologyName –w10;

停用Storm任務的命令格式為:storm deactive【拓撲名稱】,如storm deactive topologyName;

啟動Storm任務的命令格式為:storm activate【拓撲名稱】,如storm activate topologyName;

重新部署Storm任務的命令格式為:storm rebalance【拓撲名稱】,如storm rebalance topologyName;

再平衡使你重分配Storm集群任務,這是個非常強大的命令,比如,你向一個運行中的集群增加了節點,再平衡命令將會停用拓撲,然後在相應超時時間之後重分配Worker,並重啟拓撲。

在Storm的安裝目錄/root/training/apache-storm-1.0.3/examples/storm-starter下,storm-starter-topologies-1.0.3.jar包中包含有多個Storm程序實例,具體使用方法可以參考該目錄下README.markdown文件中的說明。Storm示例中也提供了程序世界中的HelloWorld例子,下面就在偽分布模式下(主機hadoop221上)來運行該實例。

進入到/root/training/apache-storm-1.0.3/examples/storm-starter目錄下,運行命令storm jar storm-starter-topologies-1.0.3.jarorg.apache.storm.starter.WordCountTopology MyWorcCountTest,然後在瀏覽器地址欄中輸入http://192.168.12.221:8080,可以看到如下界面:

點擊Topology Summary下的MyWordCountTest(這是在命令行中自定義的Storm任務名稱),可以看到如下界面:

點擊Topology actions下面的Debug(打開Debug模式),會彈出一個設置對話框,其中的數字表示採樣比率(數值越大在相同時間內查看到的數據就越多,比如50,表示每採集100條數據顯示50條),可以自己隨機設定,這裡就設置為50;然後,可以點擊Spouts(All time)下面的spout或者Bolts(All time)下面的count或split,這裡點擊spout,看到如下界面:

再點擊Component summary下面的events,可以看到如下界面。需要注意的是,查看完後需要將Debug開關關閉,否則會一直採集數據佔用掉大量的存儲空間,並將任務kill掉。

3Storm內部通信機制

同一個Worker間消息的發送使用的是LMAX Disruptor,它負責同一個節點(同一個進程內)上線程間的通信;Disruptor使用了一個RingBuffer替代隊列,用生產者消費者指針替代鎖;生產者消費者指針使用CPU支持的整數自增,無需加鎖並且速度很快,Java的實現在Unsafe package中;不同Worker間通信使用ZeroMQ(0.8版本)或者Netty(0.9.0版本);不同Topology之間的通信,Storm不負責,需要開發者自己想辦法進行實現,例如可以使用Kafka等。

Storm中Worker進程內部的結構如下圖所示:

4Storm編程模型

在一個Storm程序中,其邏輯處理組件主要包含兩個,即Spout和Bolt,它們之間的結構如下圖所示:

除此之外,在Storm程序中,通常還涉及其他一些成員,總結如下:

Topology:Storm中運行的一個實時應用程序(拓撲),通常也叫Storm任務(Job);

Spout:在一個Topology中獲取數據流的組件。通常情況下Spout會從外部數據源(如Kafka等)中獲取數據,然後轉換為Topology內部的源數據Tuple,一般情況下,使用Storm提供的Spout就夠用了;

Bolt:接收Tuple數據,然後執行處理的組件,用戶可以在其中執行自己的邏輯處理操作,也就是說業務邏輯的處理在這個組件中;

Tuple:Storm中處理數據的基本單元,其Schema定義欄位名稱和順序,可以理解為一組數據就是一個Tuple;

Stream:Tuple的管道(類似Unix管道),表示數據的流向;一個Stream中的Tuple有固定的Schema,每個Spout、Bolt都有一個默認的Stream,Spout、Bolt可以有多個Stream;

Grouping:數據分組策略,定義一個Tuple數據如何發送給下一個Bolt組件,主要包括ShuffleGrouping(隨機分組)、FieldsGrouping(按欄位分組,按數據中field值進行分組,相同field值的Tuple被發送到相同的Task)、DirectGrouping(直接分組,需要指定TaskId)、AllGrouping(廣播分組)以及NoneGrouping(不分組),後文將對這些常用的分組策略進行詳細的介紹。

5Storm數據分組策略

所謂的Grouping策略,其實就是在Spout組件和Bolt組件之間以及Bolt組件與Bolt組件之間傳遞Tuple的方式,總共有8種方式,總結如下:

ShuffleGrouping(隨機分組):隨機分發Tuple給Bolt組件對應的任務,能夠保證各Task中處理的數據均衡;

FieldsGrouping(按欄位分組):根據指定的欄位,具有相同值的Tuple被分配到同一個Bolt進行處理,比如按「user-id」這個欄位來分組,那麼具有相同「user-id」值的Tuple會被分配到相同的Bolt組件里的一個Task,而具有不同「user-id」值的Tuple則會被分配到不同的Bolt中的Task;

DirectGrouping(直接分組):指向型分組,由Tuple的發射者直接決定Tuple將發射給哪個Bolt,一般情況下是由接收Tuple的Bolt決定接收哪個Bolt發射的Tuple。這是一種比較特殊的分組方法,使用這種分組意味著Tuple的發送者指定由Tuple接收者的某個Task來進行處理。只有被聲明為DirectStream的消息流可以聲明這種分組方法,而且這種Tuple必須使用emitDirect方法來發射,Tuple處理者可以通過TopologyContext來獲取處理它的Task的id(OutputCollector.emit方法也會返回Task的id);

AllGrouping(廣播分組):Tuple被複制分發到所有Bolt,即所有Bolt組件都可以收到該Tuple,這種分組方式需要謹慎使用;

GlobalGrouping(全局分組):全部Tuple都被分發給Bolt組件的同一個任務Task,明確地說,是分發給ID最小的那個Task;

NoneGrouping(不分組):這個分組的意思是指Stream不關心到底怎樣分組,目前這種分組和ShuffleGrouping是一樣的效果,有一點不同的是,Storm會把使用NoneGrouping的這個Bolt放到其訂閱者同一個線程中去執行;

LocalOrShuffleGrouping(本地或隨機分組):如果目標Bolt有一個或者多個Task與源Bolt的Task在同一個工作進程中,Tuple將會被隨機發送給這些同進程中的Task,否則,就和普通的ShuffleGrouping分組一樣;

CustomGrouping(自定義分組):如同MapReduce中自己去實現一個Partition一樣,用戶可以自定義分組策略。

6Storm編程實例

下面,介紹如何編寫自己的Storm程序實例WordCount。該程序包括四個組件,每個組件對應一個Java類,各自有其特定的功能作用。

WordCountSpout組件類(負責獲取數據)源代碼如下圖所示:

WordCountSplitBolt組件類(負責拆分單詞)源代碼如下圖所示:

WordCountTotalBolt組件類源代碼如下圖所示:

WordCountTopology主程序類源代碼如下圖所示:

最終,該程序在Eclipse中運行結果如下圖所示。可以看到,每隔3秒鐘,程序會隨機採集一條數據,然後對該條數據進行單詞拆分,最後進行單詞次數統計並將結果輸出顯示到屏幕。

當然了,可以對上述程序稍加修改,然後導出jar包放到Storm集群上進行運行,運行方式與前面Storm自帶WordCount示例的運行方式類似,主程序代碼如下圖所示:

7Storm本地和Storm Zookeeper目錄樹

前面安裝Storm時,在Storm的核心配置文件中定義了storm.local.dir:"/root/training/apache-storm-1.0.3/tmp",當啟動Storm並運行了相應的任務,便會將相關的一些信息保存到該tmp目錄下,簡單介紹tmp的目錄樹結構如下圖所示:

同樣地,當提交了一個Storm任務到Storm集群上運行時,也會保存Storm的節點及相關任務信息到Zookeeper中,簡單介紹Storm Zookeeper的目錄樹結構如下圖所示:

8Storm任務提交的過程

Storm任務提交的詳細過程如下圖所示:

這裡對Storm任務提交的過程進行簡單總結如下:

客戶端提交Topology到Nimbus(提交的jar包被上傳到Nimbus下的inbox目錄);

jar包中的submitTopology方法會對Topology進行一些檢查處理(如Bolt/Spout的id是否違法,Storm是否是active等),然後在Nimbus伺服器上建立Topology本地目錄進行存儲(包含Topology的jar包以及Topology的序列化對象);

之後Nimbus進行任務分配(根據Topology定義的一些參數來對Bolt/Spot設定Task的數量並分配對應的task-Id),將分配好的Task信息(Task信息包括Task的心跳信息,Topology的描述信息等)發送到Zookeeper對應的目錄下;

Supervisor定期到Zookeeper相應目錄下查看是否有新的任務,有的話下載下來,根據任務的描述信息啟動相應的Worker進行工作;

Worker根據任務的描述信息創建相應的網路連接來發送消息。

9Storm與外部系統組件的集成

在Storm的安裝目錄/root/training/apache-storm-1.0.3/external下,可以看到如下圖所示的內容,這些目錄下存放的是Storm與外部系統集成所需要的jar包。

下面以Storm與HBase的集成為例,在前面WordCount程序的基礎上,介紹具體的實現方法。WordCountSpout、WordCountSplitBolt以及WordCountTotalBolt三個Java類保持不變,開發的第三級Bolt組件類(負責將數據保存到HBase表中)代碼如下圖所示:

Storm與HBase集成的WordCount程序的主程序類源代碼如下圖所示:

在運行程序前,需要在HBase中創建result表(帶有列族info),命令為create "result","info",並查詢result表結果如下圖所示,可以看到此時result表中沒有任何數據。

等待程序執行幾秒鐘後,再查看result表,如下圖所示

再等待幾秒鐘後,查看result表,如下圖所示。可以看到,隨著程序的執行,單詞出現次數的累加結果被更新並存儲到了HBase的result表中。

這裡僅介紹Storm與HBase的集成,但Storm可與外部系統非常多的組件進行集成,如JDBC、Redis、HDFS、Kafka、Hive以及JMS等等,有興趣的朋友可以自行再去學習,這裡不再介紹。至此,Storm的實戰介紹就告一段落了,下期再見。

參考文獻:

——《實時大數據分析 基於Storm、Spark技術的實時應用》

——《CSDN博客》

——《潭州大數據課程課件》


喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 程序猿的修身養性 的精彩文章:

Hadoop之Flume
Hadoop之HBase

TAG:程序猿的修身養性 |