前言
已经写了几篇关于RocketMQ源码的分析,可能对其逻辑处理会多一点认识,但还没深入到RocketMQ精髓中。比如MQ中的通信是如何实现的、如何实现高性能,高可用、最终一致性、MQ 消息存储。这些才是我们阅读源码的一个目标。所以这篇通过分析源码的来了解一下RocketMQ通信机制。
本文很大部分摘录了 匠心独运的博客。
NettyRemotingServer
RocketMQ中RPC通信的通过1+N+M1+M2的Reactor多线程实现。
RocketMQ的RPC通信采用Netty组件作为底层通信库,同样也遵循了Reactor多线程模型,同时又在这之上做了一些扩展和优化。下面先给出一张RocketMQ的RPC通信层的Netty多线程模型框架图,让大家对RocketMQ的RPC通信中的多线程分离设计有一个大致的了解。
从上面的框图中可以大致了解RocketMQ中NettyRemotingServer的Reactor 多线程模型。一个 Reactor 主线程(eventLoopGroupBoss,即为上面的1)负责监听 TCP网络连接请求,建立好连接后丢给Reactor 线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认设置为3),它负责将建立好连接的socket 注册到 selector上去(RocketMQ的源码中会自动根据OS的类型选择NIO和Epoll,也可以通过参数配置),然后监听真正的网络数据。拿到网络数据后,再丢给Worker线程池(defaultEventExecutorGroup,即为上面的“M1”,源码中默认设置为8)。
点开NettyRemotingServer可以看到eventLoopGroupBoss、eventLoopGroupSelector的初始过程,截取部分NettyRemotingServer构造函数:
1 | public NettyRemotingServer(final NettyServerConfig nettyServerConfig, |
在NettyRemotingServer实例初始化完成后,就会将其启动。Server端在启动阶段会将之前实例化好的1个acceptor线程(eventLoopGroupBoss),N个IO线程(eventLoopGroupSelector),M1个worker 线程(defaultEventExecutorGroup)绑定上去。前面部分也已经介绍过各个线程池的作用了。截取start()部分代码:
1 | public void start() { |
这里需要说明的是,Worker线程拿到网络数据后,就交给Netty的ChannelPipeline(其采用责任链设计模式),从Head到Tail的一个个Handler执行下去,这些 Handler是在创建NettyRemotingServer实例时候指定的。截取部分start()部分代码:
1 | public void start() { |
NettyEncoder和NettyDecoder 负责网络传输数据和 RemotingCommand 之间的编解码。NettyServerHandler 拿到解码得到的 RemotingCommand 后,然后调度到channelRead0方法。NettyServerHandler代码:
1 | class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> { |
根据 RemotingCommand.type 来判断是 request 还是 response来进行相应处理,根据业务请求码封装成不同的task任务后,提交给对应的业务processor处理线程池处理M2。processMessageReceived代码:
1 | public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { |
流程总结
一个 Reactor 主线程负责监听 TCP 连接请求,建立好连接后丢给 Reactor 线程池,它负责将建立好连接的 socket 注册到 selector 上去(这里有两种方式,NIO和Epoll,可配置),然后监听真正的网络数据。拿到网络数据后,再丢给 Worker 线程池。
Worker 拿到网络数据后,就交给 Pipeline,从 Head 到 Tail 一个个 Handler 的走下去,这些 Handler 是在创建 Server 的时候指定的。NettyEncoder 和 NettyDecoder 负责网络数据和 RemotingCommand 之间的编解码。
NettyServerHandler 拿到解码得到的 RemotingCommand 后,根据 RemotingCommand.type 来判断是 request 还是 response,如果是 request, 就根据 RomotingCommand 的 code(code用来标识不同类型的请求) 去 processorTable 找到对应的 processor,然后封装成 task 后,丢给对应的 processor 线程池, 如果是 response 就根据RemotingCommand.opaque 去 responseTable 中拿到对应的 ResponseFuture,把结果 set 给它。
对于 Client,经过 Pipeline 的顺序是从 Tail 到 Head。不管是 Server 和 Client,并不是每次数据流转都得经过所有的 Handler,而是会根据 Context 中的一些信息去判断。
整个数据流转过程中还有很多hook, 比如处理 command 前,处理 command 后,发送数据前,发送数据后等。