RabbitMQ是一个基于AMQP协议的轻量级,可靠,可扩展且可移植的消息代理。Spring的一个springAMQP项目将核心Spring概念应用于基于AMQP的消息传递解决方案的开发。SpringBoot则是将springAMQP包装了一层,提供了pring-boot-starter-amqp“Starter”来为通过RabbitMQ使用AMQP提供了便利。
rabbitmq的安装可参考《ubuntu下安装rabbitmq》
springboot集成rabbitmq
引入依赖
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
spring-boot-starter-amqp
在springboot中,spring-boot-starter-amqp为我们提供了与rabbtmq的交互方式,其主要引入了如下依赖:
- org.springframework.amqp:spring-rabbit
- org.springframework.amqp:spring-amqp
- com.rabbitmq:amqp-client
我们在使用的时候主要是spring-amqp和spring-rabbit里面的类及方法。
org.springframework.amqp:spring-amqp提供了amqp协议的交换器(exchange)、绑定(bind)、queue(队列)、message(消息)、template(模板)等的定义与封装,如下:
此处仅列出部分类的类名,有关类详情和更多类信息可自行查看源码
- exchange(交换器)
- AbstractExchange 交换器的抽象
- DirectExchange direct交换器的定义
- FanoutExchange fanout交换器的定义
- HeadersExchange headers交换器的定义
- TopicExchange topic交换器的定义
- CustomExchange 自定义交换器的定义
- ExchangeBuilder 交换器的builder构造器
- bind(绑定)
- Binding 绑定的定义
- BindingBuilder 绑定的builder构造器
- message(消息)
- Message 消息的定义
- MessageBuilder 消息的builder构造器
- queue(队列)
- Queue 队列的定义
- QueueBuilder 队列构造器
- template(模板)
- AmqpTemplate amqp协议消息同步发送/接收等操作模板接口
- AmqpAdmin amqp协议的绑定、交换器、队列等申明/删除等操作的接口
- AsyncAmqpTemplate amqp协议消息异步发送/接收等操作模板接口
org.springframework.amqp:spring-rabbit主要提供了注解,方便我们基于注解使用,如下:
- @EnableRabbit 用于配置rabbitmq信息时使用
- @Exchange 交换器定义
- @Queue 队列定义
- @QueueBinding 队列绑定定义
- @RabbitHandler rabbitmq默认消息处理器
- @RabbitListeners、@RabbitListeners rabbitmq消息监听定义
同时还实现了AmqpTemplatej接口的BatchingRabbitTemplate、RabbitTemplate,实现了RabbitAdmin接口的RabbitAdmin,实现了AsyncAmqpTemplate的AsyncRabbitTemplate。
springboot中rabbitmq的使用
创建一个springboot项目,引入“spring-boot-starter-amqp”依赖,在application.yml文件中添加rabbitmq信息,如下所示:
1 2 3 4 5 6 7
| spring: rabbitmq: virtual-host: sbac-rabbitmq host: 127.0.0.1 port: 5672 username: admin password: admin
|
这里没有用默认的“/”vhost,而使用了新创建的vhost “sbac-rabbitmq”,并设置账号admin对该vhost的权限信息,如下:
1 2 3
| sudo rabbitmqctl add_vhost sbac-rabbitmq sudo rabbitmqctl list_vhosts sudo rabbitmqctl set_permissions -p sbac-rabbitmq admin ".*" ".*" ".*"
|
下面将来看一下springboot项目中,rabbitmq的,direct、fanout、topic、headers等四种交换器模式的使用。在此之前,先定义一个junit测试类,用于测试我们的消息发送,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| package com.lazycece.sbac.rabbitmq.producer;
import com.lazycece.sbac.rabbitmq.entity.Message; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource; import java.util.UUID;
@SpringBootTest @RunWith(SpringRunner.class) public class RabbitMqProducer {
@Resource private RabbitTemplate rabbitTemplate; private Message<String> message;
@Before public void buildMessage() { message = new Message<>(); message.setId(UUID.randomUUID().toString()); message.setContent("Hello, springboot-ac-rabbitmq !"); }
}
|
这里在测试类中只先给出了消息的定义,而至于几种模式的测试方法,将在介绍交换器模式使用的时候详细说明。
Meesage类的定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| package com.lazycece.sbac.rabbitmq.entity;
import lombok.Data;
import java.io.Serializable;
@Data public class Message<T> implements Serializable { private String id; private T content; }
|
direct模式
direct为直连模式,通过direct交换器下发的消息是严格发送到按照指定路由键绑定的队列上。
下面是direct交换器模式的配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| package com.lazycece.sbac.rabbitmq.config;
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class DirectConfig {
@Bean public Queue directQueue() { return QueueBuilder.durable("direct.queue").build(); }
@Bean public DirectExchange directExchange() { return (DirectExchange) ExchangeBuilder.directExchange("direct.exchange").build(); }
@Bean public Binding directBinding() { return BindingBuilder.bind(directQueue()).to(directExchange()).withQueueName(); } }
|
如交换器的绑定设置所示,这里设置队列名为绑定路由键。下面给队列监听代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| package com.lazycece.sbac.rabbitmq.consumer;
import com.lazycece.sbac.rabbitmq.entity.Message; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component @Slf4j public class DirectConsumer {
@RabbitListener(queues = {"direct.queue"}) public void directQueueConsumer(Message message) { log.info("direct.queue -> {} ", message.toString()); } }
|
现在来测试一下direct交换器模式的消息发送,测试方法代码如下:
1 2 3 4
| @Test public void directProducer() { rabbitTemplate.convertAndSend("direct.exchange", "direct.queue", message); }
|
此处向命名“direct.exchange”的交换器中发送消息,并设置路由键为“direct.queue”。方法运行之后,便可以看见队列“direct.queue”监听到了消息,并打印出了如下信息:
1
| direct.queue -> Message(id=25b23bac-6c89-4350-8743-bbb274da89e4, content=Hello, springboot-ac-rabbitmq !)
|
如果发送消息的时候换一个路由键,比如“direct.queue.one”,那么就不会收到消息。
fanout模式
fanout交换器为分发交换器,或者叫广播模式更为合适,因为,其不会根据路由键去区分消息到底该下发到哪儿一个队列,绑定在该交换器上的队列都会收到下发致fanout交换器的消息。
fanout交换器的配置如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| package com.lazycece.sbac.rabbitmq.config;
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class FanoutConfig {
@Bean public Queue fanoutQueueOne() { return QueueBuilder.durable("fanout.queue.one").build(); }
@Bean public Queue fanoutQueueTwo() { return QueueBuilder.durable("fanout.queue.two").build(); }
@Bean public Queue fanoutQueueThree() { return QueueBuilder.durable("fanout.queue.three").build(); }
@Bean public FanoutExchange fanoutExchange() { return (FanoutExchange) ExchangeBuilder.fanoutExchange("fanout.exchange").build(); }
@Bean public Binding fanoutBindingOne() { return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange()); }
@Bean public Binding fanoutBindingTwo() { return BindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange()); }
@Bean public Binding fanoutBindingThree() { return BindingBuilder.bind(fanoutQueueThree()).to(fanoutExchange()); } }
|
这里为此fanout交换器取名为“fanout.exchange”,并为其绑定了三个队列。下面给出三个队列的监听:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package com.lazycece.sbac.rabbitmq.consumer;
import com.lazycece.sbac.rabbitmq.entity.Message; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component @Slf4j public class FanoutConsumer {
@RabbitListener(queues = {"fanout.queue.one"}) public void fanoutQueueOneConsumer(Message message) { log.info("fanout.queue.one -> {} ", message.toString()); }
@RabbitListener(queues = {"fanout.queue.two"}) public void fanoutQueueTwoConsumer(Message message) { log.info("fanout.queue.two-> {} ", message.toString()); }
@RabbitListener(queues = {"fanout.queue.three"}) public void fanoutQueueThreeConsumer(Message message) { log.info("fanout.queue.three -> {} ", message.toString()); } }
|
现在来测试一下fanout交换器模式的消息发送,测试方法代码如下:
1 2 3 4
| @Test public void fanoutProducer() { rabbitTemplate.convertAndSend("fanout.exchange", "", message); }
|
此处向命名“fanout.exchange”的交换器中发送消息,方法运行之后,便可以看见其三个监听队列均收到了消息,并打印出了如下信息:
1 2 3
| fanout.queue.three -> Message(id=66c08bda-654a-4c16-a66b-8537a96ff2ee, content=Hello, springboot-ac-rabbitmq !) fanout.queue.one -> Message(id=66c08bda-654a-4c16-a66b-8537a96ff2ee, content=Hello, springboot-ac-rabbitmq !) fanout.queue.two-> Message(id=66c08bda-654a-4c16-a66b-8537a96ff2ee, content=Hello, springboot-ac-rabbitmq !)
|
topic模式
topic交换器是一个灵活的交换器,其可以根据路由键的规则,灵活的将消息发送到想要发送的队列中去。
topic交换器的配置如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| package com.lazycece.sbac.rabbitmq.config;
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class TopicConfig {
@Bean public Queue topicQueueOne() { return QueueBuilder.durable("topic.queue.one").build(); }
@Bean public Queue topicQueueTwo() { return QueueBuilder.durable("topic.queue.two").build(); }
@Bean public TopicExchange topicExchange() { return (TopicExchange) ExchangeBuilder.topicExchange("topic.exchange").build(); }
@Bean public Binding topicBindingOne() { return BindingBuilder.bind(topicQueueOne()).to(topicExchange()).with("routing-key"); }
@Bean public Binding topicBindingTwo() { return BindingBuilder.bind(topicQueueTwo()).to(topicExchange()).with("#"); } }
|
这里为此topic交换器取名为“topic.exchange”,并为其绑定了两个队列,一个路由规则为“routing-key”,另一个为“#”。下面给出两个队列的监听:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| package com.lazycece.sbac.rabbitmq.consumer;
import com.lazycece.sbac.rabbitmq.entity.Message; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component @Slf4j public class TopicConsumer {
@RabbitListener(queues = {"topic.queue.one"}) public void topicQueueOneConsumer(Message message) { log.info("topic.queue.one -> {} ", message.toString()); }
@RabbitListener(queues = {"topic.queue.two"}) public void topicQueueTwoConsumer(Message message) { log.info("topic.queue.two -> {} ", message.toString()); } }
|
现在来测试一下topic交换器模式的消息发送,测试方法代码如下:
1 2 3 4
| @Test public void topicProducer() { rabbitTemplate.convertAndSend("topic.exchange", "routing-key", message); }
|
此处向命名“topic.exchange”的交换器中发送消息,并指定路由规则为“routing-key”,方法运行之后,便可以看见其了两个监听队列均收到了消息,并打印出了如下信息:
1 2
| topic.queue.two -> Message(id=dc79c754-2ea5-4941-b8e0-c41511d7b328, content=Hello, springboot-ac-rabbitmq !) topic.queue.one -> Message(id=dc79c754-2ea5-4941-b8e0-c41511d7b328, content=Hello, springboot-ac-rabbitmq !)
|
为什么呢?因为“#”为通配符,可以匹配任意路由键。如果在发送消息的时候换路由规则为“routing”时,就会发现只有“topic.queue.two”队列收到消息了,因为“routing-key”无法和“routing”匹配,而“#”可以。
headers交换器亦可以说是灵活的交换器,因为其是根据消息的headers中的信息来判断是否分发消息致某一个队列的。
headers交换器的配置如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| package com.lazycece.sbac.rabbitmq.config;
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class HeadersConfig {
@Bean public Queue headersQueue() { return QueueBuilder.durable("headers.queue").build(); }
@Bean public HeadersExchange headersExchange() { return (HeadersExchange) ExchangeBuilder.headersExchange("headers.exchange").build(); }
@Bean public Binding headersBinding() { return BindingBuilder.bind(headersQueue()).to(headersExchange()).where("headers-key").exists(); } }
|
这里为此headers交换器取名为“headers.exchange”,并为其绑定了一个队列“headers.queue”,然后设置只要headers中存在一个key名为“headers-key”时,便可以被分发消息。下面给出队列的监听:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| package com.lazycece.sbac.rabbitmq.consumer;
import com.lazycece.sbac.rabbitmq.entity.Message; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component @Slf4j public class HeadersConsumer {
@RabbitListener(queues = {"headers.queue"}) public void headersQueueConsumer(Message message) { log.info("headers.queue.one -> {} ", message.toString()); } }
|
现在来测试一下headers交换器模式的消息发送,测试方法代码如下:
1 2 3 4 5 6 7 8
| @Test public void headersProducer() { rabbitTemplate.convertAndSend("headers.exchange", "", message, m -> { m.getMessageProperties().getHeaders().put("headers-key", null); return m; }); }
|
此处向命名“headers.exchange”的交换器中发送消息,并为消息的headers中加入“headers-key”键,方法运行之后,便可以看见其监听队列收到了消息,并打印出了如下信息:
1
| headers.queue.one -> Message(id=02d3d652-6fcc-4d3c-a18e-7fe90abe066e, content=Hello, springboot-ac-rabbitmq !)
|
注解使用
上面都是先给出rabbitmq的exchange、queue、binding相关的配置信息,再来监听消息,这里来看一下直接使用注解@RabbitListener来绑定和简单的用法,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| package com.lazycece.sbac.rabbitmq.consumer;
import com.lazycece.sbac.rabbitmq.entity.Message; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component @Slf4j public class AnnotationConsumer {
@RabbitListener( bindings = { @QueueBinding( value = @Queue(name = "topic.queue.annotation"), exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC), key = {"routing"} ) } ) public void topicQueueOneConsumer(Message message) { log.info("topic.queue.annotation -> {} ", message.toString()); }
}
|
这里仍然是创建topic类型的交换器“topic.exchange”,然后创建队列“topic.queue.annotation”,再进行绑定路由键“routing”。这里用上文提到的topic模式的测试方法,毫无意外,“topic.queue.two”和“topic.queue.annotation”收到了消息,而“topic.queue.one”没有收到消息。
1 2
| topic.queue.two -> Message(id=7fe62e14-8a78-43bf-a18a-422880e30c99, content=Hello, springboot-ac-rabbitmq !) topic.queue.annotation -> Message(id=7fe62e14-8a78-43bf-a18a-422880e30c99, content=Hello, springboot-ac-rabbitmq !)
|
案例源码
案例源码地址:https://github.com/lazycece/springboot-actual-combat/tree/master/springboot-ac-rabbitmq
参考文档