0%

rocketmq源码剖析之客户端topic路由刷新

rocketmq的client在启动的时候,会通过开启一个定时任务来定期刷新topic信息,这里就来看一下这个刷新的过程。

首先来看一下这个定时任务:

1
2
3
4
5
6
7
8
9
10
11
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

从源码可以看见,这个定时任务是在client启动之后延迟10ms执行,并且之后每隔一定时长重复执行,如果没有特定设置,该定时为30s。

1
2
3
4
/**
* Pulling topic information interval from the named server
*/
private int pollNameServerInterval = 1000 * 30;

在执行刷新topic信息的操作中,其首先是从生产者和消费者两个注册表中获取到topic信息,然后再迭代更新。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public void updateTopicRouteInfoFromNameServer() {
Set<String> topicList = new HashSet<String>();

// Consumer
{
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
Set<SubscriptionData> subList = impl.subscriptions();
if (subList != null) {
for (SubscriptionData subData : subList) {
topicList.add(subData.getTopic());
}
}
}
}
}

// Producer
{
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
Set<String> lst = impl.getPublishTopicList();
topicList.addAll(lst);
}
}
}

for (String topic : topicList) {
this.updateTopicRouteInfoFromNameServer(topic);
}
}

在更新方面,首先是从远程(namesrv)获取到最新的topic数据,然后再更新。执行更新操作后,对对新老数据进行比较来判断是否需要更新发布订阅的信息,如果需要则去更新。具体源码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
public boolean updateTopicRouteInfoFromNameServer(final String topic) {
// 不指定默认的producer
return updateTopicRouteInfoFromNameServer(topic, false, null);
}

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
try {
// 可重入锁
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
// 如果指定了默认的producer,则获取指定的topic信息
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
// 如果获取到topic信息则设置队列数
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
// 根据给定topic从namesrv获取数据
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic);
// 判断topic信息是否变化
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
// 如果没有变化则再次判断是否需要更新,如果当前的没有发布队列,则会进行更新
changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}

// 如果需要更新
if (changed) {
// 克隆拷贝一份topic数据
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();

// 设置broker地址
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}

// 更新消息发布者的信息
{
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}

// 更新消息订阅者的信息
{
Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicSubscribeInfo(topic, subscribeInfo);
}
}
}
log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
// 设置topic路由信息
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;
}
} else {
log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
}
} catch (Exception e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
} finally {
this.lockNamesrv.unlock();
}
} else {
log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);
}
} catch (InterruptedException e) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}

return false;
}