當前位置:
首頁 > 最新 > 乾貨:如何構建一個消息推送平台

乾貨:如何構建一個消息推送平台

01

背 景

B/S架構下很多業務場景下我們需要服務端主動推送消息到客戶端,在html5之前一般使用長輪詢(除此之外還有iframe流或者Flash Socket)的方式來實現,而長輪詢的方式缺點很明顯,頻繁交互的情況下,大量的連接被建立和釋放,並且交互頻率受限於兩次http的請求間隔,html5開始可以使用websocket全雙工的通信協議,在tomcat和jetty都有實現。

雖然在java1.4以後可以用nio包實現非阻塞的websocket通信,但是使用起來太過底層和複雜;netty封裝了nio,使用了大量非同步和事件驅動,封裝了拆包粘包,實現了網路編程和業務分離,但即便如此,使用和優化netty還是需要深厚的網路編程知識,況且我們需要的僅僅是用戶與用戶之間的文本消息的推送,這裡我們介紹一個基於netty實現的websocket服務端框架,socket.io屏蔽了使用netty的細節,針對聊天場景做了高度封裝,並且提供了包含WEBSOCKET、POLLING等多種推送方式的支持,同時支持namespace(命名空間)、broadcast(廣播)、room(房間/聊天室)、event(業務事件)、應用層ack機制。

02

需 求

構建一個統一的文本消息推送平台,可以在此之上構建業務系統,比如:客服系統、內部聊天工具、站內信、事件系統等。基礎平台要做到用戶管理、狀態管理、文本消息發送/推送(群/點對點)、圖片/文件推送、未讀消息管理、歷史消息、會話管理。

03

業務架構

restful:通用介面http服務層、提供鑒權、發送消息、歷史消息、離線消息、會話等相關介面

logic:業務邏輯層、消息/會話持久化、用戶狀態、連接狀態存儲、離線消息

router:路由層、消息路由、把消息推送到用戶連接的connector

connector:連接層、提供消息推送服務、維護房間、ack

消息的上行是走的restful,在這層可以做uniauth(內部的統一登錄平台)許可權驗證、負載均衡等,也讓connector更輕量級,依賴更少;connector是有狀態的,保持著用戶的連接信息,同時要記錄用戶和connector的映射關係,以用於router層在推送消息時路由到對應的節點,connector使用單獨的物理機來運行,因為虛擬主機並不能穩定保證較高並發的長連接數量。

04

輕量級的消息發送

一個消息推送平台最重要的是高可用性和最終一致性(消息可達),所以消息發送可以做的非常輕量,發送消息時會帶上客戶端生成的唯一消息id,同時客戶端持久化,服務端只需要把消息寫到mq即發送成功,然後logic消費mq再做非同步批量的消息持久化,從可用性角度來說,發送消息並不依賴websocket,而是通過restful服務,更加高可用和可擴展。消息發送的可靠性,沒有依賴於rabbitmq的事務或者publisher confirms,後面介紹專門的機制來保證。

05

點對點/群消息

消息我們定義了三種事件類型,MessageCategoryEnum

MSG:文本消息,包含圖片、文件、語音、文字等子類型。

IQ:操作消息,客戶端用來做某些特殊動作處理。

STATUS:狀態消息,和用戶上下線等狀態有關。

消息體如下

對於點對點消息來說,發送者和接受者都是一個用戶id,對於群消息來說,接受者可能是一個SESSION(會話),會話需要提前創建,會話中包含若干的用戶,使用socket.io的room功能,很方便的實現一個會話,room就是SESSION的id。

1. 用戶上下線的時候都要主動加入/離開和自己有關的所有room

3. 群消息推送就調用

06

消息可達保證(QoS)

使用六報文的方式保證消息可達,圖中演示的是三報文的消息發送階段。

1.client-A向im-server發送一個消息請求包,即msg:R;

2.im-server在成功處理後,回復client-A一個消息響應包,即msg:A;

3.如果此時client-B在線,則im-server主動向client-B推送一個消息通知包(不在線做離線存儲),即msg:N;

4.client-B向im-server發送一個ack請求包表示msg已經收到,即ack:R;

5.im-server在成功處理後,回復client-B一個ack響應包,即ack:A;

6.im-server主動向client-A發送一個ack通知包,即ack:N;

7.client-A發出消息後,超過設定的某個時間沒有收到ack:N,那麼我們認為消息沒有推送成功(用戶離線除外,用戶離線,服務端會模擬一個ack:N),需要客戶端重發,服務端根據客戶端id冪等處理;

在我們系統,msg:N和ack:N是connector推送,其他報文就是走http的消息發送和ack;其實ack:N可以不需要,伺服器可以保存未讀消息列表,接受ack:R來判斷是否要重發,但是這樣可靠性稍差一些,極端情況會造成消息丟失。

07

消息順序性

首先我們要確定以什麼樣的邏輯排列消息的順序,兩個人聊天,首先要保證的是同一個人消息的先後順序,其次要保證會話內消息的順序。服務端不能以持久化時生成自增id作為順序,因為非同步消費不能保證順序(某些mq,如kafka以用戶為key可以保證消息順序性,但也存在重試問題)。從客戶端的角度來說,可以通過同步發送保證消息的順序性,所以服務端可以在發mq之前通過redis生成會話內自增id,推送的時候可能會存在重發和亂序,就需要客戶端根據服務端生成的id做冪等和重排序。

如果不依賴於redis,可以使用伺服器時間(精確到秒)+客戶端生成的自增id作為排序欄位,但這樣就要求集群的時間同步在一秒內。

08

未讀(離線)消息

關於未讀消息的存儲有兩種方案:寫擴散和讀擴散,寫擴散即未讀消息在持久化時就針對每個用戶保存一份,讀擴散就是利用群消息的偏序特性,只保存用戶在會話內ack的最後一條消息id,無論是哪種方式,未讀消息都只是保存消息的id,未讀消息的推送僅在上線或者連接的時候。

寫擴散:

session_user(session_id, user_id);//會話-用戶

messages(msgid,session_id,sender_user_id,time,content);//消息

user_messages(user_id, msgid, session_id);//用戶-未讀消息

1. 消息持久化後,如果接受用戶不在線,那麼在user_messages為每個用戶新增一條記錄;

2. 用戶ack後,刪除t_user_messages記錄;

3. 用戶查未讀消息,先查user_messages,再查messages。

讀擴散:

session_user(session_id, user_id,last_ack_msgid);//會話-用戶

messages(msgid,session_id,sender_user_id,time,content);//消息

user_messages(user_id, msgid, session_id);//用戶-未讀消息

1.消息持久化,推送在線用戶;

2.用戶ack,更新last_ack_msgid;

3.用戶查未讀消息,先查last_ack_msgid,再查messages。

優化點:

1. 未讀消息user_messages表或者last_ack_msgid欄位的更新很頻繁,可以用redis來實現存儲;

2. ack的頻率很高,可以讓客戶端批量或在一個時間段內ack,減少請求,即使消息重發,也有客戶端去重;

3. 未讀消息可能存在消息量很大的情況,可以設置過期時間,過期後即表示該消息已讀;大量未讀消息不適合做推送,最好通過http分頁拉取,甚至可以把拉取請求和ack請求合併。

09

調 優

1. Jdk nio會判斷Linux kernels >= 2.6,是則使用level-triggered epoll,否則使用select/poll,而netty實現了edge-triggered epoll,在高並發場景下性能略優,也是netty推薦的模式,設置socket.io的useLinuxNativeEpoll=true,使用linux epoll, NioEventLoopGroup 替換成EpollEventLoopGroup,NioServerSocketChannel 替換成 EpollServerSocketChannel;

2. 要建立大量連接需要修改最大文件句柄數,修改sysctl.conf的fs.file-max = 1000000,同時修改limits.conf

soft  nofile  1000000

hard  nofile  1000000

3. 設置TCP_NODELAY=true禁用nagle演算法;

7. 上行數據使用AdaptiveRecvByteBufAllocator動態分配ByteBuf;

8. 業務邏輯不要佔用EventLoop線程,啟用業務線程池處理,否則可能阻塞I/O;

9. Tcp層的心跳只能檢測連接,不能確定應用可用,所以有了應用層心跳, socket.io默認25s,可以調整為180s,頻率過高會佔用大量帶寬和流量,過低可能會導致連接斷開。同時關閉tcp心跳保活SO_KEEPALIVE=false;

10. Netty的boss threads設置為1(1個監聽埠),worker設置為16(cpu*2)。

10

Socket.io

1.服務端

netty-socketio是java版的服務端實現,目前只支持xhr-polling和websocket 兩種transport。

NamespacesHub是持有所有命名空間的一個map,value是Namespace。

Namespace持有當前命名空間下的所有client連接room和client的對應關係。

NamespaceClient代表當前命名空間下的一個client連接。

ClientHead代表一個client連接,持有會話id、TransportState以及子NamespaceClient(一個Channel可以對應多個命名空間)。

TransportState持有一個Packet的無鎖隊列,以及一個netty的通道Channel,通過此向客戶端推送消息。

Packet就是一個socket.io協議下的數據包。

socket.io為我們封裝了支持websocket協議的ChannelHandler,其中就包括最重要的InPacketHandler,InPacketHandler提供了協議解碼,用來處理非OPEN,UPGRADE的PacketType事件:CONNECT(連接)、PING(應用層心跳)、UPGRADE(升級websocket)、CLOSE(連接關閉)、MESSAGE(業務消息),socket.io使用自定義協議Packet,並使用json序列化的方式傳輸。

2.客戶端

2.1 連接

var socket = io.connect("http://127.0.0.1:9092/namespace");//發起客戶端連接,客戶端sdk發起connect的時候會發出下面的若干請求

https://crm-connector-dev.dianrong.com/socket.io/?EIO=3&transport=polling&t=MFJ3jBa

會生成一個sid表示本次連接會話,upgrades:websocket表示客戶端可以把協議升級為websocket而不再使用輪詢,而客戶端也會判斷當前瀏覽器是否支持websocket來決定是否升級

Response:[{"code":"0","message":"success"}]

客戶端發送了一個業務上的註冊事件給服務端,並接受了一個成功的返回,在websocket沒有連接成功前,會一直使用長輪詢方式接受服務端推送

建立websocket長連接

2.2 監聽

監聽一個msg業務事件

socket.on(『msg』, function(data) {

//do something

});

監聽斷開連接事件

socket.on("disconnect", function() {

//do something

});

3.Socket.io websocket協議包

每一個Frame數據幀都以數字開頭,三位數字分別表示command(命令):messageType(消息類型):req_id(自增id)

第一行 2probe 表示首次ping

第二行 3probe 表示首次pong

第三行 5 表示UPGRADE,升級為websocket

第四行 421[「iq...] 表示iq事件消息


喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 點融黑幫 的精彩文章:

支付路由的設計與實踐

TAG:點融黑幫 |