Skip to content

临时与持久化队列

概述

在 RabbitMQ 中,队列根据生命周期和持久性可以分为临时队列和持久化队列。理解这两种队列的特性对于设计可靠的消息系统至关重要。

核心原理

队列生命周期

mermaid
graph LR
    subgraph 临时队列
        T1[声明队列] --> T2[消费者连接]
        T2 --> T3[消费消息]
        T3 --> T4[消费者断开]
        T4 --> T5[队列自动删除]
    end
    
    subgraph 持久化队列
        P1[声明队列] --> P2[消息入队]
        P2 --> P3[RabbitMQ 重启]
        P3 --> P4[队列恢复]
        P4 --> P5[继续消费]
    end

队列属性对比

属性临时队列持久化队列
auto-deletetruefalse
durablefalsetrue
exclusive通常 truefalse
生命周期消费者断开后删除永久存在
重启后丢失保留
适用场景临时订阅、RPC核心业务队列

PHP 代码示例

临时队列

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 声明临时队列
// 参数: queue='', passive=false, durable=false, exclusive=true, auto_delete=true
list($queueName, ,) = $channel->queue_declare('', false, false, true, true);

echo "临时队列已创建: {$queueName}\n";

// 绑定到交换机
$exchangeName = 'notifications';
$channel->exchange_declare($exchangeName, 'fanout', false, true, false);
$channel->queue_bind($queueName, $exchangeName);

// 消费消息
$callback = function (AMQPMessage $msg) {
    echo "收到消息: " . $msg->getBody() . "\n";
    $msg->ack();
};

$channel->basic_consume($queueName, '', false, false, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

持久化队列

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 声明持久化队列
$queueName = 'orders-queue';
$channel->queue_declare(
    $queueName,
    false,  // passive
    true,   // durable - 持久化
    false,  // exclusive
    false   // auto_delete
);

echo "持久化队列已创建: {$queueName}\n";

// 发送持久化消息
$message = new AMQPMessage(
    json_encode(['order_id' => 'ORD-001', 'amount' => 299.99]),
    [
        'content_type' => 'application/json',
        'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
    ]
);

$channel->basic_publish($message, '', $queueName);

echo "持久化消息已发送\n";

$channel->close();
$connection->close();

队列声明参数详解

php
<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 完整的队列声明参数
$queueName = 'advanced-queue';

$channel->queue_declare(
    $queueName,     // 队列名称
    false,          // passive: 是否检查队列存在
    true,           // durable: 是否持久化
    false,          // exclusive: 是否独占
    false,          // auto_delete: 是否自动删除
    false,          // nowait: 是否等待服务器响应
    new AMQPTable([
        // 额外参数
        'x-message-ttl' => 60000,           // 消息 TTL (毫秒)
        'x-expires' => 3600000,             // 队列过期时间 (毫秒)
        'x-max-length' => 10000,            // 队列最大消息数
        'x-max-length-bytes' => 10485760,   // 队列最大字节数
        'x-dead-letter-exchange' => 'dlx',  // 死信交换机
        'x-dead-letter-routing-key' => 'dl' // 死信路由键
    ])
);

实际应用场景

1. 临时队列 - RPC 响应队列

php
<?php

class RpcClient
{
    private $channel;
    private $callbackQueue;
    private $response;
    private $corrId;
    private $responses = [];
    
    public function __construct($channel)
    {
        $this->channel = $channel;
        
        // 创建临时回调队列
        list($this->callbackQueue,) = $this->channel->queue_declare(
            '',
            false,
            false,
            true,   // exclusive
            true    // auto_delete
        );
        
        $this->channel->basic_consume(
            $this->callbackQueue,
            '',
            false,
            true,
            false,
            false,
            [$this, 'onResponse']
        );
    }
    
    public function onResponse($msg)
    {
        $corrId = $msg->get('correlation_id');
        if (isset($this->responses[$corrId])) {
            $this->responses[$corrId] = json_decode($msg->getBody(), true);
        }
    }
    
    public function call($queueName, $data, $timeout = 30)
    {
        $corrId = uniqid();
        $this->responses[$corrId] = null;
        
        $message = new AMQPMessage(
            json_encode($data),
            [
                'correlation_id' => $corrId,
                'reply_to' => $this->callbackQueue
            ]
        );
        
        $this->channel->basic_publish($message, '', $queueName);
        
        $startTime = time();
        while ($this->responses[$corrId] === null) {
            if (time() - $startTime > $timeout) {
                throw new Exception('RPC timeout');
            }
            $this->channel->wait(null, false, 1);
        }
        
        $response = $this->responses[$corrId];
        unset($this->responses[$corrId]);
        
        return $response;
    }
}

2. 临时队列 - 事件订阅

php
<?php

class EventSubscriber
{
    private $channel;
    private $queueName;
    
    public function __construct($channel, $exchangeName, $bindingKeys = [])
    {
        $this->channel = $channel;
        
        // 创建临时队列
        list($this->queueName,) = $this->channel->queue_declare(
            '',
            false,
            false,
            true,
            true
        );
        
        // 绑定到交换机
        foreach ($bindingKeys as $key) {
            $this->channel->queue_bind($this->queueName, $exchangeName, $key);
        }
    }
    
    public function subscribe(callable $handler)
    {
        $callback = function ($msg) use ($handler) {
            $handler(json_decode($msg->getBody(), true), $msg);
            $msg->ack();
        };
        
        $this->channel->basic_consume(
            $this->queueName,
            '',
            false,
            false,
            false,
            false,
            $callback
        );
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
    
    public function getQueueName()
    {
        return $this->queueName;
    }
}

3. 持久化队列 - 核心业务队列

php
<?php

class PersistentQueueManager
{
    private $channel;
    
    // 预定义的持久化队列配置
    private $queues = [
        'orders' => [
            'durable' => true,
            'arguments' => [
                'x-message-ttl' => 86400000,        // 24小时
                'x-dead-letter-exchange' => 'dlx',
                'x-max-length' => 100000
            ]
        ],
        'payments' => [
            'durable' => true,
            'arguments' => [
                'x-message-ttl' => 604800000,       // 7天
                'x-dead-letter-exchange' => 'dlx'
            ]
        ],
        'notifications' => [
            'durable' => true,
            'arguments' => [
                'x-message-ttl' => 3600000,         // 1小时
                'x-max-length' => 10000
            ]
        ]
    ];
    
    public function __construct($channel)
    {
        $this->channel = $channel;
    }
    
    public function initializeQueues()
    {
        foreach ($this->queues as $queueName => $config) {
            $this->declareQueue($queueName, $config);
        }
    }
    
    private function declareQueue($queueName, $config)
    {
        $arguments = new AMQPTable($config['arguments'] ?? []);
        
        $this->channel->queue_declare(
            $queueName,
            false,
            $config['durable'] ?? true,
            false,
            false,
            false,
            $arguments
        );
        
        echo "持久化队列已初始化: {$queueName}\n";
    }
    
    public function sendMessage($queueName, $data)
    {
        $message = new AMQPMessage(
            json_encode($data),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
            ]
        );
        
        $this->channel->basic_publish($message, '', $queueName);
    }
}

4. 混合使用场景

php
<?php

class QueueFactory
{
    private $channel;
    
    public function __construct($channel)
    {
        $this->channel = $channel;
    }
    
    public function createTemporaryQueue($exchangeName, $bindingKey = '')
    {
        list($queueName,) = $this->channel->queue_declare(
            '',
            false,
            false,
            true,
            true
        );
        
        $this->channel->queue_bind($queueName, $exchangeName, $bindingKey);
        
        return $queueName;
    }
    
    public function createPersistentQueue($queueName, array $options = [])
    {
        $arguments = [];
        
        if (isset($options['ttl'])) {
            $arguments['x-message-ttl'] = $options['ttl'] * 1000;
        }
        
        if (isset($options['maxLength'])) {
            $arguments['x-max-length'] = $options['maxLength'];
        }
        
        if (isset($options['dlx'])) {
            $arguments['x-dead-letter-exchange'] = $options['dlx'];
        }
        
        $this->channel->queue_declare(
            $queueName,
            false,
            true,
            false,
            false,
            false,
            new AMQPTable($arguments)
        );
        
        return $queueName;
    }
    
    public function createExclusiveQueue($queueName)
    {
        $this->channel->queue_declare(
            $queueName,
            false,
            false,
            true,   // exclusive
            true    // auto_delete
        );
        
        return $queueName;
    }
}

常见问题与解决方案

问题 1: 队列声明冲突

症状: 声明队列时报错 PRECONDITION_FAILED

原因: 队列已存在但参数不匹配

解决方案:

php
<?php

// 方案一: 使用 passive 检查队列是否存在
try {
    $channel->queue_declare($queueName, true, false, false, false);
    echo "队列已存在\n";
} catch (Exception $e) {
    echo "队列不存在\n";
}

// 方案二: 统一队列声明配置
class QueueDeclarer
{
    private $channel;
    private $declaredQueues = [];
    
    public function declareOnce($queueName, $durable = true)
    {
        if (isset($this->declaredQueues[$queueName])) {
            return;
        }
        
        $this->channel->queue_declare($queueName, false, $durable, false, false);
        $this->declaredQueues[$queueName] = true;
    }
}

问题 2: 持久化消息丢失

症状: RabbitMQ 重启后消息丢失

原因: 只设置了队列持久化,未设置消息持久化

解决方案:

php
<?php

// 同时设置队列和消息持久化
$channel->queue_declare($queueName, false, true, false, false);

$message = new AMQPMessage($body, [
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);

问题 3: 临时队列堆积

症状: 临时队列消息堆积,消费者断开后消息丢失

解决方案:

php
<?php

// 对于重要消息,使用持久化队列
// 对于临时订阅,确保消费者正常处理消息

class SafeTemporaryQueue
{
    public function create($channel, $exchangeName, $bindingKey)
    {
        // 使用非自动删除的临时队列
        list($queueName,) = $channel->queue_declare(
            'temp_' . uniqid(),
            false,
            false,
            false,
            true    // auto_delete,但不会立即删除
        );
        
        $channel->queue_bind($queueName, $exchangeName, $bindingKey);
        
        return $queueName;
    }
}

最佳实践建议

  1. 核心业务使用持久化队列: 确保消息不丢失
  2. 临时订阅使用临时队列: 自动清理,避免资源浪费
  3. 统一队列声明: 在应用启动时统一声明所有队列
  4. 消息和队列都要持久化: 两者缺一不可
  5. 合理设置队列参数: TTL、最大长度等

相关链接