0%

rocketmq实战之消息的有序发送与有序消费

rocketmq支持有序的发送消息,有序的消费消息,这里便来看一下如何实现?

实战方面均以RocketMQTemplate形式展现,集成方案详见《springboot中rocketmq的集成与使用》

消息的有序发送方面,我们可以直接使用syncSendOrderly(同步有序发送)和asyncSendOrderly(异步有序发送)两种类型的方法进行发送消息,他们的区别就如命名一样是同步和异步的区别。

下面给出同步有序发送的样例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class OrderProducer {

private static final Integer NUM = 10;

@Resource
private RocketMQTemplate rocketMQTemplate;

public void sendSyncOrderly() {
String message = "orderly message: ";
for (int i = 0; i < NUM; i++) {
// 模拟有序发送消息
rocketMQTemplate.syncSendOrderly("topic-orderly", message + i, "select_queue_key");
}
}
}

有序消息的发送比普通消息的发送多了一类参数“select_queue_key”,在rocketmq-spring中叫做hashKey,该参数的左右即是在发送消息的时候,固定发送到一个队列(默认情况下rocketmq中的topic有4个队列)以保证顺序。

有序消费的样例代码如下:

1
2
3
4
5
6
7
8
9
10
@Slf4j
@RocketMQMessageListener(topic = "topic-orderly",
consumerGroup = "orderly-consumer-group",consumeMode = ConsumeMode.ORDERLY)
public class OrderConsumer implements RocketMQListener<String> {

@Override
public void onMessage(String message) {
log.info("========{}=======", message);
}
}

这里主要是设置消费模式consumeMode = ConsumeMode.ORDERLY,默认情况下是并发消费模式(ConsumeMode.CONCURRENTLY)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2019-10-22 21:59:01.654  INFO 1400 --- [           main] c.l.s.r.SpringbootAcRocketmqApplication  : Started SpringbootAcRocketmqApplication in 2.611 seconds (JVM running for 3.824)
2019-10-22 21:59:04.474 INFO 1400 --- [MessageThread_1] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 0=======
2019-10-22 21:59:04.475 INFO 1400 --- [MessageThread_1] a.r.s.s.DefaultRocketMQListenerContainer : consume C0A801040578512DDF1770DD27200000 cost: 0 ms
2019-10-22 21:59:04.475 INFO 1400 --- [MessageThread_1] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 1=======
2019-10-22 21:59:04.475 INFO 1400 --- [MessageThread_1] a.r.s.s.DefaultRocketMQListenerContainer : consume C0A801040578512DDF1770DD27280002 cost: 0 ms
2019-10-22 21:59:04.476 INFO 1400 --- [MessageThread_1] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 2=======
2019-10-22 21:59:04.476 INFO 1400 --- [MessageThread_1] a.r.s.s.DefaultRocketMQListenerContainer : consume C0A801040578512DDF1770DD272A0004 cost: 0 ms
2019-10-22 21:59:04.476 INFO 1400 --- [MessageThread_1] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 3=======
2019-10-22 21:59:04.476 INFO 1400 --- [MessageThread_1] a.r.s.s.DefaultRocketMQListenerContainer : consume C0A801040578512DDF1770DD272C0006 cost: 0 ms
2019-10-22 21:59:04.476 INFO 1400 --- [MessageThread_1] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 4=======
2019-10-22 21:59:04.476 INFO 1400 --- [MessageThread_1] a.r.s.s.DefaultRocketMQListenerContainer : consume C0A801040578512DDF1770DD272D0008 cost: 0 ms
2019-10-22 21:59:04.476 INFO 1400 --- [MessageThread_1] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 5=======
2019-10-22 21:59:04.476 INFO 1400 --- [MessageThread_1] a.r.s.s.DefaultRocketMQListenerContainer : consume C0A801040578512DDF1770DD272F000A cost: 0 ms
2019-10-22 21:59:04.477 INFO 1400 --- [MessageThread_1] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 6=======
2019-10-22 21:59:04.477 INFO 1400 --- [MessageThread_1] a.r.s.s.DefaultRocketMQListenerContainer : consume C0A801040578512DDF1770DD2731000C cost: 0 ms
2019-10-22 21:59:04.477 INFO 1400 --- [MessageThread_1] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 7=======
2019-10-22 21:59:04.477 INFO 1400 --- [MessageThread_1] a.r.s.s.DefaultRocketMQListenerContainer : consume C0A801040578512DDF1770DD2733000E cost: 0 ms
2019-10-22 21:59:04.477 INFO 1400 --- [MessageThread_1] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 8=======
2019-10-22 21:59:04.477 INFO 1400 --- [MessageThread_1] a.r.s.s.DefaultRocketMQListenerContainer : consume C0A801040578512DDF1770DD27350010 cost: 0 ms
2019-10-22 21:59:04.478 INFO 1400 --- [MessageThread_1] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 9=======
2019-10-22 21:59:04.478 INFO 1400 --- [MessageThread_1] a.r.s.s.DefaultRocketMQListenerContainer : consume C0A801040578512DDF1770DD27360012 cost: 0 ms

如果消费者不设置为有序消费的话,则消费的消息则是乱序的:

1
2
3
4
5
6
7
8
9
10
  2019-10-22 22:00:59.922  INFO 2198 --- [MessageThread_2] c.l.sbac.rocketmq.order.OrderConsumer    : ========orderly message: 1=======
2019-10-22 22:00:59.926 INFO 2198 --- [MessageThread_8] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 7=======
2019-10-22 22:00:59.922 INFO 2198 --- [MessageThread_6] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 5=======
2019-10-22 22:00:59.922 INFO 2198 --- [MessageThread_3] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 2=======
2019-10-22 22:00:59.922 INFO 2198 --- [MessageThread_1] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 0=======
2019-10-22 22:00:59.925 INFO 2198 --- [MessageThread_4] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 3=======
2019-10-22 22:00:59.926 INFO 2198 --- [MessageThread_7] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 6=======
2019-10-22 22:00:59.927 INFO 2198 --- [MessageThread_9] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 8=======
2019-10-22 22:00:59.925 INFO 2198 --- [MessageThread_5] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 4=======
2019-10-22 22:00:59.927 INFO 2198 --- [essageThread_10] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 9=======