Skip to content

优先级队列

概述

优先级队列(Priority Queue)允许消息按照优先级顺序被消费。高优先级的消息会被优先处理,这在需要区分消息重要性的场景中非常有用。

核心原理

RabbitMQ 的优先级队列通过在队列声明时指定 x-max-priority 参数来实现。消息可以设置 0 到 max-priority 之间的优先级值,数值越大优先级越高。

mermaid
graph TD
    subgraph 生产者
        P1[发送优先级=5的消息]
        P2[发送优先级=1的消息]
        P3[发送优先级=10的消息]
    end
    
    subgraph 优先级队列
        Q[队列<br/>max-priority=10]
        Q --> M1[优先级=10]
        Q --> M2[优先级=5]
        Q --> M3[优先级=1]
    end
    
    subgraph 消费者
        C[消费者按优先级消费]
    end
    
    P1 --> Q
    P2 --> Q
    P3 --> Q
    M1 --> C
    M2 --> C
    M3 --> C

优先级规则

  • 优先级范围: 0 到 x-max-priority(最大支持 255,建议不超过 10)
  • 默认优先级: 0(最低)
  • 数值越大优先级越高
  • 相同优先级的消息按 FIFO 顺序

PHP 代码示例

声明优先级队列

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$queueName = 'priority-tasks';

// 声明优先级队列,最大优先级为 10
$channel->queue_declare(
    $queueName,
    false,
    true,
    false,
    false,
    false,
    new AMQPTable(['x-max-priority' => 10])
);

echo "优先级队列已创建: {$queueName}\n";

$channel->close();
$connection->close();

发送不同优先级的消息

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$queueName = 'priority-tasks';
$channel->queue_declare(
    $queueName,
    false,
    true,
    false,
    false,
    false,
    new AMQPTable(['x-max-priority' => 10])
);

// 发送不同优先级的消息
$messages = [
    ['body' => '普通任务', 'priority' => 1],
    ['body' => '紧急任务', 'priority' => 10],
    ['body' => '高优先级任务', 'priority' => 5],
    ['body' => '低优先级任务', 'priority' => 0],
    ['body' => 'VIP任务', 'priority' => 8],
];

foreach ($messages as $msgData) {
    $message = new AMQPMessage(
        json_encode(['task' => $msgData['body'], 'timestamp' => time()]),
        [
            'content_type' => 'application/json',
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
            'priority' => $msgData['priority']
        ]
    );
    
    $channel->basic_publish($message, '', $queueName);
    
    echo "发送消息: {$msgData['body']} (优先级: {$msgData['priority']})\n";
}

$channel->close();
$connection->close();

消费优先级消息

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$queueName = 'priority-tasks';
$channel->queue_declare(
    $queueName,
    false,
    true,
    false,
    false,
    false,
    new AMQPTable(['x-max-priority' => 10])
);

echo "等待优先级消息...\n";

$callback = function (AMQPMessage $msg) {
    $priority = $msg->get('priority');
    $data = json_decode($msg->getBody(), true);
    
    echo sprintf(
        "[优先级: %d] 处理任务: %s\n",
        $priority,
        $data['task']
    );
    
    $msg->ack();
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume($queueName, '', false, false, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

封装的优先级队列类

php
<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class PriorityQueue
{
    private $channel;
    private $queueName;
    private $maxPriority;
    
    const PRIORITY_LOW = 0;
    const PRIORITY_NORMAL = 5;
    const PRIORITY_HIGH = 8;
    const PRIORITY_URGENT = 10;
    
    public function __construct($channel, $queueName, $maxPriority = 10)
    {
        $this->channel = $channel;
        $this->queueName = $queueName;
        $this->maxPriority = $maxPriority;
        
        $this->declare();
    }
    
    private function declare()
    {
        $this->channel->queue_declare(
            $this->queueName,
            false,
            true,
            false,
            false,
            false,
            new AMQPTable(['x-max-priority' => $this->maxPriority])
        );
    }
    
    public function publish($data, $priority = self::PRIORITY_NORMAL)
    {
        $priority = max(0, min($this->maxPriority, $priority));
        
        $message = new AMQPMessage(
            json_encode($data),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'priority' => $priority
            ]
        );
        
        $this->channel->basic_publish($message, '', $this->queueName);
    }
    
    public function publishLow($data)
    {
        $this->publish($data, self::PRIORITY_LOW);
    }
    
    public function publishNormal($data)
    {
        $this->publish($data, self::PRIORITY_NORMAL);
    }
    
    public function publishHigh($data)
    {
        $this->publish($data, self::PRIORITY_HIGH);
    }
    
    public function publishUrgent($data)
    {
        $this->publish($data, self::PRIORITY_URGENT);
    }
    
    public function consume(callable $handler, $prefetch = 1)
    {
        $this->channel->basic_qos(null, $prefetch, null);
        
        $callback = function ($msg) use ($handler) {
            $priority = $msg->get('priority');
            $data = json_decode($msg->getBody(), true);
            
            $handler($data, $priority, $msg);
            $msg->ack();
        };
        
        $this->channel->basic_consume(
            $this->queueName,
            '',
            false,
            false,
            false,
            false,
            $callback
        );
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
}

实际应用场景

1. 任务调度系统

php
<?php

class TaskScheduler
{
    private $queue;
    
    public function __construct($channel)
    {
        $this->queue = new PriorityQueue($channel, 'scheduled-tasks', 10);
    }
    
    public function scheduleTask($task, $priority = 'normal')
    {
        $priorityMap = [
            'low' => PriorityQueue::PRIORITY_LOW,
            'normal' => PriorityQueue::PRIORITY_NORMAL,
            'high' => PriorityQueue::PRIORITY_HIGH,
            'urgent' => PriorityQueue::PRIORITY_URGENT
        ];
        
        $this->queue->publish([
            'task' => $task,
            'scheduled_at' => time()
        ], $priorityMap[$priority] ?? PriorityQueue::PRIORITY_NORMAL);
    }
    
    public function scheduleUrgentTask($task)
    {
        $this->queue->publishUrgent([
            'task' => $task,
            'scheduled_at' => time(),
            'urgent' => true
        ]);
    }
    
    public function processTasks()
    {
        $this->queue->consume(function ($data, $priority) {
            echo sprintf(
                "[%s] 处理任务: %s (优先级: %d)\n",
                date('Y-m-d H:i:s'),
                $data['task'],
                $priority
            );
            
            // 执行任务
            $this->executeTask($data['task']);
        });
    }
    
    private function executeTask($task)
    {
        // 任务执行逻辑
    }
}

2. 客服工单系统

php
<?php

class TicketSystem
{
    private $queue;
    
    const PRIORITY_VIP = 10;
    const PRIORITY_HIGH = 7;
    const PRIORITY_NORMAL = 5;
    const PRIORITY_LOW = 2;
    
    public function __construct($channel)
    {
        $this->queue = new PriorityQueue($channel, 'support-tickets', 10);
    }
    
    public function createTicket($userId, $subject, $message, $userType = 'normal')
    {
        $priority = match($userType) {
            'vip' => self::PRIORITY_VIP,
            'premium' => self::PRIORITY_HIGH,
            'normal' => self::PRIORITY_NORMAL,
            'free' => self::PRIORITY_LOW,
            default => self::PRIORITY_NORMAL
        };
        
        $this->queue->publish([
            'ticket_id' => uniqid('TKT-'),
            'user_id' => $userId,
            'subject' => $subject,
            'message' => $message,
            'user_type' => $userType,
            'created_at' => time()
        ], $priority);
    }
    
    public function processTickets()
    {
        $this->queue->consume(function ($data, $priority) {
            echo sprintf(
                "处理工单 #%s (用户类型: %s, 优先级: %d)\n",
                $data['ticket_id'],
                $data['user_type'],
                $priority
            );
            
            // 处理工单
            $this->handleTicket($data);
        });
    }
    
    private function handleTicket($ticket)
    {
        // 工单处理逻辑
    }
}

3. 消息推送系统

php
<?php

class PushNotificationSystem
{
    private $queue;
    
    public function __construct($channel)
    {
        $this->queue = new PriorityQueue($channel, 'push-notifications', 5);
    }
    
    public function sendPush($userId, $title, $body, $priority = 'normal')
    {
        $priorityValue = match($priority) {
            'instant' => 5,
            'high' => 4,
            'normal' => 3,
            'low' => 1,
            default => 3
        };
        
        $this->queue->publish([
            'user_id' => $userId,
            'title' => $title,
            'body' => $body,
            'created_at' => time()
        ], $priorityValue);
    }
    
    public function sendInstantPush($userId, $title, $body)
    {
        $this->sendPush($userId, $title, $body, 'instant');
    }
    
    public function processPushes()
    {
        $this->queue->consume(function ($data, $priority) {
            // 高优先级消息立即发送
            if ($priority >= 4) {
                $this->sendImmediately($data);
            } else {
                $this->sendBatch($data);
            }
        });
    }
    
    private function sendImmediately($data)
    {
        // 立即发送推送
    }
    
    private function sendBatch($data)
    {
        // 批量发送推送
    }
}

常见问题与解决方案

问题 1: 优先级队列性能问题

症状: 优先级队列消费速度变慢

原因: 优先级队列需要维护优先级索引,内存开销更大

解决方案:

php
<?php

// 限制最大优先级数量
// 建议: 使用较小的优先级范围(如 1-5 或 1-10)

// 不推荐
$channel->queue_declare($queueName, false, true, false, false, false,
    new AMQPTable(['x-max-priority' => 255])
);

// 推荐
$channel->queue_declare($queueName, false, true, false, false, false,
    new AMQPTable(['x-max-priority' => 5])
);

问题 2: 消息优先级不生效

症状: 消息按 FIFO 顺序消费,优先级未生效

原因: 消费者预取了多条消息

解决方案:

php
<?php

// 设置 prefetch_count = 1,确保按优先级消费
$channel->basic_qos(null, 1, null);
$channel->basic_consume($queueName, '', false, false, false, false, $callback);

问题 3: 队列声明冲突

症状: 声明优先级队列时报错

原因: 队列已存在但不是优先级队列

解决方案:

php
<?php

// 删除现有队列后重新声明
// 或者使用不同的队列名称

// 检查队列是否存在
try {
    $channel->queue_declare($queueName, true);
} catch (Exception $e) {
    // 队列不存在或参数不匹配
    echo "需要重新创建队列\n";
}

最佳实践建议

  1. 限制优先级数量: 建议使用 1-10 的优先级范围
  2. 设置合理的 prefetch: 使用 basic_qos(1) 确保优先级生效
  3. 避免过度使用: 优先级队列有额外开销,非必要不使用
  4. 统一优先级定义: 使用常量定义优先级级别
  5. 监控队列状态: 关注优先级队列的消息积压情况

相关链接