Skip to content

消息重复陷阱

概述

消息重复是分布式消息系统中常见的问题。在 RabbitMQ 中,由于网络抖动、服务重启、重试机制等原因,消息可能被重复投递。本文档分析消息重复的原因和解决方案。

消息重复场景分析

┌─────────────────────────────────────────────────────────────────────────┐
│                        消息重复场景                                      │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  场景1:生产者重复发送                                                   │
│  ┌──────────┐                                                           │
│  │ Producer │──┬──▶ 消息1 ──▶ Queue                                    │
│  └──────────┘  │                                                        │
│      │         └──▶ 消息1(重复) ──▶ Queue                              │
│      │              (网络超时重试)                                       │
│      ▼                                                                 │
│  原因:发送超时后重试,但原消息已到达                                     │
│                                                                         │
│  场景2:消费者重复消费                                                   │
│  ┌──────────┐                                                           │
│  │ Consumer │──▶ 处理成功 ──▶ ACK失败 ──▶ 消息重新投递                  │
│  └──────────┘                                                           │
│      │                                                                  │
│      ▼                                                                  │
│  原因:处理成功但 ACK 失败,消息被重新投递                                │
│                                                                         │
│  场景3:集群故障恢复                                                     │
│  ┌──────────┐                                                           │
│  │  Node 1  │──▶ 故障 ──▶ 消息在 Node 2 重新投递                        │
│  │ (Master) │                                                           │
│  └──────────┘                                                           │
│      │                                                                  │
│      ▼                                                                  │
│  原因:主节点故障,消息在从节点重新投递                                   │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

常见陷阱场景

陷阱1:无幂等性保护的业务处理

php
<?php

class NonIdempotentConsumer
{
    public function process($message): void
    {
        $data = json_decode($message->body, true);
        
        // 陷阱:直接执行业务逻辑,无幂等性保护
        $this->deductInventory($data['order_id'], $data['sku'], $data['quantity']);
        $this->deductPayment($data['order_id'], $data['amount']);
        $this->sendNotification($data['user_id'], '订单处理成功');
        
        $message->ack();
    }
    
    private function deductInventory($orderId, $sku, $quantity): void
    {
        // 陷阱:重复消费会导致库存多次扣减
        $inventory = Inventory::where('sku', $sku)->first();
        $inventory->stock -= $quantity;
        $inventory->save();
    }
    
    private function deductPayment($orderId, $amount): void
    {
        // 陷阱:重复消费会导致重复扣款
        $account = Account::where('user_id', $userId)->first();
        $account->balance -= $amount;
        $account->save();
    }
}

问题分析

  • 无消息 ID 检查
  • 无状态检查
  • 重复消费导致数据不一致

陷阱2:ACK 时机错误

php
<?php

class WrongAckTimingConsumer
{
    public function process($message): void
    {
        // 陷阱:先 ACK 后处理
        $message->ack();
        
        // 处理过程中可能失败
        $this->doProcess($message);
    }
    
    private function doProcess($message): void
    {
        // 如果这里失败,消息已被确认,无法重试
        // 但如果重新投递,会导致重复处理
        throw new \Exception('Processing failed');
    }
}

陷阱3:重试机制导致重复

php
<?php

class RetryWithoutDeduplication
{
    public function process($message): void
    {
        $data = json_decode($message->body, true);
        
        try {
            $this->callExternalService($data);
            $message->ack();
        } catch (\Exception $e) {
            // 陷阱:重试时无去重机制
            $message->nack(false, true);
        }
    }
    
    private function callExternalService($data): void
    {
        // 外部服务可能已经处理成功
        // 但由于网络问题返回失败
        // 重试会导致重复处理
    }
}

正确做法示例

完整的幂等性保障

php
<?php

namespace App\Messaging\Idempotent;

use PhpAmqpLib\Message\AMQPMessage;
use Redis;

class IdempotentConsumer
{
    private Redis $redis;
    private $logger;
    private string $keyPrefix = 'msg:processed:';
    private int $ttl = 86400;
    
    public function __construct(Redis $redis, $logger)
    {
        $this->redis = $redis;
        $this->logger = $logger;
    }
    
    public function process(AMQPMessage $message): void
    {
        $messageId = $this->extractMessageId($message);
        
        if (empty($messageId)) {
            $this->logger->error('Message missing ID', [
                'body' => $message->body,
            ]);
            $message->ack();
            return;
        }
        
        if ($this->isProcessed($messageId)) {
            $this->logger->info('Message already processed, skipping', [
                'message_id' => $messageId,
            ]);
            $message->ack();
            return;
        }
        
        $lockAcquired = $this->acquireLock($messageId);
        
        if (!$lockAcquired) {
            $this->logger->warning('Message being processed by another consumer', [
                'message_id' => $messageId,
            ]);
            return;
        }
        
        try {
            $result = $this->doProcess($message);
            
            $this->markAsProcessed($messageId, $result);
            
            $message->ack();
            
            $this->logger->info('Message processed successfully', [
                'message_id' => $messageId,
            ]);
        } catch (\Exception $e) {
            $this->logger->error('Message processing failed', [
                'message_id' => $messageId,
                'error' => $e->getMessage(),
            ]);
            
            $this->releaseLock($messageId);
            
            $message->nack(false, true);
        }
    }
    
    private function extractMessageId(AMQPMessage $message): string
    {
        $messageId = $message->get('message_id');
        
        if ($messageId) {
            return $messageId;
        }
        
        $body = json_decode($message->body, true);
        return $body['message_id'] ?? $body['id'] ?? '';
    }
    
    private function isProcessed(string $messageId): bool
    {
        $key = $this->keyPrefix . $messageId;
        return (bool) $this->redis->exists($key);
    }
    
    private function acquireLock(string $messageId): bool
    {
        $lockKey = $this->keyPrefix . $messageId . ':lock';
        $result = $this->redis->set($lockKey, 1, ['NX', 'EX' => 30]);
        return $result !== false;
    }
    
    private function releaseLock(string $messageId): void
    {
        $lockKey = $this->keyPrefix . $messageId . ':lock';
        $this->redis->del($lockKey);
    }
    
    private function markAsProcessed(string $messageId, $result = null): void
    {
        $key = $this->keyPrefix . $messageId;
        
        $this->redis->setex($key, $this->ttl, json_encode([
            'processed_at' => time(),
            'result' => $result,
        ]));
        
        $this->releaseLock($messageId);
    }
    
    private function doProcess(AMQPMessage $message)
    {
        $data = json_decode($message->body, true);
        
        // 业务处理逻辑
        return $this->processBusinessLogic($data);
    }
    
    private function processBusinessLogic(array $data)
    {
        // 具体业务处理
        return true;
    }
}

业务级幂等性设计

php
<?php

namespace App\Messaging\Idempotent;

class OrderProcessingConsumer
{
    private $orderService;
    private $inventoryService;
    private $paymentService;
    private $logger;
    
    public function process($message): void
    {
        $data = json_decode($message->body, true);
        $orderId = $data['order_id'];
        
        // 1. 检查订单状态(业务级幂等)
        $order = $this->orderService->find($orderId);
        
        if (!$order) {
            $this->logger->error('Order not found', ['order_id' => $orderId]);
            $message->ack();
            return;
        }
        
        if ($order->status === 'completed') {
            $this->logger->info('Order already completed', ['order_id' => $orderId]);
            $message->ack();
            return;
        }
        
        if ($order->status === 'processing') {
            $this->logger->info('Order being processed', ['order_id' => $orderId]);
            $message->ack();
            return;
        }
        
        // 2. 使用乐观锁更新状态
        $updated = $this->orderService->updateStatusWithVersion(
            $orderId,
            'pending',
            'processing',
            $order->version
        );
        
        if (!$updated) {
            $this->logger->info('Order status changed by another process', [
                'order_id' => $orderId,
            ]);
            $message->ack();
            return;
        }
        
        try {
            // 3. 执行业务逻辑(每个操作都需要幂等)
            $this->processInventory($order, $data);
            $this->processPayment($order, $data);
            
            // 4. 更新最终状态
            $this->orderService->updateStatus($orderId, 'completed');
            
            $message->ack();
        } catch (\Exception $e) {
            $this->orderService->updateStatus($orderId, 'failed');
            $message->nack(false, true);
        }
    }
    
    private function processInventory($order, $data): void
    {
        foreach ($order->items as $item) {
            // 使用订单ID+SKU作为幂等键
            $deductionId = "deduct:{$order->id}:{$item->sku}";
            
            if ($this->inventoryService->isDeducted($deductionId)) {
                continue;
            }
            
            $this->inventoryService->deductWithId(
                $deductionId,
                $item->sku,
                $item->quantity
            );
        }
    }
    
    private function processPayment($order, $data): void
    {
        // 使用订单ID作为幂等键
        $paymentId = "payment:{$order->id}";
        
        if ($this->paymentService->isProcessed($paymentId)) {
            return;
        }
        
        $this->paymentService->processWithId(
            $paymentId,
            $order->user_id,
            $order->amount
        );
    }
}

数据库幂等性实现

php
<?php

namespace App\Messaging\Idempotent;

use PDO;

class DatabaseIdempotentHandler
{
    private PDO $pdo;
    
    public function __construct(PDO $pdo)
    {
        $this->pdo = $pdo;
    }
    
    public function processWithIdempotence(
        string $messageId,
        string $messageType,
        callable $handler
    ) {
        $this->pdo->beginTransaction();
        
        try {
            // 检查是否已处理
            $stmt = $this->pdo->prepare("
                SELECT id, result FROM message_idempotence 
                WHERE message_id = ? AND message_type = ?
                FOR UPDATE
            ");
            $stmt->execute([$messageId, $messageType]);
            $existing = $stmt->fetch(PDO::FETCH_ASSOC);
            
            if ($existing) {
                $this->pdo->commit();
                return json_decode($existing['result'], true);
            }
            
            // 插入处理记录
            $stmt = $this->pdo->prepare("
                INSERT INTO message_idempotence 
                (message_id, message_type, status, created_at) 
                VALUES (?, ?, 'processing', NOW())
            ");
            $stmt->execute([$messageId, $messageType]);
            
            // 执行业务逻辑
            $result = $handler();
            
            // 更新处理结果
            $stmt = $this->pdo->prepare("
                UPDATE message_idempotence 
                SET status = 'completed', result = ?, completed_at = NOW() 
                WHERE message_id = ? AND message_type = ?
            ");
            $stmt->execute([json_encode($result), $messageId, $messageType]);
            
            $this->pdo->commit();
            
            return $result;
        } catch (\Exception $e) {
            $this->pdo->rollBack();
            throw $e;
        }
    }
}

去重表设计

sql
CREATE TABLE message_deduplication (
    id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
    message_id VARCHAR(64) NOT NULL,
    message_type VARCHAR(64) NOT NULL,
    status ENUM('processing', 'completed', 'failed') NOT NULL DEFAULT 'processing',
    result TEXT,
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    completed_at TIMESTAMP NULL,
    
    UNIQUE KEY uk_message (message_id, message_type),
    INDEX idx_status (status),
    INDEX idx_created_at (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

消息去重策略对比

策略优点缺点适用场景
Redis 去重高性能,实现简单数据可能丢失高并发场景
数据库去重强一致性,可靠性能较低低并发场景
业务状态检查无需额外存储依赖业务设计有状态业务
唯一索引数据库保证需要设计表结构数据库操作

最佳实践建议清单

消息设计

  • [ ] 每条消息包含唯一 ID
  • [ ] 消息格式标准化
  • [ ] 包含业务标识符
  • [ ] 设置合理的 TTL

幂等性实现

  • [ ] 实现消息级幂等
  • [ ] 实现业务级幂等
  • [ ] 使用分布式锁
  • [ ] 设计去重表

消费者设计

  • [ ] 处理前检查重复
  • [ ] 处理后再确认
  • [ ] 记录处理日志
  • [ ] 异常正确处理

监控告警

  • [ ] 监控重复消息比例
  • [ ] 监控幂等存储性能
  • [ ] 配置异常告警
  • [ ] 定期清理过期数据

生产环境注意事项

  1. 幂等键设计

    • 使用业务唯一标识
    • 考虑消息类型隔离
    • 避免键冲突
  2. 存储选择

    • Redis:高性能,适合高并发
    • 数据库:强一致,适合关键业务
    • 混合方案:Redis + 数据库持久化
  3. 过期策略

    • 设置合理的 TTL
    • 定期清理过期记录
    • 考虑业务时效性
  4. 性能优化

    • 使用批量操作
    • 合理设置索引
    • 监控存储性能

相关链接