为什么要用MQ?

  1. 应用解耦

假设有支付、订单、短信三个服务,当有一笔订单需要调用这三个服务时,需要在代码中写死这三个服务的调用接口,如果需要新增一个新的服务积分服务,又要在代码中新增调用接口,耦合度太高,不利于管理

此时引入RocketMQ,通过发布不同的topic,需要的服务只需要订阅它就可以了,这样一来,生产者不需要知道我需要调用哪些接口,只管发送消息即可。未来再需要新增服务的时候,只需要在这个服务中订阅消息就可以了。扩展性提高,维护成本降低

生产者不再直接调用多个下游服务,只负责发消息;
谁想处理这条消息,就订阅对应的 Topic。
新增/删除下游服务不需要修改生产者代码。

  1. 异步提速

假设生产者向RocketMQ中发送一条用户下单的消息需要 50ms,生产者向数据库中发送一条消息需要 50ms,支付、订单、短信三个服务接收消息后处理需要 200ms

那么用户得到反馈的时间是多少呢? 50ms + 50ms = 100ms ,前端只需要将消息发送成功,数据库消息写入成功后就可以返回了,后端服务异步处理消息就行了,减少用户等待的时间

把耗时操作(发短信、扣库存、记积分)交给 MQ 异步执行,主流程只做核心步骤,然后立刻返回用户。

  1. 削峰填谷

在订单高峰期可能达到 10万/s 的qps,使用RocketMQ可以将这些消息存入消息队列中,再通过后端服务依次处理,将高峰期挤压的消息按照匀给其他时间处理,可以提高系统稳定性

image-20251121090958962

高峰期把 10 万 QPS 的请求写入 MQ,后端以自身处理能力匀速消费。


Rocket-MQ的优点

  1. 开发语言:Rocket-MQ使用Java开发,更容易上手阅读体验和受众
  2. 社区氛围活跃:RocketMQ是阿里巴巴开源且内部在大量使用的消息队列,说明RocketMQ是经得起考验的,能够针对线上复杂环境的需求场景提供相应的解决方案
  3. 特性丰富:根据RocketMQ官方文档的举例,其高级特性达到了12种,比如顺序消息、事务消息、定时消息等;能够在复杂的业务下尽可能多的提供思路及解决方案

RocketMQ组件

image-20251121091323745

生产者、Topic、消费者之间的关系

image-20251121091355383

  1. 生产者:消息的发送方,负责指定主题向MQ发送消息

生产者如何将消息发送给broker?生产者需要指定topic,和NameServer建立一个tcp长连接,拉取最新的路由信息;通过路由信息找到自己要投递信息的topic分布在哪几台broker上,根据负载均衡向broker发送信息


  1. 消费者:消息的接收方,从主题订阅消息并消费

消费者组是一个具有相同消费逻辑的集合,一个主题可以由多个消费者组进行订阅和消费,作用是实现负载均衡和容错能力

拉取消息时,消费者可能拉主也可能拉从,根据负载决定


  1. Broker:MQ的中间件,负责存储和转发

一个Broker中可以有多个topic

心跳机制

消费者必须每隔30s向Broker发送心跳,如果Broker在120s没有接收到心跳,就会将这个消费者踢出消费者组

生产者非必须可以暂时不发送心跳


  1. NameServer:管理维护

NameServer 完全是 独立的多个节点

管理:生产者、Broker、消费者都需要向NameServer进行注册;生产者通过NameServer知道,自己的topic需要发送到哪一个Broker中;消费者通过NameServer知道,从哪一个Broker中订阅Topic消息

心跳机制:只有Broker每隔30s就会向NameServer发送一次心跳;如果超出时间没有发送,该节点就会被移除,不再参与MQ的任何事件;当心跳恢复后,NameServer会将该节点加入MQ中参与事件

心跳包含什么?

1
2
3
4
5
6
- BrokerName (名字)
- BrokerId (0=master, 1+=slave)
- Broker 地址
- Broker 的读写权限(Perm)
- Broker 的 Topic 列表(TopicConfig)
- Broker 配置版本号(configVersion)

  1. messageQueue:消息的存储单元
  • 唯一标识:每个队列有唯一标识,由topic名称和队列编号组成
  • 消息顺序性:FIFO
  • 负载均衡:动态调整消息分配,消息分配到不同队列中

消息模型

消息发送模式

  1. 同步发送

生产者发送消息后会阻塞当前线程,等待Broker返回结果,只有收到ack确认,才表示消息发送成功

优点:可靠性高;缺点:吞吐量低

使用场景:交易信息,核心订单创建,保证消息发送成功

  1. 异步发送

生产者发送消息后,立即返回,不会阻塞,Broker返回结果时会异步回调用户提供的回调函数

优点:吞吐量高;缺点:需要实现回调接口,消息无法保证成功

使用场景:能接受消息丢失,关注速度

  1. 单向发送

生产者发送消息后,立即返回,不关心发送结果

优点:吞吐量高;缺点:不知道消息是否发送成功,可靠性低

使用场景:日志收集


消费模式

  1. 消费方式

pull:消费者不断检查消息是否发送,如果有了就将消息拉回来

push:消费者提供一个监视器来监视消费者消费的topic,当生产者发送消息到Broker,会立即将该消息推送到消费者

优缺点

  • pull拉:完全掌握数量和速度,避免消息堆积;但不断进行轮询,对中间件有一定压力
  • push推:消息是实时的;但上传速度 > 消费速度,会造成消息堆积
  • 推适合实时性高的场景;拉适合实时性不高的场景
  1. 消息分发

集群:一条消息只能被同一个消费者组中任意一个消费者消费。通过负载均衡可以分配给不同的消费者消费

  • 目的:实现水平扩展和负载均衡,保证消息不会被重复消费
  • 场景:绝大部分解耦,微服务之间的异步通信

广播:一条消息只能被同一个消费者组消费一次

  • 目标:将消息通知给所有订阅者
  • 场景:全局处理,更新全局配置

消费机制

  1. 顺序消费

场景:需要保证创建订单 -> 支付订单 -> 发货 -> 确认收货,这些消息必须按顺序处理

实现

  • 全局顺序消费:我们知道一个topic可以有多个messageQueue,以提高系统的吞吐量;为了保证全局顺序消费,可以将队列数量设置为1,这样下单场景下各个消息一定会按照发送的顺序执行;但是这样设置极大降低了系统的吞吐量,不符合MQ的设计初衷
  • 局部顺序消费:然后我们又引入了局部顺序消费的概念,第一要保证消息顺序发送,对应到MQ中就需要使用同步发送,异步发送无法保证顺序性;第二要保证消息顺序存储,MQ的Topic下会存在多个messageQueue,要保证消息顺序存储,同一个业务编号信息需要存储在一个MessageQueue中,这里可以使用messageQueueSelector来选择需要发送的messageQueue,原理就是对业务编号进行hash,根据队列数量对hash取余,将消息发送到一个messageQueue中;第三要保证消息顺序消费,由于前两步已经保证了消息顺序存储了,在这里只需要考虑顺序处理,RocketMQ会对队列加分布式锁,即同一时刻,一个messageQueue中的消息只能被一个消费者中的一个消费线程消费
  1. 延迟消费

消息发送给Broker后不会立即处理,而是经过一个等待时间再发送给消费者,消费者是无感的

场景:每天5点执行文件清理,每隔2分钟触发一次消息推送等需求

实现:生产者设置message.setDelayTimeLevel(3)指定延迟级别为3; 之后Broker会将消息转移到延迟的Topic中,根据延迟级别存入特定队列,时间到了才会投递到目标Topic中

1
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
  1. 批量消费

消费者一次拉取处理多条消息,性能提升很大,但如果某条消息处理失败,需要整批消息一起重试

场景:日志收集和处理,数据同步迁移

实现:生产者使用 producer.send(Collection msgs); 方法批量发送消息;消费者实现 MessageListenerBatchly 接口或使用 DefaultMQPushConsumerBatch 来消费批量消息。消费者可以设置 consumerMessageBatchMaxSize参数来控制单次拉取的最大消息条数

  1. 事务消息

把 RocketMQ 里的“发消息”这件事,和你本地数据库事务绑在一起,要么都成功,要么都失败,避免出现“DB 成功了,但消息没发出去 / 多发一次”的操作。

实现:见【怎么实现分布式消息事务的?半消息?


RocketMQ怎么实现延迟消费?

消息先发到 Broker,但不是马上被消费者看到,而是等一段时间后再投递给真正的业务 Topic,让消费者正常消费。

RocketMQ 的延迟消息是通过一个内部延迟 Topic(SCHEDULE_TOPIC_XXXX)和延迟级别(delayLevel)来实现的。消息先进入延迟队列,Broker 的定时扫描线程会在延迟时间到了后,重新投递到真实 Topic,消费者才会看到。

image-20251121093023514


指数退避

指数退避是一种重试策略,每次重试失败后等待时间按指数级增长,比如 1s、2s、4s、8s、16s,而不是固定间隔反复重试。

这样可以在对端服务异常或高负载时,避免大量请求短时间内反复冲击,降低雪崩风险。实际工程里一般还会在基础的指数退避上增加随机抖动,把不同客户端的重试时间打散


RocketMQ如何保证消息的可用性/可靠性/不丢失呢?

消息可能在这三个阶段发生丢失:生产阶段、存储阶段、消费阶段

生产阶段

Producer自己发送失败,解决方法:同步发送(不使用单向发送) + 重试机制(默认2次重试)

1
producer.setRetryTimesWhenSendFailed(3);

同步发送中:

  • Producer会等待Broker确认“写入成功”
  • 如果失败会自动重试
  • Producer最后会返回发送失败的异常

存储阶段

消息存储到Broker有两种刷盘方式

  1. 同步刷盘(最可靠)
1
2
3
写入 CommitLog
→ 落盘(fsync)
→ 返回发送成功

只有磁盘真正写成功之后才算发送成功

优点:绝对可靠;缺点:慢

  1. 异步刷盘
1
2
3
4
写入 CommitLog(写到内存)
→ 写到内存,立刻返回成功
→ RocketMQ 会启动一个专门的线程池
→ 异步线程刷盘

如果 Broker 突然宕机,可能丢最近几十毫秒的数据。

image-20251204132805273

Broker 的两种复制机制

  1. 同步复制

消息必须 Master 和 Slave 都写成功 才算成功,同步复制模式可以保证即使Master宕机,消息肯定在Slave中有备份,保证了消息不回丢失。

  1. 异步复制

Master 写完消息立即返回成功,不等待 Slave 写入。

结论】:真正不丢消息的方式,使用同步刷盘,同步复制

消费阶段

Consumer保证消息成功消费的关键在于确认的时机,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。因为消息队列维护了消费的位置,逻辑执行失败了,没有确认,再去队列拉取消息,就还是之前的一条。


如何解决重复消费消息的情况?

RocketMQ只能确保至少一次发送,保证了消息不丢失,但是可能造成消息重复消费。处理消息重复消费的问题,我们只能在Consumer,也就是业务端处理,主要使用的是幂等操作。有一下两种幂等操作,Redis、数据库唯一键

  1. Redis

消费消息时执行SETNX messageId 1:仅当键 messageId 不存在时,才将其值设为 1;如果 messageId 已存在,则不做任何操作。

如果这个命令返回0说明messageId存在,已经处理

返回1表示messageId不存在,是第一次处理,可以执行业务逻辑

优点:性能快,不会入侵业务表结构,使用简单

缺点:Redis数据会过期,不能做永久幂等;Redis会丢失数据导致幂等失败


  1. 数据库唯一键

比如在处理订单支付回调时:

1
2
INSERT INTO pay_log(trans_id, ...) VALUES(...)
-- trans_id 是唯一键

重复插入直接报错,程序捕获后忽略即可。

这是经常使用的幂等操作,真正强幂等只可能落在数据库中。

优点:持久可靠,绝对不会重复执行

缺点:需要一张日志表,写库操作相比较于Redis慢


如何解决消息积压问题?

一般来说,消息积压本质上是消费速度跟不上生产速度,大多是消费者处理不及时导致的。

解决时从消费端入手

  • 先排查慢 SQL 和慢接口,因为一条消息如果要执行复杂查询或者远程调用,消费耗时会变长,线程被长期占用,后面的消息就会在队列里排队,形成积压;
  • 在业务逻辑优化之后,再适当增加消费线程数,提高单实例的并发消费能力;
  • 如果单机已经到瓶颈,会考虑横向扩容,增加多个 consumer 实例加入同一个 consumer group,让更多实例一起分摊队列,提升整体消费速度。

如何实现消息过滤?

有两种方案:

  1. 一种是在 Broker 端按照 Consumer 的去重逻辑进行过滤,这样做的好处是避免了无用的消息传输到 Consumer 端,缺点是加重了 Broker 的负担,实现起来相对复杂。
  2. 另一种是在 Consumer 端过滤,比如按照消息设置的 tag 去重,这样的好处是实现起来简单,缺点是有大量无用的消息到达了 Consumer 端只能丢弃不处理。

一般采用Cosumer端过滤,如果希望提高吞吐量,可以采用Broker过滤。


怎么实现分布式消息事务的?半消息?

在 RocketMQ 里,分布式消息事务是通过 半消息 + 本地事务 + 事务回查 来实现的。
具体流程是:

  1. Producer 先发送一条 半消息 到 Broker,这条消息已经持久化,但对消费者不可见;
  2. 半消息发送成功后,Producer 执行本地事务,比如写订单库;
  3. 本地事务成功则向 Broker 发送 Commit,Broker 会把这条半消息转成正常消息投递给消费者;如果本地事务失败则发送 Rollback,Broker 会丢弃这条半消息;
  4. 对于长时间未提交的半消息,Broker 会定时发起 事务回查,让 Producer 根据本地事务状态返回 Commit 或 Rollback,从而保证最终一致性。
  5. 这样可以保证“本地事务”和“消息发送”要么都成功,要么都失败,避免数据不一致。

image-20251204143435868

死信队列(DLQ)是什么?

死信队列:Dead-Letter Queue

当一条消息:

  • 多次重试仍然失败
  • 或者消费超时
  • 或者消息格式异常无法处理

RocketMQ 不会让这条消息一直卡在正常队列里影响其他消息消费,而是:

👉 把它移到这个专门的“死信队列 DLQ”里隔离起来。

这样的目的有两个:

  1. 把有问题的消息从正常队列里隔离出来,避免继续重试占用资源、阻塞正常消息;
  2. 保留这些失败消息,方便运维和开发后续排查问题、手工修复数据,必要时还能对死信队列中的消息进行人工补偿或重放,而不是直接丢失。

如何保证RocketMQ的高可用?

  1. NameServer高可用

NameServer 是无状态的,可以部署多个实例。Broker 会定时向所有 NameServer 上报路由信息,客户端可以连接任意 NameServer,因此 NameServer 本身不存在单点故障问题,当某一个NameServer挂掉后,其余的NameServer任然可以使用

  1. Broker高可用

Broker 可以采用 Master–Slave 模式部署,Slave 通过同步或异步复制跟随 Master。当 Master 故障时,可以人工切换到 Slave,或者使用 DLedger 支持自动选主,实现更强的一致性和可用性。

了解)DLedger 是 Distributed Ledger(分布式账本) 的缩写,是由阿里巴巴开源的一款轻量级、高性能的分布式日志存储与共识组件,核心定位是为分布式系统提供高可靠、强一致的日志存储,同时内置成熟的 Raft 共识算法实现,解决分布式场景下的日志复制、选主、数据一致性问题。

  1. Consumer 高可用

Consumer 以集群模式消费,一个 consumer group 下可以部署多个实例(服务器),RocketMQ 会对队列做队列重新分配(Rebalance),当某个实例故障时,其队列会被分配给其他实例(服务器)继续消费,从而保证消费端高可用。


为什么RocketMQ不使用Zookeeper作为注册中心呢?


Broker是怎么保存数据的呢?

RocketMQ 的存储是基于:

  • CommitLog(顺序写的大文件,所有消息数据)

Broker 上来的消息,不管哪个 Topic、哪个队列,统统按到达顺序写进 CommitLog,CommitLog 是一组固定大小的大文件(比如每个 1GB,写满了就滚动下一个),写入模式:追加写(append),不做随机修改

  • ConsumeQueue(按 Topic/Queue 维度的“索引文件”)

Consumer 是按 Topic + 队列 来拉消息的,如果只靠 CommitLog 这一条大日志,每次消费需要从文件头扫描到文件尾才能找到数据,于是RocketMQ又搞了一个逻辑结构:ConsumeQueue,它按照Topic + queueId维度拆分

比如:TopicA-Queue0 有一个 ConsumeQueue 文件;TopicA-Queue1 又是一个

ConsumeQueue就像是记录:这个队列的第 N 条消息,在 CommitLog 的 xx 位置,长度是 xx

  • IndexFile(建立哈希索引)

如果只想查找Topic中某一个消息,通过IndexFile可以将key -> CommitLog offset映射起来

.

总结】:Broker 把所有消息顺序写到一条大日志里(CommitLog),再用 ConsumeQueue 给每个 Topic/Queue 建“索引”,核心消费流程还是 CommitLog + ConsumeQueue,有需要的话再用 IndexFile 做按 key 查找。


说说RocketMQ怎么对文件进行读写的?

RocketMQ对文件的读写巧妙地利用了操作系统的一些高效文件读写方式——PageCache顺序读写零拷贝

  1. PageCache + 顺序写入:写几乎和写内存一样快

RocketMQ 写 CommitLog 时采用顺序追加,操作系统会把数据先写入 PageCache,然后后台异步刷盘,因此写入性能接近内存速度。

读取 ConsumeQueue(逻辑队列)时也是顺序访问,PageCache 的预读机制可以让读取基本命中内存,即使消息积压也不会影响性能。

  1. 随机读优化:CommitLog 的读取依赖 PageCache + IO 调度

CommitLog 读取是随机访问,但如果磁盘使用 SSD 并配合 Deadline 调度算法,可以显著降低随机读延迟。

  1. 零拷贝(mmap):把文件“变成内存”直接读写

在传统的方式中,操作系统将数据从磁盘文件发送到网卡,需要经历文件 → 内核态 → 用户态 → 内核态 → 网卡

image-20251205092551320

可以通过零拷贝的方式,减少用户态与内核态的上下文切换和内存拷贝的次数,用来提升I/O的性能。零拷贝比较常见的实现方式是mmap,这种机制在Java中是通过MappedByteBuffer实现的,就可以将它经历优化为文件 → 内核态 → 网卡

image-20251205092735268

RocketMQ消息长轮询了解吗?

所谓的长轮询,就是消费者pull拉消息的时候,Broker检查是否有消息,如果没有消息就挂起PulRequest,等待30s,在此期间如果有新消息写入CommitLog中,会触发回调函数通知PullRequestHoldService,按照Topic + MessageQueue精确唤醒pull请求任务,唤醒后 Broker 重新检查可拉取的消息并返回;如果超过30s就返回空结果,让消费者重试pull