Appearance
消息重复陷阱
概述
消息重复是分布式消息系统中常见的问题。在 RabbitMQ 中,由于网络抖动、服务重启、重试机制等原因,消息可能被重复投递。本文档分析消息重复的原因和解决方案。
消息重复场景分析
┌─────────────────────────────────────────────────────────────────────────┐
│ 消息重复场景 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 场景1:生产者重复发送 │
│ ┌──────────┐ │
│ │ Producer │──┬──▶ 消息1 ──▶ Queue │
│ └──────────┘ │ │
│ │ └──▶ 消息1(重复) ──▶ Queue │
│ │ (网络超时重试) │
│ ▼ │
│ 原因:发送超时后重试,但原消息已到达 │
│ │
│ 场景2:消费者重复消费 │
│ ┌──────────┐ │
│ │ Consumer │──▶ 处理成功 ──▶ ACK失败 ──▶ 消息重新投递 │
│ └──────────┘ │
│ │ │
│ ▼ │
│ 原因:处理成功但 ACK 失败,消息被重新投递 │
│ │
│ 场景3:集群故障恢复 │
│ ┌──────────┐ │
│ │ Node 1 │──▶ 故障 ──▶ 消息在 Node 2 重新投递 │
│ │ (Master) │ │
│ └──────────┘ │
│ │ │
│ ▼ │
│ 原因:主节点故障,消息在从节点重新投递 │
│ │
└─────────────────────────────────────────────────────────────────────────┘常见陷阱场景
陷阱1:无幂等性保护的业务处理
php
<?php
class NonIdempotentConsumer
{
public function process($message): void
{
$data = json_decode($message->body, true);
// 陷阱:直接执行业务逻辑,无幂等性保护
$this->deductInventory($data['order_id'], $data['sku'], $data['quantity']);
$this->deductPayment($data['order_id'], $data['amount']);
$this->sendNotification($data['user_id'], '订单处理成功');
$message->ack();
}
private function deductInventory($orderId, $sku, $quantity): void
{
// 陷阱:重复消费会导致库存多次扣减
$inventory = Inventory::where('sku', $sku)->first();
$inventory->stock -= $quantity;
$inventory->save();
}
private function deductPayment($orderId, $amount): void
{
// 陷阱:重复消费会导致重复扣款
$account = Account::where('user_id', $userId)->first();
$account->balance -= $amount;
$account->save();
}
}问题分析:
- 无消息 ID 检查
- 无状态检查
- 重复消费导致数据不一致
陷阱2:ACK 时机错误
php
<?php
class WrongAckTimingConsumer
{
public function process($message): void
{
// 陷阱:先 ACK 后处理
$message->ack();
// 处理过程中可能失败
$this->doProcess($message);
}
private function doProcess($message): void
{
// 如果这里失败,消息已被确认,无法重试
// 但如果重新投递,会导致重复处理
throw new \Exception('Processing failed');
}
}陷阱3:重试机制导致重复
php
<?php
class RetryWithoutDeduplication
{
public function process($message): void
{
$data = json_decode($message->body, true);
try {
$this->callExternalService($data);
$message->ack();
} catch (\Exception $e) {
// 陷阱:重试时无去重机制
$message->nack(false, true);
}
}
private function callExternalService($data): void
{
// 外部服务可能已经处理成功
// 但由于网络问题返回失败
// 重试会导致重复处理
}
}正确做法示例
完整的幂等性保障
php
<?php
namespace App\Messaging\Idempotent;
use PhpAmqpLib\Message\AMQPMessage;
use Redis;
class IdempotentConsumer
{
private Redis $redis;
private $logger;
private string $keyPrefix = 'msg:processed:';
private int $ttl = 86400;
public function __construct(Redis $redis, $logger)
{
$this->redis = $redis;
$this->logger = $logger;
}
public function process(AMQPMessage $message): void
{
$messageId = $this->extractMessageId($message);
if (empty($messageId)) {
$this->logger->error('Message missing ID', [
'body' => $message->body,
]);
$message->ack();
return;
}
if ($this->isProcessed($messageId)) {
$this->logger->info('Message already processed, skipping', [
'message_id' => $messageId,
]);
$message->ack();
return;
}
$lockAcquired = $this->acquireLock($messageId);
if (!$lockAcquired) {
$this->logger->warning('Message being processed by another consumer', [
'message_id' => $messageId,
]);
return;
}
try {
$result = $this->doProcess($message);
$this->markAsProcessed($messageId, $result);
$message->ack();
$this->logger->info('Message processed successfully', [
'message_id' => $messageId,
]);
} catch (\Exception $e) {
$this->logger->error('Message processing failed', [
'message_id' => $messageId,
'error' => $e->getMessage(),
]);
$this->releaseLock($messageId);
$message->nack(false, true);
}
}
private function extractMessageId(AMQPMessage $message): string
{
$messageId = $message->get('message_id');
if ($messageId) {
return $messageId;
}
$body = json_decode($message->body, true);
return $body['message_id'] ?? $body['id'] ?? '';
}
private function isProcessed(string $messageId): bool
{
$key = $this->keyPrefix . $messageId;
return (bool) $this->redis->exists($key);
}
private function acquireLock(string $messageId): bool
{
$lockKey = $this->keyPrefix . $messageId . ':lock';
$result = $this->redis->set($lockKey, 1, ['NX', 'EX' => 30]);
return $result !== false;
}
private function releaseLock(string $messageId): void
{
$lockKey = $this->keyPrefix . $messageId . ':lock';
$this->redis->del($lockKey);
}
private function markAsProcessed(string $messageId, $result = null): void
{
$key = $this->keyPrefix . $messageId;
$this->redis->setex($key, $this->ttl, json_encode([
'processed_at' => time(),
'result' => $result,
]));
$this->releaseLock($messageId);
}
private function doProcess(AMQPMessage $message)
{
$data = json_decode($message->body, true);
// 业务处理逻辑
return $this->processBusinessLogic($data);
}
private function processBusinessLogic(array $data)
{
// 具体业务处理
return true;
}
}业务级幂等性设计
php
<?php
namespace App\Messaging\Idempotent;
class OrderProcessingConsumer
{
private $orderService;
private $inventoryService;
private $paymentService;
private $logger;
public function process($message): void
{
$data = json_decode($message->body, true);
$orderId = $data['order_id'];
// 1. 检查订单状态(业务级幂等)
$order = $this->orderService->find($orderId);
if (!$order) {
$this->logger->error('Order not found', ['order_id' => $orderId]);
$message->ack();
return;
}
if ($order->status === 'completed') {
$this->logger->info('Order already completed', ['order_id' => $orderId]);
$message->ack();
return;
}
if ($order->status === 'processing') {
$this->logger->info('Order being processed', ['order_id' => $orderId]);
$message->ack();
return;
}
// 2. 使用乐观锁更新状态
$updated = $this->orderService->updateStatusWithVersion(
$orderId,
'pending',
'processing',
$order->version
);
if (!$updated) {
$this->logger->info('Order status changed by another process', [
'order_id' => $orderId,
]);
$message->ack();
return;
}
try {
// 3. 执行业务逻辑(每个操作都需要幂等)
$this->processInventory($order, $data);
$this->processPayment($order, $data);
// 4. 更新最终状态
$this->orderService->updateStatus($orderId, 'completed');
$message->ack();
} catch (\Exception $e) {
$this->orderService->updateStatus($orderId, 'failed');
$message->nack(false, true);
}
}
private function processInventory($order, $data): void
{
foreach ($order->items as $item) {
// 使用订单ID+SKU作为幂等键
$deductionId = "deduct:{$order->id}:{$item->sku}";
if ($this->inventoryService->isDeducted($deductionId)) {
continue;
}
$this->inventoryService->deductWithId(
$deductionId,
$item->sku,
$item->quantity
);
}
}
private function processPayment($order, $data): void
{
// 使用订单ID作为幂等键
$paymentId = "payment:{$order->id}";
if ($this->paymentService->isProcessed($paymentId)) {
return;
}
$this->paymentService->processWithId(
$paymentId,
$order->user_id,
$order->amount
);
}
}数据库幂等性实现
php
<?php
namespace App\Messaging\Idempotent;
use PDO;
class DatabaseIdempotentHandler
{
private PDO $pdo;
public function __construct(PDO $pdo)
{
$this->pdo = $pdo;
}
public function processWithIdempotence(
string $messageId,
string $messageType,
callable $handler
) {
$this->pdo->beginTransaction();
try {
// 检查是否已处理
$stmt = $this->pdo->prepare("
SELECT id, result FROM message_idempotence
WHERE message_id = ? AND message_type = ?
FOR UPDATE
");
$stmt->execute([$messageId, $messageType]);
$existing = $stmt->fetch(PDO::FETCH_ASSOC);
if ($existing) {
$this->pdo->commit();
return json_decode($existing['result'], true);
}
// 插入处理记录
$stmt = $this->pdo->prepare("
INSERT INTO message_idempotence
(message_id, message_type, status, created_at)
VALUES (?, ?, 'processing', NOW())
");
$stmt->execute([$messageId, $messageType]);
// 执行业务逻辑
$result = $handler();
// 更新处理结果
$stmt = $this->pdo->prepare("
UPDATE message_idempotence
SET status = 'completed', result = ?, completed_at = NOW()
WHERE message_id = ? AND message_type = ?
");
$stmt->execute([json_encode($result), $messageId, $messageType]);
$this->pdo->commit();
return $result;
} catch (\Exception $e) {
$this->pdo->rollBack();
throw $e;
}
}
}去重表设计
sql
CREATE TABLE message_deduplication (
id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
message_id VARCHAR(64) NOT NULL,
message_type VARCHAR(64) NOT NULL,
status ENUM('processing', 'completed', 'failed') NOT NULL DEFAULT 'processing',
result TEXT,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
completed_at TIMESTAMP NULL,
UNIQUE KEY uk_message (message_id, message_type),
INDEX idx_status (status),
INDEX idx_created_at (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;消息去重策略对比
| 策略 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| Redis 去重 | 高性能,实现简单 | 数据可能丢失 | 高并发场景 |
| 数据库去重 | 强一致性,可靠 | 性能较低 | 低并发场景 |
| 业务状态检查 | 无需额外存储 | 依赖业务设计 | 有状态业务 |
| 唯一索引 | 数据库保证 | 需要设计表结构 | 数据库操作 |
最佳实践建议清单
消息设计
- [ ] 每条消息包含唯一 ID
- [ ] 消息格式标准化
- [ ] 包含业务标识符
- [ ] 设置合理的 TTL
幂等性实现
- [ ] 实现消息级幂等
- [ ] 实现业务级幂等
- [ ] 使用分布式锁
- [ ] 设计去重表
消费者设计
- [ ] 处理前检查重复
- [ ] 处理后再确认
- [ ] 记录处理日志
- [ ] 异常正确处理
监控告警
- [ ] 监控重复消息比例
- [ ] 监控幂等存储性能
- [ ] 配置异常告警
- [ ] 定期清理过期数据
生产环境注意事项
幂等键设计
- 使用业务唯一标识
- 考虑消息类型隔离
- 避免键冲突
存储选择
- Redis:高性能,适合高并发
- 数据库:强一致,适合关键业务
- 混合方案:Redis + 数据库持久化
过期策略
- 设置合理的 TTL
- 定期清理过期记录
- 考虑业务时效性
性能优化
- 使用批量操作
- 合理设置索引
- 监控存储性能
