Skip to content

幂等性设计

概述

幂等性是指同一操作执行多次与执行一次的效果相同。在消息队列系统中,由于网络抖动、服务重启、消息重试等原因,消息可能会被重复消费,因此幂等性设计至关重要。

核心概念

1. 为什么需要幂等性

消息重复场景:
├── 生产者重复发送
│   ├── 网络超时后重试
│   ├── ACK 丢失后重发
│   └── 事务回滚后重发
├── 消费者重复消费
│   ├── 处理成功但 ACK 失败
│   ├── 消费者重启后重新投递
│   └── 预取消息重新投递
└── 集群场景
    ├── 主从切换期间
    ├── 网络分区恢复后
    └── 镜像队列同步时

2. 幂等性原则

幂等性设计原则:
├── 唯一标识:每条消息有唯一 ID
├── 状态检查:处理前检查是否已处理
├── 原子操作:使用原子性操作保证一致性
├── 结果缓存:缓存处理结果供重复请求使用
└── 过期清理:定期清理过期的幂等记录

3. 幂等性级别

级别说明实现复杂度
弱幂等大概率避免重复,允许少量重复
强幂等严格保证不重复处理
精确一次消息精确处理一次

幂等性架构设计

┌─────────────────────────────────────────────────────────────────┐
│                        幂等性处理流程                            │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────┐                                                    │
│  │ 消息到达 │                                                    │
│  └────┬────┘                                                    │
│       │                                                         │
│       ▼                                                         │
│  ┌─────────────┐                                                │
│  │ 提取消息 ID  │                                                │
│  └──────┬──────┘                                                │
│         │                                                       │
│         ▼                                                       │
│  ┌─────────────┐     ┌─────────────┐                           │
│  │ 查询幂等记录 │────▶│ 已处理?     │                           │
│  └─────────────┘     └──────┬──────┘                           │
│                             │                                   │
│              ┌──────────────┼──────────────┐                    │
│              │ 是           │              │ 否                 │
│              ▼              │              ▼                    │
│        ┌───────────┐        │        ┌───────────┐              │
│        │ 返回缓存   │        │        │ 执行业务   │              │
│        │ 结果      │        │        │ 逻辑      │              │
│        └───────────┘        │        └─────┬─────┘              │
│                             │              │                    │
│                             │              ▼                    │
│                             │        ┌───────────┐              │
│                             │        │ 记录幂等   │              │
│                             │        │ 结果      │              │
│                             │        └───────────┘              │
│                             │                                   │
│                             └──────────────────▶ ACK            │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

PHP 代码示例

正确做法:完整的幂等性实现

php
<?php

namespace App\Messaging\Idempotence;

use Psr\Log\LoggerInterface;
use Redis;

class IdempotenceManager
{
    private Redis $redis;
    private LoggerInterface $logger;
    private string $keyPrefix = 'idempotence:';
    private int $ttl = 86400;
    
    public function __construct(Redis $redis, LoggerInterface $logger, array $config = [])
    {
        $this->redis = $redis;
        $this->logger = $logger;
        $this->keyPrefix = $config['key_prefix'] ?? 'idempotence:';
        $this->ttl = $config['ttl'] ?? 86400;
    }
    
    public function process(
        string $messageId,
        callable $handler,
        array $context = []
    ) {
        $key = $this->buildKey($messageId);
        
        $cached = $this->getCachedResult($key);
        if ($cached !== null) {
            $this->logger->info('Message already processed, returning cached result', [
                'message_id' => $messageId,
            ]);
            return $cached;
        }
        
        $lockAcquired = $this->acquireLock($key);
        if (!$lockAcquired) {
            $this->logger->warning('Failed to acquire lock, waiting for result', [
                'message_id' => $messageId,
            ]);
            return $this->waitForResult($key);
        }
        
        try {
            $result = $handler($context);
            
            $this->saveResult($key, $result);
            
            $this->logger->info('Message processed successfully', [
                'message_id' => $messageId,
            ]);
            
            return $result;
        } finally {
            $this->releaseLock($key);
        }
    }
    
    public function isProcessed(string $messageId): bool
    {
        $key = $this->buildKey($messageId);
        return $this->redis->exists($key) || $this->redis->exists($key . ':result');
    }
    
    public function markAsProcessed(string $messageId, $result = null): void
    {
        $key = $this->buildKey($messageId);
        $this->saveResult($key, $result);
    }
    
    private function buildKey(string $messageId): string
    {
        return $this->keyPrefix . $messageId;
    }
    
    private function getCachedResult(string $key)
    {
        $resultKey = $key . ':result';
        $cached = $this->redis->get($resultKey);
        
        if ($cached !== false) {
            return json_decode($cached, true);
        }
        
        return null;
    }
    
    private function acquireLock(string $key): bool
    {
        $lockKey = $key . ':lock';
        $acquired = $this->redis->set($lockKey, 1, ['NX', 'EX' => 30]);
        
        return $acquired !== false;
    }
    
    private function releaseLock(string $key): void
    {
        $lockKey = $key . ':lock';
        $this->redis->del($lockKey);
    }
    
    private function saveResult(string $key, $result): void
    {
        $resultKey = $key . ':result';
        $this->redis->setex(
            $resultKey,
            $this->ttl,
            json_encode([
                'status' => 'success',
                'result' => $result,
                'processed_at' => time(),
            ])
        );
        
        $this->redis->setex($key, $this->ttl, 1);
    }
    
    private function waitForResult(string $key, int $timeout = 30)
    {
        $startTime = time();
        $resultKey = $key . ':result';
        
        while (time() - $startTime < $timeout) {
            $cached = $this->redis->get($resultKey);
            
            if ($cached !== false) {
                return json_decode($cached, true);
            }
            
            usleep(100000);
        }
        
        throw new IdempotenceException('Timeout waiting for result');
    }
}

数据库幂等性实现

php
<?php

namespace App\Messaging\Idempotence;

use PDO;
use Psr\Log\LoggerInterface;

class DatabaseIdempotenceManager
{
    private PDO $pdo;
    private LoggerInterface $logger;
    private string $tableName = 'message_idempotence';
    
    public function __construct(PDO $pdo, LoggerInterface $logger, string $tableName = null)
    {
        $this->pdo = $pdo;
        $this->logger = $logger;
        $this->tableName = $tableName ?? $this->tableName;
    }
    
    public function process(
        string $messageId,
        string $messageType,
        callable $handler,
        array $context = []
    ) {
        $this->ensureTableExists();
        
        $existing = $this->findRecord($messageId, $messageType);
        
        if ($existing) {
            $this->logger->info('Message already processed', [
                'message_id' => $messageId,
                'message_type' => $messageType,
            ]);
            
            return $this->decodeResult($existing['result']);
        }
        
        try {
            $this->pdo->beginTransaction();
            
            $this->insertPendingRecord($messageId, $messageType);
            
            $result = $handler($context);
            
            $this->updateRecordWithResult($messageId, $messageType, $result);
            
            $this->pdo->commit();
            
            $this->logger->info('Message processed successfully', [
                'message_id' => $messageId,
                'message_type' => $messageType,
            ]);
            
            return $result;
        } catch (\Exception $e) {
            $this->pdo->rollBack();
            throw $e;
        }
    }
    
    private function findRecord(string $messageId, string $messageType): ?array
    {
        $sql = "SELECT * FROM {$this->tableName} 
                WHERE message_id = :message_id 
                AND message_type = :message_type 
                AND status = 'completed'";
        
        $stmt = $this->pdo->prepare($sql);
        $stmt->execute([
            ':message_id' => $messageId,
            ':message_type' => $messageType,
        ]);
        
        return $stmt->fetch(PDO::FETCH_ASSOC) ?: null;
    }
    
    private function insertPendingRecord(string $messageId, string $messageType): void
    {
        $sql = "INSERT INTO {$this->tableName} 
                (message_id, message_type, status, created_at) 
                VALUES (:message_id, :message_type, 'pending', NOW())";
        
        try {
            $stmt = $this->pdo->prepare($sql);
            $stmt->execute([
                ':message_id' => $messageId,
                ':message_type' => $messageType,
            ]);
        } catch (\PDOException $e) {
            if ($this->isDuplicateKeyError($e)) {
                throw new IdempotenceException('Message is being processed by another consumer');
            }
            throw $e;
        }
    }
    
    private function updateRecordWithResult(
        string $messageId,
        string $messageType,
        $result
    ): void {
        $sql = "UPDATE {$this->tableName} 
                SET status = 'completed', 
                    result = :result, 
                    completed_at = NOW() 
                WHERE message_id = :message_id 
                AND message_type = :message_type";
        
        $stmt = $this->pdo->prepare($sql);
        $stmt->execute([
            ':message_id' => $messageId,
            ':message_type' => $messageType,
            ':result' => json_encode($result),
        ]);
    }
    
    private function decodeResult(?string $result)
    {
        if ($result === null) {
            return null;
        }
        return json_decode($result, true);
    }
    
    private function isDuplicateKeyError(\PDOException $e): bool
    {
        return strpos($e->getMessage(), 'Duplicate entry') !== false
            || strpos($e->getMessage(), 'unique constraint') !== false;
    }
    
    private function ensureTableExists(): void
    {
        $sql = "CREATE TABLE IF NOT EXISTS {$this->tableName} (
            id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
            message_id VARCHAR(64) NOT NULL,
            message_type VARCHAR(64) NOT NULL,
            status ENUM('pending', 'completed', 'failed') NOT NULL DEFAULT 'pending',
            result TEXT,
            created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
            completed_at TIMESTAMP NULL,
            UNIQUE KEY uk_message (message_id, message_type),
            INDEX idx_status (status),
            INDEX idx_created_at (created_at)
        ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
        
        $this->pdo->exec($sql);
    }
    
    public function cleanup(int $daysToKeep = 7): int
    {
        $sql = "DELETE FROM {$this->tableName} 
                WHERE created_at < DATE_SUB(NOW(), INTERVAL :days DAY)
                AND status = 'completed'";
        
        $stmt = $this->pdo->prepare($sql);
        $stmt->execute([':days' => $daysToKeep]);
        
        return $stmt->rowCount();
    }
}

消费者幂等性集成

php
<?php

namespace App\Messaging\Consumer;

use App\Messaging\Idempotence\IdempotenceManager;
use PhpAmqpLib\Message\AMQPMessage;

abstract class IdempotentConsumer
{
    protected IdempotenceManager $idempotenceManager;
    
    public function process(AMQPMessage $message): void
    {
        $messageId = $this->extractMessageId($message);
        $messageType = $this->getMessageType();
        
        if (empty($messageId)) {
            $this->handleMissingMessageId($message);
            return;
        }
        
        try {
            $result = $this->idempotenceManager->process(
                $messageId,
                $messageType,
                fn() => $this->handle($message),
                ['message' => $message]
            );
            
            $message->ack();
            
        } catch (IdempotenceException $e) {
            $this->logger->warning('Idempotence check failed', [
                'message_id' => $messageId,
                'error' => $e->getMessage(),
            ]);
            
            $message->ack();
        } catch (\Exception $e) {
            $this->handleError($message, $e);
        }
    }
    
    protected function extractMessageId(AMQPMessage $message): string
    {
        $messageId = $message->get('message_id');
        
        if ($messageId) {
            return $messageId;
        }
        
        $body = json_decode($message->body, true);
        return $body['message_id'] ?? $body['id'] ?? '';
    }
    
    abstract protected function handle(AMQPMessage $message);
    
    abstract protected function getMessageType(): string;
    
    protected function handleMissingMessageId(AMQPMessage $message): void
    {
        $this->logger->error('Message missing ID, cannot ensure idempotence', [
            'body' => $message->body,
        ]);
        
        $message->ack();
    }
    
    protected function handleError(AMQPMessage $message, \Exception $e): void
    {
        // 错误处理逻辑
    }
}

业务幂等性示例

php
<?php

namespace App\Messaging\Consumer;

use App\Messaging\Idempotence\IdempotenceManager;
use PhpAmqpLib\Message\AMQPMessage;

class OrderPaymentConsumer extends IdempotentConsumer
{
    private OrderService $orderService;
    private PaymentService $paymentService;
    private InventoryService $inventoryService;
    
    protected function getMessageType(): string
    {
        return 'order.payment';
    }
    
    protected function handle(AMQPMessage $message)
    {
        $data = json_decode($message->body, true);
        $orderData = $data['data'];
        
        $orderId = $orderData['order_id'];
        
        $order = $this->orderService->find($orderId);
        
        if (!$order) {
            throw new OrderNotFoundException("Order not found: {$orderId}");
        }
        
        if ($order->status === 'paid') {
            return ['status' => 'already_paid', 'order_id' => $orderId];
        }
        
        if ($order->status !== 'pending') {
            throw new InvalidOrderStatusException("Invalid order status: {$order->status}");
        }
        
        $this->orderService->updateStatus($orderId, 'processing');
        
        try {
            $this->processPayment($order, $orderData);
            $this->deductInventory($order);
            $this->orderService->updateStatus($orderId, 'paid');
            
            return ['status' => 'success', 'order_id' => $orderId];
        } catch (\Exception $e) {
            $this->orderService->updateStatus($orderId, 'payment_failed');
            throw $e;
        }
    }
    
    private function processPayment($order, array $orderData): void
    {
        $payment = $this->paymentService->process([
            'order_id' => $order->id,
            'amount' => $order->amount,
            'method' => $orderData['payment_method'],
        ]);
        
        if ($payment->status !== 'success') {
            throw new PaymentFailedException($payment->error_message);
        }
    }
    
    private function deductInventory($order): void
    {
        foreach ($order->items as $item) {
            $this->inventoryService->deduct($item->sku, $item->quantity);
        }
    }
}

错误做法:无幂等性保护

php
<?php

class NonIdempotentConsumer
{
    public function process($message): void
    {
        $data = json_decode($message->body, true);
        
        // 错误1:无消息ID检查
        // 错误2:无状态检查
        // 错误3:直接执行业务逻辑
        
        $this->processPayment($data);
        $this->deductInventory($data);
        $this->sendNotification($data);
        
        // 错误4:重复消费会导致重复扣款、重复扣库存
        $message->ack();
    }
    
    private function processPayment(array $data): void
    {
        // 无幂等性保护,重复调用会重复扣款
        $this->paymentService->charge($data['amount']);
    }
    
    private function deductInventory(array $data): void
    {
        // 无幂等性保护,重复调用会重复扣库存
        foreach ($data['items'] as $item) {
            $this->inventoryService->deduct($item['sku'], $item['quantity']);
        }
    }
}

高级幂等性模式

乐观锁幂等

php
<?php

namespace App\Messaging\Idempotence;

class OptimisticLockIdempotence
{
    private PDO $pdo;
    
    public function processWithOptimisticLock(
        string $resourceType,
        int $resourceId,
        int $expectedVersion,
        callable $handler
    ) {
        $this->pdo->beginTransaction();
        
        try {
            $currentVersion = $this->getResourceVersion($resourceType, $resourceId);
            
            if ($currentVersion !== $expectedVersion) {
                throw new ConcurrentModificationException(
                    "Resource version mismatch: expected {$expectedVersion}, got {$currentVersion}"
                );
            }
            
            $result = $handler();
            
            $this->incrementVersion($resourceType, $resourceId);
            
            $this->pdo->commit();
            
            return $result;
        } catch (\Exception $e) {
            $this->pdo->rollBack();
            throw $e;
        }
    }
    
    private function getResourceVersion(string $type, int $id): int
    {
        $sql = "SELECT version FROM resource_versions 
                WHERE resource_type = :type AND resource_id = :id 
                FOR UPDATE";
        
        $stmt = $this->pdo->prepare($sql);
        $stmt->execute([':type' => $type, ':id' => $id]);
        
        return (int) $stmt->fetchColumn();
    }
    
    private function incrementVersion(string $type, int $id): void
    {
        $sql = "UPDATE resource_versions 
                SET version = version + 1, updated_at = NOW() 
                WHERE resource_type = :type AND resource_id = :id";
        
        $stmt = $this->pdo->prepare($sql);
        $stmt->execute([':type' => $type, ':id' => $id]);
    }
}

分布式锁幂等

php
<?php

namespace App\Messaging\Idempotence;

use Redis;

class DistributedLockIdempotence
{
    private Redis $redis;
    private int $lockTtl = 30;
    
    public function processWithLock(
        string $messageId,
        callable $handler,
        int $timeout = 30
    ) {
        $lockKey = "lock:{$messageId}";
        $resultKey = "result:{$messageId}";
        
        $lock = $this->tryAcquireLock($lockKey);
        
        if (!$lock) {
            return $this->waitForResult($resultKey, $timeout);
        }
        
        try {
            $result = $handler();
            
            $this->saveResult($resultKey, $result);
            
            return $result;
        } finally {
            $this->releaseLock($lockKey);
        }
    }
    
    private function tryAcquireLock(string $key): bool
    {
        $token = bin2hex(random_bytes(16));
        $acquired = $this->redis->set($key, $token, ['NX', 'EX' => $this->lockTtl]);
        
        return $acquired !== false;
    }
    
    private function releaseLock(string $key): void
    {
        $script = "
            if redis.call('get', KEYS[1]) == ARGV[1] then
                return redis.call('del', KEYS[1])
            else
                return 0
            end
        ";
        
        $this->redis->eval($script, [$key], 1);
    }
    
    private function saveResult(string $key, $result): void
    {
        $this->redis->setex($key, 86400, json_encode([
            'result' => $result,
            'processed_at' => time(),
        ]));
    }
    
    private function waitForResult(string $key, int $timeout)
    {
        $startTime = time();
        
        while (time() - $startTime < $timeout) {
            $result = $this->redis->get($key);
            
            if ($result !== false) {
                return json_decode($result, true)['result'];
            }
            
            usleep(100000);
        }
        
        throw new TimeoutException('Timeout waiting for result');
    }
}

实际应用场景

场景一:订单支付幂等

php
<?php

class OrderPaymentIdempotence
{
    public function processPayment(string $orderId, array $paymentData): array
    {
        $idempotenceKey = "payment:{$orderId}:" . md5(json_encode($paymentData));
        
        return $this->idempotenceManager->process(
            $idempotenceKey,
            'payment',
            function () use ($orderId, $paymentData) {
                $order = Order::findOrFail($orderId);
                
                if ($order->status === 'paid') {
                    return ['status' => 'already_paid'];
                }
                
                $payment = $this->paymentService->process($paymentData);
                
                $order->update([
                    'status' => 'paid',
                    'paid_at' => now(),
                    'payment_id' => $payment->id,
                ]);
                
                return ['status' => 'success', 'payment_id' => $payment->id];
            }
        );
    }
}

场景二:库存扣减幂等

php
<?php

class InventoryDeductionIdempotence
{
    public function deduct(string $orderId, string $sku, int $quantity): bool
    {
        $idempotenceKey = "inventory:{$orderId}:{$sku}";
        
        return $this->idempotenceManager->process(
            $idempotenceKey,
            'inventory_deduction',
            function () use ($sku, $quantity, $orderId) {
                $inventory = Inventory::where('sku', $sku)->lockForUpdate()->first();
                
                if ($inventory->stock < $quantity) {
                    throw new InsufficientStockException("Insufficient stock for SKU: {$sku}");
                }
                
                $inventory->decrement('stock', $quantity);
                
                InventoryLog::create([
                    'order_id' => $orderId,
                    'sku' => $sku,
                    'quantity' => $quantity,
                    'type' => 'deduction',
                ]);
                
                return true;
            }
        );
    }
}

最佳实践建议清单

幂等性设计

  • [ ] 为每条消息分配唯一 ID
  • [ ] 选择合适的幂等性存储(Redis/数据库)
  • [ ] 设计合理的幂等性键格式
  • [ ] 设置合理的过期时间
  • [ ] 实现分布式锁保护

幂等性实现

  • [ ] 处理前检查是否已处理
  • [ ] 使用原子性操作
  • [ ] 正确处理并发场景
  • [ ] 缓存处理结果
  • [ ] 记录处理日志

幂等性维护

  • [ ] 定期清理过期记录
  • [ ] 监控幂等性命中率
  • [ ] 监控存储容量
  • [ ] 处理存储故障场景

生产环境注意事项

  1. 存储选择

    • Redis:高性能,适合高并发场景
    • 数据库:强一致性,适合低并发场景
    • 混合方案:Redis + 数据库持久化
  2. 过期策略

    • 设置合理的 TTL
    • 定期清理任务
    • 考虑业务时效性
  3. 故障处理

    • 幂等存储不可用时的降级策略
    • 缓存穿透保护
    • 热点数据处理
  4. 监控告警

    • 监控重复消息比例
    • 监控幂等存储性能
    • 监控存储容量

相关链接