RocketMQ 源码分析 HA高可用(八)

前言

这篇来记录一下RocketMQ是如何实现高可用的。主要是来分析一下NameSrv、Broker的HAService。

NameSrv高可用

namesrv在RocketMQ充当了一个注册中心的角色,但不像Zookeeper需要实现Leader和Follower之间的同步。那namesrv是如何通过集群的方式到达高可用的呢?

namesrv上路由信息是通过broker启动的时候注册进来的,在之后有通过心跳来更新路由信息。为了解决namesrv集群的数据同步问题,broker在注册的时候是向每一个namesrv都注册的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public List<RegisterBrokerResult> registerBrokerAll(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final boolean oneway,
final int timeoutMills,
final boolean compressed) {

final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
//获得所有namesrv的地址
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {

final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);

RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
requestHeader.setBodyCrc32(bodyCrc32);
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
//向namesrv每个地址都进行注册
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
try {
RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
if (result != null) {
registerBrokerResultList.add(result);
}

log.info("register broker to name server {} OK", namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
}
});
}

try {
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}

return registerBrokerResultList;
}

Broker 高可用

以同一个brokername构造broker集群,进而实现高可用。集群情况:broker_master 1 + broker_slave N 。Master节点 提供读写服务, Slave节点 只提供读服务。

broker集群的同步主要体现在以下两点:

  • slaver->master:主要是指slave会定期向master发起同步数据请求,master向slave返回数据。
  • master->slave:在设置为同步双写的时候,master每写入一条消息都会同步到slave当中。设置异步复制的时候,master写入的一条消息不需要等到slave返回写入成功。

先来看第一种情况slaver->master:

当slaver启动时,会上报当前的commitlog offset偏移值。一般会有两种情况,一种是slaver是全新的机器,commitlog是空的,传递maxPhyOffset=0。一种是master在使用过程中宕机了,经运维人员修复后重启成功,继续工作,master会根据slaver上报的maxPhyOffset的值继续同步。每次最大同步32k数据。

HAService的内部类AcceptSocketService作为master broker的监听服务.HAConnection的内部类ReadSocketService负责处理slave的offset,WriteSocketService将commitlog发送给slave。

再来看第二种情况master->slaver:

HA同步复制, 当msg写入master的commitlog文件后,判断maser的角色如果是同步双写SYNC_MASTER, 等待master同步到slave在返回结果。

HA异步复制:

你想输入的替代文字