Appearance
工作队列模式
概述
工作队列模式(Work Queue Pattern)也称为任务队列(Task Queue),用于在多个消费者之间分配耗时的任务。通过工作队列,可以将任务异步处理,避免在用户等待响应时执行耗时操作,提高系统的响应速度和吞吐量。
核心知识点
架构图
mermaid
graph TB
subgraph 生产者
P[Producer]
end
subgraph RabbitMQ
Q[Task Queue]
end
subgraph 消费者
C1[Worker 1]
C2[Worker 2]
C3[Worker 3]
end
P -->|发布任务| Q
Q -->|轮询分发| C1
Q -->|轮询分发| C2
Q -->|轮询分发| C3
style P fill:#e1f5fe
style Q fill:#fff3e0
style C1 fill:#e8f5e9
style C2 fill:#e8f5e9
style C3 fill:#e8f5e9消息分发机制
mermaid
sequenceDiagram
participant P as 生产者
participant Q as 队列
participant W1 as Worker 1
participant W2 as Worker 2
P->>Q: 任务 A
P->>Q: 任务 B
P->>Q: 任务 C
P->>Q: 任务 D
Note over Q: 轮询分发
Q->>W1: 任务 A
Q->>W2: 任务 B
Q->>W1: 任务 C
Q->>W2: 任务 D核心概念
| 概念 | 说明 |
|---|---|
| Round-robin | 轮询分发,每个消费者依次获取消息 |
| Fair dispatch | 公平分发,根据消费者处理能力分发 |
| Acknowledgment | 消息确认,确保消息被正确处理 |
| Durability | 持久化,确保消息不丢失 |
PHP 代码示例
基础工作队列生产者
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class TaskPublisher
{
private $connection;
private $channel;
private $queueName = 'task_queue';
public function __construct()
{
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
$this->channel->queue_declare(
$this->queueName,
false,
true,
false,
false
);
}
public function publishTask($task, $priority = 0)
{
$message = new AMQPMessage(
json_encode([
'task' => $task,
'priority' => $priority,
'created_at' => time()
]),
[
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]
);
$this->channel->basic_publish($message, '', $this->queueName);
echo " [x] 任务已发送: {$task}\n";
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}基础工作队列消费者
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
class TaskWorker
{
private $connection;
private $channel;
private $queueName = 'task_queue';
public function __construct()
{
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
$this->channel->queue_declare(
$this->queueName,
false,
true,
false,
false
);
}
public function start()
{
$callback = function ($message) {
$data = json_decode($message->body, true);
$task = $data['task'];
echo " [x] 开始处理任务: {$task}\n";
$this->processTask($task);
echo " [x] 任务完成\n";
$message->ack();
};
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume(
$this->queueName,
'',
false,
false,
false,
false,
$callback
);
echo " [*] 工作器已启动,等待任务...\n";
while ($this->channel->is_open()) {
$this->channel->wait();
}
}
private function processTask($task)
{
sleep(substr_count($task, '.'));
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}公平分发工作队列
php
<?php
class FairDispatchWorker
{
private $connection;
private $channel;
private $queueName = 'fair_task_queue';
public function __construct()
{
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
$this->channel->queue_declare(
$this->queueName,
false,
true,
false,
false
);
}
public function start()
{
$callback = function ($message) {
$data = json_decode($message->body, true);
echo sprintf(
" [x] Worker %s 开始处理: %s\n",
getmypid(),
$data['task']
);
$startTime = microtime(true);
$this->processTask($data);
$duration = round((microtime(true) - $startTime) * 1000, 2);
echo sprintf(
" [x] Worker %s 完成,耗时: %sms\n",
getmypid(),
$duration
);
$message->ack();
};
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume(
$this->queueName,
'',
false,
false,
false,
false,
$callback
);
echo sprintf(
" [*] Worker %s 已启动\n",
getmypid()
);
while ($this->channel->is_open()) {
$this->channel->wait();
}
}
private function processTask($data)
{
$estimatedTime = $data['estimated_time'] ?? 1;
sleep($estimatedTime);
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}带重试机制的工作队列
php
<?php
class RetryableTaskWorker
{
private $connection;
private $channel;
private $queueName = 'retryable_task_queue';
private $maxRetries = 3;
private $retryDelay = 5;
public function __construct()
{
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
$this->setupQueues();
}
private function setupQueues()
{
$this->channel->queue_declare(
$this->queueName,
false,
true,
false,
false,
false,
new PhpAmqpLib\Wire\AMQPTable([
'x-dead-letter-exchange' => '',
'x-dead-letter-routing-key' => $this->queueName . '_retry'
])
);
$this->channel->queue_declare(
$this->queueName . '_retry',
false,
true,
false,
false,
false,
new PhpAmqpLib\Wire\AMQPTable([
'x-message-ttl' => $this->retryDelay * 1000,
'x-dead-letter-exchange' => '',
'x-dead-letter-routing-key' => $this->queueName
])
);
}
public function start()
{
$callback = function ($message) {
$data = json_decode($message->body, true);
$retryCount = $data['retry_count'] ?? 0;
try {
$this->processTask($data);
echo " [x] 任务成功\n";
$message->ack();
} catch (Exception $e) {
if ($retryCount < $this->maxRetries) {
echo " [!] 任务失败,准备重试 ({$retryCount}/{$this->maxRetries})\n";
$data['retry_count'] = $retryCount + 1;
$data['last_error'] = $e->getMessage();
$retryMessage = new AMQPMessage(
json_encode($data),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$this->channel->basic_publish(
$retryMessage,
'',
$this->queueName . '_retry'
);
$message->ack();
} else {
echo " [x] 任务失败,已达最大重试次数\n";
$this->handleFailedTask($data);
$message->ack();
}
}
};
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume(
$this->queueName,
'',
false,
false,
false,
false,
$callback
);
echo " [*] 重试工作器已启动\n";
while ($this->channel->is_open()) {
$this->channel->wait();
}
}
private function processTask($data)
{
// 任务处理逻辑
}
private function handleFailedTask($data)
{
error_log("任务最终失败: " . json_encode($data));
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}完整示例:图片处理工作队列
php
<?php
class ImageProcessingQueue
{
private $publisher;
public function __construct()
{
$this->publisher = new TaskPublisher();
}
public function submitResize($imagePath, $width, $height)
{
$this->publisher->publishTask([
'type' => 'resize',
'image_path' => $imagePath,
'width' => $width,
'height' => $height
]);
}
public function submitConvert($imagePath, $format)
{
$this->publisher->publishTask([
'type' => 'convert',
'image_path' => $imagePath,
'format' => $format
]);
}
public function submitWatermark($imagePath, $watermarkPath, $position)
{
$this->publisher->publishTask([
'type' => 'watermark',
'image_path' => $imagePath,
'watermark_path' => $watermarkPath,
'position' => $position
]);
}
}
class ImageProcessingWorker
{
private $worker;
public function __construct()
{
$this->worker = new FairDispatchWorker();
}
public function start()
{
echo "图片处理工作器启动...\n";
$this->worker->start();
}
private function processImage($task)
{
switch ($task['type']) {
case 'resize':
$this->resizeImage(
$task['image_path'],
$task['width'],
$task['height']
);
break;
case 'convert':
$this->convertImage(
$task['image_path'],
$task['format']
);
break;
case 'watermark':
$this->addWatermark(
$task['image_path'],
$task['watermark_path'],
$task['position']
);
break;
}
}
private function resizeImage($path, $width, $height)
{
echo "调整图片大小: {$path} -> {$width}x{$height}\n";
}
private function convertImage($path, $format)
{
echo "转换图片格式: {$path} -> {$format}\n";
}
private function addWatermark($path, $watermark, $position)
{
echo "添加水印: {$path}\n";
}
}实际应用场景
1. 邮件发送队列
php
<?php
class EmailTaskQueue
{
private $publisher;
public function __construct()
{
$this->publisher = new TaskPublisher();
}
public function queueEmail($to, $subject, $body, $options = [])
{
$this->publisher->publishTask([
'type' => 'send_email',
'to' => $to,
'subject' => $subject,
'body' => $body,
'options' => $options
]);
}
public function queueBulkEmails($recipients, $subject, $body)
{
foreach ($recipients as $to) {
$this->queueEmail($to, $subject, $body);
}
}
}
class EmailWorker
{
public function processTask($task)
{
if ($task['type'] !== 'send_email') {
return;
}
$this->sendEmail(
$task['to'],
$task['subject'],
$task['body'],
$task['options']
);
}
private function sendEmail($to, $subject, $body, $options)
{
echo "发送邮件到: {$to}\n";
}
}2. 报表生成队列
php
<?php
class ReportGenerationQueue
{
private $publisher;
public function __construct()
{
$this->publisher = new TaskPublisher();
}
public function queueReport($reportType, $params, $callbackUrl)
{
$this->publisher->publishTask([
'type' => 'generate_report',
'report_type' => $reportType,
'params' => $params,
'callback_url' => $callbackUrl,
'created_at' => time()
]);
}
}
class ReportGenerationWorker
{
public function processTask($task)
{
if ($task['type'] !== 'generate_report') {
return;
}
$report = $this->generateReport(
$task['report_type'],
$task['params']
);
$this->notifyCallback($task['callback_url'], $report);
}
private function generateReport($type, $params)
{
echo "生成报表: {$type}\n";
return ['report_id' => uniqid()];
}
private function notifyCallback($url, $report)
{
echo "回调通知: {$url}\n";
}
}3. 数据导入队列
php
<?php
class DataImportQueue
{
private $publisher;
public function __construct()
{
$this->publisher = new TaskPublisher();
}
public function queueImport($file, $mapping, $options = [])
{
$this->publisher->publishTask([
'type' => 'data_import',
'file' => $file,
'mapping' => $mapping,
'options' => $options,
'created_at' => time()
]);
}
}
class DataImportWorker
{
public function processTask($task)
{
if ($task['type'] !== 'data_import') {
return;
}
$file = $task['file'];
$mapping = $task['mapping'];
$this->importData($file, $mapping);
}
private function importData($file, $mapping)
{
echo "导入数据: {$file}\n";
}
}常见问题与解决方案
问题 1:任务处理不均衡
原因:轮询分发不考虑任务复杂度
解决方案:
php
<?php
class WeightedTaskDistributor
{
private $tasks = [];
public function addTask($task, $weight = 1)
{
$this->tasks[] = [
'task' => $task,
'weight' => $weight
];
}
public function distributeToWorkers($workerCount)
{
usort($this->tasks, function ($a, $b) {
return $b['weight'] - $a['weight'];
});
$workers = array_fill(0, $workerCount, ['tasks' => [], 'load' => 0]);
foreach ($this->tasks as $task) {
$minLoadWorker = 0;
$minLoad = $workers[0]['load'];
for ($i = 1; $i < $workerCount; $i++) {
if ($workers[$i]['load'] < $minLoad) {
$minLoad = $workers[$i]['load'];
$minLoadWorker = $i;
}
}
$workers[$minLoadWorker]['tasks'][] = $task['task'];
$workers[$minLoadWorker]['load'] += $task['weight'];
}
return $workers;
}
}问题 2:消费者故障导致任务丢失
解决方案:
php
<?php
class ReliableTaskWorker
{
private $processingTimeout = 300;
public function start()
{
$callback = function ($message) {
$deliveryTag = $message->delivery_info['delivery_tag'];
register_shutdown_function(function () use ($message) {
if (connection_aborted()) {
$message->nack(true);
}
});
try {
$this->processTask($message);
$message->ack();
} catch (Exception $e) {
$message->nack(true);
}
};
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume(
$this->queueName,
'',
false,
false,
false,
false,
$callback
);
while ($this->channel->is_open()) {
$this->channel->wait();
}
}
}问题 3:任务积压
解决方案:
php
<?php
class TaskMonitor
{
private $apiClient;
public function getQueueDepth($queueName)
{
$response = $this->apiClient->get("/api/queues/%2f/{$queueName}");
return $response['messages'] ?? 0;
}
public function checkBacklog($queueName, $threshold = 1000)
{
$depth = $this->getQueueDepth($queueName);
if ($depth > $threshold) {
$this->alertBacklog($queueName, $depth);
$this->scaleWorkers($queueName);
}
}
private function alertBacklog($queueName, $depth)
{
error_log("队列 {$queueName} 积压: {$depth} 条消息");
}
private function scaleWorkers($queueName)
{
echo "建议增加工作器数量\n";
}
}最佳实践建议
1. 任务优先级
php
<?php
class PriorityTaskQueue
{
private $queues = [
'high' => 'task_queue_high',
'normal' => 'task_queue_normal',
'low' => 'task_queue_low'
];
public function publishTask($task, $priority = 'normal')
{
$queueName = $this->queues[$priority] ?? $this->queues['normal'];
$this->channel->basic_publish(
new AMQPMessage(json_encode($task)),
'',
$queueName
);
}
public function consumeWithPriority()
{
foreach (['high', 'normal', 'low'] as $priority) {
$queueName = $this->queues[$priority];
$this->channel->basic_consume(
$queueName,
'',
false,
false,
false,
false,
[$this, 'processMessage']
);
}
}
}2. 任务进度跟踪
php
<?php
class TaskProgressTracker
{
private $redis;
public function __construct()
{
$this->redis = new Redis();
$this->redis->connect('127.0.0.1', 6379);
}
public function startTask($taskId, $total = 100)
{
$this->redis->hMSet("task:{$taskId}", [
'status' => 'processing',
'progress' => 0,
'total' => $total,
'started_at' => time()
]);
}
public function updateProgress($taskId, $progress)
{
$this->redis->hSet("task:{$taskId}", 'progress', $progress);
}
public function completeTask($taskId)
{
$this->redis->hMSet("task:{$taskId}", [
'status' => 'completed',
'progress' => 100,
'completed_at' => time()
]);
}
public function failTask($taskId, $error)
{
$this->redis->hMSet("task:{$taskId}", [
'status' => 'failed',
'error' => $error,
'failed_at' => time()
]);
}
public function getProgress($taskId)
{
return $this->redis->hGetAll("task:{$taskId}");
}
}3. 优雅关闭
php
<?php
class GracefulTaskWorker
{
private $running = true;
private $currentTask = null;
public function start()
{
pcntl_async_signals(true);
pcntl_signal(SIGTERM, [$this, 'shutdown']);
pcntl_signal(SIGINT, [$this, 'shutdown']);
while ($this->running) {
try {
$this->channel->wait(null, false, 1);
} catch (PhpAmqpLib\Exception\AMQPTimeoutException $e) {
continue;
}
}
if ($this->currentTask) {
echo "等待当前任务完成...\n";
}
echo "工作器已关闭\n";
}
public function shutdown()
{
echo "收到关闭信号...\n";
$this->running = false;
}
}