| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 
 | public boolean updateTopicRouteInfoFromNameServer(final String topic) {
 return updateTopicRouteInfoFromNameServer(topic, false, null);
 }
 
 public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
 DefaultMQProducer defaultMQProducer) {
 try {
 
 if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
 try {
 TopicRouteData topicRouteData;
 if (isDefault && defaultMQProducer != null) {
 
 topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
 1000 * 3);
 
 if (topicRouteData != null) {
 for (QueueData data : topicRouteData.getQueueDatas()) {
 int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
 data.setReadQueueNums(queueNums);
 data.setWriteQueueNums(queueNums);
 }
 }
 } else {
 
 topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
 }
 if (topicRouteData != null) {
 TopicRouteData old = this.topicRouteTable.get(topic);
 
 boolean changed = topicRouteDataIsChange(old, topicRouteData);
 if (!changed) {
 
 changed = this.isNeedUpdateTopicRouteInfo(topic);
 } else {
 log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
 }
 
 
 if (changed) {
 
 TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
 
 
 for (BrokerData bd : topicRouteData.getBrokerDatas()) {
 this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
 }
 
 
 {
 TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
 publishInfo.setHaveTopicRouterInfo(true);
 Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
 while (it.hasNext()) {
 Entry<String, MQProducerInner> entry = it.next();
 MQProducerInner impl = entry.getValue();
 if (impl != null) {
 impl.updateTopicPublishInfo(topic, publishInfo);
 }
 }
 }
 
 
 {
 Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
 Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
 while (it.hasNext()) {
 Entry<String, MQConsumerInner> entry = it.next();
 MQConsumerInner impl = entry.getValue();
 if (impl != null) {
 impl.updateTopicSubscribeInfo(topic, subscribeInfo);
 }
 }
 }
 log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
 
 this.topicRouteTable.put(topic, cloneTopicRouteData);
 return true;
 }
 } else {
 log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
 }
 } catch (Exception e) {
 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
 log.warn("updateTopicRouteInfoFromNameServer Exception", e);
 }
 } finally {
 this.lockNamesrv.unlock();
 }
 } else {
 log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);
 }
 } catch (InterruptedException e) {
 log.warn("updateTopicRouteInfoFromNameServer Exception", e);
 }
 
 return false;
 }
 
 |