消息队列rocketmq是Apache旗下的开源项目(原是Alibaba开源的项目),当springboot盛行后,Apache团队开源了rocketmq-spring来帮助我们在springboot中快速集成rocketmq,只需引入rocketmq-spring-boot-starter
即可。
rocketmq-spring
不仅实现了自动配置功能,个人认为最主要还是封装了spring-message风格的rocketmq操作,使得我们在spring中,能够像使用rabbitmq一样方便快捷的使用rocketmq。目前除了pull模式的消费还未支持,其他均已支持,官方给出的情况如下所示:
下面便来看一下springboot中使用rocketmq-spring
如何快速集成和使用rocketmq?
依赖引入
在pom文件中引入如下依赖:
1 2 3 4 5
| <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.3</version> </dependency>
|
自动配置
在application.yml文件中对rocketmq进行配置,如下所示:
1 2 3 4
| rocketmq: name-server: 127.0.0.1:9876 producer: group: rocketmq-group
|
案例使用
这里给出一个简单的案例,来说明如何以spring-message风格的形式使用,首先定义一个消息实体如下:
1 2 3 4 5
| @Data public class Message<T> implements Serializable { private String id; private T content; }
|
生产者
在消息的生产者方面,实现CommandLineRunner
接口方便启动运行,然后注入rocketmq模板RocketMQTemplate
,使用该模板调用相应的方法向指定的topic发送消息,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Component public class RocketmqProducer implements CommandLineRunner {
private RocketMQTemplate rocketMQTemplate;
@Autowired public RocketmqProducer(RocketMQTemplate rocketMQTemplate) { this.rocketMQTemplate = rocketMQTemplate; }
@Override public void run(String... args) throws Exception { Message<String> message = new Message<>(); message.setId(UUID.randomUUID().toString()); message.setContent("Hello, springboot-ac-rocketmq !"); rocketMQTemplate.convertAndSend("topic-queue-one", message); rocketMQTemplate.convertAndSend("topic-queue-two", "Hello, springboot-ac-rocketmq !"); } }
|
消费者
在消息的消费者方面,通过注解@RocketMQMessageListener
的方式很方便快捷的消费消息,只需指定相应的topic和group即可(该注解其他参数自行查看源码),如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Slf4j @Service public class RocketmqConsumer {
@Service @RocketMQMessageListener(topic = "topic-queue-one", consumerGroup = "consumer_topic-queue-one") public class ConsumerOne implements RocketMQListener<Message> { @Override public void onMessage(Message message) { log.info("consumer-one received message: {}", message); } }
@Service @RocketMQMessageListener(topic = "topic-queue-two", consumerGroup = "consumer_topic-queue-two") public class ConsumerTwo implements RocketMQListener<String> { @Override public void onMessage(String message) { log.info("consumer-two received message: {}", message); } } }
|
案例源码
案例源码地址: https://github.com/lazycece/springboot-actual-combat/tree/master/springboot-ac-rocketmq