rocketmq支持发送事务消息,即发送消息的事务性,这里便来看一下如何实现?
实战方面均以RocketMQTemplate
形式展现,集成方案详见《springboot中rocketmq的集成与使用》
首先使用@RocketMQTransactionListener
定义一个监听器来模拟执行本地事务和事务会查:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Slf4j @RocketMQTransactionListener(txProducerGroup = "tx-group") public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
@Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
log.info("============== executeLocalTransaction");
return RocketMQLocalTransactionState.UNKNOWN; }
@Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
log.info("============== checkLocalTransaction");
return RocketMQLocalTransactionState.COMMIT; } }
|
然后再定义一个消费者,设定topic
为“topic-tx”:
1 2 3 4 5 6 7 8
| @RocketMQMessageListener(topic = "topic-tx", consumerGroup = "tx-consumer-group") public class TransactionConsumer implements RocketMQListener<Message> {
@Override public void onMessage(Message message) { log.info("topic-tx received message: {}", message); } }
|
最后定义一个消息生产者,向“topic-tx”主题发送消息,并且需要指定gorup
为“tx-group”(同事务监听器所设置的txProducerGroup
一致):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Slf4j public class TransactionProducer {
@Resource private RocketMQTemplate rocketMQTemplate;
public void produce() { Message<String> message = new Message<>(); message.setId(UUID.randomUUID().toString()); message.setContent("transaction message"); log.info("========sending message========="); rocketMQTemplate.sendMessageInTransaction("tx-group", "topic-tx", MessageBuilder.withPayload(message).build(), null); log.info("========finish send ========="); } }
|
跑起项目后,在控制台可看到如下输出:
1 2 3 4
| ========sending message========= ============== executeLocalTransaction ========finish send ========= ============== checkLocalTransaction
|