Appearance
优先级队列
概述
优先级队列(Priority Queue)允许消息按照优先级顺序被消费。高优先级的消息会被优先处理,这在需要区分消息重要性的场景中非常有用。
核心原理
RabbitMQ 的优先级队列通过在队列声明时指定 x-max-priority 参数来实现。消息可以设置 0 到 max-priority 之间的优先级值,数值越大优先级越高。
mermaid
graph TD
subgraph 生产者
P1[发送优先级=5的消息]
P2[发送优先级=1的消息]
P3[发送优先级=10的消息]
end
subgraph 优先级队列
Q[队列<br/>max-priority=10]
Q --> M1[优先级=10]
Q --> M2[优先级=5]
Q --> M3[优先级=1]
end
subgraph 消费者
C[消费者按优先级消费]
end
P1 --> Q
P2 --> Q
P3 --> Q
M1 --> C
M2 --> C
M3 --> C优先级规则
- 优先级范围: 0 到
x-max-priority(最大支持 255,建议不超过 10) - 默认优先级: 0(最低)
- 数值越大优先级越高
- 相同优先级的消息按 FIFO 顺序
PHP 代码示例
声明优先级队列
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queueName = 'priority-tasks';
// 声明优先级队列,最大优先级为 10
$channel->queue_declare(
$queueName,
false,
true,
false,
false,
false,
new AMQPTable(['x-max-priority' => 10])
);
echo "优先级队列已创建: {$queueName}\n";
$channel->close();
$connection->close();发送不同优先级的消息
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queueName = 'priority-tasks';
$channel->queue_declare(
$queueName,
false,
true,
false,
false,
false,
new AMQPTable(['x-max-priority' => 10])
);
// 发送不同优先级的消息
$messages = [
['body' => '普通任务', 'priority' => 1],
['body' => '紧急任务', 'priority' => 10],
['body' => '高优先级任务', 'priority' => 5],
['body' => '低优先级任务', 'priority' => 0],
['body' => 'VIP任务', 'priority' => 8],
];
foreach ($messages as $msgData) {
$message = new AMQPMessage(
json_encode(['task' => $msgData['body'], 'timestamp' => time()]),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'priority' => $msgData['priority']
]
);
$channel->basic_publish($message, '', $queueName);
echo "发送消息: {$msgData['body']} (优先级: {$msgData['priority']})\n";
}
$channel->close();
$connection->close();消费优先级消息
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queueName = 'priority-tasks';
$channel->queue_declare(
$queueName,
false,
true,
false,
false,
false,
new AMQPTable(['x-max-priority' => 10])
);
echo "等待优先级消息...\n";
$callback = function (AMQPMessage $msg) {
$priority = $msg->get('priority');
$data = json_decode($msg->getBody(), true);
echo sprintf(
"[优先级: %d] 处理任务: %s\n",
$priority,
$data['task']
);
$msg->ack();
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();封装的优先级队列类
php
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class PriorityQueue
{
private $channel;
private $queueName;
private $maxPriority;
const PRIORITY_LOW = 0;
const PRIORITY_NORMAL = 5;
const PRIORITY_HIGH = 8;
const PRIORITY_URGENT = 10;
public function __construct($channel, $queueName, $maxPriority = 10)
{
$this->channel = $channel;
$this->queueName = $queueName;
$this->maxPriority = $maxPriority;
$this->declare();
}
private function declare()
{
$this->channel->queue_declare(
$this->queueName,
false,
true,
false,
false,
false,
new AMQPTable(['x-max-priority' => $this->maxPriority])
);
}
public function publish($data, $priority = self::PRIORITY_NORMAL)
{
$priority = max(0, min($this->maxPriority, $priority));
$message = new AMQPMessage(
json_encode($data),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'priority' => $priority
]
);
$this->channel->basic_publish($message, '', $this->queueName);
}
public function publishLow($data)
{
$this->publish($data, self::PRIORITY_LOW);
}
public function publishNormal($data)
{
$this->publish($data, self::PRIORITY_NORMAL);
}
public function publishHigh($data)
{
$this->publish($data, self::PRIORITY_HIGH);
}
public function publishUrgent($data)
{
$this->publish($data, self::PRIORITY_URGENT);
}
public function consume(callable $handler, $prefetch = 1)
{
$this->channel->basic_qos(null, $prefetch, null);
$callback = function ($msg) use ($handler) {
$priority = $msg->get('priority');
$data = json_decode($msg->getBody(), true);
$handler($data, $priority, $msg);
$msg->ack();
};
$this->channel->basic_consume(
$this->queueName,
'',
false,
false,
false,
false,
$callback
);
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
}实际应用场景
1. 任务调度系统
php
<?php
class TaskScheduler
{
private $queue;
public function __construct($channel)
{
$this->queue = new PriorityQueue($channel, 'scheduled-tasks', 10);
}
public function scheduleTask($task, $priority = 'normal')
{
$priorityMap = [
'low' => PriorityQueue::PRIORITY_LOW,
'normal' => PriorityQueue::PRIORITY_NORMAL,
'high' => PriorityQueue::PRIORITY_HIGH,
'urgent' => PriorityQueue::PRIORITY_URGENT
];
$this->queue->publish([
'task' => $task,
'scheduled_at' => time()
], $priorityMap[$priority] ?? PriorityQueue::PRIORITY_NORMAL);
}
public function scheduleUrgentTask($task)
{
$this->queue->publishUrgent([
'task' => $task,
'scheduled_at' => time(),
'urgent' => true
]);
}
public function processTasks()
{
$this->queue->consume(function ($data, $priority) {
echo sprintf(
"[%s] 处理任务: %s (优先级: %d)\n",
date('Y-m-d H:i:s'),
$data['task'],
$priority
);
// 执行任务
$this->executeTask($data['task']);
});
}
private function executeTask($task)
{
// 任务执行逻辑
}
}2. 客服工单系统
php
<?php
class TicketSystem
{
private $queue;
const PRIORITY_VIP = 10;
const PRIORITY_HIGH = 7;
const PRIORITY_NORMAL = 5;
const PRIORITY_LOW = 2;
public function __construct($channel)
{
$this->queue = new PriorityQueue($channel, 'support-tickets', 10);
}
public function createTicket($userId, $subject, $message, $userType = 'normal')
{
$priority = match($userType) {
'vip' => self::PRIORITY_VIP,
'premium' => self::PRIORITY_HIGH,
'normal' => self::PRIORITY_NORMAL,
'free' => self::PRIORITY_LOW,
default => self::PRIORITY_NORMAL
};
$this->queue->publish([
'ticket_id' => uniqid('TKT-'),
'user_id' => $userId,
'subject' => $subject,
'message' => $message,
'user_type' => $userType,
'created_at' => time()
], $priority);
}
public function processTickets()
{
$this->queue->consume(function ($data, $priority) {
echo sprintf(
"处理工单 #%s (用户类型: %s, 优先级: %d)\n",
$data['ticket_id'],
$data['user_type'],
$priority
);
// 处理工单
$this->handleTicket($data);
});
}
private function handleTicket($ticket)
{
// 工单处理逻辑
}
}3. 消息推送系统
php
<?php
class PushNotificationSystem
{
private $queue;
public function __construct($channel)
{
$this->queue = new PriorityQueue($channel, 'push-notifications', 5);
}
public function sendPush($userId, $title, $body, $priority = 'normal')
{
$priorityValue = match($priority) {
'instant' => 5,
'high' => 4,
'normal' => 3,
'low' => 1,
default => 3
};
$this->queue->publish([
'user_id' => $userId,
'title' => $title,
'body' => $body,
'created_at' => time()
], $priorityValue);
}
public function sendInstantPush($userId, $title, $body)
{
$this->sendPush($userId, $title, $body, 'instant');
}
public function processPushes()
{
$this->queue->consume(function ($data, $priority) {
// 高优先级消息立即发送
if ($priority >= 4) {
$this->sendImmediately($data);
} else {
$this->sendBatch($data);
}
});
}
private function sendImmediately($data)
{
// 立即发送推送
}
private function sendBatch($data)
{
// 批量发送推送
}
}常见问题与解决方案
问题 1: 优先级队列性能问题
症状: 优先级队列消费速度变慢
原因: 优先级队列需要维护优先级索引,内存开销更大
解决方案:
php
<?php
// 限制最大优先级数量
// 建议: 使用较小的优先级范围(如 1-5 或 1-10)
// 不推荐
$channel->queue_declare($queueName, false, true, false, false, false,
new AMQPTable(['x-max-priority' => 255])
);
// 推荐
$channel->queue_declare($queueName, false, true, false, false, false,
new AMQPTable(['x-max-priority' => 5])
);问题 2: 消息优先级不生效
症状: 消息按 FIFO 顺序消费,优先级未生效
原因: 消费者预取了多条消息
解决方案:
php
<?php
// 设置 prefetch_count = 1,确保按优先级消费
$channel->basic_qos(null, 1, null);
$channel->basic_consume($queueName, '', false, false, false, false, $callback);问题 3: 队列声明冲突
症状: 声明优先级队列时报错
原因: 队列已存在但不是优先级队列
解决方案:
php
<?php
// 删除现有队列后重新声明
// 或者使用不同的队列名称
// 检查队列是否存在
try {
$channel->queue_declare($queueName, true);
} catch (Exception $e) {
// 队列不存在或参数不匹配
echo "需要重新创建队列\n";
}最佳实践建议
- 限制优先级数量: 建议使用 1-10 的优先级范围
- 设置合理的 prefetch: 使用
basic_qos(1)确保优先级生效 - 避免过度使用: 优先级队列有额外开销,非必要不使用
- 统一优先级定义: 使用常量定义优先级级别
- 监控队列状态: 关注优先级队列的消息积压情况
