RocketMQ 源码分析 NettyRemotingServer(六)

前言

已经写了几篇关于RocketMQ源码的分析,可能对其逻辑处理会多一点认识,但还没深入到RocketMQ精髓中。比如MQ中的通信是如何实现的、如何实现高性能,高可用、最终一致性、MQ 消息存储。这些才是我们阅读源码的一个目标。所以这篇通过分析源码的来了解一下RocketMQ通信机制。

本文很大部分摘录了 匠心独运的博客

NettyRemotingServer

RocketMQ中RPC通信的通过1+N+M1+M2的Reactor多线程实现。

RocketMQ的RPC通信采用Netty组件作为底层通信库,同样也遵循了Reactor多线程模型,同时又在这之上做了一些扩展和优化。下面先给出一张RocketMQ的RPC通信层的Netty多线程模型框架图,让大家对RocketMQ的RPC通信中的多线程分离设计有一个大致的了解。

你想输入的替代文字

从上面的框图中可以大致了解RocketMQ中NettyRemotingServer的Reactor 多线程模型。一个 Reactor 主线程(eventLoopGroupBoss,即为上面的1)负责监听 TCP网络连接请求,建立好连接后丢给Reactor 线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认设置为3),它负责将建立好连接的socket 注册到 selector上去(RocketMQ的源码中会自动根据OS的类型选择NIO和Epoll,也可以通过参数配置),然后监听真正的网络数据。拿到网络数据后,再丢给Worker线程池(defaultEventExecutorGroup,即为上面的“M1”,源码中默认设置为8)

点开NettyRemotingServer可以看到eventLoopGroupBoss、eventLoopGroupSelector的初始过程,截取部分NettyRemotingServer构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
final ChannelEventListener channelEventListener)

//省略部分代码

// eventLoopGroupBoss负责监听 TCP网络连接请求
this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyBoss_%d", this.threadIndex.incrementAndGet()));
}
});


//这个是在eventLoopGroupBoss在接受到连接的时候,它负责将建立好连接的socket注册到selector上去
if (useEpoll()) {
this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();

@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
} else {
this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();

@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
}

//省略部分代码
}

在NettyRemotingServer实例初始化完成后,就会将其启动。Server端在启动阶段会将之前实例化好的1个acceptor线程(eventLoopGroupBoss),N个IO线程(eventLoopGroupSelector),M1个worker 线程(defaultEventExecutorGroup)绑定上去。前面部分也已经介绍过各个线程池的作用了。截取start()部分代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void start() {
//这里的Worker线程池是专门用于处理Netty网络通信相关的(包括编码/解码、空闲链接管理、网络连接管理以及网络请求处理)
//RocketMQ-> Java NIO的1+N+M模型:1个acceptor线程,N个IO线程,M1个worker 线程。
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {

private AtomicInteger threadIndex = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});

//省略部分代码
}

这里需要说明的是,Worker线程拿到网络数据后,就交给Netty的ChannelPipeline(其采用责任链设计模式),从Head到Tail的一个个Handler执行下去,这些 Handler是在创建NettyRemotingServer实例时候指定的。截取部分start()部分代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public void start() {
//省略部分代码
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
new HandshakeHandler(TlsSystemConfig.tlsMode))
.addLast(defaultEventExecutorGroup,
//rocketmq解码器,他们分别覆盖了父类的encode和decode方法
new NettyEncoder(),
//rocketmq编码器
new NettyDecoder(),
//Netty自带的心跳管理器
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
//连接管理器,他负责捕获新连接、连接断开、异常等事件,然后统一调度到NettyEventExecuter处理器处理。
new NettyConnectManageHandler(),
//当一个消息经过前面的解码等步骤后,然后调度到channelRead0方法,然后根据消息类型进行分发
new NettyServerHandler()
);
}
});

NettyEncoder和NettyDecoder 负责网络传输数据和 RemotingCommand 之间的编解码。NettyServerHandler 拿到解码得到的 RemotingCommand 后,然后调度到channelRead0方法。NettyServerHandler代码:

1
2
3
4
5
6
7
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {

@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}

根据 RemotingCommand.type 来判断是 request 还是 response来进行相应处理,根据业务请求码封装成不同的task任务后,提交给对应的业务processor处理线程池处理M2。processMessageReceived代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}

流程总结

你想输入的替代文字

一个 Reactor 主线程负责监听 TCP 连接请求,建立好连接后丢给 Reactor 线程池,它负责将建立好连接的 socket 注册到 selector 上去(这里有两种方式,NIO和Epoll,可配置),然后监听真正的网络数据。拿到网络数据后,再丢给 Worker 线程池。

Worker 拿到网络数据后,就交给 Pipeline,从 Head 到 Tail 一个个 Handler 的走下去,这些 Handler 是在创建 Server 的时候指定的。NettyEncoder 和 NettyDecoder 负责网络数据和 RemotingCommand 之间的编解码。

NettyServerHandler 拿到解码得到的 RemotingCommand 后,根据 RemotingCommand.type 来判断是 request 还是 response,如果是 request, 就根据 RomotingCommand 的 code(code用来标识不同类型的请求) 去 processorTable 找到对应的 processor,然后封装成 task 后,丢给对应的 processor 线程池, 如果是 response 就根据RemotingCommand.opaque 去 responseTable 中拿到对应的 ResponseFuture,把结果 set 给它。

对于 Client,经过 Pipeline 的顺序是从 Tail 到 Head。不管是 Server 和 Client,并不是每次数据流转都得经过所有的 Handler,而是会根据 Context 中的一些信息去判断。

整个数据流转过程中还有很多hook, 比如处理 command 前,处理 command 后,发送数据前,发送数据后等。