0%

rocketmq实战之发送事务消息

rocketmq支持发送事务消息,即发送消息的事务性,这里便来看一下如何实现?

实战方面均以RocketMQTemplate形式展现,集成方案详见《springboot中rocketmq的集成与使用》

首先使用@RocketMQTransactionListener定义一个监听器来模拟执行本地事务和事务会查:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "tx-group")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {

@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {

// 模拟本地事务不通过
log.info("============== executeLocalTransaction");

return RocketMQLocalTransactionState.UNKNOWN;
}

@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {

// 模拟回查本地事务
log.info("============== checkLocalTransaction");


return RocketMQLocalTransactionState.COMMIT;
}
}

然后再定义一个消费者,设定topic为“topic-tx”:

1
2
3
4
5
6
7
8
@RocketMQMessageListener(topic = "topic-tx", consumerGroup = "tx-consumer-group")
public class TransactionConsumer implements RocketMQListener<Message> {

@Override
public void onMessage(Message message) {
log.info("topic-tx received message: {}", message);
}
}

最后定义一个消息生产者,向“topic-tx”主题发送消息,并且需要指定gorup为“tx-group”(同事务监听器所设置的txProducerGroup一致):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Slf4j
public class TransactionProducer {

@Resource
private RocketMQTemplate rocketMQTemplate;

public void produce() {
Message<String> message = new Message<>();
message.setId(UUID.randomUUID().toString());
message.setContent("transaction message");
log.info("========sending message=========");
rocketMQTemplate.sendMessageInTransaction("tx-group", "topic-tx", MessageBuilder.withPayload(message).build(), null);
log.info("========finish send =========");
}
}

跑起项目后,在控制台可看到如下输出:

1
2
3
4
========sending message=========
============== executeLocalTransaction
========finish send =========
============== checkLocalTransaction