1214 字
6 分钟
MQ消息重复消费的原因、解决方案对比
1. 介绍消息重复消费产生的原因
生产端为了保证消息发送成功,可能会重复推送(直到收到成功ACK),会产生重复消息。 一个成熟的MQ Server框架一般会想办法解决,避免存储重复消息(比如:空间换时间,存储已处理过的message_id),给生产端提供一个幂等性的发送消息接口。 消费端却无法根本解决这个问题,在高并发标准要求下,拉取消息+业务处理+提交消费位移需要做事务处理,另外消费端服务可能宕机,很可能会拉取到重复消息。 所以,只能业务端自己做控制,对于已经消费成功的消息,本地数据库表或Redis缓存业务标识,每次处理前先进行校验,保证幂等。
2. 消费端为什么会有消息重复消费问题
ACK 是在「消息处理完成后」才返回的 这就导致了: 如果消费过程中挂了、超时了、异常了…
- RabbitMQ 没有收到 ACK
- 它会认为这条消息「还没有被成功消费」
- 于是再次投递(可能给同一个消费者,也可能是另一个)
所以问题来了: 在消息“还没 ACK”之前,你的业务逻辑可能已经执行了 比如:
// 假设你在消费方法里:createOrder(orderNo); // 执行成功 ✅channel.basicAck(tag, false); // 正准备 ack,此时服务崩溃 ❌
RabbitMQ:哦,我没收到 ack,你没消费成功,我再发一次 ✅
结果:
- createOrder 又执行一次 (重复下单、重复扣款、重复发优惠券)
3. 初步解决消息重复消费问题
自己加幂等判断:Redis SETNX 或数据库唯一约束
Boolean ok = redis.setIfAbsent("order:created:" + orderNo, "1", 1, TimeUnit.DAYS);if (!ok) { // 已处理过,直接 ack 跳过逻辑 channel.basicAck(tag, false); return;}createOrder(orderNo); // 只执行一次channel.basicAck(tag, false); // 最后才 ack
正确的消费端顺序是:
- 消息进入监听器,提取 messageId 或业务 key
- 用 Redis/DB 检查这个 message 是否处理过(幂等性)
- 没处理过 ➜ 执行业务逻辑 ➜ 成功后 ack
- 如果处理过 ➜ 直接 ack,跳过逻辑
小结:ACK vs SETNX
场景 | 作用 |
---|---|
ACK | 确保 MQ 不重复推消息(避免重复投递) |
Redis SETNX/DB | 确保业务逻辑不重复处理(避免重复执行) |
4. Redis SETNX + MQ ACK,看似双保险,但实际上仍然存在两种风险:
情况一:SETNX 后宕机,业务未执行
- Redis 中已记录“已处理”状态;
- 但业务未真正执行(如扣款没发生);
- 后续 RabbitMQ 重发消息,消费端因为 Redis 中已标记,跳过处理;
- ❗导致消息丢失,业务永远不会执行
情况二:业务执行成功,但 ACK 之前宕机
- 业务逻辑已经执行成功(扣款/下单等);
- 但尚未发送 basicAck;
- RabbitMQ 以为失败,重发消息;
- 消费端再次处理,因为 Redis 中没有记录,还会重复执行业务逻辑;
- ❗导致业务重复执行,数据异常
使用 Redis SETNX + ACK 并不能完全规避消息重复消费和消息丢失的问题。它解决了一部分问题,但中间过程依然存在“非原子性风险”。
根本原因:三步操作不是原子事务
- SETNX
- 执行业务逻辑
- ACK
任意一个步骤失败,都会产生“重复 or 丢失”的风险。
5. 理想解决方案
理想解决方式是: 使用 “原子事务” 绑定 幂等记录、业务逻辑 和 ACK,一起成功或失败
方法一:使用数据库事务绑定幂等记录 + 业务逻辑 + ACK
@Transactionalpublic void consumeMessage(...) { if (messageExistsInDb(messageId)) { ack(); return; }
doBusiness(); // 下订单、扣款等 saveMessageRecord(messageId); // 记录已处理 ack(); // 只在事务最后才发送 ack}
- ✔️ 优点:所有动作在一个本地事务中提交或失败
- ✔️ 缺点:需要所有动作都走数据库,牺牲了 Redis 的性能优势
方法二:使用 Redis + 状态标记 + ACK 的最终一致性方案(推荐)
String key = "msg:" + messageId;
// 设置状态为处理中Boolean first = redis.setIfAbsent(key, "PROCESSING", 10, TimeUnit.MINUTES);if (!first) { // 如果状态是 DONE 或正在处理,说明已处理或处理中,跳过 return;}
try { doBusiness(); // 真正的业务逻辑 redis.set(key, "DONE", 1, TimeUnit.DAYS); // 标记业务完成 channel.basicAck(tag, false); // ack 最后再发} catch (Exception e) { redis.delete(key); // 处理失败清除 key,允许下次重试 channel.basicNack(tag, false, true); // 消息重回队列}
- ✔️ 优点:兼顾幂等性与可靠性
- ✔️ 避免业务未执行就标记“已处理”
- ✔️ ACK 始终在最后,确保业务成功才确认
方法三(高级):使用 RocketMQ/Kafka 的事务消息或 exactly-once 语义(RabbitMQ 不支持)
MQ消息重复消费的原因、解决方案对比
https://htglgithub.github.io/AstroBlog/posts/20250703/