Appearance
错误处理模式
概述
错误处理是 RabbitMQ 系统稳定性的关键保障。良好的错误处理机制能够确保消息不丢失、异常可追踪、系统可恢复。本文档介绍 RabbitMQ 错误处理的最佳实践。
核心错误类型
1. 生产者错误
| 错误类型 | 原因 | 处理策略 |
|---|---|---|
| 连接失败 | 网络问题、服务不可用 | 重连机制 |
| 通道关闭 | 资源限制、协议错误 | 重建通道 |
| 发布超时 | 确认等待超时 | 重试或备份 |
| NACK | 服务器拒绝 | 记录并重试 |
2. 消费者错误
| 错误类型 | 原因 | 处理策略 |
|---|---|---|
| 业务异常 | 数据校验失败、业务逻辑错误 | 记录并ACK或转入死信 |
| 处理超时 | 处理时间过长 | 延长超时或优化逻辑 |
| 依赖服务不可用 | 下游服务故障 | 重试或降级 |
| 资源不足 | 内存、连接池耗尽 | 限流或扩容 |
3. 基础设施错误
| 错误类型 | 原因 | 处理策略 |
|---|---|---|
| 队列满 | 消息积压 | 扩容或清理 |
| 磁盘告警 | 磁盘空间不足 | 清理或扩容 |
| 内存告警 | 内存使用过高 | 优化或扩容 |
| 网络分区 | 集群脑裂 | 自动恢复或人工介入 |
错误处理架构
┌─────────────────────────────────────────────────────────────┐
│ 消息处理流程 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Producer │───▶│ Exchange│───▶│ Queue │───▶│Consumer │ │
│ └────┬────┘ └─────────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 发布确认 │ │ 死信队列 │ │ 错误处理 │ │
│ │ 重试机制 │ │ 消息TTL │ │ 重试机制 │ │
│ │ 备份队列 │ │ 溢出处理 │ │ 降级处理 │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘PHP 代码示例
正确做法:完善的错误处理
php
<?php
namespace App\Messaging\Error;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use Psr\Log\LoggerInterface;
class ErrorHandler
{
private LoggerInterface $logger;
private array $config;
public function __construct(LoggerInterface $logger, array $config = [])
{
$this->logger = $logger;
$this->config = array_merge([
'max_retries' => 3,
'retry_delay' => 1000,
'dead_letter_exchange' => 'dlx',
'error_queue_prefix' => 'error',
], $config);
}
public function handleConsumerError(
AMQPMessage $message,
\Throwable $error
): void {
$retryCount = $this->getRetryCount($message);
$this->logError($message, $error, $retryCount);
if ($this->shouldRetry($error, $retryCount)) {
$this->retryMessage($message, $retryCount);
} elseif ($this->shouldDiscard($error)) {
$this->discardMessage($message);
} else {
$this->sendToDeadLetterQueue($message, $error);
}
}
private function getRetryCount(AMQPMessage $message): int
{
$headers = $message->get('application_headers');
if ($headers) {
$data = $headers->getNativeData();
return $data['x-retry-count'] ?? 0;
}
return 0;
}
private function shouldRetry(\Throwable $error, int $retryCount): bool
{
if ($retryCount >= $this->config['max_retries']) {
return false;
}
if ($error instanceof NonRetryableException) {
return false;
}
return true;
}
private function shouldDiscard(\Throwable $error): bool
{
return $error instanceof DiscardableException;
}
private function retryMessage(AMQPMessage $message, int $currentRetryCount): void
{
$delay = $this->calculateDelay($currentRetryCount);
$this->publishWithDelay($message, $delay, $currentRetryCount + 1);
$message->ack();
}
private function calculateDelay(int $retryCount): int
{
return $this->config['retry_delay'] * pow(2, $retryCount);
}
private function publishWithDelay(
AMQPMessage $message,
int $delayMs,
int $newRetryCount
): void {
$properties = $message->get_properties();
$headers = [];
if (isset($properties['application_headers'])) {
$headers = $properties['application_headers']->getNativeData();
}
$headers['x-retry-count'] = $newRetryCount;
$headers['x-first-failure-time'] = $headers['x-first-failure-time'] ?? time();
$headers['x-last-failure-time'] = time();
$properties['application_headers'] = new \PhpAmqpLib\Wire\AMQPTable($headers);
$properties['expiration'] = (string) $delayMs;
$newMessage = new AMQPMessage($message->body, $properties);
$this->channel->basic_publish(
$newMessage,
$this->config['delay_exchange'] ?? 'delay',
$message->getRoutingKey()
);
}
private function sendToDeadLetterQueue(AMQPMessage $message, \Throwable $error): void
{
$errorRecord = [
'original_message' => $message->body,
'error' => [
'class' => get_class($error),
'message' => $error->getMessage(),
'code' => $error->getCode(),
'file' => $error->getFile(),
'line' => $error->getLine(),
'trace' => $error->getTraceAsString(),
],
'metadata' => [
'queue' => $message->getConsumerTag(),
'routing_key' => $message->getRoutingKey(),
'exchange' => $message->getExchange(),
'message_id' => $message->get('message_id'),
'timestamp' => time(),
],
];
$errorMessage = new AMQPMessage(
json_encode($errorRecord, JSON_UNESCAPED_UNICODE),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'message_id' => $this->generateErrorId(),
'timestamp' => time(),
]
);
$this->channel->basic_publish(
$errorMessage,
'',
$this->config['error_queue_prefix'] . '.' . $message->getRoutingKey()
);
$message->ack();
}
private function discardMessage(AMQPMessage $message): void
{
$this->logger->warning('Discarding message', [
'message_id' => $message->get('message_id'),
'body' => $message->body,
]);
$message->ack();
}
private function logError(
AMQPMessage $message,
\Throwable $error,
int $retryCount
): void {
$this->logger->error('Message processing failed', [
'message_id' => $message->get('message_id'),
'error_class' => get_class($error),
'error_message' => $error->getMessage(),
'retry_count' => $retryCount,
'body' => $message->body,
]);
}
private function generateErrorId(): string
{
return 'err_' . bin2hex(random_bytes(16));
}
}生产者错误处理
php
<?php
namespace App\Messaging\Producer;
use PhpAmqpLib\Message\AMQPMessage;
use Psr\Log\LoggerInterface;
class ReliableProducer
{
private $connection;
private $channel;
private $logger;
private $pendingConfirms = [];
private $maxRetries = 3;
private $connectionTimeout = 5.0;
public function __construct($connection, LoggerInterface $logger)
{
$this->connection = $connection;
$this->logger = $logger;
$this->setupChannel();
}
private function setupChannel(): void
{
$this->channel = $this->connection->channel();
$this->channel->confirm_select();
$this->channel->set_ack_handler(function ($deliveryTag) {
$this->handleAck($deliveryTag);
});
$this->channel->set_nack_handler(function ($deliveryTag, $multiple) {
$this->handleNack($deliveryTag, $multiple);
});
$this->channel->set_return_listener(function (
$replyCode,
$replyText,
$exchange,
$routingKey,
AMQPMessage $message
) {
$this->handleReturn($replyCode, $replyText, $exchange, $routingKey, $message);
});
}
public function publish(
string $exchange,
string $routingKey,
AMQPMessage $message,
int $retryCount = 0
): bool {
try {
$this->ensureConnection();
$this->channel->basic_publish(
$message,
$exchange,
$routingKey,
true,
false
);
$deliveryTag = $this->channel->get_delivery_tag();
$this->pendingConfirms[$deliveryTag] = [
'message' => $message,
'exchange' => $exchange,
'routing_key' => $routingKey,
'retry_count' => $retryCount,
'timestamp' => microtime(true),
];
return $this->waitForConfirm($deliveryTag);
} catch (\Exception $e) {
return $this->handlePublishError(
$e,
$exchange,
$routingKey,
$message,
$retryCount
);
}
}
private function ensureConnection(): void
{
if (!$this->connection->isConnected()) {
$this->reconnect();
}
if (!$this->channel || !$this->channel->is_open()) {
$this->setupChannel();
}
}
private function reconnect(): void
{
$this->logger->info('Attempting to reconnect to RabbitMQ');
try {
$this->connection->reconnect();
$this->logger->info('Successfully reconnected to RabbitMQ');
} catch (\Exception $e) {
$this->logger->error('Failed to reconnect to RabbitMQ', [
'error' => $e->getMessage(),
]);
throw $e;
}
}
private function waitForConfirm(string $deliveryTag): bool
{
try {
$this->channel->wait_for_pending_acks($this->connectionTimeout);
return true;
} catch (\Exception $e) {
$this->logger->error('Timeout waiting for confirm', [
'delivery_tag' => $deliveryTag,
'error' => $e->getMessage(),
]);
return false;
}
}
private function handlePublishError(
\Exception $e,
string $exchange,
string $routingKey,
AMQPMessage $message,
int $retryCount
): bool {
$this->logger->error('Publish error', [
'exchange' => $exchange,
'routing_key' => $routingKey,
'error' => $e->getMessage(),
'retry_count' => $retryCount,
]);
if ($retryCount < $this->maxRetries && $this->isRetryable($e)) {
usleep(100000 * ($retryCount + 1));
try {
$this->reconnect();
} catch (\Exception $reconnectError) {
$this->sendToBackupQueue($message, $exchange, $routingKey);
return false;
}
return $this->publish($exchange, $routingKey, $message, $retryCount + 1);
}
$this->sendToBackupQueue($message, $exchange, $routingKey);
return false;
}
private function isRetryable(\Exception $e): bool
{
return $e instanceof \PhpAmqpLib\Exception\AMQPIOException
|| $e instanceof \PhpAmqpLib\Exception\AMQPRuntimeException
|| $e instanceof \PhpAmqpLib\Exception\AMQPConnectionClosedException;
}
private function handleAck(string $deliveryTag): void
{
unset($this->pendingConfirms[$deliveryTag]);
$this->logger->debug('Message acknowledged', ['delivery_tag' => $deliveryTag]);
}
private function handleNack(string $deliveryTag, bool $multiple): void
{
$confirm = $this->pendingConfirms[$deliveryTag] ?? null;
if ($confirm) {
$this->logger->error('Message nacked by broker', [
'delivery_tag' => $deliveryTag,
'exchange' => $confirm['exchange'],
'routing_key' => $confirm['routing_key'],
]);
$this->sendToBackupQueue(
$confirm['message'],
$confirm['exchange'],
$confirm['routing_key']
);
}
unset($this->pendingConfirms[$deliveryTag]);
}
private function handleReturn(
int $replyCode,
string $replyText,
string $exchange,
string $routingKey,
AMQPMessage $message
): void {
$this->logger->warning('Message returned by broker', [
'reply_code' => $replyCode,
'reply_text' => $replyText,
'exchange' => $exchange,
'routing_key' => $routingKey,
'message_id' => $message->get('message_id'),
]);
$this->sendToBackupQueue($message, $exchange, $routingKey);
}
private function sendToBackupQueue(
AMQPMessage $message,
string $exchange,
string $routingKey
): void {
$backupMessage = new AMQPMessage(
json_encode([
'original_message' => $message->body,
'original_exchange' => $exchange,
'original_routing_key' => $routingKey,
'original_message_id' => $message->get('message_id'),
'backup_timestamp' => time(),
]),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]
);
try {
$this->channel->basic_publish($backupMessage, '', 'backup.queue');
} catch (\Exception $e) {
$this->logger->critical('Failed to send to backup queue', [
'error' => $e->getMessage(),
'original_message_id' => $message->get('message_id'),
]);
}
}
}消费者错误处理
php
<?php
namespace App\Messaging\Consumer;
use App\Messaging\Error\ErrorHandler;
use PhpAmqpLib\Message\AMQPMessage;
abstract class BaseConsumer
{
protected ErrorHandler $errorHandler;
protected $channel;
protected int $prefetchCount = 10;
protected int $consumerTimeout = 30;
public function consume(string $queue): void
{
$this->setupConsumer($queue);
while ($this->channel->is_consuming()) {
try {
$this->channel->wait(null, false, $this->consumerTimeout);
} catch (\Exception $e) {
$this->handleWaitError($e);
}
}
}
private function setupConsumer(string $queue): void
{
$this->channel->basic_qos(null, $this->prefetchCount, null);
$this->channel->basic_consume(
$queue,
$this->getConsumerTag(),
false,
false,
false,
false,
[$this, 'processMessage']
);
}
public function processMessage(AMQPMessage $message): void
{
$startTime = microtime(true);
try {
$this->validateMessage($message);
$result = $this->handle($message);
if ($result === true) {
$message->ack();
} elseif ($result === false) {
$this->errorHandler->handleConsumerError(
$message,
new ProcessingException('Handler returned false')
);
}
$this->logProcessing($message, $startTime);
} catch (\Throwable $e) {
$this->errorHandler->handleConsumerError($message, $e);
}
}
abstract protected function handle(AMQPMessage $message): bool;
protected function validateMessage(AMQPMessage $message): void
{
$body = json_decode($message->body, true);
if (json_last_error() !== JSON_ERROR_NONE) {
throw new InvalidMessageException('Invalid JSON: ' . json_last_error_msg());
}
if (empty($body['message_id'])) {
throw new InvalidMessageException('Missing message_id');
}
if (empty($body['type'])) {
throw new InvalidMessageException('Missing message type');
}
}
protected function getConsumerTag(): string
{
return gethostname() . '-' . getmypid();
}
protected function handleWaitError(\Exception $e): void
{
if ($e instanceof \PhpAmqpLib\Exception\AMQPTimeoutException) {
return;
}
$this->logger->error('Consumer wait error', [
'error' => $e->getMessage(),
]);
if ($this->shouldReconnect($e)) {
$this->reconnect();
}
}
protected function shouldReconnect(\Exception $e): bool
{
return $e instanceof \PhpAmqpLib\Exception\AMQPConnectionClosedException
|| $e instanceof \PhpAmqpLib\Exception\AMQPChannelClosedException;
}
protected function reconnect(): void
{
// 重连逻辑
}
protected function logProcessing(AMQPMessage $message, float $startTime): void
{
$duration = (microtime(true) - $startTime) * 1000;
$this->logger->info('Message processed', [
'message_id' => $message->get('message_id'),
'duration_ms' => round($duration, 2),
]);
}
}错误做法:不完善的错误处理
php
<?php
class BadConsumer
{
public function consume(string $queue): void
{
$callback = function ($message) {
// 错误1:无异常处理
$data = json_decode($message->body, true);
// 错误2:直接处理,无验证
$this->processOrder($data);
// 错误3:总是ACK,即使处理失败
$message->ack();
};
$this->channel->basic_consume($queue, '', false, true, false, false, $callback);
// 错误4:无超时处理
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
public function processOrder(array $data): void
{
// 错误5:无错误日志
// 错误6:无重试机制
// 错误7:无死信队列
}
}
class BadProducer
{
public function publish(array $data): void
{
$message = new AMQPMessage(json_encode($data));
// 错误8:无确认机制
// 错误9:无异常处理
// 错误10:无重试机制
$this->channel->basic_publish($message, 'exchange', 'routing.key');
}
}自定义异常类
php
<?php
namespace App\Messaging\Exception;
class MessageException extends \RuntimeException
{
protected array $context = [];
public function __construct(string $message, array $context = [], int $code = 0, ?\Throwable $previous = null)
{
parent::__construct($message, $code, $previous);
$this->context = $context;
}
public function getContext(): array
{
return $this->context;
}
}
class InvalidMessageException extends MessageException
{
}
class ProcessingException extends MessageException
{
}
class NonRetryableException extends MessageException
{
}
class DiscardableException extends MessageException
{
}
class DependencyException extends MessageException
{
}
class TimeoutException extends MessageException
{
}实际应用场景
场景一:订单处理错误处理
php
<?php
class OrderConsumer extends BaseConsumer
{
protected function handle(AMQPMessage $message): bool
{
$data = json_decode($message->body, true);
$orderData = $data['data'];
try {
$order = $this->orderService->find($orderData['order_id']);
if (!$order) {
throw new NonRetryableException(
"Order not found: {$orderData['order_id']}"
);
}
if ($order->status === 'cancelled') {
throw new DiscardableException(
"Order already cancelled: {$orderData['order_id']}"
);
}
$this->orderService->process($order);
return true;
} catch (InventoryServiceException $e) {
throw new ProcessingException(
'Inventory service unavailable',
['order_id' => $orderData['order_id']],
0,
$e
);
} catch (PaymentServiceException $e) {
throw new ProcessingException(
'Payment service unavailable',
['order_id' => $orderData['order_id']],
0,
$e
);
}
}
}场景二:降级处理
php
<?php
class ResilientConsumer extends BaseConsumer
{
private CircuitBreaker $circuitBreaker;
private FallbackHandler $fallbackHandler;
protected function handle(AMQPMessage $message): bool
{
$data = json_decode($message->body, true);
if ($this->circuitBreaker->isOpen('inventory-service')) {
return $this->fallbackHandler->handle($data);
}
try {
$result = $this->inventoryService->deduct($data);
$this->circuitBreaker->recordSuccess('inventory-service');
return true;
} catch (\Exception $e) {
$this->circuitBreaker->recordFailure('inventory-service');
if ($this->circuitBreaker->isOpen('inventory-service')) {
return $this->fallbackHandler->handle($data);
}
throw $e;
}
}
}最佳实践建议清单
生产者错误处理
- [ ] 实现发布确认机制
- [ ] 处理 NACK 情况
- [ ] 处理消息返回
- [ ] 实现重连机制
- [ ] 设置备份队列
- [ ] 记录详细日志
消费者错误处理
- [ ] 捕获所有异常
- [ ] 区分可重试/不可重试错误
- [ ] 实现重试机制
- [ ] 配置死信队列
- [ ] 实现降级处理
- [ ] 记录错误上下文
基础设施错误处理
- [ ] 监控连接状态
- [ ] 实现自动重连
- [ ] 配置资源告警
- [ ] 准备应急预案
生产环境注意事项
错误日志
- 记录完整的错误堆栈
- 包含消息上下文
- 支持日志聚合分析
告警配置
- 配置错误率告警
- 配置死信队列告警
- 配置重试次数告警
监控指标
- 消息处理成功率
- 平均重试次数
- 死信队列深度
应急处理
- 准备消息重放脚本
- 准备死信队列处理流程
- 准备降级开关
