Appearance
幂等性设计
概述
幂等性是指同一操作执行多次与执行一次的效果相同。在消息队列系统中,由于网络抖动、服务重启、消息重试等原因,消息可能会被重复消费,因此幂等性设计至关重要。
核心概念
1. 为什么需要幂等性
消息重复场景:
├── 生产者重复发送
│ ├── 网络超时后重试
│ ├── ACK 丢失后重发
│ └── 事务回滚后重发
├── 消费者重复消费
│ ├── 处理成功但 ACK 失败
│ ├── 消费者重启后重新投递
│ └── 预取消息重新投递
└── 集群场景
├── 主从切换期间
├── 网络分区恢复后
└── 镜像队列同步时2. 幂等性原则
幂等性设计原则:
├── 唯一标识:每条消息有唯一 ID
├── 状态检查:处理前检查是否已处理
├── 原子操作:使用原子性操作保证一致性
├── 结果缓存:缓存处理结果供重复请求使用
└── 过期清理:定期清理过期的幂等记录3. 幂等性级别
| 级别 | 说明 | 实现复杂度 |
|---|---|---|
| 弱幂等 | 大概率避免重复,允许少量重复 | 低 |
| 强幂等 | 严格保证不重复处理 | 中 |
| 精确一次 | 消息精确处理一次 | 高 |
幂等性架构设计
┌─────────────────────────────────────────────────────────────────┐
│ 幂等性处理流程 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ │
│ │ 消息到达 │ │
│ └────┬────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ 提取消息 ID │ │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ 查询幂等记录 │────▶│ 已处理? │ │
│ └─────────────┘ └──────┬──────┘ │
│ │ │
│ ┌──────────────┼──────────────┐ │
│ │ 是 │ │ 否 │
│ ▼ │ ▼ │
│ ┌───────────┐ │ ┌───────────┐ │
│ │ 返回缓存 │ │ │ 执行业务 │ │
│ │ 结果 │ │ │ 逻辑 │ │
│ └───────────┘ │ └─────┬─────┘ │
│ │ │ │
│ │ ▼ │
│ │ ┌───────────┐ │
│ │ │ 记录幂等 │ │
│ │ │ 结果 │ │
│ │ └───────────┘ │
│ │ │
│ └──────────────────▶ ACK │
│ │
└─────────────────────────────────────────────────────────────────┘PHP 代码示例
正确做法:完整的幂等性实现
php
<?php
namespace App\Messaging\Idempotence;
use Psr\Log\LoggerInterface;
use Redis;
class IdempotenceManager
{
private Redis $redis;
private LoggerInterface $logger;
private string $keyPrefix = 'idempotence:';
private int $ttl = 86400;
public function __construct(Redis $redis, LoggerInterface $logger, array $config = [])
{
$this->redis = $redis;
$this->logger = $logger;
$this->keyPrefix = $config['key_prefix'] ?? 'idempotence:';
$this->ttl = $config['ttl'] ?? 86400;
}
public function process(
string $messageId,
callable $handler,
array $context = []
) {
$key = $this->buildKey($messageId);
$cached = $this->getCachedResult($key);
if ($cached !== null) {
$this->logger->info('Message already processed, returning cached result', [
'message_id' => $messageId,
]);
return $cached;
}
$lockAcquired = $this->acquireLock($key);
if (!$lockAcquired) {
$this->logger->warning('Failed to acquire lock, waiting for result', [
'message_id' => $messageId,
]);
return $this->waitForResult($key);
}
try {
$result = $handler($context);
$this->saveResult($key, $result);
$this->logger->info('Message processed successfully', [
'message_id' => $messageId,
]);
return $result;
} finally {
$this->releaseLock($key);
}
}
public function isProcessed(string $messageId): bool
{
$key = $this->buildKey($messageId);
return $this->redis->exists($key) || $this->redis->exists($key . ':result');
}
public function markAsProcessed(string $messageId, $result = null): void
{
$key = $this->buildKey($messageId);
$this->saveResult($key, $result);
}
private function buildKey(string $messageId): string
{
return $this->keyPrefix . $messageId;
}
private function getCachedResult(string $key)
{
$resultKey = $key . ':result';
$cached = $this->redis->get($resultKey);
if ($cached !== false) {
return json_decode($cached, true);
}
return null;
}
private function acquireLock(string $key): bool
{
$lockKey = $key . ':lock';
$acquired = $this->redis->set($lockKey, 1, ['NX', 'EX' => 30]);
return $acquired !== false;
}
private function releaseLock(string $key): void
{
$lockKey = $key . ':lock';
$this->redis->del($lockKey);
}
private function saveResult(string $key, $result): void
{
$resultKey = $key . ':result';
$this->redis->setex(
$resultKey,
$this->ttl,
json_encode([
'status' => 'success',
'result' => $result,
'processed_at' => time(),
])
);
$this->redis->setex($key, $this->ttl, 1);
}
private function waitForResult(string $key, int $timeout = 30)
{
$startTime = time();
$resultKey = $key . ':result';
while (time() - $startTime < $timeout) {
$cached = $this->redis->get($resultKey);
if ($cached !== false) {
return json_decode($cached, true);
}
usleep(100000);
}
throw new IdempotenceException('Timeout waiting for result');
}
}数据库幂等性实现
php
<?php
namespace App\Messaging\Idempotence;
use PDO;
use Psr\Log\LoggerInterface;
class DatabaseIdempotenceManager
{
private PDO $pdo;
private LoggerInterface $logger;
private string $tableName = 'message_idempotence';
public function __construct(PDO $pdo, LoggerInterface $logger, string $tableName = null)
{
$this->pdo = $pdo;
$this->logger = $logger;
$this->tableName = $tableName ?? $this->tableName;
}
public function process(
string $messageId,
string $messageType,
callable $handler,
array $context = []
) {
$this->ensureTableExists();
$existing = $this->findRecord($messageId, $messageType);
if ($existing) {
$this->logger->info('Message already processed', [
'message_id' => $messageId,
'message_type' => $messageType,
]);
return $this->decodeResult($existing['result']);
}
try {
$this->pdo->beginTransaction();
$this->insertPendingRecord($messageId, $messageType);
$result = $handler($context);
$this->updateRecordWithResult($messageId, $messageType, $result);
$this->pdo->commit();
$this->logger->info('Message processed successfully', [
'message_id' => $messageId,
'message_type' => $messageType,
]);
return $result;
} catch (\Exception $e) {
$this->pdo->rollBack();
throw $e;
}
}
private function findRecord(string $messageId, string $messageType): ?array
{
$sql = "SELECT * FROM {$this->tableName}
WHERE message_id = :message_id
AND message_type = :message_type
AND status = 'completed'";
$stmt = $this->pdo->prepare($sql);
$stmt->execute([
':message_id' => $messageId,
':message_type' => $messageType,
]);
return $stmt->fetch(PDO::FETCH_ASSOC) ?: null;
}
private function insertPendingRecord(string $messageId, string $messageType): void
{
$sql = "INSERT INTO {$this->tableName}
(message_id, message_type, status, created_at)
VALUES (:message_id, :message_type, 'pending', NOW())";
try {
$stmt = $this->pdo->prepare($sql);
$stmt->execute([
':message_id' => $messageId,
':message_type' => $messageType,
]);
} catch (\PDOException $e) {
if ($this->isDuplicateKeyError($e)) {
throw new IdempotenceException('Message is being processed by another consumer');
}
throw $e;
}
}
private function updateRecordWithResult(
string $messageId,
string $messageType,
$result
): void {
$sql = "UPDATE {$this->tableName}
SET status = 'completed',
result = :result,
completed_at = NOW()
WHERE message_id = :message_id
AND message_type = :message_type";
$stmt = $this->pdo->prepare($sql);
$stmt->execute([
':message_id' => $messageId,
':message_type' => $messageType,
':result' => json_encode($result),
]);
}
private function decodeResult(?string $result)
{
if ($result === null) {
return null;
}
return json_decode($result, true);
}
private function isDuplicateKeyError(\PDOException $e): bool
{
return strpos($e->getMessage(), 'Duplicate entry') !== false
|| strpos($e->getMessage(), 'unique constraint') !== false;
}
private function ensureTableExists(): void
{
$sql = "CREATE TABLE IF NOT EXISTS {$this->tableName} (
id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
message_id VARCHAR(64) NOT NULL,
message_type VARCHAR(64) NOT NULL,
status ENUM('pending', 'completed', 'failed') NOT NULL DEFAULT 'pending',
result TEXT,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
completed_at TIMESTAMP NULL,
UNIQUE KEY uk_message (message_id, message_type),
INDEX idx_status (status),
INDEX idx_created_at (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
$this->pdo->exec($sql);
}
public function cleanup(int $daysToKeep = 7): int
{
$sql = "DELETE FROM {$this->tableName}
WHERE created_at < DATE_SUB(NOW(), INTERVAL :days DAY)
AND status = 'completed'";
$stmt = $this->pdo->prepare($sql);
$stmt->execute([':days' => $daysToKeep]);
return $stmt->rowCount();
}
}消费者幂等性集成
php
<?php
namespace App\Messaging\Consumer;
use App\Messaging\Idempotence\IdempotenceManager;
use PhpAmqpLib\Message\AMQPMessage;
abstract class IdempotentConsumer
{
protected IdempotenceManager $idempotenceManager;
public function process(AMQPMessage $message): void
{
$messageId = $this->extractMessageId($message);
$messageType = $this->getMessageType();
if (empty($messageId)) {
$this->handleMissingMessageId($message);
return;
}
try {
$result = $this->idempotenceManager->process(
$messageId,
$messageType,
fn() => $this->handle($message),
['message' => $message]
);
$message->ack();
} catch (IdempotenceException $e) {
$this->logger->warning('Idempotence check failed', [
'message_id' => $messageId,
'error' => $e->getMessage(),
]);
$message->ack();
} catch (\Exception $e) {
$this->handleError($message, $e);
}
}
protected function extractMessageId(AMQPMessage $message): string
{
$messageId = $message->get('message_id');
if ($messageId) {
return $messageId;
}
$body = json_decode($message->body, true);
return $body['message_id'] ?? $body['id'] ?? '';
}
abstract protected function handle(AMQPMessage $message);
abstract protected function getMessageType(): string;
protected function handleMissingMessageId(AMQPMessage $message): void
{
$this->logger->error('Message missing ID, cannot ensure idempotence', [
'body' => $message->body,
]);
$message->ack();
}
protected function handleError(AMQPMessage $message, \Exception $e): void
{
// 错误处理逻辑
}
}业务幂等性示例
php
<?php
namespace App\Messaging\Consumer;
use App\Messaging\Idempotence\IdempotenceManager;
use PhpAmqpLib\Message\AMQPMessage;
class OrderPaymentConsumer extends IdempotentConsumer
{
private OrderService $orderService;
private PaymentService $paymentService;
private InventoryService $inventoryService;
protected function getMessageType(): string
{
return 'order.payment';
}
protected function handle(AMQPMessage $message)
{
$data = json_decode($message->body, true);
$orderData = $data['data'];
$orderId = $orderData['order_id'];
$order = $this->orderService->find($orderId);
if (!$order) {
throw new OrderNotFoundException("Order not found: {$orderId}");
}
if ($order->status === 'paid') {
return ['status' => 'already_paid', 'order_id' => $orderId];
}
if ($order->status !== 'pending') {
throw new InvalidOrderStatusException("Invalid order status: {$order->status}");
}
$this->orderService->updateStatus($orderId, 'processing');
try {
$this->processPayment($order, $orderData);
$this->deductInventory($order);
$this->orderService->updateStatus($orderId, 'paid');
return ['status' => 'success', 'order_id' => $orderId];
} catch (\Exception $e) {
$this->orderService->updateStatus($orderId, 'payment_failed');
throw $e;
}
}
private function processPayment($order, array $orderData): void
{
$payment = $this->paymentService->process([
'order_id' => $order->id,
'amount' => $order->amount,
'method' => $orderData['payment_method'],
]);
if ($payment->status !== 'success') {
throw new PaymentFailedException($payment->error_message);
}
}
private function deductInventory($order): void
{
foreach ($order->items as $item) {
$this->inventoryService->deduct($item->sku, $item->quantity);
}
}
}错误做法:无幂等性保护
php
<?php
class NonIdempotentConsumer
{
public function process($message): void
{
$data = json_decode($message->body, true);
// 错误1:无消息ID检查
// 错误2:无状态检查
// 错误3:直接执行业务逻辑
$this->processPayment($data);
$this->deductInventory($data);
$this->sendNotification($data);
// 错误4:重复消费会导致重复扣款、重复扣库存
$message->ack();
}
private function processPayment(array $data): void
{
// 无幂等性保护,重复调用会重复扣款
$this->paymentService->charge($data['amount']);
}
private function deductInventory(array $data): void
{
// 无幂等性保护,重复调用会重复扣库存
foreach ($data['items'] as $item) {
$this->inventoryService->deduct($item['sku'], $item['quantity']);
}
}
}高级幂等性模式
乐观锁幂等
php
<?php
namespace App\Messaging\Idempotence;
class OptimisticLockIdempotence
{
private PDO $pdo;
public function processWithOptimisticLock(
string $resourceType,
int $resourceId,
int $expectedVersion,
callable $handler
) {
$this->pdo->beginTransaction();
try {
$currentVersion = $this->getResourceVersion($resourceType, $resourceId);
if ($currentVersion !== $expectedVersion) {
throw new ConcurrentModificationException(
"Resource version mismatch: expected {$expectedVersion}, got {$currentVersion}"
);
}
$result = $handler();
$this->incrementVersion($resourceType, $resourceId);
$this->pdo->commit();
return $result;
} catch (\Exception $e) {
$this->pdo->rollBack();
throw $e;
}
}
private function getResourceVersion(string $type, int $id): int
{
$sql = "SELECT version FROM resource_versions
WHERE resource_type = :type AND resource_id = :id
FOR UPDATE";
$stmt = $this->pdo->prepare($sql);
$stmt->execute([':type' => $type, ':id' => $id]);
return (int) $stmt->fetchColumn();
}
private function incrementVersion(string $type, int $id): void
{
$sql = "UPDATE resource_versions
SET version = version + 1, updated_at = NOW()
WHERE resource_type = :type AND resource_id = :id";
$stmt = $this->pdo->prepare($sql);
$stmt->execute([':type' => $type, ':id' => $id]);
}
}分布式锁幂等
php
<?php
namespace App\Messaging\Idempotence;
use Redis;
class DistributedLockIdempotence
{
private Redis $redis;
private int $lockTtl = 30;
public function processWithLock(
string $messageId,
callable $handler,
int $timeout = 30
) {
$lockKey = "lock:{$messageId}";
$resultKey = "result:{$messageId}";
$lock = $this->tryAcquireLock($lockKey);
if (!$lock) {
return $this->waitForResult($resultKey, $timeout);
}
try {
$result = $handler();
$this->saveResult($resultKey, $result);
return $result;
} finally {
$this->releaseLock($lockKey);
}
}
private function tryAcquireLock(string $key): bool
{
$token = bin2hex(random_bytes(16));
$acquired = $this->redis->set($key, $token, ['NX', 'EX' => $this->lockTtl]);
return $acquired !== false;
}
private function releaseLock(string $key): void
{
$script = "
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
";
$this->redis->eval($script, [$key], 1);
}
private function saveResult(string $key, $result): void
{
$this->redis->setex($key, 86400, json_encode([
'result' => $result,
'processed_at' => time(),
]));
}
private function waitForResult(string $key, int $timeout)
{
$startTime = time();
while (time() - $startTime < $timeout) {
$result = $this->redis->get($key);
if ($result !== false) {
return json_decode($result, true)['result'];
}
usleep(100000);
}
throw new TimeoutException('Timeout waiting for result');
}
}实际应用场景
场景一:订单支付幂等
php
<?php
class OrderPaymentIdempotence
{
public function processPayment(string $orderId, array $paymentData): array
{
$idempotenceKey = "payment:{$orderId}:" . md5(json_encode($paymentData));
return $this->idempotenceManager->process(
$idempotenceKey,
'payment',
function () use ($orderId, $paymentData) {
$order = Order::findOrFail($orderId);
if ($order->status === 'paid') {
return ['status' => 'already_paid'];
}
$payment = $this->paymentService->process($paymentData);
$order->update([
'status' => 'paid',
'paid_at' => now(),
'payment_id' => $payment->id,
]);
return ['status' => 'success', 'payment_id' => $payment->id];
}
);
}
}场景二:库存扣减幂等
php
<?php
class InventoryDeductionIdempotence
{
public function deduct(string $orderId, string $sku, int $quantity): bool
{
$idempotenceKey = "inventory:{$orderId}:{$sku}";
return $this->idempotenceManager->process(
$idempotenceKey,
'inventory_deduction',
function () use ($sku, $quantity, $orderId) {
$inventory = Inventory::where('sku', $sku)->lockForUpdate()->first();
if ($inventory->stock < $quantity) {
throw new InsufficientStockException("Insufficient stock for SKU: {$sku}");
}
$inventory->decrement('stock', $quantity);
InventoryLog::create([
'order_id' => $orderId,
'sku' => $sku,
'quantity' => $quantity,
'type' => 'deduction',
]);
return true;
}
);
}
}最佳实践建议清单
幂等性设计
- [ ] 为每条消息分配唯一 ID
- [ ] 选择合适的幂等性存储(Redis/数据库)
- [ ] 设计合理的幂等性键格式
- [ ] 设置合理的过期时间
- [ ] 实现分布式锁保护
幂等性实现
- [ ] 处理前检查是否已处理
- [ ] 使用原子性操作
- [ ] 正确处理并发场景
- [ ] 缓存处理结果
- [ ] 记录处理日志
幂等性维护
- [ ] 定期清理过期记录
- [ ] 监控幂等性命中率
- [ ] 监控存储容量
- [ ] 处理存储故障场景
生产环境注意事项
存储选择
- Redis:高性能,适合高并发场景
- 数据库:强一致性,适合低并发场景
- 混合方案:Redis + 数据库持久化
过期策略
- 设置合理的 TTL
- 定期清理任务
- 考虑业务时效性
故障处理
- 幂等存储不可用时的降级策略
- 缓存穿透保护
- 热点数据处理
监控告警
- 监控重复消息比例
- 监控幂等存储性能
- 监控存储容量
