當前位置:
首頁 > 知識 > 關於Kafka High watermark/LEO的討論(2)

關於Kafka High watermark/LEO的討論(2)

之前寫過一篇關於Kafka High watermark的文章,引起的討論不少:有讚揚之聲,但更多的是針對文中的內容被challenge,於是下定決心找個晚上熬夜再看了一遍,昨晚挑燈通讀了一遍確實發現不少錯誤。鑒於此我決定再寫一篇博客重新梳理一下最新版本中High watermark(下稱HW)的工作原理,也算是糾正之前文章中的錯誤。這次我不打算說leader epoch,而只是專門討論HW、log end offset(日誌末端位移,下稱LEO)的事情。希望我能把整個流程交代清楚。也許這篇文章依然有很多問題,到時候就懇請各位多多批評指正了:)

和之前第一篇一樣,我首先給出與HW、LEO相關的副本角色定義:

每個Replica對象都有很多屬性或欄位,和本文相關的是LEO、remote LEO和HW。

LEO:日誌末端位移(log end offset),記錄了該Replica對象底層日誌(log欄位)中下一條消息的位移值。注意是下一條消息!也就是說,如果一個普通topic(非compact策略,即cleanup.policy != compact)的某個分區副本的LEO是10,倘若未發生任何消息刪除操作,那麼該副本當前保存了10條消息,位移值範圍是[0, 9]。此時若有一個producer向該副本插入一條消息,則該條消息的位移值是10,而副本LEO值則更新成11。

remote LEO:嚴格來說這是一個集合。leader副本所在broker的內存中維護了一個Partition對象來保存對應的分區信息,這個Partition中維護了一個Replica列表,保存了該分區所有的副本對象。除了leader Replica副本之外,該列表中其他Replica對象的LEO就被稱為remote LEO,這些LEO值也是要被更新的。

HW:上一篇中我是這麼描述HW值的——「水位值,對於同一個副本對象而言其HW值不會大於LEO值。小於等於HW值的所有消息都被認為是『已備份』的」——嚴格來說,我這裡說錯了。實際上HW值也是指向下一條消息,因此應該這樣說:小於HW或在HW以下的消息被認為是「已備份的」。另外上篇文章中的配圖也是錯誤的,如下所示:

之前文章中說HW是7時,位移介於[0, 7]的所有消息都是committed狀態。這種說法是有問題的,實際上,如果要讓[0, 7]的消息是committed狀態,那麼HW值應該是8。當然關於LEO的表述是正確的,即:LEO=15表示這個副本當前有15條消息,最新一條消息的位移是14。另外我們總說consumer是無法消費未提交消息的。這句話如果用以上名詞來解讀的話,應該表述為:consumer無法消費分區leader副本中位移值大於等於分區HW值的任何消息。這裡需要特別注意分區HW值就是leader副本的HW值

說一些題外話~~~~~

在判斷能否消費某條消息時到底比較的是」小於HW「還是」小於等於HW",我個人傾向於認為是小於HW,即位移=HW值的消息是不能被消費的。我們可以從Log.scala的read方法簽名中證明這點:

不過LogSegment.scala的read方法並未嚴格實現這一點,貌似又支持讀取offset=HW值的消息,如下所示:

雖然在後面代碼中加入了HW位移值對應物理文件位置是否真實存在的判斷邏輯,但畢竟做了一次物理文件位置的轉換操作(調用LogSegment#translateOffset),應該說是做了一些無用功的——後續我可能會提一個jira跟社區討論一下此事,不過目前我們還是先認定HW值處的消息是不能被消費的。

在討論HW、LEO工作原理的時候,下面這張圖能夠很好地解釋leader副本對象和follower副本對象的主要區別:

如前所述,所有的副本對象都保存在分區Partition對象的一個列表中。為了區別leader副本和follower副本,上圖中我還是把它們拆開分別表示,這樣會更加清晰一些。圖中灰色欄位表示不會被更新,也就是說leader Replica對象是不會更新remote HW值的(這裡的remote含義與remote LEO相同)。有了這些概念我們現在可以討論更新時機的問題了:

一、上圖右邊的follower Replica對象何時更新LEO?

Follower副本使用專屬線程不斷地向leader副本所在broker發送FETCH請求,然後leader發送FETCH response給follower。Follower拿到response之後取出裡面的數據寫入到本地底層日誌中,在該過程中其LEO值會被更新。

二、上圖左邊的leader Replica對象何時更新LEO?

和follower Replica更新LEO道理相同,leader寫底層日誌時就會自動地更新它的LEO值。對於leader來說何時會寫底層日誌呢?最容易想到的一個場景就是producer生產消息時。由此可見,不管是在leader端還是在 follower端,只有寫入本地底層日誌時才會觸發對本地Replica對象上LEO值的更新。

三、上圖左下的Other Replicas何時更新LEO?

首先思考一下為什麼leader Partition對象需要保存所有Replica副本的LEO?事實上,它們的主要作用是幫助leader Replica對象確定其HW值之用,而由於leader Replica的HW值就是整個分區的HW值,故這些other Replicas實際上是用來確定分區HW值的。Other Replicas LEO值是在leader端broker處理FETCH請求過程中被更新的。當follower發送一個FETCH請求時,它會告訴leader要從那個位移值開始讀取,即FetchRequest中的fetchOffset欄位。leader端在更新Other Replicas的LEO時會將其更新成這個fetchOffset值。

四、上圖右上的follower Replica對象何時更新HW?

Follower Replica對象更新HW是在其更新本地LEO之後。一旦follower向本地日誌寫完數據後它就會嘗試更新其HW值。具體演算法是取本地LEO與FETCH response中HW值的較小值,因此follower Replica的HW值不會大於其本地LEO值。

五、上圖左上的leader Replica對象何時更新HW?

前面說過了, leader Replica的HW值實際上就是分區的HW值,因此何時更新該值才是我們最關心的,因為它將直接影響分區數據對於consumer的可見性。以下4種情況Kafka會嘗試去更新leader Replica對象的HW值:

該Replica成為leader Replica時:當某個Replica成為分區的leader副本後,Kafka會嘗試去更新其HW值

Broker崩潰導致副本被踢出ISR時:此時Kafka會執行ISR的縮減操作,故必須要檢查下分區HW值是否需要更新

Producer向leader Replica寫入消息時:寫入消息會更新leader Replica的LEO,故有必要檢查下其HW值是否需要修改

Leader Replica處理FETCH請求時:Leader Replica處理FETCH請求時在更新完Other Replicas的LEO後會嘗試更新其HW值

上面的條件揭示了一個重要的事實:如果沒有出現broker failure或leader變更等情形,分區HW值更新時機只可能有兩個:1. leader broker處理PRODUCE請求;2. leader broker處理FETCH請求。Leader Replica HW值變更的演算法很簡單:首先找出leader Partition對象保存的所有與leader Replica保持同步的Replica對象(leader Replica + other Replicas)的LEO值,然後選擇其中航最小的LEO值作為分區HW值。這裡的同步判斷條件有兩個:

乍看上去好像這兩個條件說的是一回事,畢竟ISR的定義就是第二個條件描述的那樣。但在某些情況下Kafka的確可能出現follower副本已經「追上」了leader的進度,但卻不在ISR中——比如某個從failure中恢復的副本。如果Kafka只判斷第一個條件的話,確定分區HW值時就不會考慮這些未在ISR中的副本,但這些副本已經具備了「立刻進入ISR」的資格,因此就可能出現分區HW值越過ISR中副本LEO的情況——這肯定是不允許的,因為超過ISR副本LEO的那些消息屬於未提交消息。

在舉實際例子之前,我們先確認一下這些更新步驟的順序。首先是處理PRODUCE請求的邏輯順序:

之後是leader端broker處理FETCH請求:

最後是follower端broker處理FETCH response:

下面舉個一個實際的例子,該例子中的topic是單分區,副本因子是2。我們首先看下當producer發送一條消息時,leader/follower端broker的副本對象到底會發生什麼事情以及分區HW是如何被更新的。首先是初始狀態:

此時producer給該topic分區發送了一條消息。此時的狀態如下圖所示:

如上圖所見,producer發送消息成功後(假設acks=1, leader成功寫入即返回),follower發來了新的FECTH請求,依然請求fetchOffset = 0的數據。和上次不同的是,這次是有數據可以讀取的,因此整個處理流程如下圖:

顯然,現在leader和follower都保存了位移是0的這條消息,但兩邊的HW值都沒有被更新,它們需要在下一輪FETCH請求處理中被更新,如下圖所示:

簡單解釋一下, 第二輪FETCH請求中,follower發送fetchOffset = 1的FETCH請求——因為fetchOffset = 0的消息已經成功寫入follower本地日誌了,所以這次請求fetchOffset = 1的數據了。Leader端broker接收到FETCH請求後首先會更新other replicas中的LEO值,即將remote LEO更新成1,然後更新分區HW值為1——具體的更新規則參見上面的解釋。做完這些之後將當前分區HW值(1)封裝進FETCH response發送給follower。Follower端broker接收到FETCH response之後從中提取出當前分區HW值1,然後與自己的LEO值比較,從而將自己的HW值更新成1,至此完整的HW、LEO更新周期結束。

由上面的分析可知,兩邊HW值的更新是在後面一輪(如果有多個follower副本,也許是多輪)FETCH請求處理中完成的,這種「時間」上的錯配也是導致出現各種「數據丟失」或「數據不一致」的原因。基於此社區才引入了leader epoch機制試圖規避因使用HW而帶來的這個問題。不過本文並不關注leader epoch,只是單純希望我能把HW、LEO這件事情講明白。


喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 大數據Kafka技術分享 的精彩文章:

TAG:大數據Kafka技術分享 |