0%

rocketmq源码剖析之消费者消费偏移持久化

行为的触发时机

消费者消费偏移位置的持久化是消费客户端的行为,是在client启动的时候设定的一个定时任务,如下所示:

1
2
3
4
5
6
7
8
9
10
11
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

该定时任务是延迟10s触发,且后续每个特定时间触发一次,如果没有特殊设定,那么默认时间间隔为5s,如下所示:

1
2
3
4
/**
* Offset persistent interval for consumer
*/
private int persistConsumerOffsetInterval = 1000 * 5;

持久化行为

在持久化的过程中,先是从当前客户端的消费者注册表中获取所有的消费者,然后再迭代的方式一个个的执行具体持久化操作,源码如下所示:

1
2
3
4
5
6
7
8
private void persistAllConsumerOffset() {
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
impl.persistConsumerOffset();
}
}

当进行具体的持久化操作的时候,不管是push模式的消费者,还是pull模式的消费者,都执行同样的逻辑,即会先判断当前的消费者是否时可用状态,即RUNNING状态,如果不是则不会进行后续持久化操作;反之则获取队列进行后续持久化操作。push模式的相关源码如下(pull模式源码类似):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void persistConsumerOffset() {
try {
// 状态判断
// pull模式的判断是 isRunning()方法
this.makeSureStateOK();
Set<MessageQueue> mqs = new HashSet<MessageQueue>();
Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
mqs.addAll(allocateMq);

this.offsetStore.persistAll(mqs);
} catch (Exception e) {
log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);
}
}

// 具体的判断逻辑
private void makeSureStateOK() throws MQClientException {
if (this.serviceState != ServiceState.RUNNING) {
throw new MQClientException("The consumer service state not OK, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
}
}

后续的持久化操作是执行persistAll方法,该方法的执行会出现两种情况,一种是本地持久化存储,另一种是 远程持久化存储。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Offset store interface
*/
public interface OffsetStore {

/*其他方法*/

/**
* Persist all offsets,may be in local storage or remote name server
*/
void persistAll(final Set<MessageQueue> mqs);

/*其他方法*/
}

而在持久化模式的选择上,BROADCASTING会被设置为本地持久化方式,CLUSTERING会被设置为远程持久化方式,后续便看看具体的这两种持久化方式。

本地持久化

本地模式持久化会先根据当前队列的消费偏移情况 包装偏移序列化,然后直接写文件,=。源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void persistAll(Set<MessageQueue> mqs) {
if (null == mqs || mqs.isEmpty())
return;

OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
if (mqs.contains(entry.getKey())) {
AtomicLong offset = entry.getValue();
offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
}
}

// 序列号json
String jsonString = offsetSerializeWrapper.toJson(true);
if (jsonString != null) {
try {
// 写文件
MixAll.string2File(jsonString, this.storePath);
} catch (IOException e) {
log.error("persistAll consumer offset Exception, " + this.storePath, e);
}
}
}

远程持久化

远程持久化,即是将队列的消费偏移情况持久化到对应的broker上,但其多了一步操作,即将临时消费偏移表中的未使用的队列移除掉。源码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public void persistAll(Set<MessageQueue> mqs) {
if (null == mqs || mqs.isEmpty())
return;

final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();
if (!mqs.isEmpty()) {
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
MessageQueue mq = entry.getKey();
AtomicLong offset = entry.getValue();
if (offset != null) {
if (mqs.contains(mq)) {
try {
this.updateConsumeOffsetToBroker(mq, offset.get());
log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
this.groupName,
this.mQClientFactory.getClientId(),
mq,
offset.get());
} catch (Exception e) {
log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
}
} else {
// 记录没有使用的队列
unusedMQ.add(mq);
}
}
}
}

if (!unusedMQ.isEmpty()) {
for (MessageQueue mq : unusedMQ) {
// 在临时偏移量表中移除未使用的队列
this.offsetTable.remove(mq);
log.info("remove unused mq, {}, {}", mq, this.groupName);
}
}
}

在向broker发送持久化信息的时候采用的直接发送模式,如果主节点宕机则会更新从节点的信息。

1
2
3
4
5
6
7
8
/**
* Update the Consumer Offset in one way, once the Master is off, updated to Slave,
* here need to be optimized.
*/
private void updateConsumeOffsetToBroker(MessageQueue mq, long offset) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException {
updateConsumeOffsetToBroker(mq, offset, true);
}

在具体的更新到broker这个过程的代码是一块待优化的代码,所以也不需要再深究了,因为我们已经知道了其要做的事情。