0%

rocketmq发送消息的三种方式

从功能上来说,rocketmq支持三种发送消息的方式,分别是同步发送(sync),异步发送(async)和直接发送(oneway)。下面来简单说明一下这三种发送消息的方式,以便了解它们之间的差异。

以下的案例代码将会使用spring-message风格进行展示,即使用rocketMQTemplate方式,详见rocketmq-spring

同步发送 sync

发送消息采用同步模式,这种方式只有在消息完全发送完成之后才返回结果,此方式存在需要同步等待发送结果的时间代价。

这种方式具有内部重试机制,即在主动声明本次消息发送失败之前,内部实现将重试一定次数,默认为2次(DefaultMQProducer#getRetryTimesWhenSendFailed)。 发送的结果存在同一个消息可能被多次发送给给broker,这里需要应用的开发者自己在消费端处理幂等性问题。

案例代码如下:

1
2
3
public void sync() {
rocketMQTemplate.syncSend("topic-name", "send sync message !");
}

异步发送 async

发送消息采用异步发送模式,消息发送后立刻返回,当消息完全完成发送后,会调用回调函数sendCallback来告知发送者本次发送是成功或者失败。异步模式通常用于响应时间敏感业务场景,即承受不了同步发送消息时等待返回的耗时代价。

同同步发送一样,异步模式也在内部实现了重试机制,默认次数为2次(DefaultMQProducer#getRetryTimesWhenSendAsyncFailed})。发送的结果同样存在同一个消息可能被多次发送给给broker,需要应用的开发者自己在消费端处理幂等性问题。

案例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void async() {
rocketMQTemplate.asyncSend("topic-name", "send async message!", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("send successful");
}

@Override
public void onException(Throwable throwable) {
log.info("send fail; {}", throwable.getMessage());
}
});
}

直接发送 one-way

采用one-way发送模式发送消息的时候,发送端发送完消息后会立即返回,不会等待来自broker的ack来告知本次消息发送是否完全完成发送。这种方式吞吐量很大,但是存在消息丢失的风险,所以其适用于不重要的消息发送,比如日志收集。

案例代码如下:

1
2
3
public void oneWay() {
rocketMQTemplate.sendOneWay("topic-name", "send one-way message");
}

总结

在实际使用场景中,利用何种发送方式,可以总结如下:

  • 当发送的消息不重要时,采用one-way方式,以提高吞吐量;
  • 当发送的消息很重要是,且对响应时间不敏感的时候采用sync方式;
  • 当发送的消息很重要,且对响应时间非常敏感的时候采用async方式;