0%

springboot中rocketmq的集成与使用

消息队列rocketmq是Apache旗下的开源项目(原是Alibaba开源的项目),当springboot盛行后,Apache团队开源了rocketmq-spring来帮助我们在springboot中快速集成rocketmq,只需引入rocketmq-spring-boot-starter即可。

rocketmq-spring不仅实现了自动配置功能,个人认为最主要还是封装了spring-message风格的rocketmq操作,使得我们在spring中,能够像使用rabbitmq一样方便快捷的使用rocketmq。目前除了pull模式的消费还未支持,其他均已支持,官方给出的情况如下所示:

rocektmq-spring-feature

下面便来看一下springboot中使用rocketmq-spring如何快速集成和使用rocketmq?

依赖引入

在pom文件中引入如下依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>

自动配置

在application.yml文件中对rocketmq进行配置,如下所示:

1
2
3
4
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: rocketmq-group

案例使用

这里给出一个简单的案例,来说明如何以spring-message风格的形式使用,首先定义一个消息实体如下:

1
2
3
4
5
@Data
public class Message<T> implements Serializable {
private String id;
private T content;
}

生产者

在消息的生产者方面,实现CommandLineRunner接口方便启动运行,然后注入rocketmq模板RocketMQTemplate,使用该模板调用相应的方法向指定的topic发送消息,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Component
public class RocketmqProducer implements CommandLineRunner {

private RocketMQTemplate rocketMQTemplate;

@Autowired
public RocketmqProducer(RocketMQTemplate rocketMQTemplate) {
this.rocketMQTemplate = rocketMQTemplate;
}

@Override
public void run(String... args) throws Exception {
Message<String> message = new Message<>();
message.setId(UUID.randomUUID().toString());
message.setContent("Hello, springboot-ac-rocketmq !");
rocketMQTemplate.convertAndSend("topic-queue-one", message);
rocketMQTemplate.convertAndSend("topic-queue-two", "Hello, springboot-ac-rocketmq !");
}
}

消费者

在消息的消费者方面,通过注解@RocketMQMessageListener的方式很方便快捷的消费消息,只需指定相应的topic和group即可(该注解其他参数自行查看源码),如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Slf4j
@Service
public class RocketmqConsumer {

@Service
@RocketMQMessageListener(topic = "topic-queue-one", consumerGroup = "consumer_topic-queue-one")
public class ConsumerOne implements RocketMQListener<Message> {
@Override
public void onMessage(Message message) {
log.info("consumer-one received message: {}", message);
}
}

@Service
@RocketMQMessageListener(topic = "topic-queue-two", consumerGroup = "consumer_topic-queue-two")
public class ConsumerTwo implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("consumer-two received message: {}", message);
}
}
}

案例源码

案例源码地址: https://github.com/lazycece/springboot-actual-combat/tree/master/springboot-ac-rocketmq