G
N
I
D
A
O
L
  • Message Queue 和 RabbitMQ 的入门学习,即部署与简单使用;
  • SpringAMQP 的 api 学习

# 初识 MQ

# 1. 同步调用方案

响应耗时长,服务耦合度较高,不便于维护

# 2. 异步调用方案

异步调用方案实现就是事件驱动模式

image-20230327162041814

  • 优势:
    1. 服务解耦
    2. 性能提升,吞吐量增加
    3. 服务没有强依赖,不用担心级联失败问题
    4. 流量削峰
  • 缺点:
    1. 依赖于 Broker 的可靠性、安全性、吞吐能力
    2. 架构复杂了,业务没有明显的流程线,不便于追踪管理

# 3. 什么是 MQ

MQ(Message Queue),消息队列,用来存放消息的队列,也就是事件驱动架构中的 Broker。

RabbitMQActiveMQRocketMQKafaka
公司 / 社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala、Java
协议支持AMQP、XMPP、SMTP、STOMPOpenWire、STOMP、REST、XMPP、AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒内
消息可靠性一般一般

# 4. RabbitMQ 快速入门

https://www.rabbitmq.com/

  • 推荐 docker 进行安装

    h
    docker pull rabbitmq
  • 运行

    h
    docker run \
     -e RABBITMQ_DEFAULT_USER=admin \
     -e RABBITMQ_DEFAULT_PASS=admin \
     --name rbmq \
     -p 15672:15672 \
     -p 5672:5672 \
     -d \
     rabbitmq:3-management

    其中:

    • -p 15672:15672 设置的是管理界面的端口
    • -p 5672:5672 设置的是消息通信的端口

    image-20230327171249910

    如果不能打开管理页面,可能需要激活管理插件:

    • docker exec -it rbmq bash
    • rabbitmq-plugins enable rabbitmq_management

# 4.1 结构

image-20230327172022891

  • 几个概念
    • channel:操作 MQ 的工具
    • exchange:路由消息到队列中
    • queue:缓存消息
    • virtual host:虚拟主机,是对 queue、exchange 等资源的逻辑分组

# 4.2 常见消息模型

官网给出了 5 种消息模型,对应了几种不同的用法:

  • 基本消息队列
  • 工作消息队列

image-20230327173146036

  • 发布订阅,根据交换机类型不同,分为三种:

    • 广播:Fanout Exchange
    • 路由:Direct Exchange
    • 主题:Topic Exchange

    image-20230327173338853

# 4.2.1 HelloWorld 案例 - 简单模型

官方的 Helloworld 是基于最基础的消息队列模型实现的,只包括三个角色:

  • publisher:消息的发布者,将消息发送到队列 queue
  • queue:消息队列,负责接收并缓存消息
  • consumer:订阅队列,处理队列中的消息

Producer -> Queue -> Consuming: send and receive messages from a named queue.

消息发送流程:

  1. 建立 connection
  2. 创建 channel
  3. 利用 channel 声明队列
  4. 利用 channel 向队列发送消息

消息接收(消费)流程:

  1. 建立 connection
  2. 创建 channel
  3. 利用 channel 声明队列
  4. 定义 consumer 的消费行为 handleDelivery () 回调函数
  5. 利用 channel 将消费者与队列绑定

具体代码实现如下:

  • Publisher:

    a
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1. 建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1. 设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("10.255.125.164");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("admin");
        factory.setPassword("admin");
        // 1.2. 建立连接
        Connection connection = factory.newConnection();
        // 2. 创建通道 Channel
        Channel channel = connection.createChannel();
        // 3. 创建队列
        String queueName = "first.queue";
        channel.queueDeclare(queueName, false, false, false, null);
        // 4. 发送消息
        String message = "Hello, RabbitMQ!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.printf("发送消息成功 -> %s%n", message);
        // 5. 关闭通道和连接
        channel.close();
        connection.close();
    }
    • 测试运行:

      image-20230327193012339

  • Consumer:

    a
    @Test
    public void testHandleMessage() throws IOException, TimeoutException {
        // 1. 建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1 设置连接参数,分别是:主机名、端口号、virtual host、用户名、密码
        factory.setHost("10.255.125.164");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("admin");
        factory.setPassword("admin");
        // 1.2 建立连接
        Connection conn = factory.newConnection();
        // 2. 创建通道 channel
        Channel channel = conn.createChannel();
        // 3. 创建消息队列
        String queName = "first.queue";
        channel.queueDeclare(queName, false, false, false, null);
        // 4. 订阅消息
        channel.basicConsume(queName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                // 5. 实现处理消息的行为
                String message = new String(body);
                System.out.printf("接收到的消息 -> %s%n", message);
            }
        });
        System.out.println("等待接收消息......");
    }
    • 测试结果:

      image-20230327194123529

      image-20230327193325144

# 4.2.2 引入 SpringAMQP

# 什么是 SpringAMQP

官方介绍:https://spring.io/projects/spring-amqp

AMQP(Advanced Message Queue Protocol) 是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。

SpringAMQP 是基于 AMQP 协议的一套 API 规范,提供了发送和接收处理消息的模板。包含两部分,其中 spring-amqp 是基础抽象,spring-rabbit 是底层的默认实现。

改写 Helloword 案例:

# 发布消息
  • 引入依赖
l
<!-- spring amqp 依赖,包含了 rabbit mq -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 配置 mq 信息:
l
spring:
  rabbitmq:
    host: 10.255.125.164 # rabbitMQ 的 ip 地址
    port: 5672 # 端口
    username: admin
    password: admin
    virtual-host: /
  • 测试:
a
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
/**
 * @author dubulingbo, 2023/3/27 19:57.
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAMQPTest {
    @Resource
    RabbitTemplate rabbitTemplate;
    @Test
    public void testSend() {
        String queName = "first.queue";
        String msg = "Hellooooo, spring amqp!";
        rabbitTemplate.convertAndSend(queName, msg);
    }
}
  • 结果:

image-20230327201611535

# 接收消息 / 处理消息

前两步与发布消息相同,这里不在赘述!

  • 实现:
a
// 定义 RabbitListener
@Component
public class RabbitMQListener {
    @RabbitListener(queues = {"first.queue"})
    public void listenFirstQueue(String msg) {
        System.out.printf("消费者收到first.queue的消息 -> %s%n", msg);
    }
}
// 主启动类
@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}
  • 结果:

image-20230327202340934

# 4.2.3 Work queues 案例 - 工作队列模型

Producer -> Queue -> Consuming: Work Queue used to distribute time-consuming tasks among multiple workers.

  • 消费者:
a
public class RabbitMQListener {
    @RabbitListener(queues = {"first.queue"})
    public void listenFirstQueue1(String msg) throws InterruptedException {
        System.out.printf("消费者 1 收到first.queue的消息 -> [%s] %s%n", msg, LocalTime.now());
        Thread.sleep(20);
    }
    @RabbitListener(queues = {"first.queue"})
    public void listenFirstQueue2(String msg) throws InterruptedException {
        System.err.printf("消费者 2 收到first.queue的消息 -> [%s] %s%n", msg, LocalTime.now());
        Thread.sleep(200);
    }
}
  • 发布者模拟发送 10 条消息:
a
@Test
public void testSendMessage2WorkQueue() throws InterruptedException {
    String queName = "first.queue";
    String msg = "Hello, message - ";
    for (int i = 0; i < 50; ++i) {
        rabbitTemplate.convertAndSend(queName, msg + (i + 1));
        Thread.sleep(20);
    }
}
  • 测试结果:

image-20230327223604712

有上述结果可看出,并没有实现 “能者多劳” 的现象,而是均等分配了消息,其原因是 SpringAMQP 默认消费者是均等预取的,即相当于 <u> 在消费前就将消息均等分配到每个消费者,而不管其执行时间的长短 </u> ;因此设置预取数量,即增加消费者配置项如下:

l
spring:
  rabbitmq:
    ......
    listener:
      simple:
        prefetch: 1 # 每次最多取一条消息,消费完再取下一条

再次运行,结果如下:

image-20230327224350094

# 4.2.4 发布、订阅模型

发布订阅模型与之前的模型区别就是允许同一个消息发送给多个消费者。实现方式是加入了 Exchange(交换机)。常见的 exchange 有以下三种:

  • Fanout:广播
  • Direct:路由
  • Topic:话题

注意:exchange 只负责消息路由,而不是存储,路由失败则消息丢失。

# 4.2.4.1 Fanout - 广播

Fanout Exchange 会将接收到的消息路由到每一个与其绑定的 queue

Producer -> Queue -> Consuming: deliver a message to multiple consumers. This pattern is known as publish/subscribe

P - Publisher

X - Exchange

C - Consumer

  • 实现思路:
    1. 在 consumer 服务中,声明队列,交换机、并将两者绑定
    2. 在 consumer 服务中,编写两个消费者 consumer1、consumer2 方法,分别监听 fanout.queue1 和 fanout.queue2
    3. 在 publisher 中编写测试方法,向交换机 fanout.exchange 发消息

SpringAMQP 提供了声明交换机、队列的绑定关系 API 如下:

Exchange

具体实现如:

a
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * @author dubulingbo, 2023/3/27 23:27.
 */
@Configuration
public class FanoutConfig {
    // 声明交换机 fanout.exchange
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout.exchange");
    }
    // 声明 fanout.queue1
    @Bean
    public Queue fanoutQueue1() {
        return new Queue("fanout.queue1");
    }
    // 声明 fanout.queue2
    @Bean
    public Queue fanoutQueue2() {
        return new Queue("fanout.queue2");
    }
    // 绑定 fanout.queue1 到交换机
    // 变量名必须一致,否则绑定会失败
    @Bean
    public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
    // 绑定 fanout.queue2 到交换机
    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

运行后,进入 RabbitMQ 后台界面,绑定结果如下:

image-20230327234002436

消费者消息监听:

a
@Component
public class RabbitMQListener {
    @RabbitListener(queues = {"fanout.queue1"})
    public void listenFanoutQueue1(String msg) {
        System.err.printf("消费者接收到fanout.queue1的消息 -> [%s]%n", msg);
    }
    @RabbitListener(queues = {"fanout.queue2"})
    public void listenFanoutQueue2(String msg) {
        System.err.printf("消费者接收到fanout.queue2的消息 -> [%s]%n", msg);
    }
}

发布者发布消息:

a
@Test
public void testSendFanoutExchange() {
    // 交换机名称
    String ex = "fanout.exchange";
    // 消息
    String msg = "Hello, every one!";
    // 发送消息
    rabbitTemplate.convertAndSend(ex, "", msg);
}

测试结果:

image-20230327234924896

小结:

交换机的作用:

  • 接收 publisher 发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • 不能缓存消息、路由失败,消息丢失
  • FanoutExchange 会将消息路由到每一个绑定的队列

声明交换机、队列、绑定关系的 Bean:

  • org.springframework.amqp.core.FanoutExchange
  • org.springframework.amqp.core.Queue
  • org.springframework.amqp.core.Binding
    • org.springframework.amqp.core.BindingBuilder::bind(queue)::to(exchange)
# 4.2.4.2 Direct - 路由

Direct Exchange 会将接收到的消息根据规则路由到指定的 Queue,因此称为路由模式(routes)。

  • 每一个 Queue 都与 Exchange 设置一个 Binding Key
  • 发布者发送消息时,指定消息的 Routing Key
  • Exchange 将消息路由到 Binding Key 和 Routing Key 一致的队列

Producer -> Queue -> Consuming: subscribe to a subset of the messages only.

实现:

  • 消费者(注解实现)
a
@Component
public class RabbitMQListener {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT),
            key = "error"
    ))
    public void listenDirectQueue1(String msg) {
        System.out.printf("消费者接收到direct.queue1的消息 -> %s%n", msg);
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT),
            key = {"info", "error", "warning"}
    ))
    public void listenDirectQueue2(String msg) {
        System.out.printf("消费者接收到direct.queue2的消息 -> %s%n", msg);
    }
}

运行后,进入 RabbitMQ 后台界面,绑定结果如下:

image-20230328002812661

发布者发布消息:

a
@Test
public void testSendDirectExchange() {
    // 交换机名称
    String ex = "direct.exchange";
    // 发送消息
    rabbitTemplate.convertAndSend(ex, "info", "This is info message");
    rabbitTemplate.convertAndSend(ex, "error", "This is error message");
    rabbitTemplate.convertAndSend(ex, "warning", "This is warning message");
    // 这条消息会被丢弃,因为找不到匹配的队列
    rabbitTemplate.convertAndSend(ex, "success", "This is success message");
}

运行结果:

image-20230328003818583

# 4.2.4.3 Topic - 话题

TopicExchange 与 DirectExchange 类似,区别在于 routing key 必须是多个单词的列表,并且以 . 分割。Queue 与 Exchange 指定 Binding Key 时,可以使用通配符:

  • ‘#’:代表 0 个或多个单词
  • ‘*’:代指一个单词

Producer -> Queue -> Consuming: receiving messages based on a pattern (topics).

实现:

  • 消费者:
a
@Component
public class RabbitMQListener {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("topic.queue1"),
            exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
            key = "*.orange.*"
    ))
    public void listenTopicQueue1(String msg) {
        System.err.printf("消费者接收到topic.queue1的消息 -> %s%n", msg);
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("topic.queue2"),
            exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
            key = {"*.*.rabbit", "lazy.#"}
    ))
    public void listenTopicQueue2(String msg) {
        System.out.printf("消费者接收到topic.queue2的消息 -> %s%n", msg);
    }
}

运行后,进入 RabbitMQ 后台界面,绑定结果如下:

image-20230328005626095

  • 发布者发布如下消息:
a
@Test
public void testSendTopicExchange() {
    // 交换机名称
    String ex = "topic.exchange";
    // 发送消息
    rabbitTemplate.convertAndSend(ex, "1.orange", "1.orange");
    rabbitTemplate.convertAndSend(ex, "1.orange.2", "1.orange.2");
    rabbitTemplate.convertAndSend(ex, "orange.2", "orange.2");
    rabbitTemplate.convertAndSend(ex, "orange.rabbit", "orange.rabbit");
    rabbitTemplate.convertAndSend(ex, ".rabbit", ".rabbit");
    rabbitTemplate.convertAndSend(ex, ".rabbit.", ".rabbit.");
    rabbitTemplate.convertAndSend(ex, "..rabbit", "..rabbit");
    rabbitTemplate.convertAndSend(ex, ".orange.", ".orange.");
    rabbitTemplate.convertAndSend(ex, "rabbit.2", "rabbit.2");
    rabbitTemplate.convertAndSend(ex, "orange.rabbit.2", "orange.rabbit.2");
    rabbitTemplate.convertAndSend(ex, "orange.orange.rabbit.2", "orange.orange.rabbit.2");
    rabbitTemplate.convertAndSend(ex, "orange.orange.rabbit", "orange.orange.rabbit");
    rabbitTemplate.convertAndSend(ex, "1.2.3.rabbit", "1.2.3.rabbit");
    rabbitTemplate.convertAndSend(ex, "lazy.", "lazy.");
    rabbitTemplate.convertAndSend(ex, "lazy.1", "lazy.1");
    rabbitTemplate.convertAndSend(ex, "lazy.123", "lazy.123");
    rabbitTemplate.convertAndSend(ex, "lazy.1.2", "lazy.1.2");
    rabbitTemplate.convertAndSend(ex, ".lazy", ".lazy");
    rabbitTemplate.convertAndSend(ex, ".lazy.", ".lazy.");
    rabbitTemplate.convertAndSend(ex, "lazy#123", "lazy#123");
    rabbitTemplate.convertAndSend(ex, "lazy.123.", "lazy.123.");
}
  • 运行结果如下:

image-20230328011853715

# 4.3 消息转换器

在 SpringAMQP 的发送方法中,接收消息的类型是 Object,也就是说可以发送任意对象数据类型的消息,SpringAMQP 会将其序列化为字节进行发送。

  • 默认使用实现序列化

  • 发送的消息为:

    a
    @Test
    public void testSendObjectQueue() {
        Map<String, Object> map = new HashMap<>();
        map.put("name", "dubulingbo");
        map.put("age", 26);
        rabbitTemplate.convertAndSend("object.queue", map);
    }

    image-20230328104817211

  • 引入 jackson 序列化:

    l
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
    • 配置 MessageConverter

      a
      @Bean
      public MessageConverter messageConverter() {
          return new Jackson2JsonMessageConverter();
      }

      image-20230328105510738

  • 接收消息:

    a
    // 直接用 Map 接收,SpringAMQP 会自动完成反序列化
    @RabbitListener(queues = "object.queue")
    public void listenObjectQueue(Map<String, Object> msg) {
        System.out.printf("消费者接收到object.queue的消息 -> %s%n", msg);
    }
  • 测试:

    image-20230328105934244

SpringAMQP 中消息的序列化和反序列化是怎么实现的?

  • 利用 MessageConverter 实现,默认是 JDK 的序列化
  • 注意发送方与接收方必须使用相同的 MessageConverter

# 4.4 写在最后

队列中的消息一旦消费就会从队列中删除,RabbitMQ 没有消息回溯功能!

更新于 阅读次数

😉觉得写的还不错,请我喝杯咖啡吧😁

独步凌波 微信支付

微信支付

独步凌波 支付宝

支付宝