1214 字
6 分钟
MQ消息重复消费的原因、解决方案对比

1. 介绍消息重复消费产生的原因#

生产端为了保证消息发送成功,可能会重复推送(直到收到成功ACK),会产生重复消息。 一个成熟的MQ Server框架一般会想办法解决,避免存储重复消息(比如:空间换时间,存储已处理过的message_id),给生产端提供一个幂等性的发送消息接口。 消费端却无法根本解决这个问题,在高并发标准要求下,拉取消息+业务处理+提交消费位移需要做事务处理,另外消费端服务可能宕机,很可能会拉取到重复消息。 所以,只能业务端自己做控制,对于已经消费成功的消息,本地数据库表或Redis缓存业务标识,每次处理前先进行校验,保证幂等。

2. 消费端为什么会有消息重复消费问题#

ACK 是在「消息处理完成后」才返回的 这就导致了: 如果消费过程中挂了、超时了、异常了…

  • RabbitMQ 没有收到 ACK
  • 它会认为这条消息「还没有被成功消费」
  • 于是再次投递(可能给同一个消费者,也可能是另一个)

所以问题来了: 在消息“还没 ACK”之前,你的业务逻辑可能已经执行了 比如:

// 假设你在消费方法里:
createOrder(orderNo); // 执行成功 ✅
channel.basicAck(tag, false); // 正准备 ack,此时服务崩溃 ❌

RabbitMQ:哦,我没收到 ack,你没消费成功,我再发一次 ✅

结果:

  • createOrder 又执行一次 (重复下单、重复扣款、重复发优惠券)

3. 初步解决消息重复消费问题#

自己加幂等判断:Redis SETNX 或数据库唯一约束

Boolean ok = redis.setIfAbsent("order:created:" + orderNo, "1", 1, TimeUnit.DAYS);
if (!ok) {
// 已处理过,直接 ack 跳过逻辑
channel.basicAck(tag, false);
return;
}
createOrder(orderNo); // 只执行一次
channel.basicAck(tag, false); // 最后才 ack

正确的消费端顺序是:

  1. 消息进入监听器,提取 messageId 或业务 key
  2. 用 Redis/DB 检查这个 message 是否处理过(幂等性)
  3. 没处理过 ➜ 执行业务逻辑 ➜ 成功后 ack
  4. 如果处理过 ➜ 直接 ack,跳过逻辑

小结:ACK vs SETNX

场景作用
ACK确保 MQ 不重复推消息(避免重复投递)
Redis SETNX/DB确保业务逻辑不重复处理(避免重复执行)

4. Redis SETNX + MQ ACK,看似双保险,但实际上仍然存在两种风险:#

情况一:SETNX 后宕机,业务未执行

  • Redis 中已记录“已处理”状态;
  • 但业务未真正执行(如扣款没发生);
  • 后续 RabbitMQ 重发消息,消费端因为 Redis 中已标记,跳过处理;
  • ❗导致消息丢失,业务永远不会执行

情况二:业务执行成功,但 ACK 之前宕机

  • 业务逻辑已经执行成功(扣款/下单等);
  • 但尚未发送 basicAck;
  • RabbitMQ 以为失败,重发消息;
  • 消费端再次处理,因为 Redis 中没有记录,还会重复执行业务逻辑;
  • ❗导致业务重复执行,数据异常

使用 Redis SETNX + ACK 并不能完全规避消息重复消费和消息丢失的问题。它解决了一部分问题,但中间过程依然存在“非原子性风险”。

根本原因:三步操作不是原子事务

  1. SETNX
  2. 执行业务逻辑
  3. ACK

任意一个步骤失败,都会产生“重复 or 丢失”的风险。

5. 理想解决方案#

理想解决方式是: 使用 “原子事务” 绑定 幂等记录、业务逻辑 和 ACK,一起成功或失败

方法一:使用数据库事务绑定幂等记录 + 业务逻辑 + ACK#

@Transactional
public void consumeMessage(...) {
if (messageExistsInDb(messageId)) {
ack(); return;
}
doBusiness(); // 下订单、扣款等
saveMessageRecord(messageId); // 记录已处理
ack(); // 只在事务最后才发送 ack
}
  • ✔️ 优点:所有动作在一个本地事务中提交或失败
  • ✔️ 缺点:需要所有动作都走数据库,牺牲了 Redis 的性能优势

方法二:使用 Redis + 状态标记 + ACK 的最终一致性方案(推荐)#

String key = "msg:" + messageId;
// 设置状态为处理中
Boolean first = redis.setIfAbsent(key, "PROCESSING", 10, TimeUnit.MINUTES);
if (!first) {
// 如果状态是 DONE 或正在处理,说明已处理或处理中,跳过
return;
}
try {
doBusiness(); // 真正的业务逻辑
redis.set(key, "DONE", 1, TimeUnit.DAYS); // 标记业务完成
channel.basicAck(tag, false); // ack 最后再发
} catch (Exception e) {
redis.delete(key); // 处理失败清除 key,允许下次重试
channel.basicNack(tag, false, true); // 消息重回队列
}
  • ✔️ 优点:兼顾幂等性与可靠性
  • ✔️ 避免业务未执行就标记“已处理”
  • ✔️ ACK 始终在最后,确保业务成功才确认

方法三(高级):使用 RocketMQ/Kafka 的事务消息或 exactly-once 语义(RabbitMQ 不支持)#

MQ消息重复消费的原因、解决方案对比
https://htglgithub.github.io/AstroBlog/posts/20250703/
作者
Wok
发布于
2025-07-03
许可协议
CC BY-NC-SA 4.0