Appearance
惰性队列
概述
惰性队列(Lazy Queue)是 RabbitMQ 3.6.0 引入的一种队列模式。与默认的即时队列不同,惰性队列会将消息尽可能早地写入磁盘,只有在消费者请求时才加载到内存中。
核心原理
即时队列 vs 惰性队列
mermaid
graph TD
subgraph 即时队列
M1[消息] --> MQ1[内存队列]
MQ1 --> D1[磁盘]
MQ1 --> C1[消费者]
end
subgraph 惰性队列
M2[消息] --> D2[磁盘]
D2 --> MQ2[内存缓存]
MQ2 --> C2[消费者]
end
style MQ1 fill:#87CEEB
style D2 fill:#90EE90工作模式对比
| 特性 | 即时队列 | 惰性队列 |
|---|---|---|
| 消息存储 | 优先内存 | 优先磁盘 |
| 内存占用 | 高 | 低 |
| 吞吐量 | 高 | 较低 |
| 适用场景 | 实时处理 | 大量积压 |
| 消费延迟 | 低 | 较高 |
触发条件
即时队列在以下情况会将消息写入磁盘:
- 内存使用超过阈值
- 队列被标记为惰性
- 消息设置了持久化且内存压力大
PHP 代码示例
声明惰性队列
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queueName = 'lazy-queue';
// 声明惰性队列
$channel->queue_declare(
$queueName,
false,
true,
false,
false,
false,
new AMQPTable([
'x-queue-mode' => 'lazy'
])
);
echo "惰性队列已创建: {$queueName}\n";
$channel->close();
$connection->close();发送大量消息测试
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queueName = 'lazy-queue';
$channel->queue_declare(
$queueName,
false,
true,
false,
false,
false,
new AMQPTable(['x-queue-mode' => 'lazy'])
);
// 发送大量消息
$messageCount = 100000;
$batchSize = 1000;
for ($i = 1; $i <= $messageCount; $i++) {
$message = new AMQPMessage(
json_encode([
'id' => $i,
'data' => str_repeat('x', 1000), // 1KB 数据
'timestamp' => time()
]),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$channel->basic_publish($message, '', $queueName);
if ($i % $batchSize === 0) {
echo "已发送 {$i} 条消息\n";
}
}
echo "完成发送 {$messageCount} 条消息\n";
$channel->close();
$connection->close();消费惰性队列消息
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queueName = 'lazy-queue';
echo "开始消费惰性队列消息...\n";
$processedCount = 0;
$startTime = microtime(true);
$callback = function (AMQPMessage $msg) use (&$processedCount, $startTime) {
$data = json_decode($msg->getBody(), true);
$processedCount++;
if ($processedCount % 1000 === 0) {
$elapsed = microtime(true) - $startTime;
$rate = $processedCount / $elapsed;
echo sprintf(
"已处理 %d 条消息, 速率: %.2f 条/秒\n",
$processedCount,
$rate
);
}
$msg->ack();
};
$channel->basic_qos(null, 100, null); // 较大的 prefetch 提高吞吐
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();惰性队列管理类
php
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class LazyQueueManager
{
private $channel;
public function __construct($channel)
{
$this->channel = $channel;
}
public function declareLazyQueue($queueName, array $options = [])
{
$arguments = ['x-queue-mode' => 'lazy'];
if (isset($options['ttl'])) {
$arguments['x-message-ttl'] = $options['ttl'];
}
if (isset($options['maxLength'])) {
$arguments['x-max-length'] = $options['maxLength'];
}
if (isset($options['dlx'])) {
$arguments['x-dead-letter-exchange'] = $options['dlx'];
}
$this->channel->queue_declare(
$queueName,
false,
true,
false,
false,
false,
new AMQPTable($arguments)
);
return $queueName;
}
public function getQueueStats($queueName)
{
list(, $messageCount, $consumerCount) = $this->channel->queue_declare(
$queueName,
true
);
return [
'queue_name' => $queueName,
'message_count' => $messageCount,
'consumer_count' => $consumerCount
];
}
public function purgeQueue($queueName)
{
$this->channel->queue_purge($queueName);
echo "队列 {$queueName} 已清空\n";
}
}实际应用场景
1. 日志收集系统
php
<?php
class LogCollector
{
private $channel;
private $queueName = 'logs_lazy';
public function __construct($channel)
{
$this->channel = $channel;
$this->setupQueue();
}
private function setupQueue()
{
$this->channel->queue_declare(
$this->queueName,
false,
true,
false,
false,
false,
new AMQPTable([
'x-queue-mode' => 'lazy',
'x-message-ttl' => 604800000, // 7天
'x-max-length' => 10000000 // 最多1000万条
])
);
}
public function collectLog($level, $service, $message, $context = [])
{
$log = new AMQPMessage(
json_encode([
'level' => $level,
'service' => $service,
'message' => $message,
'context' => $context,
'timestamp' => time(),
'hostname' => gethostname()
]),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$this->channel->basic_publish($log, '', $this->queueName);
}
public function processLogs(callable $handler, $batchSize = 100)
{
$this->channel->basic_qos(null, $batchSize, null);
$callback = function ($msg) use ($handler) {
$log = json_decode($msg->getBody(), true);
$handler($log);
$msg->ack();
};
$this->channel->basic_consume(
$this->queueName,
'',
false,
false,
false,
false,
$callback
);
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
}2. 批量数据处理
php
<?php
class BatchDataProcessor
{
private $channel;
private $queueName = 'batch_data';
public function __construct($channel)
{
$this->channel = $channel;
$this->setupQueue();
}
private function setupQueue()
{
$this->channel->queue_declare(
$this->queueName,
false,
true,
false,
false,
false,
new AMQPTable([
'x-queue-mode' => 'lazy',
'x-max-length-bytes' => 10737418240 // 10GB
])
);
}
public function enqueueData(array $data)
{
$message = new AMQPMessage(
json_encode($data),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$this->channel->basic_publish($message, '', $this->queueName);
}
public function processBatch($batchSize = 1000)
{
$batch = [];
$processedCount = 0;
while (true) {
$message = $this->channel->basic_get($this->queueName);
if (!$message) {
if (!empty($batch)) {
$this->processBatchData($batch);
$processedCount += count($batch);
}
break;
}
$batch[] = json_decode($message->getBody(), true);
$this->channel->basic_ack($message->getDeliveryTag());
if (count($batch) >= $batchSize) {
$this->processBatchData($batch);
$processedCount += count($batch);
$batch = [];
}
}
return $processedCount;
}
private function processBatchData(array $batch)
{
echo "处理批量数据: " . count($batch) . " 条\n";
// 批量处理逻辑
}
}3. 离线任务队列
php
<?php
class OfflineTaskQueue
{
private $channel;
private $queueName = 'offline_tasks';
public function __construct($channel)
{
$this->channel = $channel;
$this->setupQueue();
}
private function setupQueue()
{
$this->channel->queue_declare(
$this->queueName,
false,
true,
false,
false,
false,
new AMQPTable([
'x-queue-mode' => 'lazy',
'x-message-ttl' => 2592000000, // 30天
])
);
}
public function scheduleTask($taskType, $data, $scheduledTime = null)
{
$message = new AMQPMessage(
json_encode([
'task_type' => $taskType,
'data' => $data,
'scheduled_time' => $scheduledTime,
'created_at' => time()
]),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$this->channel->basic_publish($message, '', $this->queueName);
}
public function processTasks(callable $handler)
{
$this->channel->basic_qos(null, 10, null);
$callback = function ($msg) use ($handler) {
$task = json_decode($msg->getBody(), true);
// 检查是否到达计划执行时间
if ($task['scheduled_time'] && $task['scheduled_time'] > time()) {
// 重新入队
$msg->nack(true);
return;
}
try {
$handler($task);
$msg->ack();
} catch (Exception $e) {
error_log("任务执行失败: " . $e->getMessage());
$msg->nack(true);
}
};
$this->channel->basic_consume(
$this->queueName,
'',
false,
false,
false,
false,
$callback
);
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
}常见问题与解决方案
问题 1: 消费速度慢
症状: 惰性队列消费速度明显慢于即时队列
原因: 消息需要从磁盘加载到内存
解决方案:
php
<?php
// 增加 prefetch 数量
$channel->basic_qos(null, 100, null);
// 使用批量消费
$batchSize = 100;
$batch = [];
while (true) {
$message = $channel->basic_get($queueName);
if (!$message) break;
$batch[] = json_decode($message->getBody(), true);
$channel->basic_ack($message->getDeliveryTag());
if (count($batch) >= $batchSize) {
processBatch($batch);
$batch = [];
}
}问题 2: 磁盘空间不足
症状: 惰性队列占用大量磁盘空间
解决方案:
php
<?php
// 设置队列最大长度或 TTL
$channel->queue_declare(
$queueName,
false,
true,
false,
false,
false,
new AMQPTable([
'x-queue-mode' => 'lazy',
'x-max-length' => 1000000, // 最多100万条
'x-max-length-bytes' => 10737418240, // 最多10GB
'x-message-ttl' => 604800000 // 7天过期
])
);问题 3: 队列模式切换
症状: 需要在即时模式和惰性模式之间切换
解决方案:
php
<?php
// 方案一:创建新队列迁移数据
// 方案二:使用策略动态切换
// 通过 RabbitMQ 管理界面或命令行设置策略
// rabbitmqctl set_policy lazy-queue "^lazy\." '{"queue-mode":"lazy"}' --apply-to queues
// PHP 代码中声明时指定模式
$channel->queue_declare(
$queueName,
false,
true,
false,
false,
false,
new AMQPTable(['x-queue-mode' => 'lazy'])
);最佳实践建议
- 适用场景: 消息量大、消费者处理慢、允许较高延迟的场景
- 监控磁盘: 监控磁盘空间使用情况
- 设置限制: 配置队列最大长度和 TTL
- 合理 prefetch: 增大 prefetch 数量提高吞吐
- 批量处理: 使用批量消费提高效率
