RocketMQ 源码分析 BrokerHousekeepingService(五)

前言

RocketMQ在早期版本使用的是Zookeeper,考虑到Zookeeper不够轻量,后面用namesrv来代替。熟悉Zookeeper的知道它使用心跳检测来判断服务是否正常,那么namesrv肯定也少不了这块功能,那么它是怎么判断一个Broker节点是否Live呢?如果一个Broker断开了namesrv是如何处理的呢?

在前面篇章提到NamesrvController的构造函数中有一个BrokerHousekeepingService,这个就是来处理Broker连接发生变化的服务。

BrokerHousekeepingService

BrokerHousekeepingService是NamesrvController构造函数的时候生成的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
// Name Server配置
this.namesrvConfig = namesrvConfig;
// Netty配置
this.nettyServerConfig = nettyServerConfig;
// KV配置管理
this.kvConfigManager = new KVConfigManager(this);
// 路由信息管理
this.routeInfoManager = new RouteInfoManager();
// Broker连接事件处理服务
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
this.configuration = new Configuration(
log,
this.namesrvConfig, this.nettyServerConfig
);
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}

看一下BrokerHousekeepingService这个类的情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class BrokerHousekeepingService implements ChannelEventListener {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final NamesrvController namesrvController;

public BrokerHousekeepingService(NamesrvController namesrvController) {
this.namesrvController = namesrvController;
}

@Override
public void onChannelConnect(String remoteAddr, Channel channel) {
}

//监测close事件
@Override
public void onChannelClose(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}

//监测异常事件
@Override
public void onChannelException(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}

//监测闲置事件
@Override
public void onChannelIdle(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
}

可以看到这个类实现了ChannelEventListener接口,除了onChannelConnect外,其余各个方法均委托给namesrvController的routeInfoManager的onChannelDestroy方法。

这里需要netty的一些基础,简单来说每一个broker与namesrv通过一个“通道”channel进行“沟通”。namesrv通过监测这些通道是否发生某些事件,去做出相应的变动。可以点进routeInfoManager的onChannelDestroy方法看看,对于宕机的broker是如何处理的。

onChannelDestroy

onChannelDestroy大概有130行的代码,我打算将它拆开来分析,那么我拆的依据是什么呢?

通过上一篇博客我们知道一个broker注册到namesrv的时候会触发registerBroker方法,registerBroker会往brokerAddrTable、clusterAddrTable、brokerLiveTable、topicQueueTable中添加新的broker信息。那么现在发现了一个宕机的broker,就是将上述的添加的信息一个个去除即可!!

放一张registerBroker注册的图,方便我们知道broker和这些路由的关联。

你想输入的替代文字

移除brokerLiveTable

onChannelDestroy第一个要删除的就是brokerLiveTable中的信息,截取部分onChannelDestroy的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void onChannelDestroy(String remoteAddr, Channel channel) {
String brokerAddrFound = null;
if (channel != null) {
try {
try {
this.lock.readLock().lockInterruptibly();
//遍历了存活的BrokerTable,如果channel是相同的,获取对应的BrokerAddress
Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =
this.brokerLiveTable.entrySet().iterator();
while (itBrokerLiveTable.hasNext()) {
Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();
//如果LiveTable中的channel和传入的channel相同,则返回对应的brokerAddr
if (entry.getValue().getChannel() == channel) {
brokerAddrFound = entry.getKey();
break;
}
}
} finally {
this.lock.readLock().unlock();
}
} catch (Exception e) {
log.error("onChannelDestroy Exception", e);
}
}

///省略...............
}

这部分是比较简单的,就是迭代了brokerLiveTable,判断live表中channel和传入的channel是否为同一个对象,如果是则从把这个channel对应的brokerAddr返回到brokerAddrFound中。如果没有找到那么brokerAddrFound为null。

你们可能会有疑问为什么这里不在brokerLiveTable移除这个channel呢?!!

注意看这步的操作是加了一个readLock,意味着现在允许其他线程对brokerLiveTable是可读,但是不允许其他线程变动brokerLiveTable。因为它希望再要修改brokerLiveTable的时候,读表的操作是不会被阻塞的

移除brokerAddrTable

接着往下看如何修改brokerAddrTable的,截取部分onChannelDestroy的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
try {
this.lock.writeLock().lockInterruptibly();
//根据找到的brokerAddrFound移除LiveTable中的Broker
this.brokerLiveTable.remove(brokerAddrFound);
this.filterServerTable.remove(brokerAddrFound);
String brokerNameFound = null;
boolean removeBrokerName = false;
Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
this.brokerAddrTable.entrySet().iterator();
while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
BrokerData brokerData = itBrokerAddrTable.next().getValue();

Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> entry = it.next();
Long brokerId = entry.getKey();
String brokerAddr = entry.getValue();
//现在是针对brokerData中的BrokerAddrs,找到对应的从BrokerAddrs移除
if (brokerAddr.equals(brokerAddrFound)) {
brokerNameFound = brokerData.getBrokerName();
it.remove();
log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
brokerId, brokerAddr);
break;
}
}

//如果BrokerAddrs为空,那么brokerDataTable就需要把这个没有用的brokerData移除
if (brokerData.getBrokerAddrs().isEmpty()) {
removeBrokerName = true;
itBrokerAddrTable.remove();
log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
brokerData.getBrokerName());
}
}
//省略.....
}

首先要明白brokerAddrTable是一个hashmap1,一个brokername对应一个brokerData。每一个brokerData存放了一个BrokerAddrs为hashmap2,一个brokerID对应一个brokerAddr。

根据前面获得brokerAddrFound,第一个就是要移除BrokerAddrs中的一组信息。如果移除后,当前的BrokerAddrs已经是空的了:if (brokerData.getBrokerAddrs().isEmpty()),那么意味着一个brokername已经找不到对应的broker的addr和id了。那么就要把brokerAddrTable对应的broker也要移除。解释的可能不太清楚,不过看看注册图可能就会明白。

移除clusterAddrTable

clusterAddrTable存放着集群的信息,如果宕机的broker的brokername只有它一个,那么集群信息中就需要把clusterAddrTable中的set移除,截取部分onChannelDestroy的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//如果removeBrokerName,那么意味着以这个brokername为name的broker没有一个了。
//那么对应的集群中已经没有了cluter - > broker了。所以这里要把集群中一个broker删除掉
if (brokerNameFound != null && removeBrokerName) {
Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, Set<String>> entry = it.next();
String clusterName = entry.getKey();
Set<String> brokerNames = entry.getValue();
//直接移除在set中移除brokerNameFound
boolean removed = brokerNames.remove(brokerNameFound);
if (removed) {
log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
brokerNameFound, clusterName);

if (brokerNames.isEmpty()) {
log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
clusterName);
it.remove();
}

break;
}
}
}

移除topicQueueTable

一个topic可能会存在多个broker中,如果removeBrokerName为true,意味着原本存在brokername的信息就要移除,变量topicQueueTable中的List,如果有对应的brokername就移除。截取部分onChannelDestroy的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//如果removeBrokerName,那么topic就不会再存放在一个叫brokerNameFound的broker中了,移除
if (removeBrokerName) {
Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
this.topicQueueTable.entrySet().iterator();
while (itTopicQueueTable.hasNext()) {
Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
String topic = entry.getKey();
List<QueueData> queueDataList = entry.getValue();

Iterator<QueueData> itQueueData = queueDataList.iterator();
while (itQueueData.hasNext()) {
QueueData queueData = itQueueData.next();
if (queueData.getBrokerName().equals(brokerNameFound)) {
itQueueData.remove();
log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",
topic, queueData);
}
}

if (queueDataList.isEmpty()) {
itTopicQueueTable.remove();
log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
topic);
}
}
}

onChannelDestroy整体代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
public void onChannelDestroy(String remoteAddr, Channel channel) {
String brokerAddrFound = null;
if (channel != null) {
try {
try {
this.lock.readLock().lockInterruptibly();
//遍历了存活的BrokerTable,如果channel是相同的,获取对应的BrokerAddress
Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =
this.brokerLiveTable.entrySet().iterator();
while (itBrokerLiveTable.hasNext()) {
Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();
if (entry.getValue().getChannel() == channel) {
brokerAddrFound = entry.getKey();
break;
}
}
} finally {
this.lock.readLock().unlock();
}
} catch (Exception e) {
log.error("onChannelDestroy Exception", e);
}
}

if (null == brokerAddrFound) {//说明现有存活的表中没有这个broker,直接用传入的remoteAddr
brokerAddrFound = remoteAddr;
} else {
log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);
}

if (brokerAddrFound != null && brokerAddrFound.length() > 0) {

try {
try {
this.lock.writeLock().lockInterruptibly();
//根据找到的brokerAddrFound移除LiveTable中的Broker
this.brokerLiveTable.remove(brokerAddrFound);
//TODO filterServerTable是干什么的?
this.filterServerTable.remove(brokerAddrFound);
String brokerNameFound = null;
boolean removeBrokerName = false;
Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
this.brokerAddrTable.entrySet().iterator();
while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
BrokerData brokerData = itBrokerAddrTable.next().getValue();

Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> entry = it.next();
Long brokerId = entry.getKey();
String brokerAddr = entry.getValue();
//上面的操作是将liveTable中移除,现在是针对brokerData中的BrokerAddrs,找到对应的从BrokerAddrs移除
if (brokerAddr.equals(brokerAddrFound)) {
brokerNameFound = brokerData.getBrokerName();
it.remove();
log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
brokerId, brokerAddr);
break;
}
}

//如果BrokerAddrs为空,那么brokerDataTable就需要把这个没有用的brokerData移除
if (brokerData.getBrokerAddrs().isEmpty()) {
removeBrokerName = true;
itBrokerAddrTable.remove();
log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
brokerData.getBrokerName());
}
}

//如果removeBrokerName,那么意味着以这个brokername为name的broker没有一个了。
//那么对应的集群中已经没有了cluter - > broker了。所以这里要把集群中一个broker删除掉
if (brokerNameFound != null && removeBrokerName) {
Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, Set<String>> entry = it.next();
String clusterName = entry.getKey();
Set<String> brokerNames = entry.getValue();
boolean removed = brokerNames.remove(brokerNameFound);
if (removed) {
log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
brokerNameFound, clusterName);

if (brokerNames.isEmpty()) {
log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
clusterName);
it.remove();
}

break;
}
}
}

//如果removeBrokerName,那么topic就不会再存放在一个叫brokerNameFound的broker中了,移除
if (removeBrokerName) {
Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
this.topicQueueTable.entrySet().iterator();
while (itTopicQueueTable.hasNext()) {
Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
String topic = entry.getKey();
List<QueueData> queueDataList = entry.getValue();

Iterator<QueueData> itQueueData = queueDataList.iterator();
while (itQueueData.hasNext()) {
QueueData queueData = itQueueData.next();
if (queueData.getBrokerName().equals(brokerNameFound)) {
itQueueData.remove();
log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",
topic, queueData);
}
}

if (queueDataList.isEmpty()) {
itTopicQueueTable.remove();
log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
topic);
}
}
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("onChannelDestroy Exception", e);
}
}
}