producer在rocketmq的作用是消息的生产者,consumer在rocketmq的作用是消息的消费者,它的生命周期是跟项目相关的,即是由使用者控制的。而为什么要将这两个角色的启动关闭流程放在一起剖析呢?是因为他们都是MQ的客户端,在启动和关闭的行为上,有着很多共同的地方。接下来便将会来仔细探究其启动和关闭的过程。
Producer DefaultMQProducer DefaultMQProducer的启动 producer在启动的时候会做一系列的内部初始化,其启动的源码如下所示:
1 2 3 4 5 6 7 8 9 10 11 public  void  start ()  throws  MQClientException     this .setProducerGroup(withNamespace(this .producerGroup));     this .defaultMQProducerImpl.start();     if  (null  != traceDispatcher) {         try  {             traceDispatcher.start(this .getNamesrvAddr(), this .getAccessChannel());         } catch  (MQClientException e) {             log.warn("trace dispatcher start failed " , e);         }     } } 
从简短的代码可以看见其做的事情是对producer的group的设置、核心初始化、trace的初始化。接下来便仔细说明下核心初始化过程,其余是其和consumer所共有的,会在公共行为部分进行剖析。
producer核心初始化producer启动初始化化部分核心逻辑如下,第一次启动的时候它会执行CREATE_JUST分支:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public  void  start (final  boolean  startFactory)  throws  MQClientException     switch  (this .serviceState) {         case  CREATE_JUST:             this .serviceState = ServiceState.START_FAILED;                          this .checkConfig();             if  (!this .defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {                                  this .defaultMQProducer.changeInstanceNameToPID();             }                          this .mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this .defaultMQProducer, rpcHook);                          boolean  registerOK = mQClientFactory.registerProducer(this .defaultMQProducer.getProducerGroup(), this );             if  (!registerOK) {                 this .serviceState = ServiceState.CREATE_JUST;                 throw  new  MQClientException("The producer group["  + this .defaultMQProducer.getProducerGroup()                     + "] has been created before, specify another name please."  + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),                     null );             }                          this .topicPublishInfoTable.put(this .defaultMQProducer.getCreateTopicKey(), new  TopicPublishInfo());             if  (startFactory) {                                  mQClientFactory.start();             }             log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}" , this .defaultMQProducer.getProducerGroup(),                 this .defaultMQProducer.isSendMessageWithVIPChannel());             this .serviceState = ServiceState.RUNNING;             break ;         case  RUNNING:         case  START_FAILED:         case  SHUTDOWN_ALREADY:             throw  new  MQClientException("The producer service state not OK, maybe started once, "                  + this .serviceState                 + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),                 null );         default :             break ;     }          this .mQClientFactory.sendHeartbeatToAllBrokerWithLock(); } 
浏览过这段代码,便可以很容易看出其中的核心逻辑在于MQClient的启动,以及producer与broker之间的心跳触发,这两部分是与consumer所共有的,会在公共行为部分进行剖析。
DefaultMQProducer的关闭 producer的关闭行为是producer从RUNNING状态到SHUTDOWN_ALREADY状态的过程,主要做了三件事:
从注册表中清除了本身实例; 
关闭了线程池资源; 
清楚了client的资源; 
 
核心代码如下所示:
1 2 3 4 5 this .mQClientFactory.unregisterProducer(this .defaultMQProducer.getProducerGroup());this .defaultAsyncSenderExecutor.shutdown();if  (shutdownFactory) {    this .mQClientFactory.shutdown(); } 
TransactionMQProducer TransactionMQProducer的启动 事务生产者TransactionMQProducer是默认的生产者的一个增强,它在启动方面多了一步操作:
1 2 3 4 public  void  start ()  throws  MQClientException     this .defaultMQProducerImpl.initTransactionEnv();     super .start(); } 
可以看出多的这一步则是在事务方面的一个初始化,即事务检查的线程池初始化,其源码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public  void  initTransactionEnv ()      TransactionMQProducer producer = (TransactionMQProducer) this .defaultMQProducer;     if  (producer.getExecutorService() != null ) {         this .checkExecutor = producer.getExecutorService();     } else  {         this .checkRequestQueue = new  LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax());         this .checkExecutor = new  ThreadPoolExecutor(             producer.getCheckThreadPoolMinSize(),             producer.getCheckThreadPoolMaxSize(),             1000  * 60 ,             TimeUnit.MILLISECONDS,             this .checkRequestQueue);     } } 
TransactionMQProducer的关闭 同样,事务生产者在关闭的时候也要多一步关闭事务事务检查线程池资源的操作,具体就不在详细说明。
Consumer PushConsumer PushConsumer的启动 push模式的消费者同生产者一样,在启动的时候依然是先包装group,然后在执行核心流程,最后在根据情况初始化trace。其源码如下:
1 2 3 4 5 6 7 8 9 10 11 public  void  start ()  throws  MQClientException     setConsumerGroup(NamespaceUtil.wrapNamespace(this .getNamespace(), this .consumerGroup));     this .defaultMQPushConsumerImpl.start();     if  (null  != traceDispatcher) {         try  {             traceDispatcher.start(this .getNamesrvAddr(), this .getAccessChannel());         } catch  (MQClientException e) {             log.warn("trace dispatcher start failed " , e);         }     } } 
接下来便仔细说明下核心初始化过程,其余是其和producer所共有的,会在公共行为部分进行剖析。
PushConsumer核心初始化这里直接附上核心的start过程源码。其一样的走CREATE_JUST分支,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 public  synchronized  void  start ()  throws  MQClientException     switch  (this .serviceState) {         case  CREATE_JUST:             log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}" , this .defaultMQPushConsumer.getConsumerGroup(),                 this .defaultMQPushConsumer.getMessageModel(), this .defaultMQPushConsumer.isUnitMode());             this .serviceState = ServiceState.START_FAILED;                          this .checkConfig();                          this .copySubscription();             if  (this .defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {                                  this .defaultMQPushConsumer.changeInstanceNameToPID();             }                          this .mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this .defaultMQPushConsumer, this .rpcHook);                          this .rebalanceImpl.setConsumerGroup(this .defaultMQPushConsumer.getConsumerGroup());             this .rebalanceImpl.setMessageModel(this .defaultMQPushConsumer.getMessageModel());             this .rebalanceImpl.setAllocateMessageQueueStrategy(this .defaultMQPushConsumer.getAllocateMessageQueueStrategy());             this .rebalanceImpl.setmQClientFactory(this .mQClientFactory);                          this .pullAPIWrapper = new  PullAPIWrapper(                 mQClientFactory,                 this .defaultMQPushConsumer.getConsumerGroup(), isUnitMode());                          this .pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);                          if  (this .defaultMQPushConsumer.getOffsetStore() != null ) {                 this .offsetStore = this .defaultMQPushConsumer.getOffsetStore();             } else  {                 switch  (this .defaultMQPushConsumer.getMessageModel()) {                     case  BROADCASTING:                                                  this .offsetStore = new  LocalFileOffsetStore(this .mQClientFactory, this .defaultMQPushConsumer.getConsumerGroup());                         break ;                     case  CLUSTERING:                                                  this .offsetStore = new  RemoteBrokerOffsetStore(this .mQClientFactory, this .defaultMQPushConsumer.getConsumerGroup());                         break ;                     default :                         break ;                 }                 this .defaultMQPushConsumer.setOffsetStore(this .offsetStore);             }                          this .offsetStore.load();             if  (this .getMessageListenerInner() instanceof  MessageListenerOrderly) {                                  this .consumeOrderly = true ;                 this .consumeMessageService =                     new  ConsumeMessageOrderlyService(this , (MessageListenerOrderly) this .getMessageListenerInner());             } else  if  (this .getMessageListenerInner() instanceof  MessageListenerConcurrently) {                                  this .consumeOrderly = false ;                 this .consumeMessageService =                     new  ConsumeMessageConcurrentlyService(this , (MessageListenerConcurrently) this .getMessageListenerInner());             }                          this .consumeMessageService.start();                          boolean  registerOK = mQClientFactory.registerConsumer(this .defaultMQPushConsumer.getConsumerGroup(), this );             if  (!registerOK) {                 this .serviceState = ServiceState.CREATE_JUST;                 this .consumeMessageService.shutdown();                 throw  new  MQClientException("The consumer group["  + this .defaultMQPushConsumer.getConsumerGroup()                     + "] has been created before, specify another name please."  + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),                     null );             }                          mQClientFactory.start();             log.info("the consumer [{}] start OK." , this .defaultMQPushConsumer.getConsumerGroup());             this .serviceState = ServiceState.RUNNING;             break ;         case  RUNNING:         case  START_FAILED:         case  SHUTDOWN_ALREADY:             throw  new  MQClientException("The PushConsumer service state not OK, maybe started once, "                  + this .serviceState                 + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),                 null );         default :             break ;     }          this .updateTopicSubscribeInfoWhenSubscriptionChanged();          this .mQClientFactory.checkClientInBroker();          this .mQClientFactory.sendHeartbeatToAllBrokerWithLock();          this .mQClientFactory.rebalanceImmediately(); } 
这里来看一下消费偏移的load情况,消费偏移的加载分从远程broker和本地文件加载,远程由RemoteBrokerOffsetStore来实现,本地由LocalFileOffsetStore来实现,具体使用哪儿种取决于是集群模式还是广播模式,者同样是影响消费偏移是持久化到本地还是远程的问题。
远程的加载其实没有做任何事情,源码如下所示:
1 2 3 @Override public  void  load ()  } 
从本地文件加载则是会根据是否从文件中读取到内容决定,具体源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public  void  load ()  throws  MQClientException          OffsetSerializeWrapper offsetSerializeWrapper = this .readLocalOffset();     if  (offsetSerializeWrapper != null  && offsetSerializeWrapper.getOffsetTable() != null ) {                  offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());                  for  (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) {             AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);             log.info("load consumer's offset, {} {} {}" ,                 this .groupName,                 mq,                 offset.get());         }     } } 
PushConsumer的关闭 push模式consumer在关闭的时候清理资源的情况如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public  synchronized  void  shutdown ()      switch  (this .serviceState) {         case  CREATE_JUST:             break ;         case  RUNNING:                          this .consumeMessageService.shutdown();                          this .persistConsumerOffset();                          this .mQClientFactory.unregisterConsumer(this .defaultMQPushConsumer.getConsumerGroup());                          this .mQClientFactory.shutdown();             log.info("the consumer [{}] shutdown OK" , this .defaultMQPushConsumer.getConsumerGroup());             this .rebalanceImpl.destroy();             this .serviceState = ServiceState.SHUTDOWN_ALREADY;             break ;         case  SHUTDOWN_ALREADY:             break ;         default :             break ;     } } 
PullConsumer PullConsumer的启动 pull模式的消费者在启动的时候则只有包装group和执行核心流程,其源码如下:
1 2 3 4 public  void  start ()  throws  MQClientException     this .setConsumerGroup(NamespaceUtil.wrapNamespace(this .getNamespace(), this .consumerGroup));     this .defaultMQPullConsumerImpl.start(); } 
接下来便仔细说明下核心初始化过程,包装group部分会在公共行为部分进行剖析。
PullConsumer核心初始化 这里直接附上核心的start过程源码,其一样的走CREATE_JUST分支,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 public  synchronized  void  start ()  throws  MQClientException     switch  (this .serviceState) {         case  CREATE_JUST:             this .serviceState = ServiceState.START_FAILED;                          this .checkConfig();                          this .copySubscription();                      if  (this .defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) {                                  this .defaultMQPullConsumer.changeInstanceNameToPID();             }                          this .mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this .defaultMQPullConsumer, this .rpcHook);                          this .rebalanceImpl.setConsumerGroup(this .defaultMQPullConsumer.getConsumerGroup());             this .rebalanceImpl.setMessageModel(this .defaultMQPullConsumer.getMessageModel());             this .rebalanceImpl.setAllocateMessageQueueStrategy(this .defaultMQPullConsumer.getAllocateMessageQueueStrategy());             this .rebalanceImpl.setmQClientFactory(this .mQClientFactory);                          this .pullAPIWrapper = new  PullAPIWrapper(                 mQClientFactory,                 this .defaultMQPullConsumer.getConsumerGroup(), isUnitMode());             this .pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);                          if  (this .defaultMQPullConsumer.getOffsetStore() != null ) {                 this .offsetStore = this .defaultMQPullConsumer.getOffsetStore();             } else  {                 switch  (this .defaultMQPullConsumer.getMessageModel()) {                     case  BROADCASTING:                         this .offsetStore = new  LocalFileOffsetStore(this .mQClientFactory, this .defaultMQPullConsumer.getConsumerGroup());                         break ;                     case  CLUSTERING:                         this .offsetStore = new  RemoteBrokerOffsetStore(this .mQClientFactory, this .defaultMQPullConsumer.getConsumerGroup());                         break ;                     default :                         break ;                 }                 this .defaultMQPullConsumer.setOffsetStore(this .offsetStore);             }                          this .offsetStore.load();                          boolean  registerOK = mQClientFactory.registerConsumer(this .defaultMQPullConsumer.getConsumerGroup(), this );             if  (!registerOK) {                 this .serviceState = ServiceState.CREATE_JUST;                 throw  new  MQClientException("The consumer group["  + this .defaultMQPullConsumer.getConsumerGroup()                     + "] has been created before, specify another name please."  + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),                     null );             }                          mQClientFactory.start();             log.info("the consumer [{}] start OK" , this .defaultMQPullConsumer.getConsumerGroup());             this .serviceState = ServiceState.RUNNING;             break ;         case  RUNNING:         case  START_FAILED:         case  SHUTDOWN_ALREADY:             throw  new  MQClientException("The PullConsumer service state not OK, maybe started once, "                  + this .serviceState                 + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),                 null );         default :             break ;     } } 
走览玩这段源码,可以看见,pull模式和push模式的消费者核心启动流程中,唯一的区别就是push模式下会有注册消息消费的监听器功能和最后检测连接操作,从这便可以看出,其实push模式是在pull模式上做了enhance。
PullConsumer的关闭 pull模式consumer在关闭的时候清理资源的情况如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public  synchronized  void  shutdown ()      switch  (this .serviceState) {         case  CREATE_JUST:             break ;         case  RUNNING:                          this .persistConsumerOffset();                          this .mQClientFactory.unregisterConsumer(this .defaultMQPullConsumer.getConsumerGroup());                          this .mQClientFactory.shutdown();             log.info("the consumer [{}] shutdown OK" , this .defaultMQPullConsumer.getConsumerGroup());             this .serviceState = ServiceState.SHUTDOWN_ALREADY;             break ;         case  SHUTDOWN_ALREADY:             break ;         default :             break ;     } } 
公共行为 包装设置 group 对于group的设置,主要是在给予的group参数基础上加了一层名字空间域namespace,而在包装的过程中首先是要获取namespace的值,源码如下:
1 2 3 4 5 6 7 8 9 10 11 public  String getNamespace ()      if  (StringUtils.isNotEmpty(namespace)) {         return  namespace;     }     if  (StringUtils.isNotEmpty(this .namesrvAddr)) {         if  (NameServerAddressUtils.validateInstanceEndpoint(namesrvAddr)) {             return  NameServerAddressUtils.parseInstanceIdFromEndpoint(namesrvAddr);         }     }     return  namespace; } 
这段源码来自ClientConfig,可以看见其是先尝试获取client中配置的值,如果值不存在的话便尝试从实例的Endpoint中获取,从端点中获取是通过匹正则来正则来判断的,如果匹配成功,则从InstanceEndpoint中截取namespace得值,正则表达式如下所示:
只要没有做配置,那便获取不到值,因为不存在默认值。在执行完毕获取namespace值的操作后,便是真正的来包装group的值了。源码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public  static  String wrapNamespace (String namespace, String resourceWithOutNamespace)           if  (StringUtils.isEmpty(namespace) || StringUtils.isEmpty(resourceWithOutNamespace)) {         return  resourceWithOutNamespace;     }          if  (isSystemResource(resourceWithOutNamespace) || isAlreadyWithNamespace(resourceWithOutNamespace, namespace)) {         return  resourceWithOutNamespace;     }     String resourceWithoutRetryAndDLQ = withOutRetryAndDLQ(resourceWithOutNamespace);     StringBuffer strBuffer = new  StringBuffer();          if  (isRetryTopic(resourceWithOutNamespace)) {         strBuffer.append(MixAll.RETRY_GROUP_TOPIC_PREFIX);     }          if  (isDLQTopic(resourceWithOutNamespace)) {         strBuffer.append(MixAll.DLQ_GROUP_TOPIC_PREFIX);     }     return  strBuffer.append(namespace).append(NAMESPACE_SEPARATOR).append(resourceWithoutRetryAndDLQ).toString(); } 
消息轨迹功能开启 消息轨迹功能是异步的,其的核心也是一个producer,具体初始化话过程如下:
1 2 3 4 5 6 7 8 9 10 11 12 public  void  start (String nameSrvAddr, AccessChannel accessChannel)  throws  MQClientException     if  (isStarted.compareAndSet(false , true )) {         traceProducer.setNamesrvAddr(nameSrvAddr);         traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_"  + nameSrvAddr);         traceProducer.start();     }     this .accessChannel = accessChannel;     this .worker = new  Thread(new  AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-"  + dispatcherId);     this .worker.setDaemon(true );     this .worker.start();     this .registerShutDownHook(); } 
MQClient启动这里直接来看MQClient的启动,如果是第一次启动则走CREATE_JUST分支,核心源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public  void  start ()  throws  MQClientException     synchronized  (this ) {         switch  (this .serviceState) {             case  CREATE_JUST:                 this .serviceState = ServiceState.START_FAILED;                                  if  (null  == this .clientConfig.getNamesrvAddr()) {                     this .mQClientAPIImpl.fetchNameServerAddr();                 }                                  this .mQClientAPIImpl.start();                                  this .startScheduledTask();                                  this .pullMessageService.start();                                  this .rebalanceService.start();                                  this .defaultMQProducer.getDefaultMQProducerImpl().start(false );                 log.info("the client factory [{}] start OK" , this .clientId);                 this .serviceState = ServiceState.RUNNING;                 break ;             case  RUNNING:                 break ;             case  SHUTDOWN_ALREADY:                 break ;             case  START_FAILED:                 throw  new  MQClientException("The Factory object["  + this .getClientId() + "] has been created before, and failed." , null );             default :                 break ;         }     } } 
可以看见,其主要是做了如下6件事:
再次检查namesrv地址 
打开client的交互信道 
启动pull-message服务 
启动push-message服务 
启动均衡负载服务 
启动一系列定时任务 
 
其中最核心的便是启动的这些定时任务,具体定时任务如下:
定时刷新namesrv地址;本地的topic路由; 
定时处理broker(触发心跳,清理下线的); 
定时持久化消费者消费位置; 
定时处理消费者的线程池; 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 private  void  startScheduledTask ()       if  (null  == this .clientConfig.getNamesrvAddr()) {          this .scheduledExecutorService.scheduleAtFixedRate(new  Runnable() {              @Override               public  void  run ()                    try  {                      MQClientInstance.this .mQClientAPIImpl.fetchNameServerAddr();                  } catch  (Exception e) {                      log.error("ScheduledTask fetchNameServerAddr exception" , e);                  }              }          }, 1000  * 10 , 1000  * 60  * 2 , TimeUnit.MILLISECONDS);      }      this .scheduledExecutorService.scheduleAtFixedRate(new  Runnable() {          @Override           public  void  run ()                try  {                  MQClientInstance.this .updateTopicRouteInfoFromNameServer();              } catch  (Exception e) {                  log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception" , e);              }          }      }, 10 , this .clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);      this .scheduledExecutorService.scheduleAtFixedRate(new  Runnable() {          @Override           public  void  run ()                try  {                  MQClientInstance.this .cleanOfflineBroker();                  MQClientInstance.this .sendHeartbeatToAllBrokerWithLock();              } catch  (Exception e) {                  log.error("ScheduledTask sendHeartbeatToAllBroker exception" , e);              }          }      }, 1000 , this .clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);      this .scheduledExecutorService.scheduleAtFixedRate(new  Runnable() {          @Override           public  void  run ()                try  {                  MQClientInstance.this .persistAllConsumerOffset();              } catch  (Exception e) {                  log.error("ScheduledTask persistAllConsumerOffset exception" , e);              }          }      }, 1000  * 10 , this .clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);      this .scheduledExecutorService.scheduleAtFixedRate(new  Runnable() {          @Override           public  void  run ()                try  {                  MQClientInstance.this .adjustThreadPool();              } catch  (Exception e) {                  log.error("ScheduledTask adjustThreadPool exception" , e);              }          }      }, 1 , 1 , TimeUnit.MINUTES);  } 
broker心跳触发在触发心跳的操作中,具体分为两个部分,如下所示:
1 2 3 4 this .sendHeartbeatToAllBroker();this .uploadFilterClassSource();
上传消息过滤类资源是为了push模式下的消费者进行消息过滤用的,这里不做探究,直接来看启动触发心跳的核心代码,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 private  void  sendHeartbeatToAllBroker ()       final  HeartbeatData heartbeatData = this .prepareHeartbeatData();      final  boolean  producerEmpty = heartbeatData.getProducerDataSet().isEmpty();      final  boolean  consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();      if  (producerEmpty && consumerEmpty) {          log.warn("sending heartbeat, but no consumer and no producer" );          return ;      }      if  (!this .brokerAddrTable.isEmpty()) {          long  times = this .sendHeartbeatTimesTotal.getAndIncrement();          Iterator<Entry<String, HashMap<Long, String>>> it = this .brokerAddrTable.entrySet().iterator();          while  (it.hasNext()) {              Entry<String, HashMap<Long, String>> entry = it.next();              String brokerName = entry.getKey();              HashMap<Long, String> oneTable = entry.getValue();              if  (oneTable != null ) {                  for  (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {                      Long id = entry1.getKey();                      String addr = entry1.getValue();                      if  (addr != null ) {                          if  (consumerEmpty) {                              if  (id != MixAll.MASTER_ID)                                  continue ;                          }                          try  {                                                            int  version = this .mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000 );                              if  (!this .brokerVersionTable.containsKey(brokerName)) {                                  this .brokerVersionTable.put(brokerName, new  HashMap<String, Integer>(4 ));                              }                              this .brokerVersionTable.get(brokerName).put(addr, version);                              if  (times % 20  == 0 ) {                                  log.info("send heart beat to broker[{} {} {}] success" , brokerName, id, addr);                                  log.info(heartbeatData.toString());                              }                          } catch  (Exception e) {                              if  (this .isBrokerInNameServer(addr)) {                                  log.info("send heart beat to broker[{} {} {}] failed" , brokerName, id, addr, e);                              } else  {                                  log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it" , brokerName,                                      id, addr, e);                              }                          }                      }                  }              }          }      }  }