當前位置:
首頁 > 知識 > RabbitMQ之消息確認機制(事務+Confirm)

RabbitMQ之消息確認機制(事務+Confirm)

在使用RabbitMQ的時候,我們可以通過消息持久化操作來解決因為伺服器的異常奔潰導致的消息丟失,除此之外我們還會遇到一個問題,當消息的發布者在將消息發送出去之後,消息到底有沒有正確到達broker代理伺服器呢?如果不進行特殊配置的話,默認情況下發布操作是不會返回任何信息給生產者的,也就是默認情況下我們的生產者是不知道消息有沒有正確到達broker的,如果在消息到達broker之前已經丟失的話,持久化操作也解決不了這個問題,因為消息根本就沒到達代理伺服器,你怎麼進行持久化,那麼這個問題該怎麼解決呢?

RabbitMQ為我們提供了兩種方式:

通過AMQP事務機制實現,這也是AMQP協議層面提供的解決方案;

通過將channel設置成confirm模式來實現;

事務機制

這裡首先探討下RabbitMQ事務機制。

RabbitMQ中與事務機制有關的方法有三個:txSelect(), txCommit()以及txRollback(), txSelect用於將當前channel設置成transaction模式,txCommit用於提交事務,txRollback用於回滾事務,在通過txSelect開啟事務之後,我們便可以發布消息給broker代理伺服器了,如果txCommit提交成功了,則消息一定到達了broker了,如果在txCommit執行之前broker異常崩潰或者由於其他原因拋出異常,這個時候我們便可以捕獲異常通過txRollback回滾事務了。

關鍵代碼:

通過wirkshark抓包(ip.addr==xxx.xxx.xxx.xxx && amqp),可以看到:

RabbitMQ之消息確認機制(事務+Confirm)

(注意這裡的Tx.Commit與Tx.Commit-Ok之間的時間間隔294ms,由此可見事務還是很耗時的。)

我們先來看看沒有事務的通信過程是什麼樣的:

RabbitMQ之消息確認機制(事務+Confirm)

可以看到帶事務的多了四個步驟:

client發送Tx.Select

broker發送Tx.Select-Ok(之後publish)

client發送Tx.Commit

broker發送Tx.Commit-Ok

下面我們來看下事務回滾是什麼樣子的。關鍵代碼如下:

RabbitMQ之消息確認機制(事務+Confirm)

同樣通過wireshark抓包可以看到:

RabbitMQ之消息確認機制(事務+Confirm)

代碼中先是發送了消息至broker中但是這時候發生了異常,之後在捕獲異常的過程中進行事務回滾。

事務確實能夠解決producer與broker之間消息確認的問題,只有消息成功被broker接受,事務提交才能成功,否則我們便可以在捕獲異常進行事務回滾操作同時進行消息重發,但是使用事務機制的話會降低RabbitMQ的性能,那麼有沒有更好的方法既能保障producer知道消息已經正確送到,又能基本上不帶來性能上的損失呢?從AMQP協議的層面看是沒有更好的方法,但是RabbitMQ提供了一個更好的方案,即將channel信道設置成confirm模式。

Confirm模式

概述

上面我們介紹了RabbitMQ可能會遇到的一個問題,即生成者不知道消息是否真正到達broker,隨後通過AMQP協議層面為我們提供了事務機制解決了這個問題,但是採用事務機制實現會降低RabbitMQ的消息吞吐量,那麼有沒有更加高效的解決方式呢?答案是採用Confirm模式。

producer端confirm模式的實現原理

生產者將信道設置成confirm模式,一旦信道進入confirm模式,所有在該信道上面發布的消息都會被指派一個唯一的ID(從1開始),一旦消息被投遞到所有匹配的隊列之後,broker就會發送一個確認給生產者(包含消息的唯一ID),這就使得生產者知道消息已經正確到達目的隊列了,如果消息和隊列是可持久化的,那麼確認消息會將消息寫入磁碟之後發出,broker回傳給生產者的確認消息中deliver-tag域包含了確認消息的序列號,此外broker也可以設置basic.ack的multiple域,表示到這個序列號之前的所有消息都已經得到了處理。

confirm模式最大的好處在於他是非同步的,一旦發布一條消息,生產者應用程序就可以在等信道返回確認的同時繼續發送下一條消息,當消息最終得到確認之後,生產者應用便可以通過回調方法來處理該確認消息,如果RabbitMQ因為自身內部錯誤導致消息丟失,就會發送一條nack消息,生產者應用程序同樣可以在回調方法中處理該nack消息。

在channel 被設置成 confirm 模式之後,所有被 publish 的後續消息都將被 confirm(即 ack) 或者被nack一次。但是沒有對消息被 confirm 的快慢做任何保證,並且同一條消息不會既被 confirm又被nack 。

RabbitMQ之消息確認機制(事務+Confirm)

已經在transaction事務模式的channel是不能再設置成confirm模式的,即這兩種模式是不能共存的。

編程模式

對於固定消息體大小和線程數,如果消息持久化,生產者confirm(或者採用事務機制),消費者ack那麼對性能有很大的影響.

消息持久化的優化沒有太好方法,用更好的物理存儲(SAS, SSD, RAID卡)總會帶來改善。生產者confirm這一環節的優化則主要在於客戶端程序的優化之上。歸納起來,客戶端實現生產者confirm有三種編程方式:

普通confirm模式:每發送一條消息後,調用waitForConfirms()方法,等待伺服器端confirm。實際上是一種串列confirm了。

批量confirm模式:每發送一批消息後,調用waitForConfirms()方法,等待伺服器端confirm。

非同步confirm模式:提供一個回調方法,服務端confirm了一條或者多條消息後Client端會回調這個方法。

從編程實現的複雜度上來看:

第1種

普通confirm模式最簡單,publish一條消息後,等待伺服器端confirm,如果服務端返回false或者超時時間內未返回,客戶端進行消息重傳。

關鍵代碼如下:

RabbitMQ之消息確認機制(事務+Confirm)

wirkShark抓包可以看到如下:

RabbitMQ之消息確認機制(事務+Confirm)

第二種

批量confirm模式稍微複雜一點,客戶端程序需要定期(每隔多少秒)或者定量(達到多少條)或者兩則結合起來publish消息,然後等待伺服器端confirm, 相比普通confirm模式,批量極大提升confirm效率,但是問題在於一旦出現confirm返回false或者超時的情況時,客戶端需要將這一批次的消息全部重發,這會帶來明顯的重複消息數量,並且,當消息經常丟失時,批量confirm性能應該是不升反降的。

關鍵代碼:

RabbitMQ之消息確認機制(事務+Confirm)

第三種

非同步confirm模式的編程實現最複雜,Channel對象提供的ConfirmListener()回調方法只包含deliveryTag(當前Chanel發出的消息序號),我們需要自己為每一個Channel維護一個unconfirm的消息序號集合,每publish一條數據,集合中元素加1,每回調一次handleAck方法,unconfirm集合刪掉相應的一條(multiple=false)或多條(multiple=true)記錄。從程序運行效率上看,這個unconfirm集合最好採用有序集合SortedSet存儲結構。實際上,SDK中的waitForConfirms()方法也是通過SortedSet維護消息序號的。

關鍵代碼:

RabbitMQ之消息確認機制(事務+Confirm)

SDK中waitForConfirms方法實現:

RabbitMQ之消息確認機制(事務+Confirm)

性能測試

Client端機器和RabbitMQ機器配置:CPU:24核,2600MHZ, 64G內存,1TB硬碟。

Client端發送消息體大小10B,線程數為1即單線程,消息都持久化處理(deliveryMode:2)。

分別採用事務模式、普通confirm模式,批量confirm模式和非同步confirm模式進行producer實驗,比對各個模式下的發送性能。

RabbitMQ之消息確認機制(事務+Confirm)

發送平均速率:

事務模式(tx):1637.484

普通confirm模式(common):1936.032

批量confirm模式(batch):10432.45

非同步confirm模式(async):10542.06

可以看到事務模式性能是最差的,普通confirm模式性能比事務模式稍微好點,但是和批量confirm模式還有非同步confirm模式相比,還是小巫見大巫。批量confirm模式的問題在於confirm之後返回false之後進行重發這樣會使性能降低,非同步confirm模式(async)編程模型較為複雜,至於採用哪種方式,那是仁者見仁智者見智了。

消息確認(Consumer端)

為了保證消息從隊列可靠地到達消費者,RabbitMQ提供消息確認機制(message acknowledgment)。消費者在聲明隊列時,可以指定noAck參數,當noAck=false時,RabbitMQ會等待消費者顯式發回ack信號後才從內存(和磁碟,如果是持久化消息的話)中移去消息。否則,RabbitMQ會在隊列中消息被消費後立即刪除它。

採用消息確認機制後,只要令noAck=false,消費者就有足夠的時間處理消息(任務),不用擔心處理消息過程中消費者進程掛掉後消息丟失的問題,因為RabbitMQ會一直持有消息直到消費者顯式調用basicAck為止。

當noAck=false時,對於RabbitMQ伺服器端而言,隊列中的消息分成了兩部分:一部分是等待投遞給消費者的消息;一部分是已經投遞給消費者,但是還沒有收到消費者ack信號的消息。如果伺服器端一直沒有收到消費者的ack信號,並且消費此消息的消費者已經斷開連接,則伺服器端會安排該消息重新進入隊列,等待投遞給下一個消費者(也可能還是原來的那個消費者)。

RabbitMQ不會為未ack的消息設置超時時間,它判斷此消息是否需要重新投遞給消費者的唯一依據是消費該消息的消費者連接是否已經斷開。這麼設計的原因是RabbitMQ允許消費者消費一條消息的時間可以很久很久。

RabbitMQ管理平台界面上可以看到當前隊列中Ready狀態和Unacknowledged狀態的消息數,分別對應上文中的等待投遞給消費者的消息數和已經投遞給消費者但是未收到ack信號的消息數。也可以通過命令行來查看上述信息:

RabbitMQ之消息確認機制(事務+Confirm)

代碼示例(關閉自動消息確認,進行手動ack):

basicRecover:是路由不成功的消息可以使用recovery重新發送到隊列中。

basicReject:是接收端告訴伺服器這個消息我拒絕接收,不處理,可以設置是否放回到隊列中還是丟掉,而且只能一次拒絕一個消息,官網中有明確說明不能批量拒絕消息,為解決批量拒絕消息才有了basicNack。

basicNack:可以一次拒絕N條消息,客戶端可以設置basicNack方法的multiple參數為true,伺服器會拒絕指定了delivery_tag的所有未確認的消息(tag是一個64位的long值,最大值是9223372036854775807)。

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 程序員小新人學習 的精彩文章:

PAT 1035插入與歸併的代碼實現及錯誤分析(C語言)
Tengine-2.1.0的安裝與配置

TAG:程序員小新人學習 |