Skip to content

惰性队列

概述

惰性队列(Lazy Queue)是 RabbitMQ 3.6.0 引入的一种队列模式。与默认的即时队列不同,惰性队列会将消息尽可能早地写入磁盘,只有在消费者请求时才加载到内存中。

核心原理

即时队列 vs 惰性队列

mermaid
graph TD
    subgraph 即时队列
        M1[消息] --> MQ1[内存队列]
        MQ1 --> D1[磁盘]
        MQ1 --> C1[消费者]
    end
    
    subgraph 惰性队列
        M2[消息] --> D2[磁盘]
        D2 --> MQ2[内存缓存]
        MQ2 --> C2[消费者]
    end
    
    style MQ1 fill:#87CEEB
    style D2 fill:#90EE90

工作模式对比

特性即时队列惰性队列
消息存储优先内存优先磁盘
内存占用
吞吐量较低
适用场景实时处理大量积压
消费延迟较高

触发条件

即时队列在以下情况会将消息写入磁盘:

  • 内存使用超过阈值
  • 队列被标记为惰性
  • 消息设置了持久化且内存压力大

PHP 代码示例

声明惰性队列

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$queueName = 'lazy-queue';

// 声明惰性队列
$channel->queue_declare(
    $queueName,
    false,
    true,
    false,
    false,
    false,
    new AMQPTable([
        'x-queue-mode' => 'lazy'
    ])
);

echo "惰性队列已创建: {$queueName}\n";

$channel->close();
$connection->close();

发送大量消息测试

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$queueName = 'lazy-queue';
$channel->queue_declare(
    $queueName,
    false,
    true,
    false,
    false,
    false,
    new AMQPTable(['x-queue-mode' => 'lazy'])
);

// 发送大量消息
$messageCount = 100000;
$batchSize = 1000;

for ($i = 1; $i <= $messageCount; $i++) {
    $message = new AMQPMessage(
        json_encode([
            'id' => $i,
            'data' => str_repeat('x', 1000),  // 1KB 数据
            'timestamp' => time()
        ]),
        ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
    );
    
    $channel->basic_publish($message, '', $queueName);
    
    if ($i % $batchSize === 0) {
        echo "已发送 {$i} 条消息\n";
    }
}

echo "完成发送 {$messageCount} 条消息\n";

$channel->close();
$connection->close();

消费惰性队列消息

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$queueName = 'lazy-queue';

echo "开始消费惰性队列消息...\n";

$processedCount = 0;
$startTime = microtime(true);

$callback = function (AMQPMessage $msg) use (&$processedCount, $startTime) {
    $data = json_decode($msg->getBody(), true);
    $processedCount++;
    
    if ($processedCount % 1000 === 0) {
        $elapsed = microtime(true) - $startTime;
        $rate = $processedCount / $elapsed;
        echo sprintf(
            "已处理 %d 条消息, 速率: %.2f 条/秒\n",
            $processedCount,
            $rate
        );
    }
    
    $msg->ack();
};

$channel->basic_qos(null, 100, null);  // 较大的 prefetch 提高吞吐
$channel->basic_consume($queueName, '', false, false, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

惰性队列管理类

php
<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class LazyQueueManager
{
    private $channel;
    
    public function __construct($channel)
    {
        $this->channel = $channel;
    }
    
    public function declareLazyQueue($queueName, array $options = [])
    {
        $arguments = ['x-queue-mode' => 'lazy'];
        
        if (isset($options['ttl'])) {
            $arguments['x-message-ttl'] = $options['ttl'];
        }
        
        if (isset($options['maxLength'])) {
            $arguments['x-max-length'] = $options['maxLength'];
        }
        
        if (isset($options['dlx'])) {
            $arguments['x-dead-letter-exchange'] = $options['dlx'];
        }
        
        $this->channel->queue_declare(
            $queueName,
            false,
            true,
            false,
            false,
            false,
            new AMQPTable($arguments)
        );
        
        return $queueName;
    }
    
    public function getQueueStats($queueName)
    {
        list(, $messageCount, $consumerCount) = $this->channel->queue_declare(
            $queueName,
            true
        );
        
        return [
            'queue_name' => $queueName,
            'message_count' => $messageCount,
            'consumer_count' => $consumerCount
        ];
    }
    
    public function purgeQueue($queueName)
    {
        $this->channel->queue_purge($queueName);
        echo "队列 {$queueName} 已清空\n";
    }
}

实际应用场景

1. 日志收集系统

php
<?php

class LogCollector
{
    private $channel;
    private $queueName = 'logs_lazy';
    
    public function __construct($channel)
    {
        $this->channel = $channel;
        $this->setupQueue();
    }
    
    private function setupQueue()
    {
        $this->channel->queue_declare(
            $this->queueName,
            false,
            true,
            false,
            false,
            false,
            new AMQPTable([
                'x-queue-mode' => 'lazy',
                'x-message-ttl' => 604800000,  // 7天
                'x-max-length' => 10000000     // 最多1000万条
            ])
        );
    }
    
    public function collectLog($level, $service, $message, $context = [])
    {
        $log = new AMQPMessage(
            json_encode([
                'level' => $level,
                'service' => $service,
                'message' => $message,
                'context' => $context,
                'timestamp' => time(),
                'hostname' => gethostname()
            ]),
            ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
        );
        
        $this->channel->basic_publish($log, '', $this->queueName);
    }
    
    public function processLogs(callable $handler, $batchSize = 100)
    {
        $this->channel->basic_qos(null, $batchSize, null);
        
        $callback = function ($msg) use ($handler) {
            $log = json_decode($msg->getBody(), true);
            $handler($log);
            $msg->ack();
        };
        
        $this->channel->basic_consume(
            $this->queueName,
            '',
            false,
            false,
            false,
            false,
            $callback
        );
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
}

2. 批量数据处理

php
<?php

class BatchDataProcessor
{
    private $channel;
    private $queueName = 'batch_data';
    
    public function __construct($channel)
    {
        $this->channel = $channel;
        $this->setupQueue();
    }
    
    private function setupQueue()
    {
        $this->channel->queue_declare(
            $this->queueName,
            false,
            true,
            false,
            false,
            false,
            new AMQPTable([
                'x-queue-mode' => 'lazy',
                'x-max-length-bytes' => 10737418240  // 10GB
            ])
        );
    }
    
    public function enqueueData(array $data)
    {
        $message = new AMQPMessage(
            json_encode($data),
            ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
        );
        
        $this->channel->basic_publish($message, '', $this->queueName);
    }
    
    public function processBatch($batchSize = 1000)
    {
        $batch = [];
        $processedCount = 0;
        
        while (true) {
            $message = $this->channel->basic_get($this->queueName);
            
            if (!$message) {
                if (!empty($batch)) {
                    $this->processBatchData($batch);
                    $processedCount += count($batch);
                }
                break;
            }
            
            $batch[] = json_decode($message->getBody(), true);
            $this->channel->basic_ack($message->getDeliveryTag());
            
            if (count($batch) >= $batchSize) {
                $this->processBatchData($batch);
                $processedCount += count($batch);
                $batch = [];
            }
        }
        
        return $processedCount;
    }
    
    private function processBatchData(array $batch)
    {
        echo "处理批量数据: " . count($batch) . " 条\n";
        // 批量处理逻辑
    }
}

3. 离线任务队列

php
<?php

class OfflineTaskQueue
{
    private $channel;
    private $queueName = 'offline_tasks';
    
    public function __construct($channel)
    {
        $this->channel = $channel;
        $this->setupQueue();
    }
    
    private function setupQueue()
    {
        $this->channel->queue_declare(
            $this->queueName,
            false,
            true,
            false,
            false,
            false,
            new AMQPTable([
                'x-queue-mode' => 'lazy',
                'x-message-ttl' => 2592000000,  // 30天
            ])
        );
    }
    
    public function scheduleTask($taskType, $data, $scheduledTime = null)
    {
        $message = new AMQPMessage(
            json_encode([
                'task_type' => $taskType,
                'data' => $data,
                'scheduled_time' => $scheduledTime,
                'created_at' => time()
            ]),
            ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
        );
        
        $this->channel->basic_publish($message, '', $this->queueName);
    }
    
    public function processTasks(callable $handler)
    {
        $this->channel->basic_qos(null, 10, null);
        
        $callback = function ($msg) use ($handler) {
            $task = json_decode($msg->getBody(), true);
            
            // 检查是否到达计划执行时间
            if ($task['scheduled_time'] && $task['scheduled_time'] > time()) {
                // 重新入队
                $msg->nack(true);
                return;
            }
            
            try {
                $handler($task);
                $msg->ack();
            } catch (Exception $e) {
                error_log("任务执行失败: " . $e->getMessage());
                $msg->nack(true);
            }
        };
        
        $this->channel->basic_consume(
            $this->queueName,
            '',
            false,
            false,
            false,
            false,
            $callback
        );
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
}

常见问题与解决方案

问题 1: 消费速度慢

症状: 惰性队列消费速度明显慢于即时队列

原因: 消息需要从磁盘加载到内存

解决方案:

php
<?php

// 增加 prefetch 数量
$channel->basic_qos(null, 100, null);

// 使用批量消费
$batchSize = 100;
$batch = [];

while (true) {
    $message = $channel->basic_get($queueName);
    if (!$message) break;
    
    $batch[] = json_decode($message->getBody(), true);
    $channel->basic_ack($message->getDeliveryTag());
    
    if (count($batch) >= $batchSize) {
        processBatch($batch);
        $batch = [];
    }
}

问题 2: 磁盘空间不足

症状: 惰性队列占用大量磁盘空间

解决方案:

php
<?php

// 设置队列最大长度或 TTL
$channel->queue_declare(
    $queueName,
    false,
    true,
    false,
    false,
    false,
    new AMQPTable([
        'x-queue-mode' => 'lazy',
        'x-max-length' => 1000000,           // 最多100万条
        'x-max-length-bytes' => 10737418240, // 最多10GB
        'x-message-ttl' => 604800000         // 7天过期
    ])
);

问题 3: 队列模式切换

症状: 需要在即时模式和惰性模式之间切换

解决方案:

php
<?php

// 方案一:创建新队列迁移数据
// 方案二:使用策略动态切换

// 通过 RabbitMQ 管理界面或命令行设置策略
// rabbitmqctl set_policy lazy-queue "^lazy\." '{"queue-mode":"lazy"}' --apply-to queues

// PHP 代码中声明时指定模式
$channel->queue_declare(
    $queueName,
    false,
    true,
    false,
    false,
    false,
    new AMQPTable(['x-queue-mode' => 'lazy'])
);

最佳实践建议

  1. 适用场景: 消息量大、消费者处理慢、允许较高延迟的场景
  2. 监控磁盘: 监控磁盘空间使用情况
  3. 设置限制: 配置队列最大长度和 TTL
  4. 合理 prefetch: 增大 prefetch 数量提高吞吐
  5. 批量处理: 使用批量消费提高效率

相关链接