producer
在rocketmq
的作用是消息的生产者,consumer
在rocketmq
的作用是消息的消费者,它的生命周期是跟项目相关的,即是由使用者控制的。而为什么要将这两个角色的启动关闭流程放在一起剖析呢?是因为他们都是MQ的客户端,在启动和关闭的行为上,有着很多共同的地方。接下来便将会来仔细探究其启动和关闭的过程。
Producer DefaultMQProducer DefaultMQProducer的启动 producer
在启动的时候会做一系列的内部初始化,其启动的源码如下所示:
1 2 3 4 5 6 7 8 9 10 11 public void start () throws MQClientException { this .setProducerGroup(withNamespace(this .producerGroup)); this .defaultMQProducerImpl.start(); if (null != traceDispatcher) { try { traceDispatcher.start(this .getNamesrvAddr(), this .getAccessChannel()); } catch (MQClientException e) { log.warn("trace dispatcher start failed " , e); } } }
从简短的代码可以看见其做的事情是对producer
的group
的设置、核心初始化、trace
的初始化。接下来便仔细说明下核心初始化过程,其余是其和consumer
所共有的,会在公共行为部分进行剖析。
producer
核心初始化producer
启动初始化化部分核心逻辑如下,第一次启动的时候它会执行CREATE_JUST
分支:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public void start (final boolean startFactory) throws MQClientException { switch (this .serviceState) { case CREATE_JUST: this .serviceState = ServiceState.START_FAILED; this .checkConfig(); if (!this .defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) { this .defaultMQProducer.changeInstanceNameToPID(); } this .mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this .defaultMQProducer, rpcHook); boolean registerOK = mQClientFactory.registerProducer(this .defaultMQProducer.getProducerGroup(), this ); if (!registerOK) { this .serviceState = ServiceState.CREATE_JUST; throw new MQClientException("The producer group[" + this .defaultMQProducer.getProducerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null ); } this .topicPublishInfoTable.put(this .defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo()); if (startFactory) { mQClientFactory.start(); } log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}" , this .defaultMQProducer.getProducerGroup(), this .defaultMQProducer.isSendMessageWithVIPChannel()); this .serviceState = ServiceState.RUNNING; break ; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The producer service state not OK, maybe started once, " + this .serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null ); default : break ; } this .mQClientFactory.sendHeartbeatToAllBrokerWithLock(); }
浏览过这段代码,便可以很容易看出其中的核心逻辑在于MQClient
的启动,以及producer
与broker
之间的心跳触发,这两部分是与consumer
所共有的,会在公共行为部分进行剖析。
DefaultMQProducer的关闭 producer
的关闭行为是producer
从RUNNING
状态到SHUTDOWN_ALREADY
状态的过程,主要做了三件事:
从注册表中清除了本身实例;
关闭了线程池资源;
清楚了client的资源;
核心代码如下所示:
1 2 3 4 5 this .mQClientFactory.unregisterProducer(this .defaultMQProducer.getProducerGroup());this .defaultAsyncSenderExecutor.shutdown();if (shutdownFactory) { this .mQClientFactory.shutdown(); }
TransactionMQProducer TransactionMQProducer的启动 事务生产者TransactionMQProducer
是默认的生产者的一个增强,它在启动方面多了一步操作:
1 2 3 4 public void start () throws MQClientException { this .defaultMQProducerImpl.initTransactionEnv(); super .start(); }
可以看出多的这一步则是在事务方面的一个初始化,即事务检查的线程池初始化,其源码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public void initTransactionEnv () { TransactionMQProducer producer = (TransactionMQProducer) this .defaultMQProducer; if (producer.getExecutorService() != null ) { this .checkExecutor = producer.getExecutorService(); } else { this .checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax()); this .checkExecutor = new ThreadPoolExecutor( producer.getCheckThreadPoolMinSize(), producer.getCheckThreadPoolMaxSize(), 1000 * 60 , TimeUnit.MILLISECONDS, this .checkRequestQueue); } }
TransactionMQProducer的关闭 同样,事务生产者在关闭的时候也要多一步关闭事务事务检查线程池资源的操作,具体就不在详细说明。
Consumer PushConsumer PushConsumer的启动 push
模式的消费者同生产者一样,在启动的时候依然是先包装group
,然后在执行核心流程,最后在根据情况初始化trace
。其源码如下:
1 2 3 4 5 6 7 8 9 10 11 public void start () throws MQClientException { setConsumerGroup(NamespaceUtil.wrapNamespace(this .getNamespace(), this .consumerGroup)); this .defaultMQPushConsumerImpl.start(); if (null != traceDispatcher) { try { traceDispatcher.start(this .getNamesrvAddr(), this .getAccessChannel()); } catch (MQClientException e) { log.warn("trace dispatcher start failed " , e); } } }
接下来便仔细说明下核心初始化过程,其余是其和producer
所共有的,会在公共行为部分进行剖析。
PushConsumer
核心初始化这里直接附上核心的start
过程源码。其一样的走CREATE_JUST
分支,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 public synchronized void start () throws MQClientException { switch (this .serviceState) { case CREATE_JUST: log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}" , this .defaultMQPushConsumer.getConsumerGroup(), this .defaultMQPushConsumer.getMessageModel(), this .defaultMQPushConsumer.isUnitMode()); this .serviceState = ServiceState.START_FAILED; this .checkConfig(); this .copySubscription(); if (this .defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) { this .defaultMQPushConsumer.changeInstanceNameToPID(); } this .mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this .defaultMQPushConsumer, this .rpcHook); this .rebalanceImpl.setConsumerGroup(this .defaultMQPushConsumer.getConsumerGroup()); this .rebalanceImpl.setMessageModel(this .defaultMQPushConsumer.getMessageModel()); this .rebalanceImpl.setAllocateMessageQueueStrategy(this .defaultMQPushConsumer.getAllocateMessageQueueStrategy()); this .rebalanceImpl.setmQClientFactory(this .mQClientFactory); this .pullAPIWrapper = new PullAPIWrapper( mQClientFactory, this .defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); this .pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); if (this .defaultMQPushConsumer.getOffsetStore() != null ) { this .offsetStore = this .defaultMQPushConsumer.getOffsetStore(); } else { switch (this .defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: this .offsetStore = new LocalFileOffsetStore(this .mQClientFactory, this .defaultMQPushConsumer.getConsumerGroup()); break ; case CLUSTERING: this .offsetStore = new RemoteBrokerOffsetStore(this .mQClientFactory, this .defaultMQPushConsumer.getConsumerGroup()); break ; default : break ; } this .defaultMQPushConsumer.setOffsetStore(this .offsetStore); } this .offsetStore.load(); if (this .getMessageListenerInner() instanceof MessageListenerOrderly) { this .consumeOrderly = true ; this .consumeMessageService = new ConsumeMessageOrderlyService(this , (MessageListenerOrderly) this .getMessageListenerInner()); } else if (this .getMessageListenerInner() instanceof MessageListenerConcurrently) { this .consumeOrderly = false ; this .consumeMessageService = new ConsumeMessageConcurrentlyService(this , (MessageListenerConcurrently) this .getMessageListenerInner()); } this .consumeMessageService.start(); boolean registerOK = mQClientFactory.registerConsumer(this .defaultMQPushConsumer.getConsumerGroup(), this ); if (!registerOK) { this .serviceState = ServiceState.CREATE_JUST; this .consumeMessageService.shutdown(); throw new MQClientException("The consumer group[" + this .defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null ); } mQClientFactory.start(); log.info("the consumer [{}] start OK." , this .defaultMQPushConsumer.getConsumerGroup()); this .serviceState = ServiceState.RUNNING; break ; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The PushConsumer service state not OK, maybe started once, " + this .serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null ); default : break ; } this .updateTopicSubscribeInfoWhenSubscriptionChanged(); this .mQClientFactory.checkClientInBroker(); this .mQClientFactory.sendHeartbeatToAllBrokerWithLock(); this .mQClientFactory.rebalanceImmediately(); }
这里来看一下消费偏移的load情况,消费偏移的加载分从远程broker
和本地文件加载,远程由RemoteBrokerOffsetStore
来实现,本地由LocalFileOffsetStore
来实现,具体使用哪儿种取决于是集群模式还是广播模式,者同样是影响消费偏移是持久化到本地还是远程的问题。
远程的加载其实没有做任何事情,源码如下所示:
1 2 3 @Override public void load () {}
从本地文件加载则是会根据是否从文件中读取到内容决定,具体源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public void load () throws MQClientException { OffsetSerializeWrapper offsetSerializeWrapper = this .readLocalOffset(); if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null ) { offsetTable.putAll(offsetSerializeWrapper.getOffsetTable()); for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) { AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq); log.info("load consumer's offset, {} {} {}" , this .groupName, mq, offset.get()); } } }
PushConsumer的关闭 push
模式consumer
在关闭的时候清理资源的情况如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public synchronized void shutdown () { switch (this .serviceState) { case CREATE_JUST: break ; case RUNNING: this .consumeMessageService.shutdown(); this .persistConsumerOffset(); this .mQClientFactory.unregisterConsumer(this .defaultMQPushConsumer.getConsumerGroup()); this .mQClientFactory.shutdown(); log.info("the consumer [{}] shutdown OK" , this .defaultMQPushConsumer.getConsumerGroup()); this .rebalanceImpl.destroy(); this .serviceState = ServiceState.SHUTDOWN_ALREADY; break ; case SHUTDOWN_ALREADY: break ; default : break ; } }
PullConsumer PullConsumer的启动 pull
模式的消费者在启动的时候则只有包装group
和执行核心流程,其源码如下:
1 2 3 4 public void start () throws MQClientException { this .setConsumerGroup(NamespaceUtil.wrapNamespace(this .getNamespace(), this .consumerGroup)); this .defaultMQPullConsumerImpl.start(); }
接下来便仔细说明下核心初始化过程,包装group
部分会在公共行为部分进行剖析。
PullConsumer核心初始化 这里直接附上核心的start过程源码,其一样的走CREATE_JUST分支,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 public synchronized void start () throws MQClientException { switch (this .serviceState) { case CREATE_JUST: this .serviceState = ServiceState.START_FAILED; this .checkConfig(); this .copySubscription(); if (this .defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) { this .defaultMQPullConsumer.changeInstanceNameToPID(); } this .mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this .defaultMQPullConsumer, this .rpcHook); this .rebalanceImpl.setConsumerGroup(this .defaultMQPullConsumer.getConsumerGroup()); this .rebalanceImpl.setMessageModel(this .defaultMQPullConsumer.getMessageModel()); this .rebalanceImpl.setAllocateMessageQueueStrategy(this .defaultMQPullConsumer.getAllocateMessageQueueStrategy()); this .rebalanceImpl.setmQClientFactory(this .mQClientFactory); this .pullAPIWrapper = new PullAPIWrapper( mQClientFactory, this .defaultMQPullConsumer.getConsumerGroup(), isUnitMode()); this .pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); if (this .defaultMQPullConsumer.getOffsetStore() != null ) { this .offsetStore = this .defaultMQPullConsumer.getOffsetStore(); } else { switch (this .defaultMQPullConsumer.getMessageModel()) { case BROADCASTING: this .offsetStore = new LocalFileOffsetStore(this .mQClientFactory, this .defaultMQPullConsumer.getConsumerGroup()); break ; case CLUSTERING: this .offsetStore = new RemoteBrokerOffsetStore(this .mQClientFactory, this .defaultMQPullConsumer.getConsumerGroup()); break ; default : break ; } this .defaultMQPullConsumer.setOffsetStore(this .offsetStore); } this .offsetStore.load(); boolean registerOK = mQClientFactory.registerConsumer(this .defaultMQPullConsumer.getConsumerGroup(), this ); if (!registerOK) { this .serviceState = ServiceState.CREATE_JUST; throw new MQClientException("The consumer group[" + this .defaultMQPullConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null ); } mQClientFactory.start(); log.info("the consumer [{}] start OK" , this .defaultMQPullConsumer.getConsumerGroup()); this .serviceState = ServiceState.RUNNING; break ; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The PullConsumer service state not OK, maybe started once, " + this .serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null ); default : break ; } }
走览玩这段源码,可以看见,pull模式和push模式的消费者核心启动流程中,唯一的区别就是push模式下会有注册消息消费的监听器功能和最后检测连接操作,从这便可以看出,其实push模式是在pull模式上做了enhance。
PullConsumer的关闭 pull
模式consumer
在关闭的时候清理资源的情况如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public synchronized void shutdown () { switch (this .serviceState) { case CREATE_JUST: break ; case RUNNING: this .persistConsumerOffset(); this .mQClientFactory.unregisterConsumer(this .defaultMQPullConsumer.getConsumerGroup()); this .mQClientFactory.shutdown(); log.info("the consumer [{}] shutdown OK" , this .defaultMQPullConsumer.getConsumerGroup()); this .serviceState = ServiceState.SHUTDOWN_ALREADY; break ; case SHUTDOWN_ALREADY: break ; default : break ; } }
公共行为 包装设置 group
对于group
的设置,主要是在给予的group
参数基础上加了一层名字空间域namespace
,而在包装的过程中首先是要获取namespace
的值,源码如下:
1 2 3 4 5 6 7 8 9 10 11 public String getNamespace () { if (StringUtils.isNotEmpty(namespace)) { return namespace; } if (StringUtils.isNotEmpty(this .namesrvAddr)) { if (NameServerAddressUtils.validateInstanceEndpoint(namesrvAddr)) { return NameServerAddressUtils.parseInstanceIdFromEndpoint(namesrvAddr); } } return namespace; }
这段源码来自ClientConfig
,可以看见其是先尝试获取client
中配置的值,如果值不存在的话便尝试从实例的Endpoint
中获取,从端点中获取是通过匹正则来正则来判断的,如果匹配成功,则从InstanceEndpoint
中截取namespace
得值,正则表达式如下所示:
只要没有做配置,那便获取不到值,因为不存在默认值。在执行完毕获取namespace
值的操作后,便是真正的来包装group
的值了。源码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public static String wrapNamespace (String namespace, String resourceWithOutNamespace) { if (StringUtils.isEmpty(namespace) || StringUtils.isEmpty(resourceWithOutNamespace)) { return resourceWithOutNamespace; } if (isSystemResource(resourceWithOutNamespace) || isAlreadyWithNamespace(resourceWithOutNamespace, namespace)) { return resourceWithOutNamespace; } String resourceWithoutRetryAndDLQ = withOutRetryAndDLQ(resourceWithOutNamespace); StringBuffer strBuffer = new StringBuffer(); if (isRetryTopic(resourceWithOutNamespace)) { strBuffer.append(MixAll.RETRY_GROUP_TOPIC_PREFIX); } if (isDLQTopic(resourceWithOutNamespace)) { strBuffer.append(MixAll.DLQ_GROUP_TOPIC_PREFIX); } return strBuffer.append(namespace).append(NAMESPACE_SEPARATOR).append(resourceWithoutRetryAndDLQ).toString(); }
消息轨迹功能开启 消息轨迹功能是异步的,其的核心也是一个producer
,具体初始化话过程如下:
1 2 3 4 5 6 7 8 9 10 11 12 public void start (String nameSrvAddr, AccessChannel accessChannel) throws MQClientException { if (isStarted.compareAndSet(false , true )) { traceProducer.setNamesrvAddr(nameSrvAddr); traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr); traceProducer.start(); } this .accessChannel = accessChannel; this .worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId); this .worker.setDaemon(true ); this .worker.start(); this .registerShutDownHook(); }
MQClient
启动这里直接来看MQClient
的启动,如果是第一次启动则走CREATE_JUST
分支,核心源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public void start () throws MQClientException { synchronized (this ) { switch (this .serviceState) { case CREATE_JUST: this .serviceState = ServiceState.START_FAILED; if (null == this .clientConfig.getNamesrvAddr()) { this .mQClientAPIImpl.fetchNameServerAddr(); } this .mQClientAPIImpl.start(); this .startScheduledTask(); this .pullMessageService.start(); this .rebalanceService.start(); this .defaultMQProducer.getDefaultMQProducerImpl().start(false ); log.info("the client factory [{}] start OK" , this .clientId); this .serviceState = ServiceState.RUNNING; break ; case RUNNING: break ; case SHUTDOWN_ALREADY: break ; case START_FAILED: throw new MQClientException("The Factory object[" + this .getClientId() + "] has been created before, and failed." , null ); default : break ; } } }
可以看见,其主要是做了如下6件事:
再次检查namesrv
地址
打开client的交互信道
启动pull-message服务
启动push-message服务
启动均衡负载服务
启动一系列定时任务
其中最核心的便是启动的这些定时任务,具体定时任务如下:
定时刷新namesrv
地址;本地的topic
路由;
定时处理broker
(触发心跳,清理下线的);
定时持久化消费者消费位置;
定时处理消费者的线程池;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 private void startScheduledTask () { if (null == this .clientConfig.getNamesrvAddr()) { this .scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run () { try { MQClientInstance.this .mQClientAPIImpl.fetchNameServerAddr(); } catch (Exception e) { log.error("ScheduledTask fetchNameServerAddr exception" , e); } } }, 1000 * 10 , 1000 * 60 * 2 , TimeUnit.MILLISECONDS); } this .scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run () { try { MQClientInstance.this .updateTopicRouteInfoFromNameServer(); } catch (Exception e) { log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception" , e); } } }, 10 , this .clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS); this .scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run () { try { MQClientInstance.this .cleanOfflineBroker(); MQClientInstance.this .sendHeartbeatToAllBrokerWithLock(); } catch (Exception e) { log.error("ScheduledTask sendHeartbeatToAllBroker exception" , e); } } }, 1000 , this .clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS); this .scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run () { try { MQClientInstance.this .persistAllConsumerOffset(); } catch (Exception e) { log.error("ScheduledTask persistAllConsumerOffset exception" , e); } } }, 1000 * 10 , this .clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS); this .scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run () { try { MQClientInstance.this .adjustThreadPool(); } catch (Exception e) { log.error("ScheduledTask adjustThreadPool exception" , e); } } }, 1 , 1 , TimeUnit.MINUTES); }
broker
心跳触发在触发心跳的操作中,具体分为两个部分,如下所示:
1 2 3 4 this .sendHeartbeatToAllBroker();this .uploadFilterClassSource();
上传消息过滤类资源是为了push模式下的消费者进行消息过滤用的,这里不做探究,直接来看启动触发心跳的核心代码,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 private void sendHeartbeatToAllBroker () { final HeartbeatData heartbeatData = this .prepareHeartbeatData(); final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty(); final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty(); if (producerEmpty && consumerEmpty) { log.warn("sending heartbeat, but no consumer and no producer" ); return ; } if (!this .brokerAddrTable.isEmpty()) { long times = this .sendHeartbeatTimesTotal.getAndIncrement(); Iterator<Entry<String, HashMap<Long, String>>> it = this .brokerAddrTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, HashMap<Long, String>> entry = it.next(); String brokerName = entry.getKey(); HashMap<Long, String> oneTable = entry.getValue(); if (oneTable != null ) { for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) { Long id = entry1.getKey(); String addr = entry1.getValue(); if (addr != null ) { if (consumerEmpty) { if (id != MixAll.MASTER_ID) continue ; } try { int version = this .mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000 ); if (!this .brokerVersionTable.containsKey(brokerName)) { this .brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4 )); } this .brokerVersionTable.get(brokerName).put(addr, version); if (times % 20 == 0 ) { log.info("send heart beat to broker[{} {} {}] success" , brokerName, id, addr); log.info(heartbeatData.toString()); } } catch (Exception e) { if (this .isBrokerInNameServer(addr)) { log.info("send heart beat to broker[{} {} {}] failed" , brokerName, id, addr, e); } else { log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it" , brokerName, id, addr, e); } } } } } } } }