
借用一句話“消息隊列的本質在於消息的發送、存儲和接收”。那麼,對於一款消息隊列來說,如何做到消息的高效發送與接收是重點和關鍵。
前排友情提示:這是一篇很硬的乾貨。

RocketMQ中Remoting通信模塊概覽
RocketMQ消息隊列的整體部署架構如下圖所示:
先來說下RocketMQ消息隊列集群中的幾個角色:
- NameServer:在MQ集群中做的是做命名服務,更新和路由發現 broker服務;
- Broker-Master:broker 消息主機服務器;
- Broker-Slave:broker 消息從機服務器;
- Producer:消息生產者;
- Consumer:消息消費者。
其中,RocketMQ集群的一部分通信如下:
- Broker啟動後需要完成一次將自己註冊至NameServer的操作;隨後每隔30s時間定期向NameServer上報Topic路由信息;
- 消息生產者Producer作為客戶端發送消息時候,需要根據Msg的Topic從本地緩存的TopicPublishInfoTable獲取路由信息。如果沒有則更新路由信息會從NameServer上重新拉取;
- 消息生產者Producer根據所獲取的路由信息選擇一個隊列(MessageQueue)進行消息發送;Broker作為消息的接收者接收消息並落盤存儲。
從上面可以看出在消息生產者,在Broker和NameServer間都會發生通信(這裡只說了MQ的部分通信),因此如何設計一個良好的網絡通信模塊在MQ中至關重要,它將決定RocketMQ集群整體的消息傳輸能力與最終性能。
rocketmq-remoting 模塊是 RocketMQ消息隊列中負責網絡通信的模塊,它幾乎被其他所有需要網絡通信的模塊(諸如rocketmq-client、rocketmq-server、rocketmq-namesrv)所依賴和引用。
為了實現客戶端與服務器之間高效的數據請求與接收,RocketMQ消息隊列自定義了通信協議並在Netty的基礎之上擴展了通信模塊。
鑑於RocketMQ的通信模塊是建立在Netty基礎之上的,因此在閱讀RocketMQ的源碼之前,讀者最好先對Netty的多線程模型、JAVA NIO模型均有一定的瞭解,這樣子理解RocketMQ源碼會較為快一些。
本文使用的RocketMQ版本是4.2.0, 依賴的netty版本是4.0.42.Final. RocketMQ的代碼結構圖如下:
源碼部分主要可以分為rocketmq-broker,rocketmq-client,rocketmq-common,rocketmq-filterSrv,rocketmq-namesrv和rocketmq-remoting等模塊,通信框架就封裝在rocketmq-remoting模塊中。
本文主要從RocketMQ的協議格式,消息編解碼,通信方式(同步/異步/單向)和具體的發送/接收消息的通信流程來進行闡述等。
RocketMQ中Remoting通信模塊的具體實現
1、Remoting通信模塊的類結構圖
從類層次結構來看:
- RemotingService:為最上層的接口,提供了三個方法:
1void start();
2void shutdown();
3void registerRPCHook(RPCHook rpcHook);
- RemotingClient/RemotingSever:兩個接口繼承了最上層接口—RemotingService,分別各自為Client和Server提供所必需的方法,下面所列的是RemotingServer的方法:
1/**
2 * 同RemotingClient端一樣
3 *
4 * @param requestCode
5 * @param processor
6 * @param executor
7 */
8 void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
9 final ExecutorService executor);
10
11 /**
12 * 註冊默認的處理器
13 *
14 * @param processor
15 * @param executor
16 */
17 void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);
18
19 int localListenPort();
20
21 /**
22 * 根據請求code來獲取不同的處理Pair
23 *
24 * @param requestCode
25 * @return
26 */
27 Pair
getProcessorPair(final int requestCode); 28
29 /**
30 * 同RemotingClient端一樣,同步通信,有返回RemotingCommand
31 * @param channel
32 * @param request
33 * @param timeoutMillis
34 * @return
35 * @throws InterruptedException
36 * @throws RemotingSendRequestException
37 * @throws RemotingTimeoutException
38 */
39 RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
40 final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
41 RemotingTimeoutException;
42
43 /**
44 * 同RemotingClient端一樣,異步通信,無返回RemotingCommand
45 *
46 * @param channel
47 * @param request
48 * @param timeoutMillis
49 * @param invokeCallback
50 * @throws InterruptedException
51 * @throws RemotingTooMuchRequestException
52 * @throws RemotingTimeoutException
53 * @throws RemotingSendRequestException
54 */
55 void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
56 final InvokeCallback invokeCallback) throws InterruptedException,
57 RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
58
59 /**
60 * 同RemotingClient端一樣,單向通信,諸如心跳包
61 *
62 * @param channel
63 * @param request
64 * @param timeoutMillis
65 * @throws InterruptedException
66 * @throws RemotingTooMuchRequestException
67 * @throws RemotingTimeoutException
68 * @throws RemotingSendRequestException
69 */
70 void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
71 throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
72 RemotingSendRequestException;
- NettyRemotingAbstract:Netty通信處理的抽象類,定義並封裝了Netty處理的公共處理方法;
- NettyRemotingClient/NettyRemotingServer:分別實現了RemotingClient和RemotingServer,都繼承了NettyRemotingAbstract抽象類。RocketMQ中其他的組件(如client、nameServer、broker在進行消息的發送和接收時均使用這兩個組件)。
2、消息的協議設計與編碼解碼
在Client和Server之間完成一次消息發送時,需要對發送的消息進行一個協議約定,因此就有必要自定義RocketMQ的消息協議。同時,為了高效地在網絡中傳輸消息和對收到的消息讀取,就需要對消息進行編解碼。在RocketMQ中,RemotingCommand這個類在消息傳輸過程中對所有數據內容的封裝,不但包含了所有的數據結構,還包含了編碼解碼操作。
RemotingCommand類的部分成員變量如下:
Header字段類型Request說明Response說明codeint請求操作碼,應答方根據不同的請求碼進行不同的業務處理應答響應碼。0表示成功,非0則表示各種錯誤languageLanguageCode請求方實現的語言應答方實現的語言versionint請求方程序的版本應答方程序的版本opaqueint相當於reqeustId,在同一個連接上的不同請求標識碼,與響應消息中的相對應應答不做修改直接返回flagint區分是普通RPC還是onewayRPC得標誌區分是普通RPC還是onewayRPC得標誌remarkString傳輸自定義文本信息傳輸自定義文本信息extFieldsHashMap請求自定義擴展信息響應自定義擴展信息
這裡展示下Broker向NameServer發送一次心跳註冊的報文:
1[
2code=103,//這裡的103對應的code就是broker向nameserver註冊自己的消息
3language=JAVA,
4version=137,
5opaque=58,//這個就是requestId
6flag(B)=0,
7remark=null,
8extFields={
9 brokerId=0,
10 clusterName=DefaultCluster,
11 brokerAddr=ip1: 10911,
12 haServerAddr=ip1: 10912,
13 brokerName=LAPTOP-SMF2CKDN
14},
15serializeTypeCurrentRPC=JSON
下面來看下RocketMQ通信協議的格式:
可見傳輸內容主要可以分為以下4部分:
- 消息長度:總長度,四個字節存儲,佔用一個int類型;
- 序列化類型&消息頭長度:同樣佔用一個int類型,第一個字節表示序列化類型,後面三個字節表示消息頭長度;
- 消息頭數據:經過序列化後的消息頭數據;
- 消息主體數據:消息主體的二進制字節數據內容。
消息的編碼和解碼分別在RemotingCommand類的encode和decode方法中完成,消息解碼decode方法是編碼的逆向過程。
3、消息的通信方式和通信流程
在RocketMQ消息隊列中支持通信的方式主要有同步(sync)、異步(async)和單向(oneway)這三種。
其中“同步”通信模式相對簡單,一般用在發送心跳包場景下,無需關注其Response。本文將主要介紹RocketMQ的異步通信流程(限於篇幅,讀者可以按照同樣的模式進行分析同步通信流程)。
下面先給出了RocketMQ異步通信的整體流程圖:
下面兩小節內容主要介紹了Client端發送請求消息、Server端接收消息的具體實現並簡要分析的Client端的回調。
3.1 Client發送請求消息的具體實現
當客戶端調用異步通信接口—invokeAsync時候,先由RemotingClient的實現類—NettyRemotingClient根據addr獲取相應的channel(如果本地緩存中沒有則創建),隨後調用invokeAsyncImpl方法,將數據流轉給抽象類NettyRemotingAbstract處理(真正做完發送請求動作的是在NettyRemotingAbstract抽象類的invokeAsyncImpl方法裡面)。
具體發送請求消息的源代碼如下所示:
1 /**
2 * invokeAsync(異步調用)
3 *
4 */
5 public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
6 final InvokeCallback invokeCallback)
7 throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
8 //相當於request ID, RemotingCommand會為每一個request產生一個request ID, 從0開始, 每次加1
9
10 final int opaque = request.getOpaque();
11 boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
12 if (acquired) {
13 final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
14 //根據request ID構建ResponseFuture
15 final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once);
16 //將ResponseFuture放入responseTable
17 this.responseTable.put(opaque, responseFuture);
18 try {
19 //使用Netty的channel發送請求數據
20 channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
21 //消息發送後執行
22 @Override
23 public void operationComplete(ChannelFuture f) throws Exception {
24 if (f.isSuccess()) {
25 //如果發送消息成功給Server,那麼這裡直接Set後return
26 responseFuture.setSendRequestOK(true);
27 return;
28 } else {
29 responseFuture.setSendRequestOK(false);
30 }
31
32 responseFuture.putResponse(null);
33 responseTable.remove(opaque);
34 try {
35 //執行回調
36 executeInvokeCallback(responseFuture);
37 } catch (Throwable e) {
38 log.warn("excute callback in writeAndFlush addListener, and callback throw", e);
39 } finally {
40 //釋放信號量
41 responseFuture.release();
42 }
43
44 log.warn("send a request command to channel failed.", RemotingHelper.parseChannelRemoteAddr(channel));
45 }
46 });
47 } catch (Exception e) {
48 //異常處理
49 responseFuture.release();
50 log.warn("send a request command to channel Exception", e);
51 throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
52 }
53 } else {
54 if (timeoutMillis <= 0) {
55 throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
56 } else {
57 String info =
58 String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
59 timeoutMillis,
60 this.semaphoreAsync.getQueueLength(),
61 this.semaphoreAsync.availablePermits()
62 );
63 log.warn(info);
64 throw new RemotingTimeoutException(info);
65 }
66 }
67 }
在Client端發送請求消息時有個比較重要的數據結構需要注意下:
- responseTable—保存請求碼與響應關聯映射
1protected final ConcurrentHashMap
responseTable
opaque表示請求發起方在同個連接上不同的請求標識代碼,每次發送一個消息的時候,可以選擇同步阻塞/異步非阻塞的方式。無論是哪種通信方式,都會保存請求操作碼至ResponseFuture的Map映射—responseTable中。
- ResponseFuture—保存返回響應(包括回調執行方法和信號量)
1public ResponseFuture(int opaque, long timeoutMillis, InvokeCallback invokeCallback,
2 SemaphoreReleaseOnlyOnce once) {
3 this.opaque = opaque;
4 this.timeoutMillis = timeoutMillis;
5 this.invokeCallback = invokeCallback;
6 this.once = once;
7 }
對於同步通信來說,第三、四個參數為null;而對於異步通信來說,invokeCallback是在收到消息響應的時候能夠根據responseTable找到請求碼對應的回調執行方法,semaphore參數用作流控,當多個線程同時往一個連接寫數據時可以通過信號量控制permit同時寫許可的數量。
- 異常發送流程處理—定時掃描responseTable本地緩存
在發送消息時候,如果遇到異常情況(比如服務端沒有response返回給客戶端或者response因網絡而丟失),上面所述的responseTable的本地緩存Map將會出現堆積情況。這個時候需要一個定時任務來專門做responseTable的清理回收。在RocketMQ的客戶端/服務端啟動時候會產生一個頻率為1s調用一次來的定時任務檢查所有的responseTable緩存中的responseFuture變量,判斷是否已經得到返回, 並進行相應的處理。
1public void scanResponseTable() {
2 final List
rfList = new LinkedList (); 3 Iterator
> it = this.responseTable.entrySet().iterator(); 4 while (it.hasNext()) {
5 Entry
next = it.next(); 6 ResponseFuture rep = next.getValue();
7
8 if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
9 rep.release();
10 it.remove();
11 rfList.add(rep);
12 log.warn("remove timeout request, " + rep);
13 }
14 }
15
16 for (ResponseFuture rf : rfList) {
17 try {
18 executeInvokeCallback(rf);
19 } catch (Throwable e) {
20 log.warn("scanResponseTable, operationComplete Exception", e);
21 }
22 }
23 }
3.2 Server端接收消息並進行處理的具體實現
Server端接收消息的處理入口在NettyServerHandler類的channelRead0方法中,其中調用了processMessageReceived方法(這裡省略了Netty服務端消息流轉的大部分流程和邏輯)。
其中服務端最為重要的處理請求方法實現如下:
1public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
2 //根據RemotingCommand中的code獲取processor和ExecutorService
3 final Pair
matched = this.processorTable.get(cmd.getCode()); 4 final Pair
pair = null == matched ? this.defaultRequestProcessor : matched; 5 final int opaque = cmd.getOpaque();
6
7 if (pair != null) {
8 Runnable run = new Runnable() {
9 @Override
10 public void run() {
11 try {
12 //rpc hook
13 RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
14 if (rpcHook != null) {
15 rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
16 }
17 //processor處理請求
18 final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
19 //rpc hook
20 if (rpcHook != null) {
21 rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
22 }
23
24 if (!cmd.isOnewayRPC()) {
25 if (response != null) {
26 response.setOpaque(opaque);
27 response.markResponseType();
28 try {
29 ctx.writeAndFlush(response);
30 } catch (Throwable e) {
31 PLOG.error("process request over, but response failed", e);
32 PLOG.error(cmd.toString());
33 PLOG.error(response.toString());
34 }
35 } else {
36
37 }
38 }
39 } catch (Throwable e) {
40 if (!"com.aliyun.openservices.ons.api.impl.authority.exception.AuthenticationException"
41 .equals(e.getClass().getCanonicalName())) {
42 PLOG.error("process request exception", e);
43 PLOG.error(cmd.toString());
44 }
45
46 if (!cmd.isOnewayRPC()) {
47 final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, //
48 RemotingHelper.exceptionSimpleDesc(e));
49 response.setOpaque(opaque);
50 ctx.writeAndFlush(response);
51 }
52 }
53 }
54 };
55
56 if (pair.getObject1().rejectRequest()) {
57 final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
58 "[REJECTREQUEST]system busy, start flow control for a while");
59 response.setOpaque(opaque);
60 ctx.writeAndFlush(response);
61 return;
62 }
63
64 try {
65 //封裝requestTask
66 final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
67 //想線程池提交requestTask
68 pair.getObject2().submit(requestTask);
69 } catch (RejectedExecutionException e) {
70 if ((System.currentTimeMillis() % 10000) == 0) {
71 PLOG.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) //
72 + ", too many requests and system thread pool busy, RejectedExecutionException " //
73 + pair.getObject2().toString() //
74 + " request code: " + cmd.getCode());
75 }
76
77 if (!cmd.isOnewayRPC()) {
78 final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
79 "[OVERLOAD]system busy, start flow control for a while");
80 response.setOpaque(opaque);
81 ctx.writeAndFlush(response);
82 }
83 }
84 } else {
85 String error = " request type " + cmd.getCode() + " not supported";
86 //構建response
87 final RemotingCommand response =
88 RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
89 response.setOpaque(opaque);
90 ctx.writeAndFlush(response);
91 PLOG.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
92 }
93}
上面的請求處理方法中根據RemotingCommand的請求業務碼來匹配到相應的業務處理器;然後生成一個新的線程提交至對應的業務線程池進行異步處理。
- processorTable—請求業務碼與業務處理、業務線程池的映射變量
1 protected final HashMap
> processorTable = 2 new HashMap
>(64);
我想RocketMQ這種做法是為了給不同類型的請求業務碼指定不同的處理器Processor處理,同時消息實際的處理並不是在當前線程,而是被封裝成task放到業務處理器Processor對應的線程池中完成異步執行。
在RocketMQ中能看到很多地方都是這樣的處理,這樣的設計能夠最大程度的保證異步,保證每個線程都專注處理自己負責的東西。
3.3 Client端異步回調執行的實現分析
看到這裡可能有一些同學會疑問Client端的異步回調究竟在哪裡執行的?從上面“RocketMQ異步通信的整體時序圖”來看,回調執行處理的流程的確是放在了Client端來完成,而rocketmq-remoting通信模塊中只是給異步回調處理提供了接口。
這裡需要結合3.1節的內容和NettyRemotingAbstract抽象類的processResponseCommand方法,便可以明白Client端實現異步回調的大致流程了。在Client端發送異步消息時候(rocketmq-client模塊最終調用sendMessageAsync方法時),會將InvokeCallback的接口注入,而在Server端的異步線程由上面所講的業務線程池真正執行後,返回response給Client端時候才會去觸發執行。NettyRemotingAbstract抽象類的processResponseCommand方法的具體代碼如下:
1public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
2 //從RemotingCommand中獲取opaque值
3 final int opaque = cmd.getOpaque();‘
4 //從本地緩存的responseTable這個Map中取出本次異步通信連接對應的ResponseFuture變量
5 final ResponseFuture responseFuture = responseTable.get(opaque);
6 if (responseFuture != null) {
7 responseFuture.setResponseCommand(cmd);
8
9 responseTable.remove(opaque);
10
11 if (responseFuture.getInvokeCallback() != null) {
12 //在這裡真正去執行Client注入進來的異步回調方法
13 executeInvokeCallback(responseFuture);
14 } else {
15 //否則釋放responseFuture變量
16 responseFuture.putResponse(cmd);
17 responseFuture.release();
18 }
19 } else {
20 log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
21 log.warn(cmd.toString());
22 }
23 }
以上主要介紹了RocketMQ的協議格式,消息編解碼,通信方式(同步/異步/單向)、消息發送/接收以及異步回調的主要通信流程。而下面將主要對RocketMQ消息隊列RPC通信部分的Netty多線程模型進行重點介紹。
為何要使用Netty作為高性能的通信庫?
在看RocketMQ的RPC通信部分時候,可能有不少同學有這樣子的疑問,RocketMQ為何要選擇Netty而不直接使用JDK的NIO進行網絡編程呢?這裡有必要先來簡要介紹下Netty。
Netty是一個封裝了JDK的NIO庫的高性能網絡通信開源框架。它提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。
下面主要列舉了下一般系統的RPC通信模塊會選擇Netty作為底層通信庫的理由(作者認為RocketMQ的RPC同樣也是基於此選擇了Netty):
- Netty的編程API使用簡單,開發門檻低,無需編程者去關注和了解太多的NIO編程模型和概念;
- 對於編程者來說,可根據業務的要求進行定製化地開發,通過Netty的ChannelHandler對通信框架進行靈活的定製化擴展;
- Netty框架本身支持拆包/解包,異常檢測等機制,讓編程者可以從JAVA NIO的繁瑣細節中解脫,而只需要關注業務處理邏輯;
- Netty解決了(準確地說應該是採用了另一種方式完美規避了)JDK NIO的Bug(Epoll bug,會導致Selector空輪詢,最終導致CPU 100%);
- Netty框架內部對線程,selector做了一些細節的優化,精心設計的reactor多線程模型,可以實現非常高效地併發處理;
- Netty已經在多個開源項目(Hadoop的RPC框架avro使用Netty作為通信框架)中都得到了充分驗證,健壯性/可靠性比較好。
RocketMQ中RPC通信的Netty多線程模型
RocketMQ的RPC通信部分採用了"1+N+M1+M2"的Reactor多線程模式,對網絡通信部分進行了一定的擴展與優化,這一節主要讓我們來看下這一部分的具體設計與實現內容。
4.1 Netty的Reactor多線程模型設計概念與簡述
這裡有必要先來簡要介紹下Netty的Reactor多線程模型。Reactor多線程模型的設計思想是分而治之+事件驅動。
- 分而治之
一般來說,一個網絡請求連接的完整處理過程可以分為接受(accept)、數據讀取(read)、解碼/編碼(decode/encode)、業務處理(process)、發送響應(send)這幾步驟。Reactor模型將每個步驟都映射成為一個任務,服務端線程執行的最小邏輯單元不再是一次完整的網絡請求,而是這個任務,且採用以非阻塞方式執行。
- 事件驅動
每個任務對應特定網絡事件。當任務準備就緒時,Reactor收到對應的網絡事件通知,並將任務分發給綁定了對應網絡事件的Handler執行。
4.2 RocketMQ中RPC通信的1+N+M1+M2的Reactor多線程設計與實現
- RocketMQ中RPC通信的Reactor多線程設計與流程
RocketMQ的RPC通信採用Netty組件作為底層通信庫,同樣也遵循了Reactor多線程模型,同時又在這之上做了一些擴展和優化。下面先給出一張RocketMQ的RPC通信層的Netty多線程模型框架圖,讓大家對RocketMQ的RPC通信中的多線程分離設計有一個大致的瞭解。
從上面的框圖中可以大致瞭解RocketMQ中NettyRemotingServer的Reactor 多線程模型。一個 Reactor 主線程(eventLoopGroupBoss,即為上面的1)負責監聽 TCP網絡連接請求,建立好連接後丟給Reactor 線程池(eventLoopGroupSelector,即為上面的“N”,源碼中默認設置為3),它負責將建立好連接的socket 註冊到 selector上去(RocketMQ的源碼中會自動根據OS的類型選擇NIO和Epoll,也可以通過參數配置),然後監聽真正的網絡數據。拿到網絡數據後,再丟給Worker線程池(defaultEventExecutorGroup,即上面的“M1”,源碼中默認設置為8)。
為了更為高效地處理RPC的網絡請求,這裡的Worker線程池是專門用於處理Netty網絡通信相關的(包括編碼/解碼、空閒鏈接管理、網絡連接管理以及網絡請求處理)。
而處理業務操作放在業務線程池中執行,根據 RomotingCommand 的業務請求碼code去processorTable這個本地緩存變量中找到對應的 processor,然後封裝成task任務後,提交給對應的業務processor處理線程池來執行(sendMessageExecutor,以發送消息為例,即為上面的 “M2”)。
下面以表格的方式列舉了下上面所述的“1+N+M1+M2”Reactor多線程模型:
線程數線程名線程具體說明1NettyBoss_%dReactor 主線程NNettyServerEPOLLSelector_%d_%dReactor 線程池M1NettyServerCodecThread_%dWorker線程池M2RemotingExecutorThread_%d業務processor處理線程池
- RocketMQ中RPC通信的Reactor多線程的代碼具體實現
說完了Reactor多線程整體的設計與流程,大家應該就對RocketMQ的RPC通信的Netty部分有了一個比較全面的理解了,那接下來就從源碼上來看下一些細節部分(在看該部分代碼時候需要讀者對JAVA NIO和Netty的相關概念與技術點有所瞭解)。
在NettyRemotingServer的實例初始化時,會初始化各個相關的變量包括serverBootstrap、nettyServerConfig參數、channelEventListener監聽器並同時初始化eventLoopGroupBoss和eventLoopGroupSelector兩個Netty的EventLoopGroup線程池(這裡需要注意的是,如果是Linux平臺,並且開啟了native epoll,就用EpollEventLoopGroup,這個也就是用JNI,調的c寫的epoll;否則就用Java NIO的NioEventLoopGroup)。代碼如下:
1public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
2 final ChannelEventListener channelEventListener) {
3 super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
4 this.serverBootstrap = new ServerBootstrap();
5 this.nettyServerConfig = nettyServerConfig;
6 this.channelEventListener = channelEventListener;
7 //省略部分代碼
8 //初始化時候nThreads設置為1,說明RemotingServer端的Disptacher鏈接管理和分發請求的線程為1,用於接收客戶端的TCP連接
9 this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
10 private AtomicInteger threadIndex = new AtomicInteger(0);
11
12 @Override
13 public Thread newThread(Runnable r) {
14 return new Thread(r, String.format("NettyBoss_%d", this.threadIndex.incrementAndGet()));
15 }
16 });
17
18 /**
19 * 根據配置設置NIO還是Epoll來作為Selector線程池
20 * 如果是Linux平臺,並且開啟了native epoll,就用EpollEventLoopGroup,這個也就是用JNI,調的c寫的epoll;否則,就用Java NIO的NioEventLoopGroup。
21 *
22 */
23 if (useEpoll()) {
24 this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
25 private AtomicInteger threadIndex = new AtomicInteger(0);
26 private int threadTotal = nettyServerConfig.getServerSelectorThreads();
27
28 @Override
29 public Thread newThread(Runnable r) {
30 return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
31 }
32 });
33 } else {
34 this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
35 private AtomicInteger threadIndex = new AtomicInteger(0);
36 private int threadTotal = nettyServerConfig.getServerSelectorThreads();
37
38 @Override
39 public Thread newThread(Runnable r) {
40 return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
41 }
42 });
43 }
44 //省略部分代碼
在NettyRemotingServer實例初始化完成後,就會將其啟動。Server端在啟動階段會將之前實例化好的1個acceptor線程(eventLoopGroupBoss),N個IO線程(eventLoopGroupSelector),M1個worker 線程(defaultEventExecutorGroup)綁定上去。
這裡需要說明的是,Worker線程拿到網絡數據後,就交給Netty的ChannelPipeline(其採用責任鏈設計模式),從Head到Tail的一個個Handler執行下去,這些 Handler是在創建NettyRemotingServer實例時候指定的。NettyEncoder和NettyDecoder 負責網絡傳輸數據和 RemotingCommand 之間的編解碼。NettyServerHandler 拿到解碼得到的 RemotingCommand 後,根據 RemotingCommand.type 來判斷是 request 還是 response來進行相應處理,根據業務請求碼封裝成不同的task任務後,提交給對應的業務processor處理線程池處理。
從上面的描述中可以概括得出RocketMQ的RPC通信部分的Reactor線程池模型框圖。
整體可以看出RocketMQ的RPC通信藉助Netty的多線程模型,其服務端監聽線程和IO線程分離,同時將RPC通信層的業務邏輯與處理具體業務的線程進一步相分離。時間可控的簡單業務都直接放在RPC通信部分來完成,複雜和時間不可控的業務提交至後端業務線程池中處理,這樣提高了通信效率和MQ整體的性能。
其中抽象出NioEventLoop來表示一個不斷循環執行處理任務的線程,每個NioEventLoop有一個selector,用於監聽綁定在其上的socket鏈路。
總結
剛開始看RocketMQ源碼—RPC通信模塊可能覺得略微有點複雜,但是隻要能夠抓住Client端發送請求消息、Server端接收消息並處理的流程以及回調過程來分析和梳理,那麼整體來說並不複雜。
RPC通信部分也是RocketMQ源碼中重要的部分之一,想要對其中的全過程和細節有更為深刻的理解,還需要多在本地環境Debug和分析對應的日誌。
限於筆者的才疏學淺,對本文內容可能還有理解不到位的地方,如有闡述不合理之處還望留言一起探討。
作者:胡宗棠,中移(蘇州)軟件技術有限公司,雲計算軟件高級研發工程師,從事公有云產品平臺研發、架構設計;目前專注於大型分佈式系統的高併發、高可用設計。曾就職於螞蟻金服支付寶,甲骨文中國研發中心,個人公眾號:匠心獨運的博客。
“徵稿啦!”
CSDN 公眾號秉持著「與千萬技術人共成長」理念,不僅以「極客頭條」、「暢言」欄目在第一時間以技術人的獨特視角描述技術人關心的行業焦點事件,更有「技術頭條」專欄,深度解讀行業內的熱門技術與場景應用,讓所有的開發者緊跟技術潮流,保持警醒的技術嗅覺,對行業趨勢、技術有更為全面的認知。
如果你有優質的文章,或是行業熱點事件、技術趨勢的真知灼見,或是深度的應用實踐、場景方案等的新見解,歡迎聯繫 CSDN 投稿,聯繫方式:微信(guorui_1118,請備註投稿+姓名+公司職位),郵箱([email protected])。
閱讀更多 CSDN 的文章