前言
RocketMQ在早期版本使用的是Zookeeper,考虑到Zookeeper不够轻量,后面用namesrv来代替。熟悉Zookeeper的知道它使用心跳检测来判断服务是否正常,那么namesrv肯定也少不了这块功能,那么它是怎么判断一个Broker节点是否Live呢?如果一个Broker断开了namesrv是如何处理的呢?
在前面篇章提到NamesrvController的构造函数中有一个BrokerHousekeepingService,这个就是来处理Broker连接发生变化的服务。
BrokerHousekeepingService
BrokerHousekeepingService是NamesrvController构造函数的时候生成的:
1 | public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) { |
看一下BrokerHousekeepingService这个类的情况:
1 | public class BrokerHousekeepingService implements ChannelEventListener { |
可以看到这个类实现了ChannelEventListener接口,除了onChannelConnect外,其余各个方法均委托给namesrvController的routeInfoManager的onChannelDestroy方法。
这里需要netty的一些基础,简单来说每一个broker与namesrv通过一个“通道”channel进行“沟通”。namesrv通过监测这些通道是否发生某些事件,去做出相应的变动。可以点进routeInfoManager的onChannelDestroy方法看看,对于宕机的broker是如何处理的。
onChannelDestroy
onChannelDestroy大概有130行的代码,我打算将它拆开来分析,那么我拆的依据是什么呢?
通过上一篇博客我们知道一个broker注册到namesrv的时候会触发registerBroker方法,registerBroker会往brokerAddrTable、clusterAddrTable、brokerLiveTable、topicQueueTable中添加新的broker信息。那么现在发现了一个宕机的broker,就是将上述的添加的信息一个个去除即可!!
放一张registerBroker注册的图,方便我们知道broker和这些路由的关联。
/1.png)
移除brokerLiveTable
onChannelDestroy第一个要删除的就是brokerLiveTable中的信息,截取部分onChannelDestroy的代码:
1 | public void onChannelDestroy(String remoteAddr, Channel channel) { |
这部分是比较简单的,就是迭代了brokerLiveTable,判断live表中channel和传入的channel是否为同一个对象,如果是则从把这个channel对应的brokerAddr返回到brokerAddrFound中。如果没有找到那么brokerAddrFound为null。
你们可能会有疑问为什么这里不在brokerLiveTable移除这个channel呢?!!注意看这步的操作是加了一个readLock,意味着现在允许其他线程对brokerLiveTable是可读,但是不允许其他线程变动brokerLiveTable。因为它希望再要修改brokerLiveTable的时候,读表的操作是不会被阻塞的。
移除brokerAddrTable
接着往下看如何修改brokerAddrTable的,截取部分onChannelDestroy的代码:
1 | try { |
首先要明白brokerAddrTable是一个hashmap1,一个brokername对应一个brokerData。每一个brokerData存放了一个BrokerAddrs为hashmap2,一个brokerID对应一个brokerAddr。
根据前面获得brokerAddrFound,第一个就是要移除BrokerAddrs中的一组信息。如果移除后,当前的BrokerAddrs已经是空的了:if (brokerData.getBrokerAddrs().isEmpty()),那么意味着一个brokername已经找不到对应的broker的addr和id了。那么就要把brokerAddrTable对应的broker也要移除。解释的可能不太清楚,不过看看注册图可能就会明白。
移除clusterAddrTable
clusterAddrTable存放着集群的信息,如果宕机的broker的brokername只有它一个,那么集群信息中就需要把clusterAddrTable中的set
1 | //如果removeBrokerName,那么意味着以这个brokername为name的broker没有一个了。 |
移除topicQueueTable
一个topic可能会存在多个broker中,如果removeBrokerName为true,意味着原本存在brokername的信息就要移除,变量topicQueueTable中的List
1 | //如果removeBrokerName,那么topic就不会再存放在一个叫brokerNameFound的broker中了,移除 |
onChannelDestroy整体代码
1 | public void onChannelDestroy(String remoteAddr, Channel channel) { |