Skip to content

工作队列模式

概述

工作队列模式(Work Queue Pattern)也称为任务队列(Task Queue),用于在多个消费者之间分配耗时的任务。通过工作队列,可以将任务异步处理,避免在用户等待响应时执行耗时操作,提高系统的响应速度和吞吐量。

核心知识点

架构图

mermaid
graph TB
    subgraph 生产者
        P[Producer]
    end

    subgraph RabbitMQ
        Q[Task Queue]
    end

    subgraph 消费者
        C1[Worker 1]
        C2[Worker 2]
        C3[Worker 3]
    end

    P -->|发布任务| Q
    Q -->|轮询分发| C1
    Q -->|轮询分发| C2
    Q -->|轮询分发| C3

    style P fill:#e1f5fe
    style Q fill:#fff3e0
    style C1 fill:#e8f5e9
    style C2 fill:#e8f5e9
    style C3 fill:#e8f5e9

消息分发机制

mermaid
sequenceDiagram
    participant P as 生产者
    participant Q as 队列
    participant W1 as Worker 1
    participant W2 as Worker 2

    P->>Q: 任务 A
    P->>Q: 任务 B
    P->>Q: 任务 C
    P->>Q: 任务 D

    Note over Q: 轮询分发
    Q->>W1: 任务 A
    Q->>W2: 任务 B
    Q->>W1: 任务 C
    Q->>W2: 任务 D

核心概念

概念说明
Round-robin轮询分发,每个消费者依次获取消息
Fair dispatch公平分发,根据消费者处理能力分发
Acknowledgment消息确认,确保消息被正确处理
Durability持久化,确保消息不丢失

PHP 代码示例

基础工作队列生产者

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class TaskPublisher
{
    private $connection;
    private $channel;
    private $queueName = 'task_queue';

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        $this->channel->queue_declare(
            $this->queueName,
            false,
            true,
            false,
            false
        );
    }

    public function publishTask($task, $priority = 0)
    {
        $message = new AMQPMessage(
            json_encode([
                'task' => $task,
                'priority' => $priority,
                'created_at' => time()
            ]),
            [
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
            ]
        );

        $this->channel->basic_publish($message, '', $this->queueName);

        echo " [x] 任务已发送: {$task}\n";
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

基础工作队列消费者

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

class TaskWorker
{
    private $connection;
    private $channel;
    private $queueName = 'task_queue';

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        $this->channel->queue_declare(
            $this->queueName,
            false,
            true,
            false,
            false
        );
    }

    public function start()
    {
        $callback = function ($message) {
            $data = json_decode($message->body, true);
            $task = $data['task'];

            echo " [x] 开始处理任务: {$task}\n";

            $this->processTask($task);

            echo " [x] 任务完成\n";

            $message->ack();
        };

        $this->channel->basic_qos(null, 1, null);
        $this->channel->basic_consume(
            $this->queueName,
            '',
            false,
            false,
            false,
            false,
            $callback
        );

        echo " [*] 工作器已启动,等待任务...\n";

        while ($this->channel->is_open()) {
            $this->channel->wait();
        }
    }

    private function processTask($task)
    {
        sleep(substr_count($task, '.'));
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

公平分发工作队列

php
<?php

class FairDispatchWorker
{
    private $connection;
    private $channel;
    private $queueName = 'fair_task_queue';

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        $this->channel->queue_declare(
            $this->queueName,
            false,
            true,
            false,
            false
        );
    }

    public function start()
    {
        $callback = function ($message) {
            $data = json_decode($message->body, true);

            echo sprintf(
                " [x] Worker %s 开始处理: %s\n",
                getmypid(),
                $data['task']
            );

            $startTime = microtime(true);

            $this->processTask($data);

            $duration = round((microtime(true) - $startTime) * 1000, 2);

            echo sprintf(
                " [x] Worker %s 完成,耗时: %sms\n",
                getmypid(),
                $duration
            );

            $message->ack();
        };

        $this->channel->basic_qos(null, 1, null);

        $this->channel->basic_consume(
            $this->queueName,
            '',
            false,
            false,
            false,
            false,
            $callback
        );

        echo sprintf(
            " [*] Worker %s 已启动\n",
            getmypid()
        );

        while ($this->channel->is_open()) {
            $this->channel->wait();
        }
    }

    private function processTask($data)
    {
        $estimatedTime = $data['estimated_time'] ?? 1;
        sleep($estimatedTime);
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

带重试机制的工作队列

php
<?php

class RetryableTaskWorker
{
    private $connection;
    private $channel;
    private $queueName = 'retryable_task_queue';
    private $maxRetries = 3;
    private $retryDelay = 5;

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        $this->setupQueues();
    }

    private function setupQueues()
    {
        $this->channel->queue_declare(
            $this->queueName,
            false,
            true,
            false,
            false,
            false,
            new PhpAmqpLib\Wire\AMQPTable([
                'x-dead-letter-exchange' => '',
                'x-dead-letter-routing-key' => $this->queueName . '_retry'
            ])
        );

        $this->channel->queue_declare(
            $this->queueName . '_retry',
            false,
            true,
            false,
            false,
            false,
            new PhpAmqpLib\Wire\AMQPTable([
                'x-message-ttl' => $this->retryDelay * 1000,
                'x-dead-letter-exchange' => '',
                'x-dead-letter-routing-key' => $this->queueName
            ])
        );
    }

    public function start()
    {
        $callback = function ($message) {
            $data = json_decode($message->body, true);
            $retryCount = $data['retry_count'] ?? 0;

            try {
                $this->processTask($data);

                echo " [x] 任务成功\n";
                $message->ack();
            } catch (Exception $e) {
                if ($retryCount < $this->maxRetries) {
                    echo " [!] 任务失败,准备重试 ({$retryCount}/{$this->maxRetries})\n";

                    $data['retry_count'] = $retryCount + 1;
                    $data['last_error'] = $e->getMessage();

                    $retryMessage = new AMQPMessage(
                        json_encode($data),
                        ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
                    );

                    $this->channel->basic_publish(
                        $retryMessage,
                        '',
                        $this->queueName . '_retry'
                    );

                    $message->ack();
                } else {
                    echo " [x] 任务失败,已达最大重试次数\n";

                    $this->handleFailedTask($data);

                    $message->ack();
                }
            }
        };

        $this->channel->basic_qos(null, 1, null);
        $this->channel->basic_consume(
            $this->queueName,
            '',
            false,
            false,
            false,
            false,
            $callback
        );

        echo " [*] 重试工作器已启动\n";

        while ($this->channel->is_open()) {
            $this->channel->wait();
        }
    }

    private function processTask($data)
    {
        // 任务处理逻辑
    }

    private function handleFailedTask($data)
    {
        error_log("任务最终失败: " . json_encode($data));
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

完整示例:图片处理工作队列

php
<?php

class ImageProcessingQueue
{
    private $publisher;

    public function __construct()
    {
        $this->publisher = new TaskPublisher();
    }

    public function submitResize($imagePath, $width, $height)
    {
        $this->publisher->publishTask([
            'type' => 'resize',
            'image_path' => $imagePath,
            'width' => $width,
            'height' => $height
        ]);
    }

    public function submitConvert($imagePath, $format)
    {
        $this->publisher->publishTask([
            'type' => 'convert',
            'image_path' => $imagePath,
            'format' => $format
        ]);
    }

    public function submitWatermark($imagePath, $watermarkPath, $position)
    {
        $this->publisher->publishTask([
            'type' => 'watermark',
            'image_path' => $imagePath,
            'watermark_path' => $watermarkPath,
            'position' => $position
        ]);
    }
}

class ImageProcessingWorker
{
    private $worker;

    public function __construct()
    {
        $this->worker = new FairDispatchWorker();
    }

    public function start()
    {
        echo "图片处理工作器启动...\n";

        $this->worker->start();
    }

    private function processImage($task)
    {
        switch ($task['type']) {
            case 'resize':
                $this->resizeImage(
                    $task['image_path'],
                    $task['width'],
                    $task['height']
                );
                break;

            case 'convert':
                $this->convertImage(
                    $task['image_path'],
                    $task['format']
                );
                break;

            case 'watermark':
                $this->addWatermark(
                    $task['image_path'],
                    $task['watermark_path'],
                    $task['position']
                );
                break;
        }
    }

    private function resizeImage($path, $width, $height)
    {
        echo "调整图片大小: {$path} -> {$width}x{$height}\n";
    }

    private function convertImage($path, $format)
    {
        echo "转换图片格式: {$path} -> {$format}\n";
    }

    private function addWatermark($path, $watermark, $position)
    {
        echo "添加水印: {$path}\n";
    }
}

实际应用场景

1. 邮件发送队列

php
<?php

class EmailTaskQueue
{
    private $publisher;

    public function __construct()
    {
        $this->publisher = new TaskPublisher();
    }

    public function queueEmail($to, $subject, $body, $options = [])
    {
        $this->publisher->publishTask([
            'type' => 'send_email',
            'to' => $to,
            'subject' => $subject,
            'body' => $body,
            'options' => $options
        ]);
    }

    public function queueBulkEmails($recipients, $subject, $body)
    {
        foreach ($recipients as $to) {
            $this->queueEmail($to, $subject, $body);
        }
    }
}

class EmailWorker
{
    public function processTask($task)
    {
        if ($task['type'] !== 'send_email') {
            return;
        }

        $this->sendEmail(
            $task['to'],
            $task['subject'],
            $task['body'],
            $task['options']
        );
    }

    private function sendEmail($to, $subject, $body, $options)
    {
        echo "发送邮件到: {$to}\n";
    }
}

2. 报表生成队列

php
<?php

class ReportGenerationQueue
{
    private $publisher;

    public function __construct()
    {
        $this->publisher = new TaskPublisher();
    }

    public function queueReport($reportType, $params, $callbackUrl)
    {
        $this->publisher->publishTask([
            'type' => 'generate_report',
            'report_type' => $reportType,
            'params' => $params,
            'callback_url' => $callbackUrl,
            'created_at' => time()
        ]);
    }
}

class ReportGenerationWorker
{
    public function processTask($task)
    {
        if ($task['type'] !== 'generate_report') {
            return;
        }

        $report = $this->generateReport(
            $task['report_type'],
            $task['params']
        );

        $this->notifyCallback($task['callback_url'], $report);
    }

    private function generateReport($type, $params)
    {
        echo "生成报表: {$type}\n";

        return ['report_id' => uniqid()];
    }

    private function notifyCallback($url, $report)
    {
        echo "回调通知: {$url}\n";
    }
}

3. 数据导入队列

php
<?php

class DataImportQueue
{
    private $publisher;

    public function __construct()
    {
        $this->publisher = new TaskPublisher();
    }

    public function queueImport($file, $mapping, $options = [])
    {
        $this->publisher->publishTask([
            'type' => 'data_import',
            'file' => $file,
            'mapping' => $mapping,
            'options' => $options,
            'created_at' => time()
        ]);
    }
}

class DataImportWorker
{
    public function processTask($task)
    {
        if ($task['type'] !== 'data_import') {
            return;
        }

        $file = $task['file'];
        $mapping = $task['mapping'];

        $this->importData($file, $mapping);
    }

    private function importData($file, $mapping)
    {
        echo "导入数据: {$file}\n";
    }
}

常见问题与解决方案

问题 1:任务处理不均衡

原因:轮询分发不考虑任务复杂度

解决方案

php
<?php

class WeightedTaskDistributor
{
    private $tasks = [];

    public function addTask($task, $weight = 1)
    {
        $this->tasks[] = [
            'task' => $task,
            'weight' => $weight
        ];
    }

    public function distributeToWorkers($workerCount)
    {
        usort($this->tasks, function ($a, $b) {
            return $b['weight'] - $a['weight'];
        });

        $workers = array_fill(0, $workerCount, ['tasks' => [], 'load' => 0]);

        foreach ($this->tasks as $task) {
            $minLoadWorker = 0;
            $minLoad = $workers[0]['load'];

            for ($i = 1; $i < $workerCount; $i++) {
                if ($workers[$i]['load'] < $minLoad) {
                    $minLoad = $workers[$i]['load'];
                    $minLoadWorker = $i;
                }
            }

            $workers[$minLoadWorker]['tasks'][] = $task['task'];
            $workers[$minLoadWorker]['load'] += $task['weight'];
        }

        return $workers;
    }
}

问题 2:消费者故障导致任务丢失

解决方案

php
<?php

class ReliableTaskWorker
{
    private $processingTimeout = 300;

    public function start()
    {
        $callback = function ($message) {
            $deliveryTag = $message->delivery_info['delivery_tag'];

            register_shutdown_function(function () use ($message) {
                if (connection_aborted()) {
                    $message->nack(true);
                }
            });

            try {
                $this->processTask($message);
                $message->ack();
            } catch (Exception $e) {
                $message->nack(true);
            }
        };

        $this->channel->basic_qos(null, 1, null);
        $this->channel->basic_consume(
            $this->queueName,
            '',
            false,
            false,
            false,
            false,
            $callback
        );

        while ($this->channel->is_open()) {
            $this->channel->wait();
        }
    }
}

问题 3:任务积压

解决方案

php
<?php

class TaskMonitor
{
    private $apiClient;

    public function getQueueDepth($queueName)
    {
        $response = $this->apiClient->get("/api/queues/%2f/{$queueName}");

        return $response['messages'] ?? 0;
    }

    public function checkBacklog($queueName, $threshold = 1000)
    {
        $depth = $this->getQueueDepth($queueName);

        if ($depth > $threshold) {
            $this->alertBacklog($queueName, $depth);
            $this->scaleWorkers($queueName);
        }
    }

    private function alertBacklog($queueName, $depth)
    {
        error_log("队列 {$queueName} 积压: {$depth} 条消息");
    }

    private function scaleWorkers($queueName)
    {
        echo "建议增加工作器数量\n";
    }
}

最佳实践建议

1. 任务优先级

php
<?php

class PriorityTaskQueue
{
    private $queues = [
        'high' => 'task_queue_high',
        'normal' => 'task_queue_normal',
        'low' => 'task_queue_low'
    ];

    public function publishTask($task, $priority = 'normal')
    {
        $queueName = $this->queues[$priority] ?? $this->queues['normal'];

        $this->channel->basic_publish(
            new AMQPMessage(json_encode($task)),
            '',
            $queueName
        );
    }

    public function consumeWithPriority()
    {
        foreach (['high', 'normal', 'low'] as $priority) {
            $queueName = $this->queues[$priority];

            $this->channel->basic_consume(
                $queueName,
                '',
                false,
                false,
                false,
                false,
                [$this, 'processMessage']
            );
        }
    }
}

2. 任务进度跟踪

php
<?php

class TaskProgressTracker
{
    private $redis;

    public function __construct()
    {
        $this->redis = new Redis();
        $this->redis->connect('127.0.0.1', 6379);
    }

    public function startTask($taskId, $total = 100)
    {
        $this->redis->hMSet("task:{$taskId}", [
            'status' => 'processing',
            'progress' => 0,
            'total' => $total,
            'started_at' => time()
        ]);
    }

    public function updateProgress($taskId, $progress)
    {
        $this->redis->hSet("task:{$taskId}", 'progress', $progress);
    }

    public function completeTask($taskId)
    {
        $this->redis->hMSet("task:{$taskId}", [
            'status' => 'completed',
            'progress' => 100,
            'completed_at' => time()
        ]);
    }

    public function failTask($taskId, $error)
    {
        $this->redis->hMSet("task:{$taskId}", [
            'status' => 'failed',
            'error' => $error,
            'failed_at' => time()
        ]);
    }

    public function getProgress($taskId)
    {
        return $this->redis->hGetAll("task:{$taskId}");
    }
}

3. 优雅关闭

php
<?php

class GracefulTaskWorker
{
    private $running = true;
    private $currentTask = null;

    public function start()
    {
        pcntl_async_signals(true);
        pcntl_signal(SIGTERM, [$this, 'shutdown']);
        pcntl_signal(SIGINT, [$this, 'shutdown']);

        while ($this->running) {
            try {
                $this->channel->wait(null, false, 1);
            } catch (PhpAmqpLib\Exception\AMQPTimeoutException $e) {
                continue;
            }
        }

        if ($this->currentTask) {
            echo "等待当前任务完成...\n";
        }

        echo "工作器已关闭\n";
    }

    public function shutdown()
    {
        echo "收到关闭信号...\n";
        $this->running = false;
    }
}

相关链接