Skip to content

优先级队列插件

概述

优先级队列是 RabbitMQ 原生支持的功能,允许消息根据优先级进行排序消费。高优先级的消息会被优先消费,这对于需要处理紧急消息或重要任务的场景非常有用。

核心知识点

优先级队列原理

mermaid
graph TB
    subgraph 生产者
        P1[高优先级消息]
        P2[普通优先级消息]
        P3[低优先级消息]
    end

    subgraph RabbitMQ
        Q[优先级队列<br/>x-max-priority=10]
        subgraph 内部排序
            H[优先级 10]
            M[优先级 5]
            L[优先级 1]
        end
    end

    subgraph 消费者
        C[消费者]
    end

    P1 -->|priority=10| Q
    P2 -->|priority=5| Q
    P3 -->|priority=1| Q

    Q --> H
    Q --> M
    Q --> L

    H -->|先消费| C
    M -->|后消费| C
    L -->|最后消费| C

    style H fill:#ffcdd2
    style M fill:#fff9c4
    style L fill:#c8e6c9

工作流程

mermaid
sequenceDiagram
    participant P as 生产者
    participant Q as 优先级队列
    participant C as 消费者

    Note over Q: 队列配置 x-max-priority=10

    P->>Q: 发送消息(priority=5)
    P->>Q: 发送消息(priority=10)
    P->>Q: 发送消息(priority=1)

    Note over Q: 内部按优先级排序<br/>[10, 5, 1]

    Q->>C: 投递消息(priority=10)
    Q->>C: 投递消息(priority=5)
    Q->>C: 投递消息(priority=1)

核心配置参数

参数说明默认值
x-max-priority队列支持的最大优先级
priority消息优先级(0 到 x-max-priority)0

PHP 代码示例

创建优先级队列

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class PriorityQueueCreator
{
    private $connection;
    private $channel;

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();
    }

    public function createQueue($queueName, $maxPriority = 10)
    {
        $args = new AMQPTable([
            'x-max-priority' => $maxPriority
        ]);

        $this->channel->queue_declare(
            $queueName,
            false,
            true,
            false,
            false,
            false,
            $args
        );

        echo "优先级队列已创建: {$queueName} (最大优先级: {$maxPriority})\n";
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

优先级消息生产者

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class PriorityMessagePublisher
{
    private $connection;
    private $channel;
    private $queueName;

    const PRIORITY_LOW = 1;
    const PRIORITY_NORMAL = 5;
    const PRIORITY_HIGH = 8;
    const PRIORITY_CRITICAL = 10;

    public function __construct($queueName = 'priority_queue')
    {
        $this->queueName = $queueName;

        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        $this->ensureQueueExists();
    }

    private function ensureQueueExists()
    {
        $creator = new PriorityQueueCreator();
        $creator->createQueue($this->queueName, 10);
        $creator->close();
    }

    public function publish($message, $priority = self::PRIORITY_NORMAL)
    {
        $msg = new AMQPMessage(
            json_encode([
                'content' => $message,
                'priority' => $priority,
                'timestamp' => time()
            ]),
            [
                'content_type' => 'application/json',
                'priority' => $priority
            ]
        );

        $this->channel->basic_publish($msg, '', $this->queueName);

        echo sprintf(
            "消息已发送,优先级: %d\n",
            $priority
        );
    }

    public function publishCritical($message)
    {
        $this->publish($message, self::PRIORITY_CRITICAL);
    }

    public function publishHigh($message)
    {
        $this->publish($message, self::PRIORITY_HIGH);
    }

    public function publishNormal($message)
    {
        $this->publish($message, self::PRIORITY_NORMAL);
    }

    public function publishLow($message)
    {
        $this->publish($message, self::PRIORITY_LOW);
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

优先级消息消费者

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class PriorityMessageConsumer
{
    private $connection;
    private $channel;
    private $queueName;

    public function __construct($queueName = 'priority_queue')
    {
        $this->queueName = $queueName;

        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();
    }

    public function consume()
    {
        $callback = function (AMQPMessage $message) {
            $body = json_decode($message->body, true);
            $priority = $message->get('priority');

            echo sprintf(
                "[%s] 收到消息,优先级: %d, 内容: %s\n",
                date('H:i:s'),
                $priority,
                json_encode($body['content'], JSON_UNESCAPED_UNICODE)
            );

            $this->processByPriority($body, $priority);

            $message->ack();
        };

        $this->channel->basic_qos(null, 1, null);
        $this->channel->basic_consume(
            $this->queueName,
            '',
            false,
            false,
            false,
            false,
            $callback
        );

        echo "优先级消费者已启动\n";

        while ($this->channel->is_open()) {
            $this->channel->wait();
        }
    }

    private function processByPriority($body, $priority)
    {
        if ($priority >= PriorityMessagePublisher::PRIORITY_CRITICAL) {
            echo "  -> 紧急处理\n";
        } elseif ($priority >= PriorityMessagePublisher::PRIORITY_HIGH) {
            echo "  -> 优先处理\n";
        } elseif ($priority >= PriorityMessagePublisher::PRIORITY_NORMAL) {
            echo "  -> 正常处理\n";
        } else {
            echo "  -> 低优先级处理\n";
        }
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

完整示例:任务优先级系统

php
<?php

class TaskPrioritySystem
{
    private $publisher;

    const TASK_TYPE_IMMEDIATE = 'immediate';
    const TASK_TYPE_URGENT = 'urgent';
    const TASK_TYPE_NORMAL = 'normal';
    const TASK_TYPE_BACKGROUND = 'background';

    private $priorityMap = [
        self::TASK_TYPE_IMMEDIATE => PriorityMessagePublisher::PRIORITY_CRITICAL,
        self::TASK_TYPE_URGENT => PriorityMessagePublisher::PRIORITY_HIGH,
        self::TASK_TYPE_NORMAL => PriorityMessagePublisher::PRIORITY_NORMAL,
        self::TASK_TYPE_BACKGROUND => PriorityMessagePublisher::PRIORITY_LOW
    ];

    public function __construct()
    {
        $this->publisher = new PriorityMessagePublisher('task_queue');
    }

    public function submitTask($taskId, $taskType, $payload)
    {
        $priority = $this->priorityMap[$taskType] ?? PriorityMessagePublisher::PRIORITY_NORMAL;

        $this->publisher->publish([
            'task_id' => $taskId,
            'task_type' => $taskType,
            'payload' => $payload,
            'submitted_at' => time()
        ], $priority);

        echo "任务 {$taskId} 已提交,类型: {$taskType}\n";
    }

    public function submitImmediateTask($taskId, $payload)
    {
        $this->submitTask($taskId, self::TASK_TYPE_IMMEDIATE, $payload);
    }

    public function submitUrgentTask($taskId, $payload)
    {
        $this->submitTask($taskId, self::TASK_TYPE_URGENT, $payload);
    }

    public function submitNormalTask($taskId, $payload)
    {
        $this->submitTask($taskId, self::TASK_TYPE_NORMAL, $payload);
    }

    public function submitBackgroundTask($taskId, $payload)
    {
        $this->submitTask($taskId, self::TASK_TYPE_BACKGROUND, $payload);
    }

    public function close()
    {
        $this->publisher->close();
    }
}

class TaskWorker
{
    private $consumer;
    private $handlers = [];

    public function __construct()
    {
        $this->consumer = new PriorityMessageConsumer('task_queue');
    }

    public function registerHandler($taskType, callable $handler)
    {
        $this->handlers[$taskType] = $handler;
    }

    public function start()
    {
        echo "任务工作器已启动\n";

        $this->consumer->consume();
    }
}

动态优先级调整

php
<?php

class DynamicPriorityManager
{
    private $connection;
    private $channel;
    private $queueName;

    public function __construct($queueName = 'dynamic_priority_queue')
    {
        $this->queueName = $queueName;

        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();
    }

    public function publishWithAging($message, $basePriority, $agingInterval = 60)
    {
        $messageData = [
            'content' => $message,
            'base_priority' => $basePriority,
            'created_at' => time(),
            'aging_interval' => $agingInterval
        ];

        $msg = new AMQPMessage(
            json_encode($messageData),
            [
                'content_type' => 'application/json',
                'priority' => $basePriority,
                'headers' => [
                    'base_priority' => $basePriority,
                    'aging_interval' => $agingInterval
                ]
            ]
        );

        $this->channel->basic_publish($msg, '', $this->queueName);
    }

    public function reprioritizeMessage($message, $waitTime)
    {
        $data = json_decode($message->body, true);
        $basePriority = $data['base_priority'];
        $agingInterval = $data['aging_interval'];

        $age = time() - $data['created_at'];
        $agingBoost = floor($age / $agingInterval);

        $newPriority = min(10, $basePriority + $agingBoost);

        if ($newPriority > $message->get('priority')) {
            echo sprintf(
                "消息优先级提升: %d -> %d (等待时间: %d秒)\n",
                $message->get('priority'),
                $newPriority,
                $age
            );

            $this->requeueWithPriority($message, $newPriority);
        }
    }

    private function requeueWithPriority($message, $newPriority)
    {
        $body = json_decode($message->body, true);
        $body['priority_boosted'] = true;
        $body['new_priority'] = $newPriority;

        $newMessage = new AMQPMessage(
            json_encode($body),
            [
                'content_type' => 'application/json',
                'priority' => $newPriority
            ]
        );

        $this->channel->basic_publish($newMessage, '', $this->queueName);
        $message->ack();
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

实际应用场景

1. 客服工单系统

php
<?php

class SupportTicketSystem
{
    private $publisher;

    const PRIORITY_VIP = 10;
    const PRIORITY_URGENT = 8;
    const PRIORITY_HIGH = 6;
    const PRIORITY_NORMAL = 4;
    const PRIORITY_LOW = 2;

    public function __construct()
    {
        $this->publisher = new PriorityMessagePublisher('support_tickets');
    }

    public function createTicket($ticketId, $customerId, $issue, $customerType = 'normal')
    {
        $priority = $this->determinePriority($customerType);

        $this->publisher->publish([
            'ticket_id' => $ticketId,
            'customer_id' => $customerId,
            'customer_type' => $customerType,
            'issue' => $issue,
            'created_at' => time()
        ], $priority);
    }

    private function determinePriority($customerType)
    {
        $priorities = [
            'vip' => self::PRIORITY_VIP,
            'premium' => self::PRIORITY_URGENT,
            'enterprise' => self::PRIORITY_HIGH,
            'normal' => self::PRIORITY_NORMAL,
            'free' => self::PRIORITY_LOW
        ];

        return $priorities[$customerType] ?? self::PRIORITY_NORMAL;
    }

    public function escalateTicket($ticketId, $reason)
    {
        $this->publisher->publish([
            'ticket_id' => $ticketId,
            'action' => 'escalate',
            'reason' => $reason,
            'escalated_at' => time()
        ], self::PRIORITY_URGENT);
    }
}

2. 订单处理系统

php
<?php

class OrderPrioritySystem
{
    private $publisher;

    public function __construct()
    {
        $this->publisher = new PriorityMessagePublisher('order_processing');
    }

    public function submitOrder($orderId, $orderData, $isExpress = false, $isVip = false)
    {
        $priority = $this->calculateOrderPriority($isExpress, $isVip);

        $this->publisher->publish([
            'order_id' => $orderId,
            'order_data' => $orderData,
            'is_express' => $isExpress,
            'is_vip' => $isVip,
            'submitted_at' => time()
        ], $priority);
    }

    private function calculateOrderPriority($isExpress, $isVip)
    {
        if ($isVip && $isExpress) {
            return PriorityMessagePublisher::PRIORITY_CRITICAL;
        } elseif ($isVip || $isExpress) {
            return PriorityMessagePublisher::PRIORITY_HIGH;
        } else {
            return PriorityMessagePublisher::PRIORITY_NORMAL;
        }
    }

    public function processExpiredOrder($orderId)
    {
        $this->publisher->publish([
            'order_id' => $orderId,
            'action' => 'expiry_check',
            'checked_at' => time()
        ], PriorityMessagePublisher::PRIORITY_HIGH);
    }
}

3. 系统告警处理

php
<?php

class AlertPrioritySystem
{
    private $publisher;

    const ALERT_CRITICAL = 10;
    const ALERT_ERROR = 8;
    const ALERT_WARNING = 5;
    const ALERT_INFO = 3;

    public function __construct()
    {
        $this->publisher = new PriorityMessagePublisher('system_alerts');
    }

    public function sendAlert($level, $source, $message, $context = [])
    {
        $priority = $this->getAlertPriority($level);

        $this->publisher->publish([
            'level' => $level,
            'source' => $source,
            'message' => $message,
            'context' => $context,
            'timestamp' => time()
        ], $priority);
    }

    private function getAlertPriority($level)
    {
        $priorities = [
            'critical' => self::ALERT_CRITICAL,
            'error' => self::ALERT_ERROR,
            'warning' => self::ALERT_WARNING,
            'info' => self::ALERT_INFO
        ];

        return $priorities[$level] ?? self::ALERT_INFO;
    }

    public function critical($source, $message, $context = [])
    {
        $this->sendAlert('critical', $source, $message, $context);
    }

    public function error($source, $message, $context = [])
    {
        $this->sendAlert('error', $source, $message, $context);
    }

    public function warning($source, $message, $context = [])
    {
        $this->sendAlert('warning', $source, $message, $context);
    }

    public function info($source, $message, $context = [])
    {
        $this->sendAlert('info', $source, $message, $context);
    }
}

常见问题与解决方案

问题 1:优先级队列性能问题

原因:优先级排序需要额外开销

解决方案

php
<?php

class OptimizedPriorityQueue
{
    public function createOptimalQueue($queueName, $expectedPriorityLevels = 5)
    {
        $args = new AMQPTable([
            'x-max-priority' => min($expectedPriorityLevels, 10)
        ]);

        $this->channel->queue_declare(
            $queueName,
            false,
            true,
            false,
            false,
            false,
            $args
        );
    }

    public function recommendPriorityLevels($messageRate)
    {
        if ($messageRate > 10000) {
            return 3;
        } elseif ($messageRate > 1000) {
            return 5;
        } else {
            return 10;
        }
    }
}

问题 2:消费者饥饿问题

原因:高优先级消息过多,低优先级消息无法消费

解决方案

php
<?php

class FairPriorityConsumer
{
    private $minProcessedPerPriority = [
        10 => 1,
        8 => 2,
        5 => 3,
        1 => 5
    ];

    private $processedCount = [];

    public function shouldProcessPriority($priority)
    {
        $processed = $this->processedCount[$priority] ?? 0;
        $minRequired = $this->minProcessedPerPriority[$priority] ?? 1;

        $lowerPrioritiesProcessed = $this->getLowerPrioritiesProcessed($priority);

        if ($lowerPrioritiesProcessed >= $minRequired) {
            return true;
        }

        $this->processedCount[$priority] = $processed + 1;
        return true;
    }

    private function getLowerPrioritiesProcessed($currentPriority)
    {
        $total = 0;

        foreach ($this->processedCount as $priority => $count) {
            if ($priority < $currentPriority) {
                $total += $count;
            }
        }

        return $total;
    }

    public function resetCounters()
    {
        $this->processedCount = [];
    }
}

问题 3:优先级配置错误

原因:消息优先级超出队列配置范围

解决方案

php
<?php

class PriorityValidator
{
    private $maxPriority;

    public function __construct($maxPriority = 10)
    {
        $this->maxPriority = $maxPriority;
    }

    public function validate($priority)
    {
        if (!is_int($priority)) {
            throw new InvalidArgumentException("优先级必须是整数");
        }

        if ($priority < 0 || $priority > $this->maxPriority) {
            throw new InvalidArgumentException(
                "优先级必须在 0 到 {$this->maxPriority} 之间"
            );
        }

        return true;
    }

    public function normalize($priority)
    {
        return max(0, min($this->maxPriority, (int)$priority));
    }
}

class SafePriorityPublisher
{
    private $publisher;
    private $validator;

    public function __construct($queueName, $maxPriority = 10)
    {
        $this->publisher = new PriorityMessagePublisher($queueName);
        $this->validator = new PriorityValidator($maxPriority);
    }

    public function publish($message, $priority)
    {
        $normalizedPriority = $this->validator->normalize($priority);

        $this->publisher->publish($message, $normalizedPriority);
    }
}

最佳实践建议

1. 合理设置优先级级别

php
<?php

class PriorityLevelConfig
{
    const LEVELS = [
        'critical' => [
            'value' => 10,
            'description' => '紧急:系统崩溃、数据丢失风险',
            'sla_seconds' => 60
        ],
        'high' => [
            'value' => 8,
            'description' => '高:重要业务中断',
            'sla_seconds' => 300
        ],
        'normal' => [
            'value' => 5,
            'description' => '普通:常规业务请求',
            'sla_seconds' => 3600
        ],
        'low' => [
            'value' => 2,
            'description' => '低:后台任务、批量处理',
            'sla_seconds' => 86400
        ]
    ];

    public static function getPriorityValue($level)
    {
        return self::LEVELS[$level]['value'] ?? 5;
    }

    public static function getSla($level)
    {
        return self::LEVELS[$level]['sla_seconds'] ?? 3600;
    }
}

2. 优先级监控

php
<?php

class PriorityMonitor
{
    private $stats = [];

    public function recordMessage($priority, $processed = true)
    {
        if (!isset($this->stats[$priority])) {
            $this->stats[$priority] = [
                'total' => 0,
                'processed' => 0,
                'pending' => 0
            ];
        }

        $this->stats[$priority]['total']++;

        if ($processed) {
            $this->stats[$priority]['processed']++;
        } else {
            $this->stats[$priority]['pending']++;
        }
    }

    public function getStats()
    {
        return $this->stats;
    }

    public function getBacklogByPriority()
    {
        $backlog = [];

        foreach ($this->stats as $priority => $data) {
            $backlog[$priority] = $data['pending'];
        }

        return $backlog;
    }

    public function checkSlaViolation($priority, $waitTime)
    {
        $sla = PriorityLevelConfig::getSla($priority);

        return $waitTime > $sla;
    }
}

3. 优先级动态调整

php
<?php

class DynamicPriorityAdjuster
{
    public function adjustPriorityBasedOnAge($message, $currentTime)
    {
        $data = json_decode($message->body, true);
        $createdAt = $data['created_at'];
        $basePriority = $data['base_priority'] ?? 5;

        $age = $currentTime - $createdAt;

        $sla = PriorityLevelConfig::getSla('normal');

        if ($age > $sla * 2) {
            return min(10, $basePriority + 3);
        } elseif ($age > $sla) {
            return min(10, $basePriority + 1);
        }

        return $basePriority;
    }

    public function adjustPriorityBasedOnLoad($basePriority, $queueDepth)
    {
        if ($queueDepth > 10000) {
            return min(10, $basePriority + 2);
        } elseif ($queueDepth > 5000) {
            return min(10, $basePriority + 1);
        }

        return $basePriority;
    }
}

相关链接