當前位置:
首頁 > 知識 > RocketMQ源碼:通信協議設計及編解碼

RocketMQ源碼:通信協議設計及編解碼

本文主要分析RocketMQ通信協議的設計。RocketMQ設計了自己的一個通信協議,用於消息內容和二進位格式之間的轉換。

RocketMQ的版本為:4.2.0 release。

一.通信協議的格式

RocketMQ源碼:通信協議設計及編解碼

1.length:4位元組整數,二三四部分長度總和;

2.header length:4位元組整數,第三部分header data長度;

3.header data:存放Json序列化的數據;

4.body data:應用自定義二進位序列化的數據。

二.消息的編碼過程

消息的編碼是在 RemotingCommand 中 encode 方法中完成的:

public ByteBuffer encode() {
// 1> header length size
int length = 4;
// 2> header data length
byte[] headerData = this.headerEncode();
length += headerData.length;
// 3> body data length
if (this.body != null) {
length += body.length;
}
ByteBuffer result = ByteBuffer.allocate(4 + length);
// 1.先放入消息的總大小
result.putInt(length);
// 2.再放入頭部的長度
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// 3.接著放入頭部數據
result.put(headerData);
// 4.最後放入消息體的數據
if (this.body != null) {
result.put(this.body);
}
result.flip();
return result;
}

對於頭部數據 - header data 的編碼,在 RemotingCommand 中 headerEncode 方法處理:

private byte[] headerEncode() {
this.makeCustomHeaderToNet();
if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
return RocketMQSerializable.rocketMQProtocolEncode(this);
} else {
return RemotingSerializable.encode(this);
}
}

還有一個細節的地方,保存 header length 的時候,經過了一個 markProtocolType 的處理,作用是將RPC類型和headerData長度編碼放到一個byte[4]數組中:

public static byte[] markProtocolType(int source, SerializeType type) {
byte[] result = new byte[4];
result[0] = type.getCode();
result[1] = (byte) ((source >> 16) & 0xFF);
result[2] = (byte) ((source >> 8) & 0xFF);
result[3] = (byte) (source & 0xFF);
return result;
}

記住這個方法,後面從消息數據解碼消息頭長度的時候,就不會看的很迷茫。

三.消息的解碼過程

消息的解碼是在類 RemotingCommand 中 decode方法中完成的:

public static RemotingCommand decode(final ByteBuffer byteBuffer) {
int length = byteBuffer.limit();// 獲取byteBuffer的總長度
int oriHeaderLen = byteBuffer.getInt();// 1.獲取前4個位元組,組裝int類型,該長度為總長度 圖中 length
int headerLength = getHeaderLength(oriHeaderLen);// length & 0xFFFFFF 獲取消息頭的長度,與運算,編碼時候的長度即為24位
byte[] headerData = new byte[headerLength];// 保存header data
byteBuffer.get(headerData);// 2.從緩衝區中讀取headerLength個位元組的數據,這個數據就是報文頭部的數據
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
int bodyLength = length - 4 - headerLength;// 報文體的數據,減去了第二、三部分的長度
byte[] bodyData = null;
if (bodyLength > 0) {
bodyData = new byte[bodyLength];
byteBuffer.get(bodyData);// 獲取消息體的數據
}
cmd.body = bodyData;
return cmd;
}

對於頭部數據 - header data 的解碼,在 RemotingCommand 中 headerDecode 方法處理:

private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
switch (type) {
case JSON:
RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
resultJson.setSerializeTypeCurrentRPC(type);
return resultJson;
case ROCKETMQ:
RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
resultRMQ.setSerializeTypeCurrentRPC(type);
return resultRMQ;
default:
break;
}
return null;
}

解碼出 header length - 消息頭長度,在 getHeaderLength 中:

public static int getHeaderLength(int length) {
return length & 0xFFFFFF;// 為什麼是和高位的24位與,可以參考編碼時的方法:markProtocolType
}

通過代碼分析,發現RocketMQ的協議設計不是很複雜,只要我們耐心一步步跟進去,看到最裡面的代碼。"哦,原來是這樣!"

RocketMQ源碼:通信協議設計及編解碼

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 程序員小新人學習 的精彩文章:

PHP Session 封裝類
Monkey可視化工具開發

TAG:程序員小新人學習 |