Appearance
优先级队列插件
概述
优先级队列是 RabbitMQ 原生支持的功能,允许消息根据优先级进行排序消费。高优先级的消息会被优先消费,这对于需要处理紧急消息或重要任务的场景非常有用。
核心知识点
优先级队列原理
mermaid
graph TB
subgraph 生产者
P1[高优先级消息]
P2[普通优先级消息]
P3[低优先级消息]
end
subgraph RabbitMQ
Q[优先级队列<br/>x-max-priority=10]
subgraph 内部排序
H[优先级 10]
M[优先级 5]
L[优先级 1]
end
end
subgraph 消费者
C[消费者]
end
P1 -->|priority=10| Q
P2 -->|priority=5| Q
P3 -->|priority=1| Q
Q --> H
Q --> M
Q --> L
H -->|先消费| C
M -->|后消费| C
L -->|最后消费| C
style H fill:#ffcdd2
style M fill:#fff9c4
style L fill:#c8e6c9工作流程
mermaid
sequenceDiagram
participant P as 生产者
participant Q as 优先级队列
participant C as 消费者
Note over Q: 队列配置 x-max-priority=10
P->>Q: 发送消息(priority=5)
P->>Q: 发送消息(priority=10)
P->>Q: 发送消息(priority=1)
Note over Q: 内部按优先级排序<br/>[10, 5, 1]
Q->>C: 投递消息(priority=10)
Q->>C: 投递消息(priority=5)
Q->>C: 投递消息(priority=1)核心配置参数
| 参数 | 说明 | 默认值 |
|---|---|---|
| x-max-priority | 队列支持的最大优先级 | 无 |
| priority | 消息优先级(0 到 x-max-priority) | 0 |
PHP 代码示例
创建优先级队列
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class PriorityQueueCreator
{
private $connection;
private $channel;
public function __construct()
{
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
}
public function createQueue($queueName, $maxPriority = 10)
{
$args = new AMQPTable([
'x-max-priority' => $maxPriority
]);
$this->channel->queue_declare(
$queueName,
false,
true,
false,
false,
false,
$args
);
echo "优先级队列已创建: {$queueName} (最大优先级: {$maxPriority})\n";
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}优先级消息生产者
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class PriorityMessagePublisher
{
private $connection;
private $channel;
private $queueName;
const PRIORITY_LOW = 1;
const PRIORITY_NORMAL = 5;
const PRIORITY_HIGH = 8;
const PRIORITY_CRITICAL = 10;
public function __construct($queueName = 'priority_queue')
{
$this->queueName = $queueName;
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
$this->ensureQueueExists();
}
private function ensureQueueExists()
{
$creator = new PriorityQueueCreator();
$creator->createQueue($this->queueName, 10);
$creator->close();
}
public function publish($message, $priority = self::PRIORITY_NORMAL)
{
$msg = new AMQPMessage(
json_encode([
'content' => $message,
'priority' => $priority,
'timestamp' => time()
]),
[
'content_type' => 'application/json',
'priority' => $priority
]
);
$this->channel->basic_publish($msg, '', $this->queueName);
echo sprintf(
"消息已发送,优先级: %d\n",
$priority
);
}
public function publishCritical($message)
{
$this->publish($message, self::PRIORITY_CRITICAL);
}
public function publishHigh($message)
{
$this->publish($message, self::PRIORITY_HIGH);
}
public function publishNormal($message)
{
$this->publish($message, self::PRIORITY_NORMAL);
}
public function publishLow($message)
{
$this->publish($message, self::PRIORITY_LOW);
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}优先级消息消费者
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class PriorityMessageConsumer
{
private $connection;
private $channel;
private $queueName;
public function __construct($queueName = 'priority_queue')
{
$this->queueName = $queueName;
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
}
public function consume()
{
$callback = function (AMQPMessage $message) {
$body = json_decode($message->body, true);
$priority = $message->get('priority');
echo sprintf(
"[%s] 收到消息,优先级: %d, 内容: %s\n",
date('H:i:s'),
$priority,
json_encode($body['content'], JSON_UNESCAPED_UNICODE)
);
$this->processByPriority($body, $priority);
$message->ack();
};
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume(
$this->queueName,
'',
false,
false,
false,
false,
$callback
);
echo "优先级消费者已启动\n";
while ($this->channel->is_open()) {
$this->channel->wait();
}
}
private function processByPriority($body, $priority)
{
if ($priority >= PriorityMessagePublisher::PRIORITY_CRITICAL) {
echo " -> 紧急处理\n";
} elseif ($priority >= PriorityMessagePublisher::PRIORITY_HIGH) {
echo " -> 优先处理\n";
} elseif ($priority >= PriorityMessagePublisher::PRIORITY_NORMAL) {
echo " -> 正常处理\n";
} else {
echo " -> 低优先级处理\n";
}
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}完整示例:任务优先级系统
php
<?php
class TaskPrioritySystem
{
private $publisher;
const TASK_TYPE_IMMEDIATE = 'immediate';
const TASK_TYPE_URGENT = 'urgent';
const TASK_TYPE_NORMAL = 'normal';
const TASK_TYPE_BACKGROUND = 'background';
private $priorityMap = [
self::TASK_TYPE_IMMEDIATE => PriorityMessagePublisher::PRIORITY_CRITICAL,
self::TASK_TYPE_URGENT => PriorityMessagePublisher::PRIORITY_HIGH,
self::TASK_TYPE_NORMAL => PriorityMessagePublisher::PRIORITY_NORMAL,
self::TASK_TYPE_BACKGROUND => PriorityMessagePublisher::PRIORITY_LOW
];
public function __construct()
{
$this->publisher = new PriorityMessagePublisher('task_queue');
}
public function submitTask($taskId, $taskType, $payload)
{
$priority = $this->priorityMap[$taskType] ?? PriorityMessagePublisher::PRIORITY_NORMAL;
$this->publisher->publish([
'task_id' => $taskId,
'task_type' => $taskType,
'payload' => $payload,
'submitted_at' => time()
], $priority);
echo "任务 {$taskId} 已提交,类型: {$taskType}\n";
}
public function submitImmediateTask($taskId, $payload)
{
$this->submitTask($taskId, self::TASK_TYPE_IMMEDIATE, $payload);
}
public function submitUrgentTask($taskId, $payload)
{
$this->submitTask($taskId, self::TASK_TYPE_URGENT, $payload);
}
public function submitNormalTask($taskId, $payload)
{
$this->submitTask($taskId, self::TASK_TYPE_NORMAL, $payload);
}
public function submitBackgroundTask($taskId, $payload)
{
$this->submitTask($taskId, self::TASK_TYPE_BACKGROUND, $payload);
}
public function close()
{
$this->publisher->close();
}
}
class TaskWorker
{
private $consumer;
private $handlers = [];
public function __construct()
{
$this->consumer = new PriorityMessageConsumer('task_queue');
}
public function registerHandler($taskType, callable $handler)
{
$this->handlers[$taskType] = $handler;
}
public function start()
{
echo "任务工作器已启动\n";
$this->consumer->consume();
}
}动态优先级调整
php
<?php
class DynamicPriorityManager
{
private $connection;
private $channel;
private $queueName;
public function __construct($queueName = 'dynamic_priority_queue')
{
$this->queueName = $queueName;
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
}
public function publishWithAging($message, $basePriority, $agingInterval = 60)
{
$messageData = [
'content' => $message,
'base_priority' => $basePriority,
'created_at' => time(),
'aging_interval' => $agingInterval
];
$msg = new AMQPMessage(
json_encode($messageData),
[
'content_type' => 'application/json',
'priority' => $basePriority,
'headers' => [
'base_priority' => $basePriority,
'aging_interval' => $agingInterval
]
]
);
$this->channel->basic_publish($msg, '', $this->queueName);
}
public function reprioritizeMessage($message, $waitTime)
{
$data = json_decode($message->body, true);
$basePriority = $data['base_priority'];
$agingInterval = $data['aging_interval'];
$age = time() - $data['created_at'];
$agingBoost = floor($age / $agingInterval);
$newPriority = min(10, $basePriority + $agingBoost);
if ($newPriority > $message->get('priority')) {
echo sprintf(
"消息优先级提升: %d -> %d (等待时间: %d秒)\n",
$message->get('priority'),
$newPriority,
$age
);
$this->requeueWithPriority($message, $newPriority);
}
}
private function requeueWithPriority($message, $newPriority)
{
$body = json_decode($message->body, true);
$body['priority_boosted'] = true;
$body['new_priority'] = $newPriority;
$newMessage = new AMQPMessage(
json_encode($body),
[
'content_type' => 'application/json',
'priority' => $newPriority
]
);
$this->channel->basic_publish($newMessage, '', $this->queueName);
$message->ack();
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}实际应用场景
1. 客服工单系统
php
<?php
class SupportTicketSystem
{
private $publisher;
const PRIORITY_VIP = 10;
const PRIORITY_URGENT = 8;
const PRIORITY_HIGH = 6;
const PRIORITY_NORMAL = 4;
const PRIORITY_LOW = 2;
public function __construct()
{
$this->publisher = new PriorityMessagePublisher('support_tickets');
}
public function createTicket($ticketId, $customerId, $issue, $customerType = 'normal')
{
$priority = $this->determinePriority($customerType);
$this->publisher->publish([
'ticket_id' => $ticketId,
'customer_id' => $customerId,
'customer_type' => $customerType,
'issue' => $issue,
'created_at' => time()
], $priority);
}
private function determinePriority($customerType)
{
$priorities = [
'vip' => self::PRIORITY_VIP,
'premium' => self::PRIORITY_URGENT,
'enterprise' => self::PRIORITY_HIGH,
'normal' => self::PRIORITY_NORMAL,
'free' => self::PRIORITY_LOW
];
return $priorities[$customerType] ?? self::PRIORITY_NORMAL;
}
public function escalateTicket($ticketId, $reason)
{
$this->publisher->publish([
'ticket_id' => $ticketId,
'action' => 'escalate',
'reason' => $reason,
'escalated_at' => time()
], self::PRIORITY_URGENT);
}
}2. 订单处理系统
php
<?php
class OrderPrioritySystem
{
private $publisher;
public function __construct()
{
$this->publisher = new PriorityMessagePublisher('order_processing');
}
public function submitOrder($orderId, $orderData, $isExpress = false, $isVip = false)
{
$priority = $this->calculateOrderPriority($isExpress, $isVip);
$this->publisher->publish([
'order_id' => $orderId,
'order_data' => $orderData,
'is_express' => $isExpress,
'is_vip' => $isVip,
'submitted_at' => time()
], $priority);
}
private function calculateOrderPriority($isExpress, $isVip)
{
if ($isVip && $isExpress) {
return PriorityMessagePublisher::PRIORITY_CRITICAL;
} elseif ($isVip || $isExpress) {
return PriorityMessagePublisher::PRIORITY_HIGH;
} else {
return PriorityMessagePublisher::PRIORITY_NORMAL;
}
}
public function processExpiredOrder($orderId)
{
$this->publisher->publish([
'order_id' => $orderId,
'action' => 'expiry_check',
'checked_at' => time()
], PriorityMessagePublisher::PRIORITY_HIGH);
}
}3. 系统告警处理
php
<?php
class AlertPrioritySystem
{
private $publisher;
const ALERT_CRITICAL = 10;
const ALERT_ERROR = 8;
const ALERT_WARNING = 5;
const ALERT_INFO = 3;
public function __construct()
{
$this->publisher = new PriorityMessagePublisher('system_alerts');
}
public function sendAlert($level, $source, $message, $context = [])
{
$priority = $this->getAlertPriority($level);
$this->publisher->publish([
'level' => $level,
'source' => $source,
'message' => $message,
'context' => $context,
'timestamp' => time()
], $priority);
}
private function getAlertPriority($level)
{
$priorities = [
'critical' => self::ALERT_CRITICAL,
'error' => self::ALERT_ERROR,
'warning' => self::ALERT_WARNING,
'info' => self::ALERT_INFO
];
return $priorities[$level] ?? self::ALERT_INFO;
}
public function critical($source, $message, $context = [])
{
$this->sendAlert('critical', $source, $message, $context);
}
public function error($source, $message, $context = [])
{
$this->sendAlert('error', $source, $message, $context);
}
public function warning($source, $message, $context = [])
{
$this->sendAlert('warning', $source, $message, $context);
}
public function info($source, $message, $context = [])
{
$this->sendAlert('info', $source, $message, $context);
}
}常见问题与解决方案
问题 1:优先级队列性能问题
原因:优先级排序需要额外开销
解决方案:
php
<?php
class OptimizedPriorityQueue
{
public function createOptimalQueue($queueName, $expectedPriorityLevels = 5)
{
$args = new AMQPTable([
'x-max-priority' => min($expectedPriorityLevels, 10)
]);
$this->channel->queue_declare(
$queueName,
false,
true,
false,
false,
false,
$args
);
}
public function recommendPriorityLevels($messageRate)
{
if ($messageRate > 10000) {
return 3;
} elseif ($messageRate > 1000) {
return 5;
} else {
return 10;
}
}
}问题 2:消费者饥饿问题
原因:高优先级消息过多,低优先级消息无法消费
解决方案:
php
<?php
class FairPriorityConsumer
{
private $minProcessedPerPriority = [
10 => 1,
8 => 2,
5 => 3,
1 => 5
];
private $processedCount = [];
public function shouldProcessPriority($priority)
{
$processed = $this->processedCount[$priority] ?? 0;
$minRequired = $this->minProcessedPerPriority[$priority] ?? 1;
$lowerPrioritiesProcessed = $this->getLowerPrioritiesProcessed($priority);
if ($lowerPrioritiesProcessed >= $minRequired) {
return true;
}
$this->processedCount[$priority] = $processed + 1;
return true;
}
private function getLowerPrioritiesProcessed($currentPriority)
{
$total = 0;
foreach ($this->processedCount as $priority => $count) {
if ($priority < $currentPriority) {
$total += $count;
}
}
return $total;
}
public function resetCounters()
{
$this->processedCount = [];
}
}问题 3:优先级配置错误
原因:消息优先级超出队列配置范围
解决方案:
php
<?php
class PriorityValidator
{
private $maxPriority;
public function __construct($maxPriority = 10)
{
$this->maxPriority = $maxPriority;
}
public function validate($priority)
{
if (!is_int($priority)) {
throw new InvalidArgumentException("优先级必须是整数");
}
if ($priority < 0 || $priority > $this->maxPriority) {
throw new InvalidArgumentException(
"优先级必须在 0 到 {$this->maxPriority} 之间"
);
}
return true;
}
public function normalize($priority)
{
return max(0, min($this->maxPriority, (int)$priority));
}
}
class SafePriorityPublisher
{
private $publisher;
private $validator;
public function __construct($queueName, $maxPriority = 10)
{
$this->publisher = new PriorityMessagePublisher($queueName);
$this->validator = new PriorityValidator($maxPriority);
}
public function publish($message, $priority)
{
$normalizedPriority = $this->validator->normalize($priority);
$this->publisher->publish($message, $normalizedPriority);
}
}最佳实践建议
1. 合理设置优先级级别
php
<?php
class PriorityLevelConfig
{
const LEVELS = [
'critical' => [
'value' => 10,
'description' => '紧急:系统崩溃、数据丢失风险',
'sla_seconds' => 60
],
'high' => [
'value' => 8,
'description' => '高:重要业务中断',
'sla_seconds' => 300
],
'normal' => [
'value' => 5,
'description' => '普通:常规业务请求',
'sla_seconds' => 3600
],
'low' => [
'value' => 2,
'description' => '低:后台任务、批量处理',
'sla_seconds' => 86400
]
];
public static function getPriorityValue($level)
{
return self::LEVELS[$level]['value'] ?? 5;
}
public static function getSla($level)
{
return self::LEVELS[$level]['sla_seconds'] ?? 3600;
}
}2. 优先级监控
php
<?php
class PriorityMonitor
{
private $stats = [];
public function recordMessage($priority, $processed = true)
{
if (!isset($this->stats[$priority])) {
$this->stats[$priority] = [
'total' => 0,
'processed' => 0,
'pending' => 0
];
}
$this->stats[$priority]['total']++;
if ($processed) {
$this->stats[$priority]['processed']++;
} else {
$this->stats[$priority]['pending']++;
}
}
public function getStats()
{
return $this->stats;
}
public function getBacklogByPriority()
{
$backlog = [];
foreach ($this->stats as $priority => $data) {
$backlog[$priority] = $data['pending'];
}
return $backlog;
}
public function checkSlaViolation($priority, $waitTime)
{
$sla = PriorityLevelConfig::getSla($priority);
return $waitTime > $sla;
}
}3. 优先级动态调整
php
<?php
class DynamicPriorityAdjuster
{
public function adjustPriorityBasedOnAge($message, $currentTime)
{
$data = json_decode($message->body, true);
$createdAt = $data['created_at'];
$basePriority = $data['base_priority'] ?? 5;
$age = $currentTime - $createdAt;
$sla = PriorityLevelConfig::getSla('normal');
if ($age > $sla * 2) {
return min(10, $basePriority + 3);
} elseif ($age > $sla) {
return min(10, $basePriority + 1);
}
return $basePriority;
}
public function adjustPriorityBasedOnLoad($basePriority, $queueDepth)
{
if ($queueDepth > 10000) {
return min(10, $basePriority + 2);
} elseif ($queueDepth > 5000) {
return min(10, $basePriority + 1);
}
return $basePriority;
}
}