RocketMQ 源码分析 NamesrvStartup(二)

前言

上一篇讲到使用命令行启动namesrv时,最终会以NamesrvStartup作为为启动类,所以这篇主要来记录一下NamesrvStartup的源码分析。分析的RocketMQ的版本是4.3.1

NamesrvStartup

首先我觉得在看源码的时候,需要把握住一个类的大概流程,而不是看到一个功能块就deep进去。首先要明白namesrv是用来做什么的,简单的来考虑就是一个提供了通信功能可以与生产者、消费者、broker进行交互,然后能有一些队列可以保存broker注册进来的broker消息,可以保存生产者的消息等。按照这个思路往下看具体的代码。

所以我先简单的阐述一下NamesrvStartup.java这个类的具体流程。下面的图中篮框就是NamesrvStartup主要做的工作。

你想输入的替代文字

简单流程的描述就是先根据配置文件和命令行参数生成实例化nettyConfig/namesrvConfig对象,然后根据这两个对象生成NamesrvController,然后调用返回对象中的initialize()方法,将通信模块的组件初始化,最后调用start()方法将netty服务启动。

1.进入main0()

下面是NamesrvStartup的入口main,可以看到调用了main0接口。

1
2
3
public static void main(String[] args) {
main0(args);
}

main0主要工作也比较明了,第一,就是根据命令行传入的参数实例化了一个NamesrvControllerNamesrvController就是控制nameServer的一个类,包含了服务启动配置、通信模块配置、键值对的存储等等。第二,start(controller)主要是将netty服务启动起来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static NamesrvController main0(String[] args) {
try {
NamesrvController controller = createNamesrvController(args); // 在里面初始化了namesrv和netty,就是设置了它们的参数
start(controller); //主要是启动了一个netty服务用来监听
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}

return null;
}

2.进入createNamesrvController

根据前面的描述知道,createNamesrvController这个类首先会获取启动时的参数,然后将参数都保留在了NamesrvConfigNettyServerConfig对象中。截取部分createNamesrvController中的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
//用来解析命令行中的参数args,保留在commandLIne中,new Option("h","help",false,"print help");
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}

final NamesrvConfig namesrvConfig = new NamesrvConfig(); // namesrv配置文件存放在该类
final NettyServerConfig nettyServerConfig = new NettyServerConfig(); //netty的配置信息存放在该类
nettyServerConfig.setListenPort(9876);

//如果在启动namesrv的时候,使用了命令-c 指定了配置文件,那么会将配置文件变量与namesrvConfig/nettyServerConfig中的变量进行设定。具体拿Method中以set开头,setXXX方法中的XXX进行对应,配置文件到对象的生成。
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);

namesrvConfig.setConfigStorePath(file);

System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}

//-p 打印配置信息 如果命令带有-p参数,则打印出NamesrvConfig、NettyServerConfig的属性
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}

MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}

经过上面的代码,可以得到一个对象保留了namesrv的配置,另外一个保存了nettyServer的配置,然后利用这两个配置对象生成一个NamesrvController,然后把对象返回到main0方法中。截取部分createNamesrvController中的代码:

1
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig); //创建了一个NamesrvController,根据配置文件

为了能够清楚的将明白,NamesrvController构造函数这里就先不deep。

3.进入start(controller)

前面的步骤得到了一个NamesrvController对象,它不仅有服务运行相关的配置,还有很多List来存放各种消息(NamesrvController构造函数可以知道)。现在往事具备了,就把这个服务启动起来吧。下面是start(controller)的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static NamesrvController start(final NamesrvController controller) throws Exception {

if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}

boolean initResult = controller.initialize(); // 实例化一个NettyRemotingServer
if (!initResult) {
controller.shutdown();
System.exit(-3);
}

Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));

controller.start(); //将实例化的NettyRemotingServer启动,进行监听

return controller;
}

主要是两个事情,第一调用了initialize(),第二就是start()了。

initialize()就是根据nettyServerConfig中的配置初始化了netty服务,主要是定义RequestCode,用来作为netty的通信协议字段,初始化线程池,初始化通信层,增加定时任务等。initialize代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public boolean initialize() {
// 加载KV配置
this.kvConfigManager.load();

// 初始化通信层
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

// 初始化线程池(根据getServerWorkerThreads值,启动相应数量线程)
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

// 此注册函数主要作用就是,定义RequestCode,用来作为netty的通信协议字段 即:如果broker通过netty发送通信请求,其中请求信息中带有code == RequestCode.REGISTER_BROKER,那么在namesrv的netty端接收到该通信连接时候,则对应调用namesrv的DefaultRequestProcessor类下面的registerBroker方法,从而完成broker向namesrv注册
// 具体请参考com.alibaba.rocketmq.namesrv.processor.DefaultRequestProcessor类
this.registerProcessor();

// 增加定时任务(延时5秒,每间隔10s钟,定时扫描一次)
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
// 定时扫描notActive的broker
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
// 定时将configTable相关信息记录到日志文件中
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);

if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
if (certChanged && keyChanged) {
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
}
}
private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
}
});
} catch (Exception e) {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
}

return true;
}

start()就是启动了netty服务:

1
2
3
4
5
6
7
public void start() throws Exception {
this.remotingServer.start();

if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}