rocketmq支持有序的发送消息,有序的消费消息,这里便来看一下如何实现?
实战方面均以RocketMQTemplate
形式展现,集成方案详见《springboot中rocketmq的集成与使用》
消息的有序发送方面,我们可以直接使用syncSendOrderly
(同步有序发送)和asyncSendOrderly
(异步有序发送)两种类型的方法进行发送消息,他们的区别就如命名一样是同步和异步的区别。
下面给出同步有序发送的样例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public class OrderProducer {
private static final Integer NUM = 10;
@Resource private RocketMQTemplate rocketMQTemplate;
public void sendSyncOrderly() { String message = "orderly message: "; for (int i = 0; i < NUM; i++) { rocketMQTemplate.syncSendOrderly("topic-orderly", message + i, "select_queue_key"); } } }
|
有序消息的发送比普通消息的发送多了一类参数“select_queue_key”,在rocketmq-spring
中叫做hashKey
,该参数的左右即是在发送消息的时候,固定发送到一个队列(默认情况下rocketmq中的topic有4个队列)以保证顺序。
有序消费的样例代码如下:
1 2 3 4 5 6 7 8 9 10
| @Slf4j @RocketMQMessageListener(topic = "topic-orderly", consumerGroup = "orderly-consumer-group",consumeMode = ConsumeMode.ORDERLY) public class OrderConsumer implements RocketMQListener<String> {
@Override public void onMessage(String message) { log.info("========{}=======", message); } }
|
这里主要是设置消费模式consumeMode = ConsumeMode.ORDERLY
,默认情况下是并发消费模式(ConsumeMode.CONCURRENTLY
)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| 2019-10-22 21:59:01.654 INFO 1400 --- [ main] c.l.s.r.SpringbootAcRocketmqApplication : Started SpringbootAcRocketmqApplication in 2.611 seconds (JVM running for 3.824) 2019-10-22 21:59:04.474 INFO 1400 --- [MessageThread_1] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 0======= 2019-10-22 21:59:04.475 INFO 1400 --- [MessageThread_1] a.r.s.s.DefaultRocketMQListenerContainer : consume C0A801040578512DDF1770DD27200000 cost: 0 ms 2019-10-22 21:59:04.475 INFO 1400 --- [MessageThread_1] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 1======= 2019-10-22 21:59:04.475 INFO 1400 --- [MessageThread_1] a.r.s.s.DefaultRocketMQListenerContainer : consume C0A801040578512DDF1770DD27280002 cost: 0 ms 2019-10-22 21:59:04.476 INFO 1400 --- [MessageThread_1] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 2======= 2019-10-22 21:59:04.476 INFO 1400 --- [MessageThread_1] a.r.s.s.DefaultRocketMQListenerContainer : consume C0A801040578512DDF1770DD272A0004 cost: 0 ms 2019-10-22 21:59:04.476 INFO 1400 --- [MessageThread_1] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 3======= 2019-10-22 21:59:04.476 INFO 1400 --- [MessageThread_1] a.r.s.s.DefaultRocketMQListenerContainer : consume C0A801040578512DDF1770DD272C0006 cost: 0 ms 2019-10-22 21:59:04.476 INFO 1400 --- [MessageThread_1] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 4======= 2019-10-22 21:59:04.476 INFO 1400 --- [MessageThread_1] a.r.s.s.DefaultRocketMQListenerContainer : consume C0A801040578512DDF1770DD272D0008 cost: 0 ms 2019-10-22 21:59:04.476 INFO 1400 --- [MessageThread_1] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 5======= 2019-10-22 21:59:04.476 INFO 1400 --- [MessageThread_1] a.r.s.s.DefaultRocketMQListenerContainer : consume C0A801040578512DDF1770DD272F000A cost: 0 ms 2019-10-22 21:59:04.477 INFO 1400 --- [MessageThread_1] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 6======= 2019-10-22 21:59:04.477 INFO 1400 --- [MessageThread_1] a.r.s.s.DefaultRocketMQListenerContainer : consume C0A801040578512DDF1770DD2731000C cost: 0 ms 2019-10-22 21:59:04.477 INFO 1400 --- [MessageThread_1] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 7======= 2019-10-22 21:59:04.477 INFO 1400 --- [MessageThread_1] a.r.s.s.DefaultRocketMQListenerContainer : consume C0A801040578512DDF1770DD2733000E cost: 0 ms 2019-10-22 21:59:04.477 INFO 1400 --- [MessageThread_1] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 8======= 2019-10-22 21:59:04.477 INFO 1400 --- [MessageThread_1] a.r.s.s.DefaultRocketMQListenerContainer : consume C0A801040578512DDF1770DD27350010 cost: 0 ms 2019-10-22 21:59:04.478 INFO 1400 --- [MessageThread_1] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 9======= 2019-10-22 21:59:04.478 INFO 1400 --- [MessageThread_1] a.r.s.s.DefaultRocketMQListenerContainer : consume C0A801040578512DDF1770DD27360012 cost: 0 ms
|
如果消费者不设置为有序消费的话,则消费的消息则是乱序的:
1 2 3 4 5 6 7 8 9 10
| 2019-10-22 22:00:59.922 INFO 2198 --- [MessageThread_2] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 1======= 2019-10-22 22:00:59.926 INFO 2198 --- [MessageThread_8] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 7======= 2019-10-22 22:00:59.922 INFO 2198 --- [MessageThread_6] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 5======= 2019-10-22 22:00:59.922 INFO 2198 --- [MessageThread_3] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 2======= 2019-10-22 22:00:59.922 INFO 2198 --- [MessageThread_1] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 0======= 2019-10-22 22:00:59.925 INFO 2198 --- [MessageThread_4] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 3======= 2019-10-22 22:00:59.926 INFO 2198 --- [MessageThread_7] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 6======= 2019-10-22 22:00:59.927 INFO 2198 --- [MessageThread_9] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 8======= 2019-10-22 22:00:59.925 INFO 2198 --- [MessageThread_5] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 4======= 2019-10-22 22:00:59.927 INFO 2198 --- [essageThread_10] c.l.sbac.rocketmq.order.OrderConsumer : ========orderly message: 9=======
|