行为的触发时机 消费者消费偏移位置的持久化是消费客户端的行为,是在client启动的时候设定的一个定时任务,如下所示:
1 2 3 4 5 6 7 8 9 10 11 this .scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run () { try { MQClientInstance.this .persistAllConsumerOffset(); } catch (Exception e) { log.error("ScheduledTask persistAllConsumerOffset exception" , e); } } }, 1000 * 10 , this .clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
该定时任务是延迟10s触发,且后续每个特定时间触发一次,如果没有特殊设定,那么默认时间间隔为5s,如下所示:
1 2 3 4 private int persistConsumerOffsetInterval = 1000 * 5 ;
持久化行为 在持久化的过程中,先是从当前客户端的消费者注册表中获取所有的消费者,然后再迭代的方式一个个的执行具体持久化操作,源码如下所示:
1 2 3 4 5 6 7 8 private void persistAllConsumerOffset () { Iterator<Entry<String, MQConsumerInner>> it = this .consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, MQConsumerInner> entry = it.next(); MQConsumerInner impl = entry.getValue(); impl.persistConsumerOffset(); } }
当进行具体的持久化操作的时候,不管是push
模式的消费者,还是pull
模式的消费者,都执行同样的逻辑,即会先判断当前的消费者是否时可用状态,即RUNNING
状态,如果不是则不会进行后续持久化操作;反之则获取队列进行后续持久化操作。push
模式的相关源码如下(pull
模式源码类似):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public void persistConsumerOffset () { try { this .makeSureStateOK(); Set<MessageQueue> mqs = new HashSet<MessageQueue>(); Set<MessageQueue> allocateMq = this .rebalanceImpl.getProcessQueueTable().keySet(); mqs.addAll(allocateMq); this .offsetStore.persistAll(mqs); } catch (Exception e) { log.error("group: " + this .defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception" , e); } } private void makeSureStateOK () throws MQClientException { if (this .serviceState != ServiceState.RUNNING) { throw new MQClientException("The consumer service state not OK, " + this .serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null ); } }
后续的持久化操作是执行persistAll
方法,该方法的执行会出现两种情况,一种是本地持久化存储,另一种是 远程持久化存储。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public interface OffsetStore { void persistAll (final Set<MessageQueue> mqs) ; }
而在持久化模式的选择上,BROADCASTING
会被设置为本地持久化方式,CLUSTERING
会被设置为远程持久化方式,后续便看看具体的这两种持久化方式。
本地持久化 本地模式持久化会先根据当前队列的消费偏移情况 包装偏移序列化,然后直接写文件,=。源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public void persistAll (Set<MessageQueue> mqs) { if (null == mqs || mqs.isEmpty()) return ; OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper(); for (Map.Entry<MessageQueue, AtomicLong> entry : this .offsetTable.entrySet()) { if (mqs.contains(entry.getKey())) { AtomicLong offset = entry.getValue(); offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset); } } String jsonString = offsetSerializeWrapper.toJson(true ); if (jsonString != null ) { try { MixAll.string2File(jsonString, this .storePath); } catch (IOException e) { log.error("persistAll consumer offset Exception, " + this .storePath, e); } } }
远程持久化 远程持久化,即是将队列的消费偏移情况持久化到对应的broker
上,但其多了一步操作,即将临时消费偏移表中的未使用的队列移除掉。源码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public void persistAll (Set<MessageQueue> mqs) { if (null == mqs || mqs.isEmpty()) return ; final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>(); if (!mqs.isEmpty()) { for (Map.Entry<MessageQueue, AtomicLong> entry : this .offsetTable.entrySet()) { MessageQueue mq = entry.getKey(); AtomicLong offset = entry.getValue(); if (offset != null ) { if (mqs.contains(mq)) { try { this .updateConsumeOffsetToBroker(mq, offset.get()); log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}" , this .groupName, this .mQClientFactory.getClientId(), mq, offset.get()); } catch (Exception e) { log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e); } } else { unusedMQ.add(mq); } } } } if (!unusedMQ.isEmpty()) { for (MessageQueue mq : unusedMQ) { this .offsetTable.remove(mq); log.info("remove unused mq, {}, {}" , mq, this .groupName); } } }
在向broker
发送持久化信息的时候采用的直接发送模式,如果主节点宕机则会更新从节点的信息。
1 2 3 4 5 6 7 8 private void updateConsumeOffsetToBroker (MessageQueue mq, long offset) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { updateConsumeOffsetToBroker(mq, offset, true ); }
在具体的更新到broker
这个过程的代码是一块待优化的代码,所以也不需要再深究了,因为我们已经知道了其要做的事情。