Skip to content

错误处理模式

概述

错误处理是 RabbitMQ 系统稳定性的关键保障。良好的错误处理机制能够确保消息不丢失、异常可追踪、系统可恢复。本文档介绍 RabbitMQ 错误处理的最佳实践。

核心错误类型

1. 生产者错误

错误类型原因处理策略
连接失败网络问题、服务不可用重连机制
通道关闭资源限制、协议错误重建通道
发布超时确认等待超时重试或备份
NACK服务器拒绝记录并重试

2. 消费者错误

错误类型原因处理策略
业务异常数据校验失败、业务逻辑错误记录并ACK或转入死信
处理超时处理时间过长延长超时或优化逻辑
依赖服务不可用下游服务故障重试或降级
资源不足内存、连接池耗尽限流或扩容

3. 基础设施错误

错误类型原因处理策略
队列满消息积压扩容或清理
磁盘告警磁盘空间不足清理或扩容
内存告警内存使用过高优化或扩容
网络分区集群脑裂自动恢复或人工介入

错误处理架构

┌─────────────────────────────────────────────────────────────┐
│                        消息处理流程                          │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────┐    ┌─────────┐    ┌─────────┐    ┌─────────┐  │
│  │ Producer │───▶│ Exchange│───▶│  Queue  │───▶│Consumer │  │
│  └────┬────┘    └─────────┘    └────┬────┘    └────┬────┘  │
│       │                              │              │       │
│       │                              │              │       │
│       ▼                              ▼              ▼       │
│  ┌─────────┐                   ┌─────────┐   ┌─────────┐   │
│  │ 发布确认 │                   │ 死信队列 │   │ 错误处理 │   │
│  │ 重试机制 │                   │ 消息TTL  │   │ 重试机制 │   │
│  │ 备份队列 │                   │ 溢出处理 │   │ 降级处理 │   │
│  └─────────┘                   └─────────┘   └─────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

PHP 代码示例

正确做法:完善的错误处理

php
<?php

namespace App\Messaging\Error;

use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use Psr\Log\LoggerInterface;

class ErrorHandler
{
    private LoggerInterface $logger;
    private array $config;
    
    public function __construct(LoggerInterface $logger, array $config = [])
    {
        $this->logger = $logger;
        $this->config = array_merge([
            'max_retries' => 3,
            'retry_delay' => 1000,
            'dead_letter_exchange' => 'dlx',
            'error_queue_prefix' => 'error',
        ], $config);
    }
    
    public function handleConsumerError(
        AMQPMessage $message,
        \Throwable $error
    ): void {
        $retryCount = $this->getRetryCount($message);
        
        $this->logError($message, $error, $retryCount);
        
        if ($this->shouldRetry($error, $retryCount)) {
            $this->retryMessage($message, $retryCount);
        } elseif ($this->shouldDiscard($error)) {
            $this->discardMessage($message);
        } else {
            $this->sendToDeadLetterQueue($message, $error);
        }
    }
    
    private function getRetryCount(AMQPMessage $message): int
    {
        $headers = $message->get('application_headers');
        if ($headers) {
            $data = $headers->getNativeData();
            return $data['x-retry-count'] ?? 0;
        }
        return 0;
    }
    
    private function shouldRetry(\Throwable $error, int $retryCount): bool
    {
        if ($retryCount >= $this->config['max_retries']) {
            return false;
        }
        
        if ($error instanceof NonRetryableException) {
            return false;
        }
        
        return true;
    }
    
    private function shouldDiscard(\Throwable $error): bool
    {
        return $error instanceof DiscardableException;
    }
    
    private function retryMessage(AMQPMessage $message, int $currentRetryCount): void
    {
        $delay = $this->calculateDelay($currentRetryCount);
        
        $this->publishWithDelay($message, $delay, $currentRetryCount + 1);
        
        $message->ack();
    }
    
    private function calculateDelay(int $retryCount): int
    {
        return $this->config['retry_delay'] * pow(2, $retryCount);
    }
    
    private function publishWithDelay(
        AMQPMessage $message,
        int $delayMs,
        int $newRetryCount
    ): void {
        $properties = $message->get_properties();
        
        $headers = [];
        if (isset($properties['application_headers'])) {
            $headers = $properties['application_headers']->getNativeData();
        }
        $headers['x-retry-count'] = $newRetryCount;
        $headers['x-first-failure-time'] = $headers['x-first-failure-time'] ?? time();
        $headers['x-last-failure-time'] = time();
        
        $properties['application_headers'] = new \PhpAmqpLib\Wire\AMQPTable($headers);
        $properties['expiration'] = (string) $delayMs;
        
        $newMessage = new AMQPMessage($message->body, $properties);
        
        $this->channel->basic_publish(
            $newMessage,
            $this->config['delay_exchange'] ?? 'delay',
            $message->getRoutingKey()
        );
    }
    
    private function sendToDeadLetterQueue(AMQPMessage $message, \Throwable $error): void
    {
        $errorRecord = [
            'original_message' => $message->body,
            'error' => [
                'class' => get_class($error),
                'message' => $error->getMessage(),
                'code' => $error->getCode(),
                'file' => $error->getFile(),
                'line' => $error->getLine(),
                'trace' => $error->getTraceAsString(),
            ],
            'metadata' => [
                'queue' => $message->getConsumerTag(),
                'routing_key' => $message->getRoutingKey(),
                'exchange' => $message->getExchange(),
                'message_id' => $message->get('message_id'),
                'timestamp' => time(),
            ],
        ];
        
        $errorMessage = new AMQPMessage(
            json_encode($errorRecord, JSON_UNESCAPED_UNICODE),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'message_id' => $this->generateErrorId(),
                'timestamp' => time(),
            ]
        );
        
        $this->channel->basic_publish(
            $errorMessage,
            '',
            $this->config['error_queue_prefix'] . '.' . $message->getRoutingKey()
        );
        
        $message->ack();
    }
    
    private function discardMessage(AMQPMessage $message): void
    {
        $this->logger->warning('Discarding message', [
            'message_id' => $message->get('message_id'),
            'body' => $message->body,
        ]);
        
        $message->ack();
    }
    
    private function logError(
        AMQPMessage $message,
        \Throwable $error,
        int $retryCount
    ): void {
        $this->logger->error('Message processing failed', [
            'message_id' => $message->get('message_id'),
            'error_class' => get_class($error),
            'error_message' => $error->getMessage(),
            'retry_count' => $retryCount,
            'body' => $message->body,
        ]);
    }
    
    private function generateErrorId(): string
    {
        return 'err_' . bin2hex(random_bytes(16));
    }
}

生产者错误处理

php
<?php

namespace App\Messaging\Producer;

use PhpAmqpLib\Message\AMQPMessage;
use Psr\Log\LoggerInterface;

class ReliableProducer
{
    private $connection;
    private $channel;
    private $logger;
    private $pendingConfirms = [];
    private $maxRetries = 3;
    private $connectionTimeout = 5.0;
    
    public function __construct($connection, LoggerInterface $logger)
    {
        $this->connection = $connection;
        $this->logger = $logger;
        $this->setupChannel();
    }
    
    private function setupChannel(): void
    {
        $this->channel = $this->connection->channel();
        $this->channel->confirm_select();
        
        $this->channel->set_ack_handler(function ($deliveryTag) {
            $this->handleAck($deliveryTag);
        });
        
        $this->channel->set_nack_handler(function ($deliveryTag, $multiple) {
            $this->handleNack($deliveryTag, $multiple);
        });
        
        $this->channel->set_return_listener(function (
            $replyCode,
            $replyText,
            $exchange,
            $routingKey,
            AMQPMessage $message
        ) {
            $this->handleReturn($replyCode, $replyText, $exchange, $routingKey, $message);
        });
    }
    
    public function publish(
        string $exchange,
        string $routingKey,
        AMQPMessage $message,
        int $retryCount = 0
    ): bool {
        try {
            $this->ensureConnection();
            
            $this->channel->basic_publish(
                $message,
                $exchange,
                $routingKey,
                true,
                false
            );
            
            $deliveryTag = $this->channel->get_delivery_tag();
            $this->pendingConfirms[$deliveryTag] = [
                'message' => $message,
                'exchange' => $exchange,
                'routing_key' => $routingKey,
                'retry_count' => $retryCount,
                'timestamp' => microtime(true),
            ];
            
            return $this->waitForConfirm($deliveryTag);
            
        } catch (\Exception $e) {
            return $this->handlePublishError(
                $e,
                $exchange,
                $routingKey,
                $message,
                $retryCount
            );
        }
    }
    
    private function ensureConnection(): void
    {
        if (!$this->connection->isConnected()) {
            $this->reconnect();
        }
        
        if (!$this->channel || !$this->channel->is_open()) {
            $this->setupChannel();
        }
    }
    
    private function reconnect(): void
    {
        $this->logger->info('Attempting to reconnect to RabbitMQ');
        
        try {
            $this->connection->reconnect();
            $this->logger->info('Successfully reconnected to RabbitMQ');
        } catch (\Exception $e) {
            $this->logger->error('Failed to reconnect to RabbitMQ', [
                'error' => $e->getMessage(),
            ]);
            throw $e;
        }
    }
    
    private function waitForConfirm(string $deliveryTag): bool
    {
        try {
            $this->channel->wait_for_pending_acks($this->connectionTimeout);
            return true;
        } catch (\Exception $e) {
            $this->logger->error('Timeout waiting for confirm', [
                'delivery_tag' => $deliveryTag,
                'error' => $e->getMessage(),
            ]);
            return false;
        }
    }
    
    private function handlePublishError(
        \Exception $e,
        string $exchange,
        string $routingKey,
        AMQPMessage $message,
        int $retryCount
    ): bool {
        $this->logger->error('Publish error', [
            'exchange' => $exchange,
            'routing_key' => $routingKey,
            'error' => $e->getMessage(),
            'retry_count' => $retryCount,
        ]);
        
        if ($retryCount < $this->maxRetries && $this->isRetryable($e)) {
            usleep(100000 * ($retryCount + 1));
            
            try {
                $this->reconnect();
            } catch (\Exception $reconnectError) {
                $this->sendToBackupQueue($message, $exchange, $routingKey);
                return false;
            }
            
            return $this->publish($exchange, $routingKey, $message, $retryCount + 1);
        }
        
        $this->sendToBackupQueue($message, $exchange, $routingKey);
        return false;
    }
    
    private function isRetryable(\Exception $e): bool
    {
        return $e instanceof \PhpAmqpLib\Exception\AMQPIOException
            || $e instanceof \PhpAmqpLib\Exception\AMQPRuntimeException
            || $e instanceof \PhpAmqpLib\Exception\AMQPConnectionClosedException;
    }
    
    private function handleAck(string $deliveryTag): void
    {
        unset($this->pendingConfirms[$deliveryTag]);
        $this->logger->debug('Message acknowledged', ['delivery_tag' => $deliveryTag]);
    }
    
    private function handleNack(string $deliveryTag, bool $multiple): void
    {
        $confirm = $this->pendingConfirms[$deliveryTag] ?? null;
        
        if ($confirm) {
            $this->logger->error('Message nacked by broker', [
                'delivery_tag' => $deliveryTag,
                'exchange' => $confirm['exchange'],
                'routing_key' => $confirm['routing_key'],
            ]);
            
            $this->sendToBackupQueue(
                $confirm['message'],
                $confirm['exchange'],
                $confirm['routing_key']
            );
        }
        
        unset($this->pendingConfirms[$deliveryTag]);
    }
    
    private function handleReturn(
        int $replyCode,
        string $replyText,
        string $exchange,
        string $routingKey,
        AMQPMessage $message
    ): void {
        $this->logger->warning('Message returned by broker', [
            'reply_code' => $replyCode,
            'reply_text' => $replyText,
            'exchange' => $exchange,
            'routing_key' => $routingKey,
            'message_id' => $message->get('message_id'),
        ]);
        
        $this->sendToBackupQueue($message, $exchange, $routingKey);
    }
    
    private function sendToBackupQueue(
        AMQPMessage $message,
        string $exchange,
        string $routingKey
    ): void {
        $backupMessage = new AMQPMessage(
            json_encode([
                'original_message' => $message->body,
                'original_exchange' => $exchange,
                'original_routing_key' => $routingKey,
                'original_message_id' => $message->get('message_id'),
                'backup_timestamp' => time(),
            ]),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
            ]
        );
        
        try {
            $this->channel->basic_publish($backupMessage, '', 'backup.queue');
        } catch (\Exception $e) {
            $this->logger->critical('Failed to send to backup queue', [
                'error' => $e->getMessage(),
                'original_message_id' => $message->get('message_id'),
            ]);
        }
    }
}

消费者错误处理

php
<?php

namespace App\Messaging\Consumer;

use App\Messaging\Error\ErrorHandler;
use PhpAmqpLib\Message\AMQPMessage;

abstract class BaseConsumer
{
    protected ErrorHandler $errorHandler;
    protected $channel;
    protected int $prefetchCount = 10;
    protected int $consumerTimeout = 30;
    
    public function consume(string $queue): void
    {
        $this->setupConsumer($queue);
        
        while ($this->channel->is_consuming()) {
            try {
                $this->channel->wait(null, false, $this->consumerTimeout);
            } catch (\Exception $e) {
                $this->handleWaitError($e);
            }
        }
    }
    
    private function setupConsumer(string $queue): void
    {
        $this->channel->basic_qos(null, $this->prefetchCount, null);
        
        $this->channel->basic_consume(
            $queue,
            $this->getConsumerTag(),
            false,
            false,
            false,
            false,
            [$this, 'processMessage']
        );
    }
    
    public function processMessage(AMQPMessage $message): void
    {
        $startTime = microtime(true);
        
        try {
            $this->validateMessage($message);
            
            $result = $this->handle($message);
            
            if ($result === true) {
                $message->ack();
            } elseif ($result === false) {
                $this->errorHandler->handleConsumerError(
                    $message,
                    new ProcessingException('Handler returned false')
                );
            }
            
            $this->logProcessing($message, $startTime);
            
        } catch (\Throwable $e) {
            $this->errorHandler->handleConsumerError($message, $e);
        }
    }
    
    abstract protected function handle(AMQPMessage $message): bool;
    
    protected function validateMessage(AMQPMessage $message): void
    {
        $body = json_decode($message->body, true);
        
        if (json_last_error() !== JSON_ERROR_NONE) {
            throw new InvalidMessageException('Invalid JSON: ' . json_last_error_msg());
        }
        
        if (empty($body['message_id'])) {
            throw new InvalidMessageException('Missing message_id');
        }
        
        if (empty($body['type'])) {
            throw new InvalidMessageException('Missing message type');
        }
    }
    
    protected function getConsumerTag(): string
    {
        return gethostname() . '-' . getmypid();
    }
    
    protected function handleWaitError(\Exception $e): void
    {
        if ($e instanceof \PhpAmqpLib\Exception\AMQPTimeoutException) {
            return;
        }
        
        $this->logger->error('Consumer wait error', [
            'error' => $e->getMessage(),
        ]);
        
        if ($this->shouldReconnect($e)) {
            $this->reconnect();
        }
    }
    
    protected function shouldReconnect(\Exception $e): bool
    {
        return $e instanceof \PhpAmqpLib\Exception\AMQPConnectionClosedException
            || $e instanceof \PhpAmqpLib\Exception\AMQPChannelClosedException;
    }
    
    protected function reconnect(): void
    {
        // 重连逻辑
    }
    
    protected function logProcessing(AMQPMessage $message, float $startTime): void
    {
        $duration = (microtime(true) - $startTime) * 1000;
        
        $this->logger->info('Message processed', [
            'message_id' => $message->get('message_id'),
            'duration_ms' => round($duration, 2),
        ]);
    }
}

错误做法:不完善的错误处理

php
<?php

class BadConsumer
{
    public function consume(string $queue): void
    {
        $callback = function ($message) {
            // 错误1:无异常处理
            $data = json_decode($message->body, true);
            
            // 错误2:直接处理,无验证
            $this->processOrder($data);
            
            // 错误3:总是ACK,即使处理失败
            $message->ack();
        };
        
        $this->channel->basic_consume($queue, '', false, true, false, false, $callback);
        
        // 错误4:无超时处理
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
    
    public function processOrder(array $data): void
    {
        // 错误5:无错误日志
        // 错误6:无重试机制
        // 错误7:无死信队列
    }
}

class BadProducer
{
    public function publish(array $data): void
    {
        $message = new AMQPMessage(json_encode($data));
        
        // 错误8:无确认机制
        // 错误9:无异常处理
        // 错误10:无重试机制
        $this->channel->basic_publish($message, 'exchange', 'routing.key');
    }
}

自定义异常类

php
<?php

namespace App\Messaging\Exception;

class MessageException extends \RuntimeException
{
    protected array $context = [];
    
    public function __construct(string $message, array $context = [], int $code = 0, ?\Throwable $previous = null)
    {
        parent::__construct($message, $code, $previous);
        $this->context = $context;
    }
    
    public function getContext(): array
    {
        return $this->context;
    }
}

class InvalidMessageException extends MessageException
{
}

class ProcessingException extends MessageException
{
}

class NonRetryableException extends MessageException
{
}

class DiscardableException extends MessageException
{
}

class DependencyException extends MessageException
{
}

class TimeoutException extends MessageException
{
}

实际应用场景

场景一:订单处理错误处理

php
<?php

class OrderConsumer extends BaseConsumer
{
    protected function handle(AMQPMessage $message): bool
    {
        $data = json_decode($message->body, true);
        $orderData = $data['data'];
        
        try {
            $order = $this->orderService->find($orderData['order_id']);
            
            if (!$order) {
                throw new NonRetryableException(
                    "Order not found: {$orderData['order_id']}"
                );
            }
            
            if ($order->status === 'cancelled') {
                throw new DiscardableException(
                    "Order already cancelled: {$orderData['order_id']}"
                );
            }
            
            $this->orderService->process($order);
            
            return true;
            
        } catch (InventoryServiceException $e) {
            throw new ProcessingException(
                'Inventory service unavailable',
                ['order_id' => $orderData['order_id']],
                0,
                $e
            );
        } catch (PaymentServiceException $e) {
            throw new ProcessingException(
                'Payment service unavailable',
                ['order_id' => $orderData['order_id']],
                0,
                $e
            );
        }
    }
}

场景二:降级处理

php
<?php

class ResilientConsumer extends BaseConsumer
{
    private CircuitBreaker $circuitBreaker;
    private FallbackHandler $fallbackHandler;
    
    protected function handle(AMQPMessage $message): bool
    {
        $data = json_decode($message->body, true);
        
        if ($this->circuitBreaker->isOpen('inventory-service')) {
            return $this->fallbackHandler->handle($data);
        }
        
        try {
            $result = $this->inventoryService->deduct($data);
            $this->circuitBreaker->recordSuccess('inventory-service');
            return true;
        } catch (\Exception $e) {
            $this->circuitBreaker->recordFailure('inventory-service');
            
            if ($this->circuitBreaker->isOpen('inventory-service')) {
                return $this->fallbackHandler->handle($data);
            }
            
            throw $e;
        }
    }
}

最佳实践建议清单

生产者错误处理

  • [ ] 实现发布确认机制
  • [ ] 处理 NACK 情况
  • [ ] 处理消息返回
  • [ ] 实现重连机制
  • [ ] 设置备份队列
  • [ ] 记录详细日志

消费者错误处理

  • [ ] 捕获所有异常
  • [ ] 区分可重试/不可重试错误
  • [ ] 实现重试机制
  • [ ] 配置死信队列
  • [ ] 实现降级处理
  • [ ] 记录错误上下文

基础设施错误处理

  • [ ] 监控连接状态
  • [ ] 实现自动重连
  • [ ] 配置资源告警
  • [ ] 准备应急预案

生产环境注意事项

  1. 错误日志

    • 记录完整的错误堆栈
    • 包含消息上下文
    • 支持日志聚合分析
  2. 告警配置

    • 配置错误率告警
    • 配置死信队列告警
    • 配置重试次数告警
  3. 监控指标

    • 消息处理成功率
    • 平均重试次数
    • 死信队列深度
  4. 应急处理

    • 准备消息重放脚本
    • 准备死信队列处理流程
    • 准备降级开关

相关链接