RocketMQ 源码分析 NamesrvController(三)

前言

上一篇在讲到createNamesrvController方法的时候,根据NamesrvConfigNettyServerConfig对象生成了一个NamesrvController对象,当时并没有deep到该类中,所以这篇来记录一下NamesrvController构造函数的分析。

NamesrvController类

下面是NamesrvController的构造类,这里重点关注的是kvConfigManagerrouteInfoManagerbrokerHousekeepingService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
// Name Server配置
this.namesrvConfig = namesrvConfig;
// Netty配置
this.nettyServerConfig = nettyServerConfig;
// KV配置管理
this.kvConfigManager = new KVConfigManager(this);
// 路由信息管理
this.routeInfoManager = new RouteInfoManager();
// Broker连接事件处理服务
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
this.configuration = new Configuration(
log,
this.namesrvConfig, this.nettyServerConfig
);
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}

kvConfigManager

KVConfigManager作用:

1.加载namesrvController指定的kvConfig配置文件(常为xxx/kvConfig.json)到内存进行读取、增加、删除kvConfig记录。
2.将内存记录的配置,持久化到文件
3.打印所有kvConfig配置

针对上面我有一个疑惑,就是kvConfig.json保存了什么配置呢?服务的一些配置不是在NamesrvConfig和NettyServerConfig中保留了吗?!

KVConfigManager有三个主要成员变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class KVConfigManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
// Name Server主要控制类
private final NamesrvController namesrvController;
// 读写锁
private final ReadWriteLock lock = new ReentrantReadWriteLock();

/**
* 以命名空间为单位存储的配置文件,存数示例如下:
* {"configTable":{"ORDER_TOPIC_CONFIG":{"UnitTest":"test"}}}%
* 此处的Namespace为ORDER_TOPIC_CONFIG,暂时不知道Namespace具体的含义
*/
private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable =
new HashMap<String, HashMap<String, String>>();

public KVConfigManager(NamesrvController namesrvController) {
this.namesrvController = namesrvController;
}
..........省略
}

RouteInfoManager

RouteInfoManager实例中保存了整个消息集群的相关信息,这个是关键类,所以后面对其还需要进一步阅读:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class RouteInfoManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
// 读写锁
private final ReadWriteLock lock = new ReentrantReadWriteLock();
// Topic,以及对应的队列信息
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
// 以Broker Name为单位的Broker集合
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
// 集群以及属于该进群的Broker列表
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// 存活的Broker地址列表
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// Broker对应的Filter Server列表
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

public RouteInfoManager() {
this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
this.brokerAddrTable = new HashMap<String, BrokerData>(128);
this.clusterAddrTable = new HashMap<String, Set<String>>(32);
this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
this.filterServerTable = new HashMap<String, List<String>>(256);
}
..........省略
}

BrokerHousekeepingService

该类负责Broker连接事件的处理,实现了ChannelEventListener,主要用来管理RouteInfoManager的brokerLiveTable。BrokerHousekeepingService实现了ChannelEventListener接口, 并且NettyRemotingServer启动时会启动BrokerHousekeepingService。

BrokerHousekeepingService会对连接事件, 连接关闭事件, 异常事件,闲置事件进行处理,进而来判断管理存活的broker。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class BrokerHousekeepingService implements ChannelEventListener {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final NamesrvController namesrvController;

public BrokerHousekeepingService(NamesrvController namesrvController) {
this.namesrvController = namesrvController;
}

//连接事件
@Override
public void onChannelConnect(String remoteAddr, Channel channel) {
}

//关闭事件
@Override
public void onChannelClose(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}

//异常事件
@Override
public void onChannelException(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}

//闲置事件
@Override
public void onChannelIdle(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
}

上面四个方法在监听到事件时,就会触发RouteInfoManager的onChannelDestroy()方法,这个方法比较关键的,所以后面单独抽出来讲。