Appearance
重试机制设计
概述
重试机制是分布式系统中处理临时故障的重要手段。合理的重试策略能够在保证系统稳定性的前提下,最大程度地提高消息处理成功率。本文档介绍 RabbitMQ 重试机制的设计与实现。
核心概念
1. 重试的必要性
临时故障类型:
├── 网络抖动
├── 服务短暂不可用
├── 数据库连接超时
├── 外部 API 限流
└── 资源临时不足2. 重试策略要素
| 要素 | 说明 | 推荐值 |
|---|---|---|
| 重试次数 | 最大重试次数 | 3-5 次 |
| 重试间隔 | 两次重试之间的等待时间 | 指数退避 |
| 重试条件 | 哪些错误需要重试 | 可恢复错误 |
| 重试上限 | 最大重试总时间 | 根据业务确定 |
3. 退避策略
固定间隔:每次重试等待相同时间
线性递增:每次重试等待时间线性增加
指数退避:每次重试等待时间指数增加(推荐)
随机抖动:在退避时间上增加随机性重试架构设计
延迟队列重试模式
┌─────────────────────────────────────────────────────────────────┐
│ 重试流程架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 消息队列 │───▶│ 消费者 │───▶│ 处理成功 │ │
│ └─────────┘ └────┬────┘ └─────────┘ │
│ │ │
│ │ 处理失败 │
│ ▼ │
│ ┌───────────┐ │
│ │ 重试判断 │ │
│ └─────┬─────┘ │
│ │ │
│ ┌───────────┼───────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 延迟队列 │ │ 死信队列 │ │ 直接丢弃 │ │
│ │ (重试) │ │ (放弃) │ │ (忽略) │ │
│ └────┬─────┘ └──────────┘ └──────────┘ │
│ │ │
│ │ 延迟到期 │
│ ▼ │
│ ┌─────────┐ │
│ │ 原队列 │ │
│ └─────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘PHP 代码示例
正确做法:完整的重试机制
php
<?php
namespace App\Messaging\Retry;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use Psr\Log\LoggerInterface;
class RetryManager
{
private $channel;
private LoggerInterface $logger;
private array $config;
public function __construct($channel, LoggerInterface $logger, array $config = [])
{
$this->channel = $channel;
$this->logger = $logger;
$this->config = array_merge([
'max_retries' => 3,
'initial_delay' => 1000,
'multiplier' => 2,
'max_delay' => 60000,
'jitter' => true,
'delay_exchange' => 'retry.delayed',
'dead_letter_exchange' => 'dlx',
], $config);
}
public function handleFailure(
AMQPMessage $message,
\Throwable $error,
string $originalQueue
): void {
$retryCount = $this->getRetryCount($message);
$errorType = $this->classifyError($error);
$this->logFailure($message, $error, $retryCount);
if (!$this->shouldRetry($error, $retryCount)) {
$this->sendToDeadLetterQueue($message, $error, $originalQueue);
return;
}
$delay = $this->calculateDelay($retryCount);
$this->retryWithDelay($message, $originalQueue, $retryCount + 1, $delay, $error);
}
private function getRetryCount(AMQPMessage $message): int
{
$headers = $message->get('application_headers');
if ($headers instanceof AMQPTable) {
$data = $headers->getNativeData();
return $data['x-retry-count'] ?? 0;
}
return 0;
}
private function classifyError(\Throwable $error): string
{
if ($error instanceof NonRetryableException) {
return 'non_retryable';
}
if ($error instanceof ValidationException) {
return 'validation_error';
}
if ($error instanceof TimeoutException) {
return 'timeout';
}
if ($error instanceof ServiceUnavailableException) {
return 'service_unavailable';
}
return 'unknown';
}
private function shouldRetry(\Throwable $error, int $retryCount): bool
{
if ($retryCount >= $this->config['max_retries']) {
return false;
}
if ($error instanceof NonRetryableException) {
return false;
}
if ($error instanceof ValidationException) {
return false;
}
return true;
}
private function calculateDelay(int $retryCount): int
{
$baseDelay = $this->config['initial_delay'];
$multiplier = $this->config['multiplier'];
$maxDelay = $this->config['max_delay'];
$delay = $baseDelay * pow($multiplier, $retryCount);
$delay = min($delay, $maxDelay);
if ($this->config['jitter']) {
$jitter = $delay * 0.1 * (mt_rand(0, 100) / 100);
$delay = (int) ($delay + $jitter);
}
return (int) $delay;
}
private function retryWithDelay(
AMQPMessage $message,
string $originalQueue,
int $newRetryCount,
int $delayMs,
\Throwable $error
): void {
$headers = $this->buildRetryHeaders($message, $newRetryCount, $error);
$properties = [
'content_type' => $message->get('content_type') ?? 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'expiration' => (string) $delayMs,
'application_headers' => new AMQPTable($headers),
];
$retryMessage = new AMQPMessage($message->body, $properties);
$routingKey = $this->getRetryRoutingKey($originalQueue, $delayMs);
$this->channel->basic_publish(
$retryMessage,
$this->config['delay_exchange'],
$routingKey
);
$message->ack();
$this->logger->info('Message scheduled for retry', [
'message_id' => $message->get('message_id'),
'retry_count' => $newRetryCount,
'delay_ms' => $delayMs,
'original_queue' => $originalQueue,
]);
}
private function buildRetryHeaders(
AMQPMessage $message,
int $retryCount,
\Throwable $error
): array {
$existingHeaders = [];
$oldHeaders = $message->get('application_headers');
if ($oldHeaders instanceof AMQPTable) {
$existingHeaders = $oldHeaders->getNativeData();
}
$retryHistory = $existingHeaders['x-retry-history'] ?? [];
$retryHistory[] = [
'timestamp' => time(),
'error' => get_class($error),
'message' => $error->getMessage(),
];
return [
'x-retry-count' => $retryCount,
'x-first-failure-time' => $existingHeaders['x-first-failure-time'] ?? time(),
'x-last-failure-time' => time(),
'x-last-error' => get_class($error),
'x-last-error-message' => $error->getMessage(),
'x-retry-history' => $retryHistory,
'x-original-queue' => $existingHeaders['x-original-queue'] ?? null,
];
}
private function getRetryRoutingKey(string $originalQueue, int $delayMs): string
{
return sprintf('retry.%s.%d', $originalQueue, $delayMs);
}
private function sendToDeadLetterQueue(
AMQPMessage $message,
\Throwable $error,
string $originalQueue
): void {
$headers = $message->get('application_headers');
$retryCount = 0;
$retryHistory = [];
if ($headers instanceof AMQPTable) {
$data = $headers->getNativeData();
$retryCount = $data['x-retry-count'] ?? 0;
$retryHistory = $data['x-retry-history'] ?? [];
}
$dlqMessage = new AMQPMessage(
json_encode([
'original_message' => $message->body,
'original_queue' => $originalQueue,
'error' => [
'class' => get_class($error),
'message' => $error->getMessage(),
'trace' => $error->getTraceAsString(),
],
'retry_count' => $retryCount,
'retry_history' => $retryHistory,
'failed_at' => date('c'),
], JSON_UNESCAPED_UNICODE),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]
);
$this->channel->basic_publish(
$dlqMessage,
$this->config['dead_letter_exchange'],
$originalQueue . '.dlq'
);
$message->ack();
$this->logger->error('Message sent to dead letter queue', [
'message_id' => $message->get('message_id'),
'retry_count' => $retryCount,
'error' => $error->getMessage(),
]);
}
private function logFailure(
AMQPMessage $message,
\Throwable $error,
int $retryCount
): void {
$this->logger->warning('Message processing failed', [
'message_id' => $message->get('message_id'),
'error_class' => get_class($error),
'error_message' => $error->getMessage(),
'retry_count' => $retryCount,
]);
}
}延迟队列配置
php
<?php
namespace App\Messaging\Setup;
class RetryQueueSetup
{
private $channel;
public function setupRetryInfrastructure(string $originalQueue): void
{
$this->declareDelayExchange();
$this->declareDelayQueues($originalQueue);
$this->declareDeadLetterExchange($originalQueue);
$this->configureOriginalQueue($originalQueue);
}
private function declareDelayExchange(): void
{
$this->channel->exchange_declare(
'retry.delayed',
'topic',
false,
true,
false
);
}
private function declareDelayQueues(string $originalQueue): void
{
$delays = [1000, 2000, 4000, 8000, 16000, 32000];
foreach ($delays as $delay) {
$delayQueue = "retry.{$originalQueue}.delay.{$delay}";
$this->channel->queue_declare(
$delayQueue,
false,
true,
false,
false,
false,
new \PhpAmqpLib\Wire\AMQPTable([
'x-message-ttl' => $delay,
'x-dead-letter-exchange' => '',
'x-dead-letter-routing-key' => $originalQueue,
])
);
$this->channel->queue_bind(
$delayQueue,
'retry.delayed',
"retry.{$originalQueue}.{$delay}"
);
}
}
private function declareDeadLetterExchange(string $originalQueue): void
{
$this->channel->exchange_declare(
'dlx',
'direct',
false,
true,
false
);
$dlq = "{$originalQueue}.dlq";
$this->channel->queue_declare(
$dlq,
false,
true,
false,
false
);
$this->channel->queue_bind($dlq, 'dlx', "{$originalQueue}.dlq");
}
private function configureOriginalQueue(string $originalQueue): void
{
// 原队列已存在,这里可以更新其死信配置
}
}消费者集成
php
<?php
namespace App\Messaging\Consumer;
use App\Messaging\Retry\RetryManager;
use PhpAmqpLib\Message\AMQPMessage;
abstract class RetryableConsumer
{
protected RetryManager $retryManager;
protected string $queueName;
public function process(AMQPMessage $message): void
{
try {
$result = $this->handle($message);
if ($result === true) {
$message->ack();
$this->logSuccess($message);
} else {
$this->retryManager->handleFailure(
$message,
new ProcessingException('Handler returned false'),
$this->queueName
);
}
} catch (\Throwable $e) {
$this->retryManager->handleFailure($message, $e, $this->queueName);
}
}
abstract protected function handle(AMQPMessage $message): bool;
protected function logSuccess(AMQPMessage $message): void
{
$headers = $message->get('application_headers');
$retryCount = 0;
if ($headers) {
$retryCount = $headers->getNativeData()['x-retry-count'] ?? 0;
}
if ($retryCount > 0) {
logger()->info('Message processed after retry', [
'message_id' => $message->get('message_id'),
'retry_count' => $retryCount,
]);
}
}
}错误做法:不当的重试实现
php
<?php
class BadRetryImplementation
{
public function process($message): void
{
// 错误1:无限重试
while (true) {
try {
$this->handle($message);
$message->ack();
break;
} catch (\Exception $e) {
// 错误2:无延迟重试
continue;
}
}
}
public function processWithFixedRetry($message): void
{
// 错误3:固定重试次数,无延迟
for ($i = 0; $i < 3; $i++) {
try {
$this->handle($message);
$message->ack();
return;
} catch (\Exception $e) {
// 错误4:所有异常都重试
if ($i === 2) {
// 错误5:重试失败后无处理
$message->ack();
}
}
}
}
public function processWithSleep($message): void
{
// 错误6:同步阻塞等待
for ($i = 0; $i < 3; $i++) {
try {
$this->handle($message);
$message->ack();
return;
} catch (\Exception $e) {
// 错误7:固定延迟,无指数退避
sleep(1);
}
}
// 错误8:最终失败无记录
$message->nack();
}
}高级重试模式
熔断器集成
php
<?php
namespace App\Messaging\Retry;
class CircuitBreaker
{
private array $circuits = [];
private int $failureThreshold = 5;
private int $resetTimeout = 60;
public function isOpen(string $service): bool
{
$circuit = $this->circuits[$service] ?? null;
if (!$circuit) {
return false;
}
if ($circuit['status'] === 'open') {
if (time() - $circuit['last_failure'] > $this->resetTimeout) {
$this->circuits[$service]['status'] = 'half-open';
return false;
}
return true;
}
return false;
}
public function recordSuccess(string $service): void
{
$this->circuits[$service] = [
'status' => 'closed',
'failures' => 0,
'last_failure' => 0,
];
}
public function recordFailure(string $service): void
{
$circuit = $this->circuits[$service] ?? [
'status' => 'closed',
'failures' => 0,
'last_failure' => 0,
];
$circuit['failures']++;
$circuit['last_failure'] = time();
if ($circuit['failures'] >= $this->failureThreshold) {
$circuit['status'] = 'open';
}
$this->circuits[$service] = $circuit;
}
}
class RetryWithCircuitBreaker
{
private RetryManager $retryManager;
private CircuitBreaker $circuitBreaker;
public function handleWithCircuitBreaker(
AMQPMessage $message,
string $service,
callable $handler
): bool {
if ($this->circuitBreaker->isOpen($service)) {
throw new ServiceUnavailableException(
"Circuit breaker is open for service: {$service}"
);
}
try {
$result = $handler($message);
$this->circuitBreaker->recordSuccess($service);
return $result;
} catch (\Exception $e) {
$this->circuitBreaker->recordFailure($service);
throw $e;
}
}
}批量重试
php
<?php
namespace App\Messaging\Retry;
class BatchRetryHandler
{
private RetryManager $retryManager;
public function processBatch(array $messages, callable $handler): array
{
$results = [
'success' => [],
'failed' => [],
'retry' => [],
];
foreach ($messages as $message) {
try {
$handler($message);
$results['success'][] = $message;
} catch (\Throwable $e) {
if ($this->retryManager->shouldRetry($e, 0)) {
$results['retry'][] = [
'message' => $message,
'error' => $e,
];
} else {
$results['failed'][] = [
'message' => $message,
'error' => $e,
];
}
}
}
return $results;
}
public function retryBatch(array $retryItems, callable $handler): array
{
$results = [
'success' => [],
'failed' => [],
];
foreach ($retryItems as $item) {
try {
$handler($item['message']);
$results['success'][] = $item['message'];
} catch (\Throwable $e) {
$results['failed'][] = [
'message' => $item['message'],
'error' => $e,
];
}
}
return $results;
}
}实际应用场景
场景一:支付处理重试
php
<?php
class PaymentConsumer extends RetryableConsumer
{
protected string $queueName = 'order.payment.process';
protected function handle(AMQPMessage $message): bool
{
$data = json_decode($message->body, true);
$payment = $this->paymentService->process([
'order_id' => $data['order_id'],
'amount' => $data['amount'],
'method' => $data['payment_method'],
]);
if ($payment->status === 'success') {
$this->publishPaymentSuccess($data['order_id'], $payment);
return true;
}
if ($payment->status === 'pending') {
throw new TimeoutException('Payment is still pending');
}
if ($payment->errorCode === 'INSUFFICIENT_FUNDS') {
throw new NonRetryableException('Insufficient funds');
}
throw new PaymentException($payment->errorMessage);
}
}场景二:API 调用重试
php
<?php
class ApiRetryClient
{
private RetryManager $retryManager;
private int $maxRetries = 3;
public function call(string $url, array $params): array
{
$attempt = 0;
$lastError = null;
while ($attempt < $this->maxRetries) {
try {
$response = $this->doCall($url, $params);
if ($response['status'] === 200) {
return $response['data'];
}
if ($response['status'] === 429) {
throw new RateLimitException('Rate limited');
}
if ($response['status'] >= 500) {
throw new ServerErrorException('Server error');
}
throw new NonRetryableException("API error: {$response['status']}");
} catch (RateLimitException $e) {
$lastError = $e;
$delay = $this->getRetryAfter($response);
usleep($delay * 1000);
} catch (ServerErrorException $e) {
$lastError = $e;
$delay = $this->calculateDelay($attempt);
usleep($delay * 1000);
} catch (NonRetryableException $e) {
throw $e;
}
$attempt++;
}
throw $lastError;
}
private function calculateDelay(int $attempt): int
{
return 1000 * pow(2, $attempt);
}
private function getRetryAfter(array $response): int
{
return $response['headers']['retry-after'] ?? 1000;
}
private function doCall(string $url, array $params): array
{
// HTTP 调用实现
return [];
}
}最佳实践建议清单
重试策略设计
- [ ] 设置合理的最大重试次数
- [ ] 使用指数退避算法
- [ ] 添加随机抖动避免惊群
- [ ] 设置最大延迟上限
- [ ] 区分可重试和不可重试错误
重试实现
- [ ] 使用延迟队列实现异步重试
- [ ] 记录重试历史和上下文
- [ ] 实现熔断器保护
- [ ] 配置死信队列兜底
- [ ] 监控重试指标
错误分类
- [ ] 定义明确的异常类型
- [ ] 标记不可重试异常
- [ ] 处理业务校验错误
- [ ] 处理超时和限流错误
生产环境注意事项
重试配置
- 根据业务特点调整重试次数
- 设置合理的延迟范围
- 监控重试成功率
资源保护
- 避免重试风暴
- 使用熔断器保护下游
- 限制并发重试数量
监控告警
- 监控重试次数分布
- 监控重试成功率
- 配置异常告警
死信处理
- 定期检查死信队列
- 建立人工处理流程
- 分析失败原因
