Skip to content

异步任务处理系统

概述

异步任务处理是 RabbitMQ 最经典的应用场景之一。通过消息队列将耗时操作从主业务流程中剥离,实现请求的快速响应和任务的异步执行,大幅提升系统的吞吐量和用户体验。

业务背景与需求

场景描述

某电商平台需要处理以下耗时操作:

  • 订单确认邮件发送:用户下单后发送确认邮件
  • 短信通知:订单状态变更时发送短信通知
  • 报表生成:定时生成销售报表、用户行为分析报表
  • 图片处理:用户上传图片后进行压缩、水印处理
  • 数据同步:将订单数据同步到数据仓库

需求分析

需求项描述
响应时间主业务接口响应时间 < 200ms
可靠性任务不丢失,支持重试
可扩展支持动态增加消费者
可监控任务执行状态可追踪
优先级支持任务优先级处理

架构设计

整体架构图

mermaid
graph TB
    subgraph "客户端"
        A[Web应用]
        B[API服务]
    end
    
    subgraph "消息生产者"
        C[任务分发器]
    end
    
    subgraph "RabbitMQ"
        D[任务交换机<br/>task.exchange]
        E[高优先级队列<br/>task.high]
        F[普通队列<br/>task.normal]
        G[低优先级队列<br/>task.low]
        H[死信队列<br/>task.dlx]
    end
    
    subgraph "消费者集群"
        I[邮件服务]
        J[短信服务]
        K[报表服务]
        L[图片处理服务]
    end
    
    subgraph "监控"
        M[任务状态追踪]
        N[告警系统]
    end
    
    A --> C
    B --> C
    C --> D
    D --> E
    D --> F
    D --> G
    E --> I
    E --> J
    F --> K
    G --> L
    E -.-> H
    F -.-> H
    G -.-> H
    I --> M
    J --> M
    K --> M
    L --> M
    M --> N

任务流转流程

mermaid
sequenceDiagram
    participant App as 应用服务
    participant Producer as 任务生产者
    participant MQ as RabbitMQ
    participant Consumer as 任务消费者
    participant DB as 数据库
    
    App->>Producer: 提交异步任务
    Producer->>Producer: 序列化任务数据
    Producer->>MQ: 发布消息(带优先级)
    MQ-->>Producer: 确认接收
    Producer-->>App: 返回任务ID
    
    App-->>App: 立即响应用户
    
    MQ->>Consumer: 投递消息
    Consumer->>Consumer: 解析任务数据
    Consumer->>DB: 执行任务逻辑
    DB-->>Consumer: 执行结果
    
    alt 执行成功
        Consumer->>MQ: 发送ACK确认
        Consumer->>DB: 更新任务状态
    else 执行失败
        Consumer->>MQ: 发送NACK拒绝
        MQ->>MQ: 消息重新入队或进入死信队列
    end

PHP 代码实现

任务消息结构定义

php
<?php

namespace App\Messaging\AsyncTask;

class TaskMessage
{
    public string $taskId;
    public string $taskType;
    public array $payload;
    public int $priority;
    public int $maxRetries;
    public int $currentRetry;
    public int $createdAt;
    public int $expireAt;
    public array $metadata;

    public function __construct(
        string $taskType,
        array $payload,
        int $priority = 5,
        int $maxRetries = 3,
        int $ttl = 86400
    ) {
        $this->taskId = $this->generateTaskId();
        $this->taskType = $taskType;
        $this->payload = $payload;
        $this->priority = min(10, max(1, $priority));
        $this->maxRetries = $maxRetries;
        $this->currentRetry = 0;
        $this->createdAt = time();
        $this->expireAt = time() + $ttl;
        $this->metadata = [];
    }

    private function generateTaskId(): string
    {
        return sprintf(
            'task_%s_%s',
            date('YmdHis'),
            bin2hex(random_bytes(8))
        );
    }

    public function incrementRetry(): self
    {
        $this->currentRetry++;
        return $this;
    }

    public function canRetry(): bool
    {
        return $this->currentRetry < $this->maxRetries;
    }

    public function isExpired(): bool
    {
        return time() > $this->expireAt;
    }

    public function toArray(): array
    {
        return [
            'task_id' => $this->taskId,
            'task_type' => $this->taskType,
            'payload' => $this->payload,
            'priority' => $this->priority,
            'max_retries' => $this->maxRetries,
            'current_retry' => $this->currentRetry,
            'created_at' => $this->createdAt,
            'expire_at' => $this->expireAt,
            'metadata' => $this->metadata,
        ];
    }

    public static function fromArray(array $data): self
    {
        $task = new self(
            $data['task_type'],
            $data['payload'],
            $data['priority'],
            $data['max_retries']
        );
        $task->taskId = $data['task_id'];
        $task->currentRetry = $data['current_retry'];
        $task->createdAt = $data['created_at'];
        $task->expireAt = $data['expire_at'];
        $task->metadata = $data['metadata'] ?? [];
        return $task;
    }
}

任务生产者实现

php
<?php

namespace App\Messaging\AsyncTask;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

class TaskProducer
{
    private AMQPStreamConnection $connection;
    private $channel;
    private string $exchangeName = 'task.exchange';
    private string $deadLetterExchange = 'task.dlx';
    
    private const PRIORITY_QUEUES = [
        'task.high' => ['min' => 8, 'max' => 10],
        'task.normal' => ['min' => 4, 'max' => 7],
        'task.low' => ['min' => 1, 'max' => 3],
    ];

    public function __construct(AMQPStreamConnection $connection)
    {
        $this->connection = $connection;
        $this->channel = $connection->channel();
        $this->setupInfrastructure();
    }

    private function setupInfrastructure(): void
    {
        $this->channel->exchange_declare(
            $this->exchangeName,
            AMQPExchangeType::DIRECT,
            false,
            true,
            false
        );

        $this->channel->exchange_declare(
            $this->deadLetterExchange,
            AMQPExchangeType::DIRECT,
            false,
            true,
            false
        );

        $dlxQueueArgs = [
            'x-message-ttl' => ['I', 604800],
            'x-max-length' => ['I', 100000],
        ];
        
        $this->channel->queue_declare(
            'task.dlq',
            false,
            true,
            false,
            false,
            false,
            $dlxQueueArgs
        );
        
        $this->channel->queue_bind('task.dlq', $this->deadLetterExchange, 'task.failed');

        foreach (self::PRIORITY_QUEUES as $queueName => $priorityRange) {
            $args = [
                'x-dead-letter-exchange' => ['S', $this->deadLetterExchange],
                'x-dead-letter-routing-key' => ['S', 'task.failed'],
                'x-max-priority' => ['I', 10],
                'x-message-ttl' => ['I', 86400000],
            ];
            
            $this->channel->queue_declare(
                $queueName,
                false,
                true,
                false,
                false,
                false,
                $args
            );
            
            $this->channel->queue_bind($queueName, $this->exchangeName, $queueName);
        }
    }

    public function publish(TaskMessage $task): bool
    {
        $queueName = $this->getQueueByPriority($task->priority);
        
        $message = new AMQPMessage(
            json_encode($task->toArray()),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'priority' => $task->priority,
                'message_id' => $task->taskId,
                'timestamp' => time(),
                'expiration' => strval(($task->expireAt - time()) * 1000),
            ]
        );

        $this->channel->basic_publish(
            $message,
            $this->exchangeName,
            $queueName
        );

        return true;
    }

    private function getQueueByPriority(int $priority): string
    {
        foreach (self::PRIORITY_QUEUES as $queueName => $range) {
            if ($priority >= $range['min'] && $priority <= $range['max']) {
                return $queueName;
            }
        }
        return 'task.normal';
    }

    public function publishBatch(array $tasks): int
    {
        $count = 0;
        foreach ($tasks as $task) {
            if ($task instanceof TaskMessage) {
                $this->publish($task);
                $count++;
            }
        }
        return $count;
    }

    public function close(): void
    {
        if ($this->channel) {
            $this->channel->close();
        }
    }
}

任务消费者实现

php
<?php

namespace App\Messaging\AsyncTask;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use App\Services\TaskHandlerRegistry;

class TaskConsumer
{
    private AMQPStreamConnection $connection;
    private $channel;
    private TaskHandlerRegistry $handlerRegistry;
    private array $config;
    private bool $running = true;

    public function __construct(
        AMQPStreamConnection $connection,
        TaskHandlerRegistry $handlerRegistry,
        array $config = []
    ) {
        $this->connection = $connection;
        $this->channel = $connection->channel();
        $this->handlerRegistry = $handlerRegistry;
        $this->config = array_merge([
            'prefetch_count' => 10,
            'queues' => ['task.high', 'task.normal', 'task.low'],
        ], $config);
        
        $this->setup();
    }

    private function setup(): void
    {
        $this->channel->basic_qos(
            null,
            $this->config['prefetch_count'],
            null
        );
    }

    public function consume(string $queueName, callable $callback = null): void
    {
        $this->channel->basic_consume(
            $queueName,
            '',
            false,
            false,
            false,
            false,
            function (AMQPMessage $message) use ($callback) {
                $this->processMessage($message, $callback);
            }
        );

        while ($this->running && count($this->channel->callbacks)) {
            $this->channel->wait(null, true);
            
            if (!$this->running) {
                break;
            }
            
            usleep(100000);
        }
    }

    public function consumeAll(): void
    {
        foreach ($this->config['queues'] as $queue) {
            $this->consumeQueue($queue);
        }

        while ($this->running && count($this->channel->callbacks)) {
            $this->channel->wait(null, true);
            
            if (!$this->running) {
                break;
            }
            
            usleep(100000);
        }
    }

    private function consumeQueue(string $queueName): void
    {
        $this->channel->basic_consume(
            $queueName,
            '',
            false,
            false,
            false,
            false,
            [$this', 'processMessage']
        );
    }

    public function processMessage(AMQPMessage $message, callable $customCallback = null): void
    {
        $taskData = json_decode($message->body, true);
        $task = TaskMessage::fromArray($taskData);
        
        try {
            $this->logTaskStart($task);
            
            if ($task->isExpired()) {
                $this->handleExpiredTask($message, $task);
                return;
            }

            if ($customCallback) {
                $result = $customCallback($task);
            } else {
                $handler = $this->handlerRegistry->get($task->taskType);
                $result = $handler->handle($task);
            }

            if ($result->isSuccess()) {
                $message->ack();
                $this->logTaskSuccess($task, $result);
            } else {
                $this->handleFailure($message, $task, $result->getError());
            }
            
        } catch (\Exception $e) {
            $this->handleException($message, $task, $e);
        }
    }

    private function handleFailure(AMQPMessage $message, TaskMessage $task, \Exception $error): void
    {
        $task->incrementRetry();
        
        if ($task->canRetry()) {
            $this->logTaskRetry($task, $error);
            
            $newMessage = new AMQPMessage(
                json_encode($task->toArray()),
                $message->get_properties()
            );
            
            $message->nack(false, false);
            
            sleep($this->calculateBackoff($task->currentRetry));
            
            $this->requeueTask($task);
        } else {
            $this->logTaskFailed($task, $error);
            $message->nack(false, false);
            $this->sendToDeadLetterQueue($task, $error);
        }
    }

    private function handleException(AMQPMessage $message, TaskMessage $task, \Exception $e): void
    {
        $this->logTaskError($task, $e);
        
        $task->incrementRetry();
        
        if ($task->canRetry()) {
            $message->nack(false, true);
        } else {
            $message->nack(false, false);
            $this->sendToDeadLetterQueue($task, $e);
        }
    }

    private function handleExpiredTask(AMQPMessage $message, TaskMessage $task): void
    {
        $this->logTaskExpired($task);
        $message->ack();
    }

    private function calculateBackoff(int $retryCount): int
    {
        return min(60, pow(2, $retryCount));
    }

    private function requeueTask(TaskMessage $task): void
    {
        $producer = new TaskProducer($this->connection);
        $producer->publish($task);
    }

    private function sendToDeadLetterQueue(TaskMessage $task, \Exception $error): void
    {
        $dlqMessage = new AMQPMessage(
            json_encode([
                'original_task' => $task->toArray(),
                'error' => [
                    'message' => $error->getMessage(),
                    'trace' => $error->getTraceAsString(),
                    'failed_at' => date('Y-m-d H:i:s'),
                ],
            ]),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
            ]
        );

        $this->channel->basic_publish(
            $dlqMessage,
            'task.dlx',
            'task.failed'
        );
    }

    public function stop(): void
    {
        $this->running = false;
    }

    public function close(): void
    {
        $this->stop();
        if ($this->channel) {
            $this->channel->close();
        }
    }

    private function logTaskStart(TaskMessage $task): void
    {
        error_log(sprintf('[Task] Starting: %s [%s]', $task->taskId, $task->taskType));
    }

    private function logTaskSuccess(TaskMessage $task, $result): void
    {
        error_log(sprintf('[Task] Success: %s', $task->taskId));
    }

    private function logTaskRetry(TaskMessage $task, \Exception $error): void
    {
        error_log(sprintf(
            '[Task] Retry %d/%d: %s - %s',
            $task->currentRetry,
            $task->maxRetries,
            $task->taskId,
            $error->getMessage()
        ));
    }

    private function logTaskFailed(TaskMessage $task, \Exception $error): void
    {
        error_log(sprintf('[Task] Failed: %s - %s', $task->taskId, $error->getMessage()));
    }

    private function logTaskError(TaskMessage $task, \Exception $e): void
    {
        error_log(sprintf('[Task] Error: %s - %s', $task->taskId, $e->getMessage()));
    }

    private function logTaskExpired(TaskMessage $task): void
    {
        error_log(sprintf('[Task] Expired: %s', $task->taskId));
    }
}

任务处理器注册表

php
<?php

namespace App\Services;

use App\Messaging\AsyncTask\TaskMessage;

interface TaskHandlerInterface
{
    public function handle(TaskMessage $task): TaskResult;
    public function supports(string $taskType): bool;
}

class TaskResult
{
    private bool $success;
    private $data;
    private ?\Exception $error;

    private function __construct(bool $success, $data = null, ?\Exception $error = null)
    {
        $this->success = $success;
        $this->data = $data;
        $this->error = $error;
    }

    public static function success($data = null): self
    {
        return new self(true, $data);
    }

    public static function failure(\Exception $error): self
    {
        return new self(false, null, $error);
    }

    public function isSuccess(): bool
    {
        return $this->success;
    }

    public function getData()
    {
        return $this->data;
    }

    public function getError(): ?\Exception
    {
        return $this->error;
    }
}

class TaskHandlerRegistry
{
    private array $handlers = [];

    public function register(TaskHandlerInterface $handler): void
    {
        $taskTypes = $handler->getSupportedTypes();
        foreach ($taskTypes as $type) {
            $this->handlers[$type] = $handler;
        }
    }

    public function get(string $taskType): TaskHandlerInterface
    {
        if (!isset($this->handlers[$taskType])) {
            throw new \InvalidArgumentException("No handler for task type: {$taskType}");
        }
        return $this->handlers[$taskType];
    }

    public function has(string $taskType): bool
    {
        return isset($this->handlers[$taskType]);
    }
}

具体任务处理器示例

php
<?php

namespace App\Handlers;

use App\Services\{TaskHandlerInterface, TaskResult};
use App\Messaging\AsyncTask\TaskMessage;
use PHPMailer\PHPMailer\PHPMailer;

class EmailTaskHandler implements TaskHandlerInterface
{
    private PHPMailer $mailer;
    private array $config;

    public function __construct(array $config)
    {
        $this->config = $config;
        $this->mailer = new PHPMailer(true);
        $this->setupMailer();
    }

    private function setupMailer(): void
    {
        $this->mailer->isSMTP();
        $this->mailer->Host = $this->config['host'];
        $this->mailer->SMTPAuth = true;
        $this->mailer->Username = $this->config['username'];
        $this->mailer->Password = $this->config['password'];
        $this->mailer->SMTPSecure = PHPMailer::ENCRYPTION_STARTTLS;
        $this->mailer->Port = $this->config['port'];
        $this->mailer->setFrom($this->config['from_email'], $this->config['from_name']);
    }

    public function handle(TaskMessage $task): TaskResult
    {
        $payload = $task->payload;
        
        try {
            $this->mailer->clearAddresses();
            $this->mailer->addAddress($payload['to_email'], $payload['to_name'] ?? '');
            $this->mailer->isHTML(true);
            $this->mailer->Subject = $payload['subject'];
            $this->mailer->Body = $payload['body'];
            
            if (isset($payload['attachments'])) {
                foreach ($payload['attachments'] as $attachment) {
                    $this->mailer->addAttachment($attachment['path'], $attachment['name'] ?? '');
                }
            }
            
            $this->mailer->send();
            
            return TaskResult::success(['sent_at' => date('Y-m-d H:i:s')]);
            
        } catch (\Exception $e) {
            return TaskResult::failure($e);
        }
    }

    public function supports(string $taskType): bool
    {
        return $taskType === 'email.send';
    }

    public function getSupportedTypes(): array
    {
        return ['email.send'];
    }
}

class SmsTaskHandler implements TaskHandlerInterface
{
    private array $config;

    public function __construct(array $config)
    {
        $this->config = $config;
    }

    public function handle(TaskMessage $task): TaskResult
    {
        $payload = $task->payload;
        
        try {
            $result = $this->sendSms(
                $payload['phone'],
                $payload['content'],
                $payload['template_id'] ?? null,
                $payload['params'] ?? []
            );
            
            return TaskResult::success($result);
            
        } catch (\Exception $e) {
            return TaskResult::failure($e);
        }
    }

    private function sendSms(string $phone, string $content, ?string $templateId, array $params): array
    {
        $ch = curl_init();
        
        $postData = [
            'phone' => $phone,
            'content' => $content,
            'template_id' => $templateId,
            'params' => $params,
        ];
        
        curl_setopt_array($ch, [
            CURLOPT_URL => $this->config['api_url'],
            CURLOPT_POST => true,
            CURLOPT_POSTFIELDS => json_encode($postData),
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_HTTPHEADER => [
                'Content-Type: application/json',
                'Authorization: Bearer ' . $this->config['api_key'],
            ],
            CURLOPT_TIMEOUT => 30,
        ]);
        
        $response = curl_exec($ch);
        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        curl_close($ch);
        
        if ($httpCode !== 200) {
            throw new \RuntimeException("SMS API error: HTTP {$httpCode}");
        }
        
        return json_decode($response, true);
    }

    public function supports(string $taskType): bool
    {
        return $taskType === 'sms.send';
    }

    public function getSupportedTypes(): array
    {
        return ['sms.send'];
    }
}

class ImageProcessHandler implements TaskHandlerInterface
{
    private array $config;

    public function __construct(array $config)
    {
        $this->config = $config;
    }

    public function handle(TaskMessage $task): TaskResult
    {
        $payload = $task->payload;
        
        try {
            $imagePath = $payload['image_path'];
            $operations = $payload['operations'];
            
            $result = $this->processImage($imagePath, $operations);
            
            return TaskResult::success($result);
            
        } catch (\Exception $e) {
            return TaskResult::failure($e);
        }
    }

    private function processImage(string $imagePath, array $operations): array
    {
        $image = imagecreatefromstring(file_get_contents($imagePath));
        $results = [];
        
        foreach ($operations as $operation) {
            switch ($operation['type']) {
                case 'resize':
                    $image = $this->resizeImage($image, $operation['width'], $operation['height']);
                    $results[] = ['type' => 'resize', 'status' => 'completed'];
                    break;
                    
                case 'watermark':
                    $image = $this->addWatermark($image, $operation['watermark_path'], $operation['position']);
                    $results[] = ['type' => 'watermark', 'status' => 'completed'];
                    break;
                    
                case 'compress':
                    $image = $this->compressImage($image, $operation['quality']);
                    $results[] = ['type' => 'compress', 'status' => 'completed'];
                    break;
            }
        }
        
        $outputPath = $this->config['output_dir'] . '/' . basename($imagePath);
        imagejpeg($image, $outputPath, 90);
        imagedestroy($image);
        
        return [
            'output_path' => $outputPath,
            'operations' => $results,
        ];
    }

    private function resizeImage($image, int $width, int $height)
    {
        $newImage = imagecreatetruecolor($width, $height);
        imagecopyresampled($newImage, $image, 0, 0, 0, 0, $width, $height, imagesx($image), imagesy($image));
        return $newImage;
    }

    private function addWatermark($image, string $watermarkPath, string $position)
    {
        $watermark = imagecreatefrompng($watermarkPath);
        $wmWidth = imagesx($watermark);
        $wmHeight = imagesy($watermark);
        
        $destX = imagesx($image) - $wmWidth - 10;
        $destY = imagesy($image) - $wmHeight - 10;
        
        imagecopy($image, $watermark, $destX, $destY, 0, 0, $wmWidth, $wmHeight);
        imagedestroy($watermark);
        
        return $image;
    }

    private function compressImage($image, int $quality)
    {
        return $image;
    }

    public function supports(string $taskType): bool
    {
        return $taskType === 'image.process';
    }

    public function getSupportedTypes(): array
    {
        return ['image.process'];
    }
}

完整使用示例

php
<?php

require_once 'vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use App\Messaging\AsyncTask\{TaskMessage, TaskProducer, TaskConsumer};
use App\Services\TaskHandlerRegistry;
use App\Handlers\{EmailTaskHandler, SmsTaskHandler, ImageProcessHandler};

$connection = new AMQPStreamConnection(
    'localhost',
    5672,
    'guest',
    'guest',
    '/',
    false,
    'AMQPLAIN',
    null,
    'en_US',
    3.0,
    3.0
);

$registry = new TaskHandlerRegistry();
$registry->register(new EmailTaskHandler([
    'host' => 'smtp.example.com',
    'username' => 'noreply@example.com',
    'password' => 'password',
    'port' => 587,
    'from_email' => 'noreply@example.com',
    'from_name' => '系统通知',
]));

$registry->register(new SmsTaskHandler([
    'api_url' => 'https://sms-api.example.com/send',
    'api_key' => 'your-api-key',
]));

$registry->register(new ImageProcessHandler([
    'output_dir' => '/var/www/storage/images/processed',
]));

$producer = new TaskProducer($connection);

$emailTask = new TaskMessage(
    'email.send',
    [
        'to_email' => 'user@example.com',
        'to_name' => '张三',
        'subject' => '订单确认通知',
        'body' => '<h1>您的订单已确认</h1><p>订单号: ORD123456</p>',
    ],
    priority: 8
);
$producer->publish($emailTask);

$smsTask = new TaskMessage(
    'sms.send',
    [
        'phone' => '13800138000',
        'content' => '您的订单已发货,快递单号: SF123456789',
        'template_id' => 'SMS_ORDER_SHIPPED',
    ],
    priority: 9
);
$producer->publish($smsTask);

$imageTask = new TaskMessage(
    'image.process',
    [
        'image_path' => '/var/www/storage/uploads/image.jpg',
        'operations' => [
            ['type' => 'resize', 'width' => 800, 'height' => 600],
            ['type' => 'watermark', 'watermark_path' => '/var/www/watermark.png', 'position' => 'bottom-right'],
            ['type' => 'compress', 'quality' => 85],
        ],
    ],
    priority: 3
);
$producer->publish($imageTask);

echo "任务已提交:\n";
echo "- 邮件任务: {$emailTask->taskId}\n";
echo "- 短信任务: {$smsTask->taskId}\n";
echo "- 图片处理任务: {$imageTask->taskId}\n";

$consumer = new TaskConsumer(
    $connection,
    $registry,
    [
        'prefetch_count' => 5,
        'queues' => ['task.high', 'task.normal', 'task.low'],
    ]
);

echo "\n开始消费任务...\n";

pcntl_async_signals(true);
pcntl_signal(SIGTERM, function () use ($consumer) {
    echo "收到终止信号,正在停止消费者...\n";
    $consumer->stop();
});
pcntl_signal(SIGINT, function () use ($consumer) {
    echo "收到中断信号,正在停止消费者...\n";
    $consumer->stop();
});

$consumer->consumeAll();

$producer->close();
$consumer->close();
$connection->close();

关键技术点解析

1. 优先级队列设计

php
$queueArgs = [
    'x-max-priority' => ['I', 10],
];
  • RabbitMQ 支持优先级队列,优先级范围 1-255
  • 建议使用较小的优先级范围(1-10)以获得更好的性能
  • 高优先级消息会被优先消费

2. 死信队列处理

php
$queueArgs = [
    'x-dead-letter-exchange' => ['S', 'task.dlx'],
    'x-dead-letter-routing-key' => ['S', 'task.failed'],
];
  • 消息被拒绝或过期后自动进入死信队列
  • 便于后续人工处理或自动重试

3. 消息持久化

php
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
  • 确保消息在 RabbitMQ 重启后不丢失
  • 队列也需要声明为持久化

4. 消费者预取设置

php
$this->channel->basic_qos(null, $prefetchCount, null);
  • 控制消费者同时处理的消息数量
  • 防止消费者过载

5. 指数退避重试

php
private function calculateBackoff(int $retryCount): int
{
    return min(60, pow(2, $retryCount));
}
  • 避免频繁重试对系统造成压力
  • 最大等待时间限制在合理范围内

性能优化建议

生产者优化

优化项建议说明
批量发送使用 batch_publish减少网络往返
连接复用保持长连接避免频繁创建连接
异步确认开启 publisher confirms提高吞吐量
消息压缩大消息使用 gzip减少网络传输

消费者优化

优化项建议说明
预取数量根据处理能力设置平衡吞吐和延迟
多消费者启动多个消费者进程提高并行处理能力
处理超时设置合理的超时时间避免消息长时间阻塞
资源池化复用数据库连接等资源减少资源创建开销

队列优化

bash
rabbitmqctl eval 'rabbit_amqqueue:set_queue_duration(
    <<"/", "task.normal">>, 
    {max_length, 100000}, 
    {max_length_bytes, 1073741824}
).'

常见问题与解决方案

1. 消息积压

问题: 队列中消息堆积过多,消费速度跟不上

解决方案:

php
$consumer = new TaskConsumer($connection, $registry, [
    'prefetch_count' => 20,
]);

for ($i = 0; $i < 5; $i++) {
    $pid = pcntl_fork();
    if ($pid === 0) {
        $consumer->consumeAll();
        exit(0);
    }
}

2. 消息重复消费

问题: 消费者处理成功但 ACK 失败,导致消息重新投递

解决方案:

php
class IdempotentTaskHandler implements TaskHandlerInterface
{
    private $redis;
    
    public function handle(TaskMessage $task): TaskResult
    {
        $lockKey = "task:lock:{$task->taskId}";
        
        if ($this->redis->exists($lockKey)) {
            return TaskResult::success(['status' => 'already_processed']);
        }
        
        $this->redis->setex($lockKey, 86400, '1');
        
        return $this->doHandle($task);
    }
}

3. 消费者崩溃

问题: 消费者处理过程中崩溃,消息处于 unacked 状态

解决方案:

  • 使用 Supervisor 管理消费者进程
  • 设置合理的心跳超时时间
  • 实现优雅关闭机制

4. 内存泄漏

问题: 长时间运行的消费者内存持续增长

解决方案:

php
$processedCount = 0;
$maxProcessCount = 1000;

while ($consumer->isRunning()) {
    $consumer->process();
    $processedCount++;
    
    if ($processedCount >= $maxProcessCount) {
        echo "达到最大处理数量,重启消费者\n";
        break;
    }
}

相关链接