Skip to content

重试机制设计

概述

重试机制是分布式系统中处理临时故障的重要手段。合理的重试策略能够在保证系统稳定性的前提下,最大程度地提高消息处理成功率。本文档介绍 RabbitMQ 重试机制的设计与实现。

核心概念

1. 重试的必要性

临时故障类型:
├── 网络抖动
├── 服务短暂不可用
├── 数据库连接超时
├── 外部 API 限流
└── 资源临时不足

2. 重试策略要素

要素说明推荐值
重试次数最大重试次数3-5 次
重试间隔两次重试之间的等待时间指数退避
重试条件哪些错误需要重试可恢复错误
重试上限最大重试总时间根据业务确定

3. 退避策略

固定间隔:每次重试等待相同时间
线性递增:每次重试等待时间线性增加
指数退避:每次重试等待时间指数增加(推荐)
随机抖动:在退避时间上增加随机性

重试架构设计

延迟队列重试模式

┌─────────────────────────────────────────────────────────────────┐
│                        重试流程架构                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────┐    ┌─────────┐    ┌─────────┐                     │
│  │ 消息队列 │───▶│ 消费者  │───▶│ 处理成功 │                     │
│  └─────────┘    └────┬────┘    └─────────┘                     │
│                      │                                          │
│                      │ 处理失败                                 │
│                      ▼                                          │
│                ┌───────────┐                                    │
│                │ 重试判断   │                                    │
│                └─────┬─────┘                                    │
│                      │                                          │
│          ┌───────────┼───────────┐                              │
│          │           │           │                              │
│          ▼           ▼           ▼                              │
│    ┌──────────┐ ┌──────────┐ ┌──────────┐                      │
│    │ 延迟队列 │ │ 死信队列 │ │ 直接丢弃 │                      │
│    │ (重试)   │ │ (放弃)   │ │ (忽略)   │                      │
│    └────┬─────┘ └──────────┘ └──────────┘                      │
│         │                                                       │
│         │ 延迟到期                                               │
│         ▼                                                       │
│    ┌─────────┐                                                  │
│    │ 原队列   │                                                  │
│    └─────────┘                                                  │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

PHP 代码示例

正确做法:完整的重试机制

php
<?php

namespace App\Messaging\Retry;

use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use Psr\Log\LoggerInterface;

class RetryManager
{
    private $channel;
    private LoggerInterface $logger;
    private array $config;
    
    public function __construct($channel, LoggerInterface $logger, array $config = [])
    {
        $this->channel = $channel;
        $this->logger = $logger;
        $this->config = array_merge([
            'max_retries' => 3,
            'initial_delay' => 1000,
            'multiplier' => 2,
            'max_delay' => 60000,
            'jitter' => true,
            'delay_exchange' => 'retry.delayed',
            'dead_letter_exchange' => 'dlx',
        ], $config);
    }
    
    public function handleFailure(
        AMQPMessage $message,
        \Throwable $error,
        string $originalQueue
    ): void {
        $retryCount = $this->getRetryCount($message);
        $errorType = $this->classifyError($error);
        
        $this->logFailure($message, $error, $retryCount);
        
        if (!$this->shouldRetry($error, $retryCount)) {
            $this->sendToDeadLetterQueue($message, $error, $originalQueue);
            return;
        }
        
        $delay = $this->calculateDelay($retryCount);
        
        $this->retryWithDelay($message, $originalQueue, $retryCount + 1, $delay, $error);
    }
    
    private function getRetryCount(AMQPMessage $message): int
    {
        $headers = $message->get('application_headers');
        if ($headers instanceof AMQPTable) {
            $data = $headers->getNativeData();
            return $data['x-retry-count'] ?? 0;
        }
        return 0;
    }
    
    private function classifyError(\Throwable $error): string
    {
        if ($error instanceof NonRetryableException) {
            return 'non_retryable';
        }
        
        if ($error instanceof ValidationException) {
            return 'validation_error';
        }
        
        if ($error instanceof TimeoutException) {
            return 'timeout';
        }
        
        if ($error instanceof ServiceUnavailableException) {
            return 'service_unavailable';
        }
        
        return 'unknown';
    }
    
    private function shouldRetry(\Throwable $error, int $retryCount): bool
    {
        if ($retryCount >= $this->config['max_retries']) {
            return false;
        }
        
        if ($error instanceof NonRetryableException) {
            return false;
        }
        
        if ($error instanceof ValidationException) {
            return false;
        }
        
        return true;
    }
    
    private function calculateDelay(int $retryCount): int
    {
        $baseDelay = $this->config['initial_delay'];
        $multiplier = $this->config['multiplier'];
        $maxDelay = $this->config['max_delay'];
        
        $delay = $baseDelay * pow($multiplier, $retryCount);
        $delay = min($delay, $maxDelay);
        
        if ($this->config['jitter']) {
            $jitter = $delay * 0.1 * (mt_rand(0, 100) / 100);
            $delay = (int) ($delay + $jitter);
        }
        
        return (int) $delay;
    }
    
    private function retryWithDelay(
        AMQPMessage $message,
        string $originalQueue,
        int $newRetryCount,
        int $delayMs,
        \Throwable $error
    ): void {
        $headers = $this->buildRetryHeaders($message, $newRetryCount, $error);
        
        $properties = [
            'content_type' => $message->get('content_type') ?? 'application/json',
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
            'expiration' => (string) $delayMs,
            'application_headers' => new AMQPTable($headers),
        ];
        
        $retryMessage = new AMQPMessage($message->body, $properties);
        
        $routingKey = $this->getRetryRoutingKey($originalQueue, $delayMs);
        
        $this->channel->basic_publish(
            $retryMessage,
            $this->config['delay_exchange'],
            $routingKey
        );
        
        $message->ack();
        
        $this->logger->info('Message scheduled for retry', [
            'message_id' => $message->get('message_id'),
            'retry_count' => $newRetryCount,
            'delay_ms' => $delayMs,
            'original_queue' => $originalQueue,
        ]);
    }
    
    private function buildRetryHeaders(
        AMQPMessage $message,
        int $retryCount,
        \Throwable $error
    ): array {
        $existingHeaders = [];
        $oldHeaders = $message->get('application_headers');
        if ($oldHeaders instanceof AMQPTable) {
            $existingHeaders = $oldHeaders->getNativeData();
        }
        
        $retryHistory = $existingHeaders['x-retry-history'] ?? [];
        $retryHistory[] = [
            'timestamp' => time(),
            'error' => get_class($error),
            'message' => $error->getMessage(),
        ];
        
        return [
            'x-retry-count' => $retryCount,
            'x-first-failure-time' => $existingHeaders['x-first-failure-time'] ?? time(),
            'x-last-failure-time' => time(),
            'x-last-error' => get_class($error),
            'x-last-error-message' => $error->getMessage(),
            'x-retry-history' => $retryHistory,
            'x-original-queue' => $existingHeaders['x-original-queue'] ?? null,
        ];
    }
    
    private function getRetryRoutingKey(string $originalQueue, int $delayMs): string
    {
        return sprintf('retry.%s.%d', $originalQueue, $delayMs);
    }
    
    private function sendToDeadLetterQueue(
        AMQPMessage $message,
        \Throwable $error,
        string $originalQueue
    ): void {
        $headers = $message->get('application_headers');
        $retryCount = 0;
        $retryHistory = [];
        
        if ($headers instanceof AMQPTable) {
            $data = $headers->getNativeData();
            $retryCount = $data['x-retry-count'] ?? 0;
            $retryHistory = $data['x-retry-history'] ?? [];
        }
        
        $dlqMessage = new AMQPMessage(
            json_encode([
                'original_message' => $message->body,
                'original_queue' => $originalQueue,
                'error' => [
                    'class' => get_class($error),
                    'message' => $error->getMessage(),
                    'trace' => $error->getTraceAsString(),
                ],
                'retry_count' => $retryCount,
                'retry_history' => $retryHistory,
                'failed_at' => date('c'),
            ], JSON_UNESCAPED_UNICODE),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
            ]
        );
        
        $this->channel->basic_publish(
            $dlqMessage,
            $this->config['dead_letter_exchange'],
            $originalQueue . '.dlq'
        );
        
        $message->ack();
        
        $this->logger->error('Message sent to dead letter queue', [
            'message_id' => $message->get('message_id'),
            'retry_count' => $retryCount,
            'error' => $error->getMessage(),
        ]);
    }
    
    private function logFailure(
        AMQPMessage $message,
        \Throwable $error,
        int $retryCount
    ): void {
        $this->logger->warning('Message processing failed', [
            'message_id' => $message->get('message_id'),
            'error_class' => get_class($error),
            'error_message' => $error->getMessage(),
            'retry_count' => $retryCount,
        ]);
    }
}

延迟队列配置

php
<?php

namespace App\Messaging\Setup;

class RetryQueueSetup
{
    private $channel;
    
    public function setupRetryInfrastructure(string $originalQueue): void
    {
        $this->declareDelayExchange();
        $this->declareDelayQueues($originalQueue);
        $this->declareDeadLetterExchange($originalQueue);
        $this->configureOriginalQueue($originalQueue);
    }
    
    private function declareDelayExchange(): void
    {
        $this->channel->exchange_declare(
            'retry.delayed',
            'topic',
            false,
            true,
            false
        );
    }
    
    private function declareDelayQueues(string $originalQueue): void
    {
        $delays = [1000, 2000, 4000, 8000, 16000, 32000];
        
        foreach ($delays as $delay) {
            $delayQueue = "retry.{$originalQueue}.delay.{$delay}";
            
            $this->channel->queue_declare(
                $delayQueue,
                false,
                true,
                false,
                false,
                false,
                new \PhpAmqpLib\Wire\AMQPTable([
                    'x-message-ttl' => $delay,
                    'x-dead-letter-exchange' => '',
                    'x-dead-letter-routing-key' => $originalQueue,
                ])
            );
            
            $this->channel->queue_bind(
                $delayQueue,
                'retry.delayed',
                "retry.{$originalQueue}.{$delay}"
            );
        }
    }
    
    private function declareDeadLetterExchange(string $originalQueue): void
    {
        $this->channel->exchange_declare(
            'dlx',
            'direct',
            false,
            true,
            false
        );
        
        $dlq = "{$originalQueue}.dlq";
        
        $this->channel->queue_declare(
            $dlq,
            false,
            true,
            false,
            false
        );
        
        $this->channel->queue_bind($dlq, 'dlx', "{$originalQueue}.dlq");
    }
    
    private function configureOriginalQueue(string $originalQueue): void
    {
        // 原队列已存在,这里可以更新其死信配置
    }
}

消费者集成

php
<?php

namespace App\Messaging\Consumer;

use App\Messaging\Retry\RetryManager;
use PhpAmqpLib\Message\AMQPMessage;

abstract class RetryableConsumer
{
    protected RetryManager $retryManager;
    protected string $queueName;
    
    public function process(AMQPMessage $message): void
    {
        try {
            $result = $this->handle($message);
            
            if ($result === true) {
                $message->ack();
                $this->logSuccess($message);
            } else {
                $this->retryManager->handleFailure(
                    $message,
                    new ProcessingException('Handler returned false'),
                    $this->queueName
                );
            }
        } catch (\Throwable $e) {
            $this->retryManager->handleFailure($message, $e, $this->queueName);
        }
    }
    
    abstract protected function handle(AMQPMessage $message): bool;
    
    protected function logSuccess(AMQPMessage $message): void
    {
        $headers = $message->get('application_headers');
        $retryCount = 0;
        
        if ($headers) {
            $retryCount = $headers->getNativeData()['x-retry-count'] ?? 0;
        }
        
        if ($retryCount > 0) {
            logger()->info('Message processed after retry', [
                'message_id' => $message->get('message_id'),
                'retry_count' => $retryCount,
            ]);
        }
    }
}

错误做法:不当的重试实现

php
<?php

class BadRetryImplementation
{
    public function process($message): void
    {
        // 错误1:无限重试
        while (true) {
            try {
                $this->handle($message);
                $message->ack();
                break;
            } catch (\Exception $e) {
                // 错误2:无延迟重试
                continue;
            }
        }
    }
    
    public function processWithFixedRetry($message): void
    {
        // 错误3:固定重试次数,无延迟
        for ($i = 0; $i < 3; $i++) {
            try {
                $this->handle($message);
                $message->ack();
                return;
            } catch (\Exception $e) {
                // 错误4:所有异常都重试
                if ($i === 2) {
                    // 错误5:重试失败后无处理
                    $message->ack();
                }
            }
        }
    }
    
    public function processWithSleep($message): void
    {
        // 错误6:同步阻塞等待
        for ($i = 0; $i < 3; $i++) {
            try {
                $this->handle($message);
                $message->ack();
                return;
            } catch (\Exception $e) {
                // 错误7:固定延迟,无指数退避
                sleep(1);
            }
        }
        
        // 错误8:最终失败无记录
        $message->nack();
    }
}

高级重试模式

熔断器集成

php
<?php

namespace App\Messaging\Retry;

class CircuitBreaker
{
    private array $circuits = [];
    private int $failureThreshold = 5;
    private int $resetTimeout = 60;
    
    public function isOpen(string $service): bool
    {
        $circuit = $this->circuits[$service] ?? null;
        
        if (!$circuit) {
            return false;
        }
        
        if ($circuit['status'] === 'open') {
            if (time() - $circuit['last_failure'] > $this->resetTimeout) {
                $this->circuits[$service]['status'] = 'half-open';
                return false;
            }
            return true;
        }
        
        return false;
    }
    
    public function recordSuccess(string $service): void
    {
        $this->circuits[$service] = [
            'status' => 'closed',
            'failures' => 0,
            'last_failure' => 0,
        ];
    }
    
    public function recordFailure(string $service): void
    {
        $circuit = $this->circuits[$service] ?? [
            'status' => 'closed',
            'failures' => 0,
            'last_failure' => 0,
        ];
        
        $circuit['failures']++;
        $circuit['last_failure'] = time();
        
        if ($circuit['failures'] >= $this->failureThreshold) {
            $circuit['status'] = 'open';
        }
        
        $this->circuits[$service] = $circuit;
    }
}

class RetryWithCircuitBreaker
{
    private RetryManager $retryManager;
    private CircuitBreaker $circuitBreaker;
    
    public function handleWithCircuitBreaker(
        AMQPMessage $message,
        string $service,
        callable $handler
    ): bool {
        if ($this->circuitBreaker->isOpen($service)) {
            throw new ServiceUnavailableException(
                "Circuit breaker is open for service: {$service}"
            );
        }
        
        try {
            $result = $handler($message);
            $this->circuitBreaker->recordSuccess($service);
            return $result;
        } catch (\Exception $e) {
            $this->circuitBreaker->recordFailure($service);
            throw $e;
        }
    }
}

批量重试

php
<?php

namespace App\Messaging\Retry;

class BatchRetryHandler
{
    private RetryManager $retryManager;
    
    public function processBatch(array $messages, callable $handler): array
    {
        $results = [
            'success' => [],
            'failed' => [],
            'retry' => [],
        ];
        
        foreach ($messages as $message) {
            try {
                $handler($message);
                $results['success'][] = $message;
            } catch (\Throwable $e) {
                if ($this->retryManager->shouldRetry($e, 0)) {
                    $results['retry'][] = [
                        'message' => $message,
                        'error' => $e,
                    ];
                } else {
                    $results['failed'][] = [
                        'message' => $message,
                        'error' => $e,
                    ];
                }
            }
        }
        
        return $results;
    }
    
    public function retryBatch(array $retryItems, callable $handler): array
    {
        $results = [
            'success' => [],
            'failed' => [],
        ];
        
        foreach ($retryItems as $item) {
            try {
                $handler($item['message']);
                $results['success'][] = $item['message'];
            } catch (\Throwable $e) {
                $results['failed'][] = [
                    'message' => $item['message'],
                    'error' => $e,
                ];
            }
        }
        
        return $results;
    }
}

实际应用场景

场景一:支付处理重试

php
<?php

class PaymentConsumer extends RetryableConsumer
{
    protected string $queueName = 'order.payment.process';
    
    protected function handle(AMQPMessage $message): bool
    {
        $data = json_decode($message->body, true);
        
        $payment = $this->paymentService->process([
            'order_id' => $data['order_id'],
            'amount' => $data['amount'],
            'method' => $data['payment_method'],
        ]);
        
        if ($payment->status === 'success') {
            $this->publishPaymentSuccess($data['order_id'], $payment);
            return true;
        }
        
        if ($payment->status === 'pending') {
            throw new TimeoutException('Payment is still pending');
        }
        
        if ($payment->errorCode === 'INSUFFICIENT_FUNDS') {
            throw new NonRetryableException('Insufficient funds');
        }
        
        throw new PaymentException($payment->errorMessage);
    }
}

场景二:API 调用重试

php
<?php

class ApiRetryClient
{
    private RetryManager $retryManager;
    private int $maxRetries = 3;
    
    public function call(string $url, array $params): array
    {
        $attempt = 0;
        $lastError = null;
        
        while ($attempt < $this->maxRetries) {
            try {
                $response = $this->doCall($url, $params);
                
                if ($response['status'] === 200) {
                    return $response['data'];
                }
                
                if ($response['status'] === 429) {
                    throw new RateLimitException('Rate limited');
                }
                
                if ($response['status'] >= 500) {
                    throw new ServerErrorException('Server error');
                }
                
                throw new NonRetryableException("API error: {$response['status']}");
                
            } catch (RateLimitException $e) {
                $lastError = $e;
                $delay = $this->getRetryAfter($response);
                usleep($delay * 1000);
            } catch (ServerErrorException $e) {
                $lastError = $e;
                $delay = $this->calculateDelay($attempt);
                usleep($delay * 1000);
            } catch (NonRetryableException $e) {
                throw $e;
            }
            
            $attempt++;
        }
        
        throw $lastError;
    }
    
    private function calculateDelay(int $attempt): int
    {
        return 1000 * pow(2, $attempt);
    }
    
    private function getRetryAfter(array $response): int
    {
        return $response['headers']['retry-after'] ?? 1000;
    }
    
    private function doCall(string $url, array $params): array
    {
        // HTTP 调用实现
        return [];
    }
}

最佳实践建议清单

重试策略设计

  • [ ] 设置合理的最大重试次数
  • [ ] 使用指数退避算法
  • [ ] 添加随机抖动避免惊群
  • [ ] 设置最大延迟上限
  • [ ] 区分可重试和不可重试错误

重试实现

  • [ ] 使用延迟队列实现异步重试
  • [ ] 记录重试历史和上下文
  • [ ] 实现熔断器保护
  • [ ] 配置死信队列兜底
  • [ ] 监控重试指标

错误分类

  • [ ] 定义明确的异常类型
  • [ ] 标记不可重试异常
  • [ ] 处理业务校验错误
  • [ ] 处理超时和限流错误

生产环境注意事项

  1. 重试配置

    • 根据业务特点调整重试次数
    • 设置合理的延迟范围
    • 监控重试成功率
  2. 资源保护

    • 避免重试风暴
    • 使用熔断器保护下游
    • 限制并发重试数量
  3. 监控告警

    • 监控重试次数分布
    • 监控重试成功率
    • 配置异常告警
  4. 死信处理

    • 定期检查死信队列
    • 建立人工处理流程
    • 分析失败原因

相关链接