乾貨:如何構建一個消息推送平台
01
背 景
B/S架構下很多業務場景下我們需要服務端主動推送消息到客戶端,在html5之前一般使用長輪詢(除此之外還有iframe流或者Flash Socket)的方式來實現,而長輪詢的方式缺點很明顯,頻繁交互的情況下,大量的連接被建立和釋放,並且交互頻率受限於兩次http的請求間隔,html5開始可以使用websocket全雙工的通信協議,在tomcat和jetty都有實現。
雖然在java1.4以後可以用nio包實現非阻塞的websocket通信,但是使用起來太過底層和複雜;netty封裝了nio,使用了大量非同步和事件驅動,封裝了拆包粘包,實現了網路編程和業務分離,但即便如此,使用和優化netty還是需要深厚的網路編程知識,況且我們需要的僅僅是用戶與用戶之間的文本消息的推送,這裡我們介紹一個基於netty實現的websocket服務端框架,socket.io屏蔽了使用netty的細節,針對聊天場景做了高度封裝,並且提供了包含WEBSOCKET、POLLING等多種推送方式的支持,同時支持namespace(命名空間)、broadcast(廣播)、room(房間/聊天室)、event(業務事件)、應用層ack機制。
02
需 求
構建一個統一的文本消息推送平台,可以在此之上構建業務系統,比如:客服系統、內部聊天工具、站內信、事件系統等。基礎平台要做到用戶管理、狀態管理、文本消息發送/推送(群/點對點)、圖片/文件推送、未讀消息管理、歷史消息、會話管理。
03
業務架構
restful:通用介面http服務層、提供鑒權、發送消息、歷史消息、離線消息、會話等相關介面
logic:業務邏輯層、消息/會話持久化、用戶狀態、連接狀態存儲、離線消息
router:路由層、消息路由、把消息推送到用戶連接的connector
connector:連接層、提供消息推送服務、維護房間、ack
消息的上行是走的restful,在這層可以做uniauth(內部的統一登錄平台)許可權驗證、負載均衡等,也讓connector更輕量級,依賴更少;connector是有狀態的,保持著用戶的連接信息,同時要記錄用戶和connector的映射關係,以用於router層在推送消息時路由到對應的節點,connector使用單獨的物理機來運行,因為虛擬主機並不能穩定保證較高並發的長連接數量。
04
輕量級的消息發送
一個消息推送平台最重要的是高可用性和最終一致性(消息可達),所以消息發送可以做的非常輕量,發送消息時會帶上客戶端生成的唯一消息id,同時客戶端持久化,服務端只需要把消息寫到mq即發送成功,然後logic消費mq再做非同步批量的消息持久化,從可用性角度來說,發送消息並不依賴websocket,而是通過restful服務,更加高可用和可擴展。消息發送的可靠性,沒有依賴於rabbitmq的事務或者publisher confirms,後面介紹專門的機制來保證。
05
點對點/群消息
消息我們定義了三種事件類型,MessageCategoryEnum
MSG:文本消息,包含圖片、文件、語音、文字等子類型。
IQ:操作消息,客戶端用來做某些特殊動作處理。
STATUS:狀態消息,和用戶上下線等狀態有關。
消息體如下
對於點對點消息來說,發送者和接受者都是一個用戶id,對於群消息來說,接受者可能是一個SESSION(會話),會話需要提前創建,會話中包含若干的用戶,使用socket.io的room功能,很方便的實現一個會話,room就是SESSION的id。
1. 用戶上下線的時候都要主動加入/離開和自己有關的所有room
3. 群消息推送就調用
06
消息可達保證(QoS)
使用六報文的方式保證消息可達,圖中演示的是三報文的消息發送階段。
1.client-A向im-server發送一個消息請求包,即msg:R;
2.im-server在成功處理後,回復client-A一個消息響應包,即msg:A;
3.如果此時client-B在線,則im-server主動向client-B推送一個消息通知包(不在線做離線存儲),即msg:N;
4.client-B向im-server發送一個ack請求包表示msg已經收到,即ack:R;
5.im-server在成功處理後,回復client-B一個ack響應包,即ack:A;
6.im-server主動向client-A發送一個ack通知包,即ack:N;
7.client-A發出消息後,超過設定的某個時間沒有收到ack:N,那麼我們認為消息沒有推送成功(用戶離線除外,用戶離線,服務端會模擬一個ack:N),需要客戶端重發,服務端根據客戶端id冪等處理;
在我們系統,msg:N和ack:N是connector推送,其他報文就是走http的消息發送和ack;其實ack:N可以不需要,伺服器可以保存未讀消息列表,接受ack:R來判斷是否要重發,但是這樣可靠性稍差一些,極端情況會造成消息丟失。
07
消息順序性
首先我們要確定以什麼樣的邏輯排列消息的順序,兩個人聊天,首先要保證的是同一個人消息的先後順序,其次要保證會話內消息的順序。服務端不能以持久化時生成自增id作為順序,因為非同步消費不能保證順序(某些mq,如kafka以用戶為key可以保證消息順序性,但也存在重試問題)。從客戶端的角度來說,可以通過同步發送保證消息的順序性,所以服務端可以在發mq之前通過redis生成會話內自增id,推送的時候可能會存在重發和亂序,就需要客戶端根據服務端生成的id做冪等和重排序。
如果不依賴於redis,可以使用伺服器時間(精確到秒)+客戶端生成的自增id作為排序欄位,但這樣就要求集群的時間同步在一秒內。
08
未讀(離線)消息
關於未讀消息的存儲有兩種方案:寫擴散和讀擴散,寫擴散即未讀消息在持久化時就針對每個用戶保存一份,讀擴散就是利用群消息的偏序特性,只保存用戶在會話內ack的最後一條消息id,無論是哪種方式,未讀消息都只是保存消息的id,未讀消息的推送僅在上線或者連接的時候。
寫擴散:
session_user(session_id, user_id);//會話-用戶
messages(msgid,session_id,sender_user_id,time,content);//消息
user_messages(user_id, msgid, session_id);//用戶-未讀消息
1. 消息持久化後,如果接受用戶不在線,那麼在user_messages為每個用戶新增一條記錄;
2. 用戶ack後,刪除t_user_messages記錄;
3. 用戶查未讀消息,先查user_messages,再查messages。
讀擴散:
session_user(session_id, user_id,last_ack_msgid);//會話-用戶
messages(msgid,session_id,sender_user_id,time,content);//消息
user_messages(user_id, msgid, session_id);//用戶-未讀消息
1.消息持久化,推送在線用戶;
2.用戶ack,更新last_ack_msgid;
3.用戶查未讀消息,先查last_ack_msgid,再查messages。
優化點:
1. 未讀消息user_messages表或者last_ack_msgid欄位的更新很頻繁,可以用redis來實現存儲;
2. ack的頻率很高,可以讓客戶端批量或在一個時間段內ack,減少請求,即使消息重發,也有客戶端去重;
3. 未讀消息可能存在消息量很大的情況,可以設置過期時間,過期後即表示該消息已讀;大量未讀消息不適合做推送,最好通過http分頁拉取,甚至可以把拉取請求和ack請求合併。
09
調 優
1. Jdk nio會判斷Linux kernels >= 2.6,是則使用level-triggered epoll,否則使用select/poll,而netty實現了edge-triggered epoll,在高並發場景下性能略優,也是netty推薦的模式,設置socket.io的useLinuxNativeEpoll=true,使用linux epoll, NioEventLoopGroup 替換成EpollEventLoopGroup,NioServerSocketChannel 替換成 EpollServerSocketChannel;
2. 要建立大量連接需要修改最大文件句柄數,修改sysctl.conf的fs.file-max = 1000000,同時修改limits.conf
soft nofile 1000000
hard nofile 1000000
3. 設置TCP_NODELAY=true禁用nagle演算法;
7. 上行數據使用AdaptiveRecvByteBufAllocator動態分配ByteBuf;
8. 業務邏輯不要佔用EventLoop線程,啟用業務線程池處理,否則可能阻塞I/O;
9. Tcp層的心跳只能檢測連接,不能確定應用可用,所以有了應用層心跳, socket.io默認25s,可以調整為180s,頻率過高會佔用大量帶寬和流量,過低可能會導致連接斷開。同時關閉tcp心跳保活SO_KEEPALIVE=false;
10. Netty的boss threads設置為1(1個監聽埠),worker設置為16(cpu*2)。
10
Socket.io
1.服務端
netty-socketio是java版的服務端實現,目前只支持xhr-polling和websocket 兩種transport。
NamespacesHub是持有所有命名空間的一個map,value是Namespace。
Namespace持有當前命名空間下的所有client連接room和client的對應關係。
NamespaceClient代表當前命名空間下的一個client連接。
ClientHead代表一個client連接,持有會話id、TransportState以及子NamespaceClient(一個Channel可以對應多個命名空間)。
TransportState持有一個Packet的無鎖隊列,以及一個netty的通道Channel,通過此向客戶端推送消息。
Packet就是一個socket.io協議下的數據包。
socket.io為我們封裝了支持websocket協議的ChannelHandler,其中就包括最重要的InPacketHandler,InPacketHandler提供了協議解碼,用來處理非OPEN,UPGRADE的PacketType事件:CONNECT(連接)、PING(應用層心跳)、UPGRADE(升級websocket)、CLOSE(連接關閉)、MESSAGE(業務消息),socket.io使用自定義協議Packet,並使用json序列化的方式傳輸。
2.客戶端
2.1 連接
var socket = io.connect("http://127.0.0.1:9092/namespace");//發起客戶端連接,客戶端sdk發起connect的時候會發出下面的若干請求
https://crm-connector-dev.dianrong.com/socket.io/?EIO=3&transport=polling&t=MFJ3jBa
會生成一個sid表示本次連接會話,upgrades:websocket表示客戶端可以把協議升級為websocket而不再使用輪詢,而客戶端也會判斷當前瀏覽器是否支持websocket來決定是否升級
Response:[{"code":"0","message":"success"}]
客戶端發送了一個業務上的註冊事件給服務端,並接受了一個成功的返回,在websocket沒有連接成功前,會一直使用長輪詢方式接受服務端推送
建立websocket長連接
2.2 監聽
監聽一個msg業務事件
socket.on(『msg』, function(data) {
//do something
});
監聽斷開連接事件
socket.on("disconnect", function() {
//do something
});
3.Socket.io websocket協議包
每一個Frame數據幀都以數字開頭,三位數字分別表示command(命令):messageType(消息類型):req_id(自增id)
第一行 2probe 表示首次ping
第二行 3probe 表示首次pong
第三行 5 表示UPGRADE,升級為websocket
第四行 421[「iq...] 表示iq事件消息
TAG:點融黑幫 |