Kafka源碼系列之以kafka為例講解分散式存儲系統
Kafka源碼系列,kafka 0.8.2.2為例給大家講解。目標是大家讀完kafka源碼系列能徹底了解kafka,最好能設計處自己的消息隊列或者存儲系統。
一,分散式系統的CAP理論
1,理論首先把分散式系統中的三個特性進行了如下歸納:
一致性(C):在分散式系統中的所有數據備份,在同一時刻是否同樣的值。(等同於所有節點訪問同一份最新的數據副本)
可用性(A):在集群中一部分節點故障後,集群整體是否還能響應客戶端的讀寫請求。(對數據更新具備高可用性)
分區容錯性(P):多副本進行容錯。以實際效果而言,分區相當於對通信的時限要求。系統如果不能在時限內達成數據一致性,就意味著發生了分區的情況,必須就當前操作在C和A之間做出選擇。
2,CAP理論實踐中的妥協
由於CAP理論在分散式存儲系統中,做多只能實現上面兩點。而現實環境是很複雜的,比如網路抖動及故障,硬體故障等問題,分區容錯是我們必須要實現的。所以我們只能在一致性和可用性之間權衡。
A),CP系統-一致性優先原則
要實現強一致性的原則有很多方式,最簡單的方式就是一個master節點和任意數目的包含冗餘備份的附屬節點。數據永遠從master寫入和讀取。但是這個是存在單點故障的,由於master的故障會導致系統不可用,就此而言是放棄了可用性。但是一般情況下我們都有容錯機制,讓從屬節點變為master,錯誤處理完成系統即可用了。
B),AP系統可用性優先原則
選擇支持可用性和分區容錯性,並犧牲一致性的系統被稱為具有」最終一致性」。特點是,我們可以從任意的一個節點寫入,該節點負責將數據同步到其它節點。讀取的時候只需訪問數據存在的一個節點就夠了,但是可用會存在從某個節點讀取的數據不是最新的,也即系統不具備一致性。
C),靈活的一致性程度
三個特性之間權衡並不是非黑即白,其實可以平緩過度,達到最佳系統性能要求。
比如,在AP系統中,假如數據有三個冗餘副本,我們可以通過調節請求數據的節點數目來達到高的一致性,比如我們同時向三個副本請求數據,那麼我們就滿足了強一致性,但是代價是喪失了容錯性。通常,我們可以要求特定數目的節點或者大多數節點可用並且能返回一致性結果,是在一致性和容錯性中進行權衡的一個不錯的方法。
同樣的在CP系統中,我們可以運行從附屬節點中讀取數據,犧牲一部分一致性來達到高的可用性。如果保持仍然只能想master寫數據,那麼我們還是高的一致性的寫入操作,但是允許讀取操作最終一致性。
我們可以根據具體的用例,調整CAP各種特性的強度,使之最適合用例的需要。甚至可以對同一個應用程序、同一個資料庫中的不同類型的數據混合使用這些策略。
二,設計自己的分散式存儲系統
設計一個分散式存儲系統,並不難,難在如何保證系統的健壯性或者叫魯棒性。至於原因,在這裡浪尖只想說一句話那就是網路是不可靠的。
目前典型的分散式存儲系統的結構為:
元數據伺服器,數據存儲節點,客戶端。
數據的存取過程:
客戶端會先獲取元數據信息,然後根據元數據信息去特定的節點讀寫數據。元數據維護了數據在所有節點的存儲情況,副本情況等等。數據存儲節點會完成副本數據同步的過程。
與此同時,我們會要求分散式數據存儲系統包含以下三個特性:
1,數據備份機制,順利的處理某個節點無法訪問的情況。
2,提供備份一致性的機制-----當用戶請求數據的時候能獲得最近更新的數據(一致性)。
3,線性擴展機制-----20個節點的吞吐量是10個節點的兩倍。
三,剖析kafka存儲系統
不要抬杠說kafka是消息隊列不是存儲系統。
1,Kafka的系統整個體系的角色:
1),zookeeper
記錄的有kafka的Broker,kafka 的Broker controller選舉,topic發布,配置更新,分區新增等都是客戶端通過zookeeper發布到Crontroller的。
2),producer
負責發送數據到kafka
3),consumer
負責從kafka取數據
4),broker
負責數據接收、存儲、管理等
5),topic 和 partition
Topic是代表一個種類的數據。Partition是對topic進行細分,保證吞吐量和處理並發度的關鍵,並且是集群數據備份的單元。
2,從常見的分散式存儲系統的角色來看:
客戶端:producer,consumer,zookeeper(topic,partition,Broker等相關的變更都是通過zookeeper通知到集群的controller的,要是覺得牽強可以將其歸屬到元數據集群)。
存儲系統:Broker集群
元數據集群:zookeeper集群。其實每個Broker都會存儲一份元數據(client-->zookeeper-->
Controller-->普通Broker)。
3,kafka的分散式存儲特性
1),數據備份,故障恢復
分兩個部分:
A),Broker故障恢復.Broker註冊到zookeeper,臨時zknode,/brokers/ids/[0...N],臨時節點保存的是advertisedHost:advertisedPort,並會初始化SessionExpireListener該listener會監聽Broker自己的臨時節點(會話超時重新註冊)。Crontroller就可以監聽這個目錄下的臨時節點,會得知Brokers是否已經宕機,或者是否有新的Broker加入到節點.
Brokers集群通過向zookeeper註冊臨時節點/controller,來選舉Crontroller,並且每個Broker都會監聽該臨時節點,通過臨時節點的變動來決定是否進行Crontroller的選舉。Crontroller宕機,觸發其它Broker進行Crontroller重新選舉。來進行容錯。
B),topic表示一類消息,topic劃分為若干partition,對每個partition進行數據的讀寫操作。Partition會有若干副本,副本會選舉一個leader,然後數據的寫入和讀取都是通過leader來實現,這就實現了強一致性CP。與此同時引入了一個單點故障的問題,故障恢復機制是從isr列表裡重新選舉出leader。假如數據在flower從leader同步數據存在滯後的話,會導致數據丟失,那麼此時,我們可以通過下面的配置,我們可以保證故障轉移之後會不會有數據丟失。
屬性的詳細介紹:
Request.required.acks:有三個取值分別的含義是:
0-不等待Broker應答,立即返回。
1-等待leader數據提交成功的應答。
-1-等待min.insync.replicas數目個副本都接收到數據才會視為寫入成功。
該參數要結合min.insync.replicas來使用,當request.required.acks設置為-1時,isr列表裡min.insync.replicas數目個副本數據寫入完畢,才算消息生產成功。
min.insync.replicas數值不能超過副本數總數,假如相等的話,有一個副本不可用即會導致集群癱瘓。一般是replication.factor = min.insync.replicas +1即可。
2),數據備份一致性的機制
副本會選舉出leader,其餘follower。數據的寫入和讀取都是經過leader,以此來實現數據備份的一致性。所以,數據備份的一致性是CP,強一致性。Leader存在故障恢復機制:leader宕機,從isr列表裡選舉出新的leader。
3),線性擴展機制
該機制對於kafka來說也是分兩個部分:
A),Broker的線性擴展
新的Broker加入集群,Crontroller會感知到變化。但是已有的分區或者數據不會重新分布到新的Broker上去,假如沒有新增topic或者不進行人工遷移等操作的話新的Broker不會有數據。增加集群假如是非異構機器的話集群性能應該是線性增加的。
B),給某個topic擴大分區數,也會增加topic的並發度,前提是磁碟數目要合適。該增加也是會增加topic吞吐量。
4,client與kafka集群之間通訊的機制
這篇文章之後講大致過程,後面會陸續出文章講細節部分。
A),Command---->zookeeper---->broker Controller---->Brokers
這個相當於基於zookeeper做了一個訂閱發布系統。Topic創建配置更新等都是通過這種方式傳達給所有Brokers Controller,然後由Broker Crontroller傳遞給所有的Brokers。
B),producer/SampleConsumer---->Brokers partitions leader----->follower
這個在<kafka源碼系列之通過源碼分析producer性能瓶頸>那講已經說過,請求分兩步:
1),第一步隨機選一個Broker,然後獲取topic相關的元數據,如leader的位置等。
2),構建鏈接到所有leader所在Broker的連接池,進行數據的讀寫。
假如是生產消息的話follower會主動從leader上獲取滯後的消息。
C),high consumer--->zookeeper---->brokers leader----->brokers follower
這個獲取數據的方式比上個步驟多了個環節,就是從zookeeper上獲取Broker的ip和port,而上個方式是直接在配置里寫明了Broker的ip和port。
在上個基礎上又基於zookeeper做了一些優化,增加了三個重要的zookeeper的listener:
1),ZKRebalancerListener
該listener監聽的是/consumers/group/ids目錄,當該目錄下的有子節點增刪的時候會觸發,rebalance。假如嘗試4次數後不能成功就會拋出一下異常
throw new ConsumerRebalanceFailedException(consumerIdString + " can"t rebalance after " + config.rebalanceMaxRetries +" retries")
2),ZKSessionExpireListener
監聽的是每個consumer自己臨時節點(/consumers/group/ids/consumerID)的刪除與註冊(無動作),臨時節點刪除時handleNewSession在處理函數里需要重新想zookeeper註冊該節點。也會觸發rebalance。
3),ZKTopicPartitionChangeListener
該listener,監控的是/brokers/topics/topicName節點的數據變動,也即是分區的變動,假如有新的分區增加也會觸發rebalance。
四,總結
本文主要是想幫助大家理解設計一套分散式存儲系統,首先介紹了CAP理論,接著講了分散式存儲系統的幾個要素,最後以kafka為例,講解了kafka這個分散式消息隊列或者分散式存儲系統的結構。希望能幫助到大家。最後,提筆做個埋點,關於zookeeper的理論及使用我們後面會跟大家娓娓道來。
![](https://pic.pimg.tw/zzuyanan/1488615166-1259157397.png)
![](https://pic.pimg.tw/zzuyanan/1482887990-2595557020.jpg)
TAG:中國存儲 |