- Message Queue 和 RabbitMQ 的入门学习,即部署与简单使用;
- SpringAMQP 的 api 学习
# 初识 MQ
# 1. 同步调用方案
响应耗时长,服务耦合度较高,不便于维护
# 2. 异步调用方案
异步调用方案实现就是事件驱动模式
- 优势:
- 服务解耦
- 性能提升,吞吐量增加
- 服务没有强依赖,不用担心级联失败问题
- 流量削峰
- 缺点:
- 依赖于 Broker 的可靠性、安全性、吞吐能力
- 架构复杂了,业务没有明显的流程线,不便于追踪管理
# 3. 什么是 MQ
MQ(Message Queue),消息队列,用来存放消息的队列,也就是事件驱动架构中的 Broker。
RabbitMQ | ActiveMQ | RocketMQ | Kafaka | |
---|---|---|---|---|
公司 / 社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala、Java |
协议支持 | AMQP、XMPP、SMTP、STOMP | OpenWire、STOMP、REST、XMPP、AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
# 4. RabbitMQ 快速入门
https://www.rabbitmq.com/
推荐 docker 进行安装
h docker pull rabbitmq
运行
h docker run \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
--name rbmq \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
其中:
- -p 15672:15672 设置的是管理界面的端口
- -p 5672:5672 设置的是消息通信的端口
如果不能打开管理页面,可能需要激活管理插件:
- docker exec -it rbmq bash
- rabbitmq-plugins enable rabbitmq_management
# 4.1 结构
- 几个概念
- channel:操作 MQ 的工具
- exchange:路由消息到队列中
- queue:缓存消息
- virtual host:虚拟主机,是对 queue、exchange 等资源的逻辑分组
# 4.2 常见消息模型
官网给出了 5 种消息模型,对应了几种不同的用法:
- 基本消息队列
- 工作消息队列
发布订阅,根据交换机类型不同,分为三种:
- 广播:Fanout Exchange
- 路由:Direct Exchange
- 主题:Topic Exchange
# 4.2.1 HelloWorld 案例 - 简单模型
官方的 Helloworld 是基于最基础的消息队列模型实现的,只包括三个角色:
- publisher:消息的发布者,将消息发送到队列 queue
- queue:消息队列,负责接收并缓存消息
- consumer:订阅队列,处理队列中的消息
消息发送流程:
- 建立 connection
- 创建 channel
- 利用 channel 声明队列
- 利用 channel 向队列发送消息
消息接收(消费)流程:
- 建立 connection
- 创建 channel
- 利用 channel 声明队列
- 定义 consumer 的消费行为 handleDelivery () 回调函数
- 利用 channel 将消费者与队列绑定
具体代码实现如下:
Publisher:
a @Test
public void testSendMessage() throws IOException, TimeoutException {
// 1. 建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1. 设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("10.255.125.164");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("admin");
// 1.2. 建立连接
Connection connection = factory.newConnection();
// 2. 创建通道 Channel
Channel channel = connection.createChannel();
// 3. 创建队列
String queueName = "first.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4. 发送消息
String message = "Hello, RabbitMQ!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.printf("发送消息成功 -> %s%n", message);
// 5. 关闭通道和连接
channel.close();
connection.close();
}
测试运行:
Consumer:
a @Test
public void testHandleMessage() throws IOException, TimeoutException {
// 1. 建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1 设置连接参数,分别是:主机名、端口号、virtual host、用户名、密码
factory.setHost("10.255.125.164");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("admin");
// 1.2 建立连接
Connection conn = factory.newConnection();
// 2. 创建通道 channel
Channel channel = conn.createChannel();
// 3. 创建消息队列
String queName = "first.queue";
channel.queueDeclare(queName, false, false, false, null);
// 4. 订阅消息
channel.basicConsume(queName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
// 5. 实现处理消息的行为
String message = new String(body);
System.out.printf("接收到的消息 -> %s%n", message);
}
});
System.out.println("等待接收消息......");
}
测试结果:
# 4.2.2 引入 SpringAMQP
# 什么是 SpringAMQP
官方介绍:https://spring.io/projects/spring-amqp
AMQP(Advanced Message Queue Protocol) 是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。
而 SpringAMQP 是基于 AMQP 协议的一套 API 规范,提供了发送和接收处理消息的模板。包含两部分,其中 spring-amqp 是基础抽象,spring-rabbit 是底层的默认实现。
改写 Helloword 案例:
# 发布消息
- 引入依赖
<!-- spring amqp 依赖,包含了 rabbit mq --> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-amqp</artifactId> | |
</dependency> |
- 配置 mq 信息:
spring: | |
rabbitmq: | |
host: 10.255.125.164 # rabbitMQ 的 ip 地址 | |
port: 5672 # 端口 | |
username: admin | |
password: admin | |
virtual-host: / |
- 测试:
import org.junit.Test; | |
import org.junit.runner.RunWith; | |
import org.springframework.amqp.rabbit.core.RabbitTemplate; | |
import org.springframework.boot.test.context.SpringBootTest; | |
import org.springframework.test.context.junit4.SpringRunner; | |
import javax.annotation.Resource; | |
/** | |
* @author dubulingbo, 2023/3/27 19:57. | |
*/ | |
@RunWith(SpringRunner.class) | |
@SpringBootTest | |
public class SpringAMQPTest { | |
@Resource | |
RabbitTemplate rabbitTemplate; | |
@Test | |
public void testSend() { | |
String queName = "first.queue"; | |
String msg = "Hellooooo, spring amqp!"; | |
rabbitTemplate.convertAndSend(queName, msg); | |
} | |
} |
- 结果:
# 接收消息 / 处理消息
前两步与发布消息相同,这里不在赘述!
- 实现:
// 定义 RabbitListener | |
@Component | |
public class RabbitMQListener { | |
@RabbitListener(queues = {"first.queue"}) | |
public void listenFirstQueue(String msg) { | |
System.out.printf("消费者收到first.queue的消息 -> %s%n", msg); | |
} | |
} | |
// 主启动类 | |
@SpringBootApplication | |
public class ConsumerApplication { | |
public static void main(String[] args) { | |
SpringApplication.run(ConsumerApplication.class, args); | |
} | |
} |
- 结果:
# 4.2.3 Work queues 案例 - 工作队列模型
- 消费者:
public class RabbitMQListener { | |
@RabbitListener(queues = {"first.queue"}) | |
public void listenFirstQueue1(String msg) throws InterruptedException { | |
System.out.printf("消费者 1 收到first.queue的消息 -> [%s] %s%n", msg, LocalTime.now()); | |
Thread.sleep(20); | |
} | |
@RabbitListener(queues = {"first.queue"}) | |
public void listenFirstQueue2(String msg) throws InterruptedException { | |
System.err.printf("消费者 2 收到first.queue的消息 -> [%s] %s%n", msg, LocalTime.now()); | |
Thread.sleep(200); | |
} | |
} |
- 发布者模拟发送 10 条消息:
@Test | |
public void testSendMessage2WorkQueue() throws InterruptedException { | |
String queName = "first.queue"; | |
String msg = "Hello, message - "; | |
for (int i = 0; i < 50; ++i) { | |
rabbitTemplate.convertAndSend(queName, msg + (i + 1)); | |
Thread.sleep(20); | |
} | |
} |
- 测试结果:
有上述结果可看出,并没有实现 “能者多劳” 的现象,而是均等分配了消息,其原因是 SpringAMQP 默认消费者是均等预取的,即相当于 <u> 在消费前就将消息均等分配到每个消费者,而不管其执行时间的长短 </u> ;因此设置预取数量,即增加消费者配置项如下:
spring: | |
rabbitmq: | |
...... | |
listener: | |
simple: | |
prefetch: 1 # 每次最多取一条消息,消费完再取下一条 |
再次运行,结果如下:
# 4.2.4 发布、订阅模型
发布订阅模型与之前的模型区别就是允许同一个消息发送给多个消费者。实现方式是加入了 Exchange(交换机)。常见的 exchange 有以下三种:
- Fanout:广播
- Direct:路由
- Topic:话题
注意:exchange 只负责消息路由,而不是存储,路由失败则消息丢失。
# 4.2.4.1 Fanout - 广播
Fanout Exchange 会将接收到的消息路由到每一个与其绑定的 queue
P - Publisher
X - Exchange
C - Consumer
- 实现思路:
- 在 consumer 服务中,声明队列,交换机、并将两者绑定
- 在 consumer 服务中,编写两个消费者 consumer1、consumer2 方法,分别监听 fanout.queue1 和 fanout.queue2
- 在 publisher 中编写测试方法,向交换机 fanout.exchange 发消息
SpringAMQP 提供了声明交换机、队列的绑定关系 API 如下:
具体实现如:
import org.springframework.amqp.core.Binding; | |
import org.springframework.amqp.core.BindingBuilder; | |
import org.springframework.amqp.core.FanoutExchange; | |
import org.springframework.amqp.core.Queue; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
/** | |
* @author dubulingbo, 2023/3/27 23:27. | |
*/ | |
@Configuration | |
public class FanoutConfig { | |
// 声明交换机 fanout.exchange | |
@Bean | |
public FanoutExchange fanoutExchange() { | |
return new FanoutExchange("fanout.exchange"); | |
} | |
// 声明 fanout.queue1 | |
@Bean | |
public Queue fanoutQueue1() { | |
return new Queue("fanout.queue1"); | |
} | |
// 声明 fanout.queue2 | |
@Bean | |
public Queue fanoutQueue2() { | |
return new Queue("fanout.queue2"); | |
} | |
// 绑定 fanout.queue1 到交换机 | |
// 变量名必须一致,否则绑定会失败 | |
@Bean | |
public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) { | |
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); | |
} | |
// 绑定 fanout.queue2 到交换机 | |
@Bean | |
public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) { | |
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); | |
} | |
} |
运行后,进入 RabbitMQ 后台界面,绑定结果如下:
消费者消息监听:
@Component | |
public class RabbitMQListener { | |
@RabbitListener(queues = {"fanout.queue1"}) | |
public void listenFanoutQueue1(String msg) { | |
System.err.printf("消费者接收到fanout.queue1的消息 -> [%s]%n", msg); | |
} | |
@RabbitListener(queues = {"fanout.queue2"}) | |
public void listenFanoutQueue2(String msg) { | |
System.err.printf("消费者接收到fanout.queue2的消息 -> [%s]%n", msg); | |
} | |
} |
发布者发布消息:
@Test | |
public void testSendFanoutExchange() { | |
// 交换机名称 | |
String ex = "fanout.exchange"; | |
// 消息 | |
String msg = "Hello, every one!"; | |
// 发送消息 | |
rabbitTemplate.convertAndSend(ex, "", msg); | |
} |
测试结果:
小结:
交换机的作用:
- 接收 publisher 发送的消息
- 将消息按照规则路由到与之绑定的队列
- 不能缓存消息、路由失败,消息丢失
- FanoutExchange 会将消息路由到每一个绑定的队列
声明交换机、队列、绑定关系的 Bean:
- org.springframework.amqp.core.FanoutExchange
- org.springframework.amqp.core.Queue
- org.springframework.amqp.core.Binding
- org.springframework.amqp.core.BindingBuilder::bind(queue)::to(exchange)
# 4.2.4.2 Direct - 路由
Direct Exchange 会将接收到的消息根据规则路由到指定的 Queue,因此称为路由模式(routes)。
- 每一个 Queue 都与 Exchange 设置一个 Binding Key
- 发布者发送消息时,指定消息的 Routing Key
- Exchange 将消息路由到 Binding Key 和 Routing Key 一致的队列
实现:
- 消费者(注解实现)
@Component | |
public class RabbitMQListener { | |
@RabbitListener(bindings = @QueueBinding( | |
value = @Queue(name = "direct.queue1"), | |
exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT), | |
key = "error" | |
)) | |
public void listenDirectQueue1(String msg) { | |
System.out.printf("消费者接收到direct.queue1的消息 -> %s%n", msg); | |
} | |
@RabbitListener(bindings = @QueueBinding( | |
value = @Queue(name = "direct.queue2"), | |
exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT), | |
key = {"info", "error", "warning"} | |
)) | |
public void listenDirectQueue2(String msg) { | |
System.out.printf("消费者接收到direct.queue2的消息 -> %s%n", msg); | |
} | |
} |
运行后,进入 RabbitMQ 后台界面,绑定结果如下:
发布者发布消息:
@Test | |
public void testSendDirectExchange() { | |
// 交换机名称 | |
String ex = "direct.exchange"; | |
// 发送消息 | |
rabbitTemplate.convertAndSend(ex, "info", "This is info message"); | |
rabbitTemplate.convertAndSend(ex, "error", "This is error message"); | |
rabbitTemplate.convertAndSend(ex, "warning", "This is warning message"); | |
// 这条消息会被丢弃,因为找不到匹配的队列 | |
rabbitTemplate.convertAndSend(ex, "success", "This is success message"); | |
} |
运行结果:
# 4.2.4.3 Topic - 话题
TopicExchange 与 DirectExchange 类似,区别在于 routing key 必须是多个单词的列表,并且以 .
分割。Queue 与 Exchange 指定 Binding Key 时,可以使用通配符:
- ‘#’:代表 0 个或多个单词
- ‘*’:代指一个单词
实现:
- 消费者:
@Component | |
public class RabbitMQListener { | |
@RabbitListener(bindings = @QueueBinding( | |
value = @Queue("topic.queue1"), | |
exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC), | |
key = "*.orange.*" | |
)) | |
public void listenTopicQueue1(String msg) { | |
System.err.printf("消费者接收到topic.queue1的消息 -> %s%n", msg); | |
} | |
@RabbitListener(bindings = @QueueBinding( | |
value = @Queue("topic.queue2"), | |
exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC), | |
key = {"*.*.rabbit", "lazy.#"} | |
)) | |
public void listenTopicQueue2(String msg) { | |
System.out.printf("消费者接收到topic.queue2的消息 -> %s%n", msg); | |
} | |
} |
运行后,进入 RabbitMQ 后台界面,绑定结果如下:
- 发布者发布如下消息:
@Test | |
public void testSendTopicExchange() { | |
// 交换机名称 | |
String ex = "topic.exchange"; | |
// 发送消息 | |
rabbitTemplate.convertAndSend(ex, "1.orange", "1.orange"); | |
rabbitTemplate.convertAndSend(ex, "1.orange.2", "1.orange.2"); | |
rabbitTemplate.convertAndSend(ex, "orange.2", "orange.2"); | |
rabbitTemplate.convertAndSend(ex, "orange.rabbit", "orange.rabbit"); | |
rabbitTemplate.convertAndSend(ex, ".rabbit", ".rabbit"); | |
rabbitTemplate.convertAndSend(ex, ".rabbit.", ".rabbit."); | |
rabbitTemplate.convertAndSend(ex, "..rabbit", "..rabbit"); | |
rabbitTemplate.convertAndSend(ex, ".orange.", ".orange."); | |
rabbitTemplate.convertAndSend(ex, "rabbit.2", "rabbit.2"); | |
rabbitTemplate.convertAndSend(ex, "orange.rabbit.2", "orange.rabbit.2"); | |
rabbitTemplate.convertAndSend(ex, "orange.orange.rabbit.2", "orange.orange.rabbit.2"); | |
rabbitTemplate.convertAndSend(ex, "orange.orange.rabbit", "orange.orange.rabbit"); | |
rabbitTemplate.convertAndSend(ex, "1.2.3.rabbit", "1.2.3.rabbit"); | |
rabbitTemplate.convertAndSend(ex, "lazy.", "lazy."); | |
rabbitTemplate.convertAndSend(ex, "lazy.1", "lazy.1"); | |
rabbitTemplate.convertAndSend(ex, "lazy.123", "lazy.123"); | |
rabbitTemplate.convertAndSend(ex, "lazy.1.2", "lazy.1.2"); | |
rabbitTemplate.convertAndSend(ex, ".lazy", ".lazy"); | |
rabbitTemplate.convertAndSend(ex, ".lazy.", ".lazy."); | |
rabbitTemplate.convertAndSend(ex, "lazy#123", "lazy#123"); | |
rabbitTemplate.convertAndSend(ex, "lazy.123.", "lazy.123."); | |
} |
- 运行结果如下:
# 4.3 消息转换器
在 SpringAMQP 的发送方法中,接收消息的类型是 Object,也就是说可以发送任意对象数据类型的消息,SpringAMQP 会将其序列化为字节进行发送。
默认使用实现序列化
发送的消息为:
a @Test
public void testSendObjectQueue() {
Map<String, Object> map = new HashMap<>();
map.put("name", "dubulingbo");
map.put("age", 26);
rabbitTemplate.convertAndSend("object.queue", map);
}
引入 jackson 序列化:
l <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
配置 MessageConverter
a @Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
接收消息:
a // 直接用 Map 接收,SpringAMQP 会自动完成反序列化
@RabbitListener(queues = "object.queue")
public void listenObjectQueue(Map<String, Object> msg) {
System.out.printf("消费者接收到object.queue的消息 -> %s%n", msg);
}
测试:
SpringAMQP 中消息的序列化和反序列化是怎么实现的?
- 利用 MessageConverter 实现,默认是 JDK 的序列化
- 注意发送方与接收方必须使用相同的 MessageConverter
# 4.4 写在最后
队列中的消息一旦消费就会从队列中删除,RabbitMQ 没有消息回溯功能!