SyncRequestProcessor 小代碼 大優雅
引言
zookeeper 的業務處理流程就像工作流一樣,其實就是一個單鏈表;在zookeeper啟動的時候,會確立各個節點的角色特性,即leader、follower和observer,每個角色確立後,就會初始化它的工作責任鏈;
本篇要分享的是 zookeeper的源碼分析之SyncRequestProcessor處理器,其目的是進行持久化,也就是將消息存儲到磁碟文件中;代碼不多,但有不少值得借鑒的地方;
主要成員變數
queuedRequests:
在zookeeper中各個工作責任鏈之間進行消息通信的是通過LinkedBlockingQueue 來進行線程間信息交互的;queuedRequests就是SyncRequestProcessor和上一責任鏈之間進行消息交互的隊列;
toFlush :
待flush到磁碟的事務日誌消息容器,包括增、刪、改消息,查詢類消息不進入該容器;
snapCount:
生成snapshot的事務記錄參數值,可在zoo.cfg中進行配置,即事務日誌記錄數大於等於snapCount(其具體演算法在下面進行探討,這裡先這樣記錄)的時候,進行snapshot文件的生成;
randRoll:
生成snapshot的隨機值,和snapcount配合使用;
業務處理
由於SyncRequestProcessor是繼承自ZooKeeperThread,所以它的主要邏輯是在run函數中,直接進入run函數中;
zookeeper沒有直接採用queuedRequests.take()進行消息接收,而是採用了兩種方式take()和poll();take函數會等待直至消息的到來;而poll()則是如果沒有消息,就會立即返回null;
zookeeper為什麼這樣設計,先拋個問題,我們先看下面的邏輯,然後再回答這個問題;
代碼三
if (si != null)
if (LOG.isDebugEnabled()) {
LOG.debug({},si);
LOG.debug(toFlush .size = toFlush.size());
}
// track the number of records written to the log
if (zks.getZKDatabase().append(si)) {
logCount ;
if (logCount (snapCount / 2 randRoll)) {
setRandRoll(r.nextInt(snapCount/2));
// roll the log
zks.getZKDatabase().rollLog();
// take a snapshot
if (snapInProcess != null snapInProcess.isAlive()) {
LOG.warn(Too busy to snap, skipping);
} else {
snapInProcess = new ZooKeeperThread(Snapshot Thread) {
public void run() {
try {
zks.takeSnapshot();
} catch(Exception e) {
LOG.warn(Unexpected exception, e);
}
}
};
snapInProcess.start();
}
logCount = 0;
}
} else if (toFlush.isEmpty()) {
// optimization for read heavy workloads
// iff this is a read, and there are no pending
// flushes (writes), then just pass this to the next
// processor
if (nextProcessor != null) {
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
}
continue;
}
toFlush.add(si);
if (toFlush.size() 1000) {
flush(toFlush);
}
}
zookeeper的兩種持久化方式,一種是進行增量事務日誌,一種是snapshot文件;增量事務日誌就是將所有的事務操作記錄下來;而snapshot文件就是把內存中的數據進行全量備份下來;
SyncRequestProcessor 先調用了zks.getZKDatabase().append(si),該函數是將事務日誌
記錄下來,如果是事務類消息,即增刪改,則返回true;如果是查詢類消息,就返回false;當返回true的時候,即記錄事物日誌,這時候做了一個判斷 if (logCount (snapCount / 2 randRoll)) ;SyncRequestProcessor 並沒有直接進行logCount 和snapCount 的判斷 ,即logCount snapCount ;而是生成了一個隨機數,其目的主要是考慮到在zookeeper集群中,各個節點的內存數據在某一時刻是基本一致的,如果都是進行logCount snapCount ,就生成snapshot,勢必導致zookeeper集群中各個節點在某一時刻,都會去進行snapshot,因為磁碟io操作總是相對較慢的,所以會導致節點都忙於刷磁碟文件了,系統負載會增加上去,那麼對外的服務就會受到影響;所以這裡採用logCount (snapCount / 2 randRoll)一個隨機數和logCount的比較,是一種全局觀,有一定的規劃思想在裡面;
那麼當zks.getZKDatabase().append(si)返回為false的時候,則判斷了toFlush.isEmpty(),其實這也就是非事務消息的邏輯,當該消息是非事務消息,即查詢類消息時候,則直接進行nextProcessor的處理,處理完就進行continue;
只有事務消息才會進入toFlush,也就是toFlush.add(si)的邏輯;後續有一個flush函數,我們來看flush的函數都做了什麼;
這段代碼的主要功能
1、zks.getZKDatabase().commit(); 消息進行刷盤至磁碟文件中;
2、將消息遞交下一處理鏈;
現在返回到上一個問題,從上一個處理鏈中為什麼要採用兩種方式take()和poll()獲取消息呢;
其實也是考慮到服務的負載問題,
1 queuedRequests隊列沒有消息,而toFlush里也沒有消息,直接採用take函數獲取,這時候說明應用的負載輕,線程處於阻塞等待,也就是直接將該線程掛起,減輕負載;
2 queuedRequests隊列沒有消息,而toFlush裡面有消息,這說明當前系統已經空閑了,那麼要把之前還沒有返回給客戶端的消息處理完,那麼就是調用flush函數;
3 queuedRequests隊列一直有消息,toFlush裡面也有消息,這說明當前系統負載過重,但是還是需要在一定量的時候(toFlush.size() 1000)將消息響應返回給客戶端,同時把收到的事務日誌消息進行flush,避免寫日誌沒刷新到磁碟中;
好了,看到這裡,或許大家已經明白了,希望大家有所收穫,歡迎大家一塊探討zookeeper的問題,或者 zookeeper的源碼分析!


※大規模系統的消息隊列技術方案!
※Redis分散式鎖的try-with-resources實現
TAG:千鋒JAVA開發學院 |