前言
我们的电商平台履约环节,在整个电商链路中,处于核心的下游位置:用户在订单系统下单,订单系统通过 RocketMQ 发送消息给我们,履约系统消费消息,进行库存锁定、物流单生成、仓库下发等复杂逻辑。
如果说订单系统是收银台,那我们就是后厨。如果 RocketMQ 出现消息积压,仓库就无法及时看到订单,导致发货延迟,直接引发用户的退款和投诉。
graph LR
A[用户下单] --> B[订单系统]
B -- 发送MQ消息 --> C{RocketMQ Cluster}
C -- 消费消息 --> D[履约系统]
D --> E[生成物流单]
D --> F[仓库WMS作业]
style C fill:#f9f,stroke:#333,stroke-width:2px
style D fill:#bbf,stroke:#333,stroke-width:2px
第一次 - 代码逻辑
事故现场
大促活动到来,单量逐步上涨。某天中午高峰期,仓库反馈大量订单在系统中刷不出来,导致打包作业停滞。
排查过程
我们第一时间查看监控,发现 RocketMQ 出现了明显的消息积压。通常消息积压无非两个原因:生产者太快,或者消费者太慢。本次流量在预期内,问题出在消费者。
查看消费者日志,发现单条消息的处理耗时异常高。经过代码走查,我们在处理逻辑中发现了一个典型的 N+1 问题。
业务逻辑需要查询订单明细中的商品信息,当时的写法是在一个 for 循环中,逐个去查询数据库。
// 优化前:循环查库,效率极低
public void processOrder(List<Long> orderIds) {
for (Long id : orderIds) {
// 每次循环发起一次DB调用,包含网络IO和解析开销
User user = userMapper.getUserById(id);
// ... 业务逻辑
}
}
如果有 50 个订单,就要进行 50 次数据库网络交互。这在低并发时无感知,但高并发下就是灾难。
解决方案
批量查询优化
将循环查库改为批量查询,利用 SQL 的 IN 语句,一次交互查出所有数据。
// 优化后:批量查询,一次IO
public List<User> queryUsers(List<User> searchList) {
if (CollectionUtils.isEmpty(searchList)) return Collections.emptyList();
List<Long> ids = searchList.stream()
.map(User::getId)
.collect(Collectors.toList());
// Select * from user where id in ...
return userMapper.getUsersByIds(ids);
}
索引优化
同时我们发现某些多条件查询的 SQL 缺少联合索引,导致扫描行数过多。我们针对高频查询条件补充了联合索引。
上线后,消费速度提升显著,积压问题瞬间解决。
第二次 - MySQL 优化器
事故现场
系统偶尔会出现短暂的消息积压,虽然时间不长,但 DBA 发来了慢查询报警。
排查过程
我查看了履约表的数据量,大约在几百万行,对于 MySQL 来说并不算大。诡异的是,相同的 SQL 语句,WHERE 条件结构一样,只是参数值不同,走的索引却完全不同。
- order_type 为 1 时,走了正确的索引 A,速度快。
- order_type 为 2 时,走了错误的索引 B,速度慢。
根因分析
MySQL 优化器在选择索引时,会参考扫描行数预估、是否需要回表、是否需要排序等因素。
由于多版本并发控制机制,数据库中存在大量已删除或更新的历史版本数据,也就是脏页,导致 MySQL 的采样统计出现偏差。优化器误以为走索引 B 扫描行数更少,结果选错索引,导致全表扫描或低效扫描。
解决方案
我们没有选择重建统计信息这种治标不治本的方法,而是采用了更强硬的手段:Force Index。
在 SQL 中显式强制使用正确的索引,不给优化器胡乱发挥的机会。
SELECT * FROM fulfillment_order FORCE INDEX (idx_create_time_status)
WHERE create_time > ... AND status = ...
强制索引后,慢查询消失,偶发性积压被根除。
第三次 - 数据膨胀
事故现场
由于没有大促,大家比较放松。突然仓库再次投诉,订单下发极其缓慢,延迟高达几分钟。检查 RocketMQ 再次积压,且 SQL 执行计划正常,索引没选错,但查询和写入依然很慢。
排查过程
再次检查履约主表,发现了一个惊人的数字:3000 万行。短短半年,由于业务扩展,单表数据量已突破 MySQL 的舒适区。B+ 树层级变深,无论是磁盘 IO 还是 Buffer Pool 的命中率都急剧下降。
解决方案
虽然分库分表是终极方案,但当时业务不允许长时间停机改造。我们选择了成本最低、见效最快的方案:冷热数据分离。
策略如下
定义热数据为最近 7 天甚至 24 小时的数据。开发一个定时任务,将 create_time 超过 30 天的已完成订单,迁移到历史表中,并从主表删除。
Code snippet
graph TD
A[履约主表] -->|每日定时任务| B{判断日期大于30天}
B -- 是 --> C[写入历史表]
C --> D[从主表物理删除]
B -- 否 --> E[保留]
经过清理,主表数据常年维持在几百万行级别。轻装上阵后,数据库性能恢复如初,积压问题解决。
第四次 - 上游系统
事故现场
某一次非高峰期 RocketMQ 积压报警突然炸了,监控显示积压量瞬间飙升到几十万。
排查过程
这次的时间点比较特殊,是下午非高峰期。订单组半小时前执行了一个批量修复数据的 Job,一次性修正了几万个订单的状态。每个状态变更都会触发 MQ 消息。短时间内数万条消息涌入,我们的消费者根本来不及处理。
尝试方案与局限
通常提升消费速度有两个方案:增加队列数量或者增加消费者节点。但 RocketMQ 的队列数量调整需要重新分配,远水解不了近渴。而单纯加机器受限于队列数量,一个队列只能被一个消费者处理,加了也没用。
最终解决方案 本地线程池并发消费
既然无法横向扩展加机器,那就纵向挖掘单机的性能。
我们改造了消费者的代码结构,将拉取消息和处理消息解耦。主线程只负责从 RocketMQ 拉取消息,不处理业务逻辑。工作线程池负责接收主线程提交的任务并发处理。
动态调整策略,我们将线程池的核心线程数和最大线程数临时调大到 50。
// 线程池消费模型
executorService.submit(() -> {
processBusinessLogic(message);
// 注意处理ACK和Offset提交顺序
});
积压的几十万消息在 20 分钟内被吞噬殆尽。待积压消除后,我们将线程数回调至合理的 8 到 10 个,以避免长期占用过多 CPU 资源。
需要注意的是,使用本地线程池消费并非万能。如果业务严格依赖消息顺序,例如先创建单再取消单,线程池并发处理可能导致乱序。此外,线程过多会导致 CPU 飙升,甚至导致依赖的第三方接口被打挂。
总结与思考
MQ 消息积压是后端开发中绕不开的难题。虽然根本原因是生产速度大于消费速度,但具体的诱因千奇百怪。
我们在实战中总结了一套排查方案:
- 看代码应用层 是否存在 N+1 查询,是否存在复杂的串行逻辑。
- 看数据库存储层 索引对不对,Force Index 此时或许是救命稻草;数据量是否过大,是否需要归档。
- 看架构并发层 是否遇到了突发流量,是否需要引入本地线程池换取时间。