Appearance
异步任务处理系统
概述
异步任务处理是 RabbitMQ 最经典的应用场景之一。通过消息队列将耗时操作从主业务流程中剥离,实现请求的快速响应和任务的异步执行,大幅提升系统的吞吐量和用户体验。
业务背景与需求
场景描述
某电商平台需要处理以下耗时操作:
- 订单确认邮件发送:用户下单后发送确认邮件
- 短信通知:订单状态变更时发送短信通知
- 报表生成:定时生成销售报表、用户行为分析报表
- 图片处理:用户上传图片后进行压缩、水印处理
- 数据同步:将订单数据同步到数据仓库
需求分析
| 需求项 | 描述 |
|---|---|
| 响应时间 | 主业务接口响应时间 < 200ms |
| 可靠性 | 任务不丢失,支持重试 |
| 可扩展 | 支持动态增加消费者 |
| 可监控 | 任务执行状态可追踪 |
| 优先级 | 支持任务优先级处理 |
架构设计
整体架构图
mermaid
graph TB
subgraph "客户端"
A[Web应用]
B[API服务]
end
subgraph "消息生产者"
C[任务分发器]
end
subgraph "RabbitMQ"
D[任务交换机<br/>task.exchange]
E[高优先级队列<br/>task.high]
F[普通队列<br/>task.normal]
G[低优先级队列<br/>task.low]
H[死信队列<br/>task.dlx]
end
subgraph "消费者集群"
I[邮件服务]
J[短信服务]
K[报表服务]
L[图片处理服务]
end
subgraph "监控"
M[任务状态追踪]
N[告警系统]
end
A --> C
B --> C
C --> D
D --> E
D --> F
D --> G
E --> I
E --> J
F --> K
G --> L
E -.-> H
F -.-> H
G -.-> H
I --> M
J --> M
K --> M
L --> M
M --> N任务流转流程
mermaid
sequenceDiagram
participant App as 应用服务
participant Producer as 任务生产者
participant MQ as RabbitMQ
participant Consumer as 任务消费者
participant DB as 数据库
App->>Producer: 提交异步任务
Producer->>Producer: 序列化任务数据
Producer->>MQ: 发布消息(带优先级)
MQ-->>Producer: 确认接收
Producer-->>App: 返回任务ID
App-->>App: 立即响应用户
MQ->>Consumer: 投递消息
Consumer->>Consumer: 解析任务数据
Consumer->>DB: 执行任务逻辑
DB-->>Consumer: 执行结果
alt 执行成功
Consumer->>MQ: 发送ACK确认
Consumer->>DB: 更新任务状态
else 执行失败
Consumer->>MQ: 发送NACK拒绝
MQ->>MQ: 消息重新入队或进入死信队列
endPHP 代码实现
任务消息结构定义
php
<?php
namespace App\Messaging\AsyncTask;
class TaskMessage
{
public string $taskId;
public string $taskType;
public array $payload;
public int $priority;
public int $maxRetries;
public int $currentRetry;
public int $createdAt;
public int $expireAt;
public array $metadata;
public function __construct(
string $taskType,
array $payload,
int $priority = 5,
int $maxRetries = 3,
int $ttl = 86400
) {
$this->taskId = $this->generateTaskId();
$this->taskType = $taskType;
$this->payload = $payload;
$this->priority = min(10, max(1, $priority));
$this->maxRetries = $maxRetries;
$this->currentRetry = 0;
$this->createdAt = time();
$this->expireAt = time() + $ttl;
$this->metadata = [];
}
private function generateTaskId(): string
{
return sprintf(
'task_%s_%s',
date('YmdHis'),
bin2hex(random_bytes(8))
);
}
public function incrementRetry(): self
{
$this->currentRetry++;
return $this;
}
public function canRetry(): bool
{
return $this->currentRetry < $this->maxRetries;
}
public function isExpired(): bool
{
return time() > $this->expireAt;
}
public function toArray(): array
{
return [
'task_id' => $this->taskId,
'task_type' => $this->taskType,
'payload' => $this->payload,
'priority' => $this->priority,
'max_retries' => $this->maxRetries,
'current_retry' => $this->currentRetry,
'created_at' => $this->createdAt,
'expire_at' => $this->expireAt,
'metadata' => $this->metadata,
];
}
public static function fromArray(array $data): self
{
$task = new self(
$data['task_type'],
$data['payload'],
$data['priority'],
$data['max_retries']
);
$task->taskId = $data['task_id'];
$task->currentRetry = $data['current_retry'];
$task->createdAt = $data['created_at'];
$task->expireAt = $data['expire_at'];
$task->metadata = $data['metadata'] ?? [];
return $task;
}
}任务生产者实现
php
<?php
namespace App\Messaging\AsyncTask;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
class TaskProducer
{
private AMQPStreamConnection $connection;
private $channel;
private string $exchangeName = 'task.exchange';
private string $deadLetterExchange = 'task.dlx';
private const PRIORITY_QUEUES = [
'task.high' => ['min' => 8, 'max' => 10],
'task.normal' => ['min' => 4, 'max' => 7],
'task.low' => ['min' => 1, 'max' => 3],
];
public function __construct(AMQPStreamConnection $connection)
{
$this->connection = $connection;
$this->channel = $connection->channel();
$this->setupInfrastructure();
}
private function setupInfrastructure(): void
{
$this->channel->exchange_declare(
$this->exchangeName,
AMQPExchangeType::DIRECT,
false,
true,
false
);
$this->channel->exchange_declare(
$this->deadLetterExchange,
AMQPExchangeType::DIRECT,
false,
true,
false
);
$dlxQueueArgs = [
'x-message-ttl' => ['I', 604800],
'x-max-length' => ['I', 100000],
];
$this->channel->queue_declare(
'task.dlq',
false,
true,
false,
false,
false,
$dlxQueueArgs
);
$this->channel->queue_bind('task.dlq', $this->deadLetterExchange, 'task.failed');
foreach (self::PRIORITY_QUEUES as $queueName => $priorityRange) {
$args = [
'x-dead-letter-exchange' => ['S', $this->deadLetterExchange],
'x-dead-letter-routing-key' => ['S', 'task.failed'],
'x-max-priority' => ['I', 10],
'x-message-ttl' => ['I', 86400000],
];
$this->channel->queue_declare(
$queueName,
false,
true,
false,
false,
false,
$args
);
$this->channel->queue_bind($queueName, $this->exchangeName, $queueName);
}
}
public function publish(TaskMessage $task): bool
{
$queueName = $this->getQueueByPriority($task->priority);
$message = new AMQPMessage(
json_encode($task->toArray()),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'priority' => $task->priority,
'message_id' => $task->taskId,
'timestamp' => time(),
'expiration' => strval(($task->expireAt - time()) * 1000),
]
);
$this->channel->basic_publish(
$message,
$this->exchangeName,
$queueName
);
return true;
}
private function getQueueByPriority(int $priority): string
{
foreach (self::PRIORITY_QUEUES as $queueName => $range) {
if ($priority >= $range['min'] && $priority <= $range['max']) {
return $queueName;
}
}
return 'task.normal';
}
public function publishBatch(array $tasks): int
{
$count = 0;
foreach ($tasks as $task) {
if ($task instanceof TaskMessage) {
$this->publish($task);
$count++;
}
}
return $count;
}
public function close(): void
{
if ($this->channel) {
$this->channel->close();
}
}
}任务消费者实现
php
<?php
namespace App\Messaging\AsyncTask;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use App\Services\TaskHandlerRegistry;
class TaskConsumer
{
private AMQPStreamConnection $connection;
private $channel;
private TaskHandlerRegistry $handlerRegistry;
private array $config;
private bool $running = true;
public function __construct(
AMQPStreamConnection $connection,
TaskHandlerRegistry $handlerRegistry,
array $config = []
) {
$this->connection = $connection;
$this->channel = $connection->channel();
$this->handlerRegistry = $handlerRegistry;
$this->config = array_merge([
'prefetch_count' => 10,
'queues' => ['task.high', 'task.normal', 'task.low'],
], $config);
$this->setup();
}
private function setup(): void
{
$this->channel->basic_qos(
null,
$this->config['prefetch_count'],
null
);
}
public function consume(string $queueName, callable $callback = null): void
{
$this->channel->basic_consume(
$queueName,
'',
false,
false,
false,
false,
function (AMQPMessage $message) use ($callback) {
$this->processMessage($message, $callback);
}
);
while ($this->running && count($this->channel->callbacks)) {
$this->channel->wait(null, true);
if (!$this->running) {
break;
}
usleep(100000);
}
}
public function consumeAll(): void
{
foreach ($this->config['queues'] as $queue) {
$this->consumeQueue($queue);
}
while ($this->running && count($this->channel->callbacks)) {
$this->channel->wait(null, true);
if (!$this->running) {
break;
}
usleep(100000);
}
}
private function consumeQueue(string $queueName): void
{
$this->channel->basic_consume(
$queueName,
'',
false,
false,
false,
false,
[$this', 'processMessage']
);
}
public function processMessage(AMQPMessage $message, callable $customCallback = null): void
{
$taskData = json_decode($message->body, true);
$task = TaskMessage::fromArray($taskData);
try {
$this->logTaskStart($task);
if ($task->isExpired()) {
$this->handleExpiredTask($message, $task);
return;
}
if ($customCallback) {
$result = $customCallback($task);
} else {
$handler = $this->handlerRegistry->get($task->taskType);
$result = $handler->handle($task);
}
if ($result->isSuccess()) {
$message->ack();
$this->logTaskSuccess($task, $result);
} else {
$this->handleFailure($message, $task, $result->getError());
}
} catch (\Exception $e) {
$this->handleException($message, $task, $e);
}
}
private function handleFailure(AMQPMessage $message, TaskMessage $task, \Exception $error): void
{
$task->incrementRetry();
if ($task->canRetry()) {
$this->logTaskRetry($task, $error);
$newMessage = new AMQPMessage(
json_encode($task->toArray()),
$message->get_properties()
);
$message->nack(false, false);
sleep($this->calculateBackoff($task->currentRetry));
$this->requeueTask($task);
} else {
$this->logTaskFailed($task, $error);
$message->nack(false, false);
$this->sendToDeadLetterQueue($task, $error);
}
}
private function handleException(AMQPMessage $message, TaskMessage $task, \Exception $e): void
{
$this->logTaskError($task, $e);
$task->incrementRetry();
if ($task->canRetry()) {
$message->nack(false, true);
} else {
$message->nack(false, false);
$this->sendToDeadLetterQueue($task, $e);
}
}
private function handleExpiredTask(AMQPMessage $message, TaskMessage $task): void
{
$this->logTaskExpired($task);
$message->ack();
}
private function calculateBackoff(int $retryCount): int
{
return min(60, pow(2, $retryCount));
}
private function requeueTask(TaskMessage $task): void
{
$producer = new TaskProducer($this->connection);
$producer->publish($task);
}
private function sendToDeadLetterQueue(TaskMessage $task, \Exception $error): void
{
$dlqMessage = new AMQPMessage(
json_encode([
'original_task' => $task->toArray(),
'error' => [
'message' => $error->getMessage(),
'trace' => $error->getTraceAsString(),
'failed_at' => date('Y-m-d H:i:s'),
],
]),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]
);
$this->channel->basic_publish(
$dlqMessage,
'task.dlx',
'task.failed'
);
}
public function stop(): void
{
$this->running = false;
}
public function close(): void
{
$this->stop();
if ($this->channel) {
$this->channel->close();
}
}
private function logTaskStart(TaskMessage $task): void
{
error_log(sprintf('[Task] Starting: %s [%s]', $task->taskId, $task->taskType));
}
private function logTaskSuccess(TaskMessage $task, $result): void
{
error_log(sprintf('[Task] Success: %s', $task->taskId));
}
private function logTaskRetry(TaskMessage $task, \Exception $error): void
{
error_log(sprintf(
'[Task] Retry %d/%d: %s - %s',
$task->currentRetry,
$task->maxRetries,
$task->taskId,
$error->getMessage()
));
}
private function logTaskFailed(TaskMessage $task, \Exception $error): void
{
error_log(sprintf('[Task] Failed: %s - %s', $task->taskId, $error->getMessage()));
}
private function logTaskError(TaskMessage $task, \Exception $e): void
{
error_log(sprintf('[Task] Error: %s - %s', $task->taskId, $e->getMessage()));
}
private function logTaskExpired(TaskMessage $task): void
{
error_log(sprintf('[Task] Expired: %s', $task->taskId));
}
}任务处理器注册表
php
<?php
namespace App\Services;
use App\Messaging\AsyncTask\TaskMessage;
interface TaskHandlerInterface
{
public function handle(TaskMessage $task): TaskResult;
public function supports(string $taskType): bool;
}
class TaskResult
{
private bool $success;
private $data;
private ?\Exception $error;
private function __construct(bool $success, $data = null, ?\Exception $error = null)
{
$this->success = $success;
$this->data = $data;
$this->error = $error;
}
public static function success($data = null): self
{
return new self(true, $data);
}
public static function failure(\Exception $error): self
{
return new self(false, null, $error);
}
public function isSuccess(): bool
{
return $this->success;
}
public function getData()
{
return $this->data;
}
public function getError(): ?\Exception
{
return $this->error;
}
}
class TaskHandlerRegistry
{
private array $handlers = [];
public function register(TaskHandlerInterface $handler): void
{
$taskTypes = $handler->getSupportedTypes();
foreach ($taskTypes as $type) {
$this->handlers[$type] = $handler;
}
}
public function get(string $taskType): TaskHandlerInterface
{
if (!isset($this->handlers[$taskType])) {
throw new \InvalidArgumentException("No handler for task type: {$taskType}");
}
return $this->handlers[$taskType];
}
public function has(string $taskType): bool
{
return isset($this->handlers[$taskType]);
}
}具体任务处理器示例
php
<?php
namespace App\Handlers;
use App\Services\{TaskHandlerInterface, TaskResult};
use App\Messaging\AsyncTask\TaskMessage;
use PHPMailer\PHPMailer\PHPMailer;
class EmailTaskHandler implements TaskHandlerInterface
{
private PHPMailer $mailer;
private array $config;
public function __construct(array $config)
{
$this->config = $config;
$this->mailer = new PHPMailer(true);
$this->setupMailer();
}
private function setupMailer(): void
{
$this->mailer->isSMTP();
$this->mailer->Host = $this->config['host'];
$this->mailer->SMTPAuth = true;
$this->mailer->Username = $this->config['username'];
$this->mailer->Password = $this->config['password'];
$this->mailer->SMTPSecure = PHPMailer::ENCRYPTION_STARTTLS;
$this->mailer->Port = $this->config['port'];
$this->mailer->setFrom($this->config['from_email'], $this->config['from_name']);
}
public function handle(TaskMessage $task): TaskResult
{
$payload = $task->payload;
try {
$this->mailer->clearAddresses();
$this->mailer->addAddress($payload['to_email'], $payload['to_name'] ?? '');
$this->mailer->isHTML(true);
$this->mailer->Subject = $payload['subject'];
$this->mailer->Body = $payload['body'];
if (isset($payload['attachments'])) {
foreach ($payload['attachments'] as $attachment) {
$this->mailer->addAttachment($attachment['path'], $attachment['name'] ?? '');
}
}
$this->mailer->send();
return TaskResult::success(['sent_at' => date('Y-m-d H:i:s')]);
} catch (\Exception $e) {
return TaskResult::failure($e);
}
}
public function supports(string $taskType): bool
{
return $taskType === 'email.send';
}
public function getSupportedTypes(): array
{
return ['email.send'];
}
}
class SmsTaskHandler implements TaskHandlerInterface
{
private array $config;
public function __construct(array $config)
{
$this->config = $config;
}
public function handle(TaskMessage $task): TaskResult
{
$payload = $task->payload;
try {
$result = $this->sendSms(
$payload['phone'],
$payload['content'],
$payload['template_id'] ?? null,
$payload['params'] ?? []
);
return TaskResult::success($result);
} catch (\Exception $e) {
return TaskResult::failure($e);
}
}
private function sendSms(string $phone, string $content, ?string $templateId, array $params): array
{
$ch = curl_init();
$postData = [
'phone' => $phone,
'content' => $content,
'template_id' => $templateId,
'params' => $params,
];
curl_setopt_array($ch, [
CURLOPT_URL => $this->config['api_url'],
CURLOPT_POST => true,
CURLOPT_POSTFIELDS => json_encode($postData),
CURLOPT_RETURNTRANSFER => true,
CURLOPT_HTTPHEADER => [
'Content-Type: application/json',
'Authorization: Bearer ' . $this->config['api_key'],
],
CURLOPT_TIMEOUT => 30,
]);
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
if ($httpCode !== 200) {
throw new \RuntimeException("SMS API error: HTTP {$httpCode}");
}
return json_decode($response, true);
}
public function supports(string $taskType): bool
{
return $taskType === 'sms.send';
}
public function getSupportedTypes(): array
{
return ['sms.send'];
}
}
class ImageProcessHandler implements TaskHandlerInterface
{
private array $config;
public function __construct(array $config)
{
$this->config = $config;
}
public function handle(TaskMessage $task): TaskResult
{
$payload = $task->payload;
try {
$imagePath = $payload['image_path'];
$operations = $payload['operations'];
$result = $this->processImage($imagePath, $operations);
return TaskResult::success($result);
} catch (\Exception $e) {
return TaskResult::failure($e);
}
}
private function processImage(string $imagePath, array $operations): array
{
$image = imagecreatefromstring(file_get_contents($imagePath));
$results = [];
foreach ($operations as $operation) {
switch ($operation['type']) {
case 'resize':
$image = $this->resizeImage($image, $operation['width'], $operation['height']);
$results[] = ['type' => 'resize', 'status' => 'completed'];
break;
case 'watermark':
$image = $this->addWatermark($image, $operation['watermark_path'], $operation['position']);
$results[] = ['type' => 'watermark', 'status' => 'completed'];
break;
case 'compress':
$image = $this->compressImage($image, $operation['quality']);
$results[] = ['type' => 'compress', 'status' => 'completed'];
break;
}
}
$outputPath = $this->config['output_dir'] . '/' . basename($imagePath);
imagejpeg($image, $outputPath, 90);
imagedestroy($image);
return [
'output_path' => $outputPath,
'operations' => $results,
];
}
private function resizeImage($image, int $width, int $height)
{
$newImage = imagecreatetruecolor($width, $height);
imagecopyresampled($newImage, $image, 0, 0, 0, 0, $width, $height, imagesx($image), imagesy($image));
return $newImage;
}
private function addWatermark($image, string $watermarkPath, string $position)
{
$watermark = imagecreatefrompng($watermarkPath);
$wmWidth = imagesx($watermark);
$wmHeight = imagesy($watermark);
$destX = imagesx($image) - $wmWidth - 10;
$destY = imagesy($image) - $wmHeight - 10;
imagecopy($image, $watermark, $destX, $destY, 0, 0, $wmWidth, $wmHeight);
imagedestroy($watermark);
return $image;
}
private function compressImage($image, int $quality)
{
return $image;
}
public function supports(string $taskType): bool
{
return $taskType === 'image.process';
}
public function getSupportedTypes(): array
{
return ['image.process'];
}
}完整使用示例
php
<?php
require_once 'vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use App\Messaging\AsyncTask\{TaskMessage, TaskProducer, TaskConsumer};
use App\Services\TaskHandlerRegistry;
use App\Handlers\{EmailTaskHandler, SmsTaskHandler, ImageProcessHandler};
$connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest',
'/',
false,
'AMQPLAIN',
null,
'en_US',
3.0,
3.0
);
$registry = new TaskHandlerRegistry();
$registry->register(new EmailTaskHandler([
'host' => 'smtp.example.com',
'username' => 'noreply@example.com',
'password' => 'password',
'port' => 587,
'from_email' => 'noreply@example.com',
'from_name' => '系统通知',
]));
$registry->register(new SmsTaskHandler([
'api_url' => 'https://sms-api.example.com/send',
'api_key' => 'your-api-key',
]));
$registry->register(new ImageProcessHandler([
'output_dir' => '/var/www/storage/images/processed',
]));
$producer = new TaskProducer($connection);
$emailTask = new TaskMessage(
'email.send',
[
'to_email' => 'user@example.com',
'to_name' => '张三',
'subject' => '订单确认通知',
'body' => '<h1>您的订单已确认</h1><p>订单号: ORD123456</p>',
],
priority: 8
);
$producer->publish($emailTask);
$smsTask = new TaskMessage(
'sms.send',
[
'phone' => '13800138000',
'content' => '您的订单已发货,快递单号: SF123456789',
'template_id' => 'SMS_ORDER_SHIPPED',
],
priority: 9
);
$producer->publish($smsTask);
$imageTask = new TaskMessage(
'image.process',
[
'image_path' => '/var/www/storage/uploads/image.jpg',
'operations' => [
['type' => 'resize', 'width' => 800, 'height' => 600],
['type' => 'watermark', 'watermark_path' => '/var/www/watermark.png', 'position' => 'bottom-right'],
['type' => 'compress', 'quality' => 85],
],
],
priority: 3
);
$producer->publish($imageTask);
echo "任务已提交:\n";
echo "- 邮件任务: {$emailTask->taskId}\n";
echo "- 短信任务: {$smsTask->taskId}\n";
echo "- 图片处理任务: {$imageTask->taskId}\n";
$consumer = new TaskConsumer(
$connection,
$registry,
[
'prefetch_count' => 5,
'queues' => ['task.high', 'task.normal', 'task.low'],
]
);
echo "\n开始消费任务...\n";
pcntl_async_signals(true);
pcntl_signal(SIGTERM, function () use ($consumer) {
echo "收到终止信号,正在停止消费者...\n";
$consumer->stop();
});
pcntl_signal(SIGINT, function () use ($consumer) {
echo "收到中断信号,正在停止消费者...\n";
$consumer->stop();
});
$consumer->consumeAll();
$producer->close();
$consumer->close();
$connection->close();关键技术点解析
1. 优先级队列设计
php
$queueArgs = [
'x-max-priority' => ['I', 10],
];- RabbitMQ 支持优先级队列,优先级范围 1-255
- 建议使用较小的优先级范围(1-10)以获得更好的性能
- 高优先级消息会被优先消费
2. 死信队列处理
php
$queueArgs = [
'x-dead-letter-exchange' => ['S', 'task.dlx'],
'x-dead-letter-routing-key' => ['S', 'task.failed'],
];- 消息被拒绝或过期后自动进入死信队列
- 便于后续人工处理或自动重试
3. 消息持久化
php
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,- 确保消息在 RabbitMQ 重启后不丢失
- 队列也需要声明为持久化
4. 消费者预取设置
php
$this->channel->basic_qos(null, $prefetchCount, null);- 控制消费者同时处理的消息数量
- 防止消费者过载
5. 指数退避重试
php
private function calculateBackoff(int $retryCount): int
{
return min(60, pow(2, $retryCount));
}- 避免频繁重试对系统造成压力
- 最大等待时间限制在合理范围内
性能优化建议
生产者优化
| 优化项 | 建议 | 说明 |
|---|---|---|
| 批量发送 | 使用 batch_publish | 减少网络往返 |
| 连接复用 | 保持长连接 | 避免频繁创建连接 |
| 异步确认 | 开启 publisher confirms | 提高吞吐量 |
| 消息压缩 | 大消息使用 gzip | 减少网络传输 |
消费者优化
| 优化项 | 建议 | 说明 |
|---|---|---|
| 预取数量 | 根据处理能力设置 | 平衡吞吐和延迟 |
| 多消费者 | 启动多个消费者进程 | 提高并行处理能力 |
| 处理超时 | 设置合理的超时时间 | 避免消息长时间阻塞 |
| 资源池化 | 复用数据库连接等资源 | 减少资源创建开销 |
队列优化
bash
rabbitmqctl eval 'rabbit_amqqueue:set_queue_duration(
<<"/", "task.normal">>,
{max_length, 100000},
{max_length_bytes, 1073741824}
).'常见问题与解决方案
1. 消息积压
问题: 队列中消息堆积过多,消费速度跟不上
解决方案:
php
$consumer = new TaskConsumer($connection, $registry, [
'prefetch_count' => 20,
]);
for ($i = 0; $i < 5; $i++) {
$pid = pcntl_fork();
if ($pid === 0) {
$consumer->consumeAll();
exit(0);
}
}2. 消息重复消费
问题: 消费者处理成功但 ACK 失败,导致消息重新投递
解决方案:
php
class IdempotentTaskHandler implements TaskHandlerInterface
{
private $redis;
public function handle(TaskMessage $task): TaskResult
{
$lockKey = "task:lock:{$task->taskId}";
if ($this->redis->exists($lockKey)) {
return TaskResult::success(['status' => 'already_processed']);
}
$this->redis->setex($lockKey, 86400, '1');
return $this->doHandle($task);
}
}3. 消费者崩溃
问题: 消费者处理过程中崩溃,消息处于 unacked 状态
解决方案:
- 使用 Supervisor 管理消费者进程
- 设置合理的心跳超时时间
- 实现优雅关闭机制
4. 内存泄漏
问题: 长时间运行的消费者内存持续增长
解决方案:
php
$processedCount = 0;
$maxProcessCount = 1000;
while ($consumer->isRunning()) {
$consumer->process();
$processedCount++;
if ($processedCount >= $maxProcessCount) {
echo "达到最大处理数量,重启消费者\n";
break;
}
}