Skip to content

消息持久化

概述

消息持久化是确保消息在 RabbitMQ 服务器重启后不丢失的重要机制。要实现完整的消息持久化,需要同时满足三个条件:交换机持久化、队列持久化和消息持久化。

核心原理

持久化三要素

mermaid
graph TD
    subgraph 持久化三要素
        E[交换机持久化] --> |durable=true| S[存储到磁盘]
        Q[队列持久化] --> |durable=true| S
        M[消息持久化] --> |delivery_mode=2| S
    end
    
    subgraph 非持久化风险
        E2[交换机非持久化] --> |重启后丢失| L1[元数据丢失]
        Q2[队列非持久化] --> |重启后丢失| L2[队列和消息丢失]
        M2[消息非持久化] --> |重启后丢失| L3[消息丢失]
    end
    
    style S fill:#90EE90
    style L1 fill:#FF6B6B
    style L2 fill:#FF6B6B
    style L3 fill:#FF6B6B

持久化流程

mermaid
sequenceDiagram
    participant P as 生产者
    participant B as Broker
    participant D as 磁盘
    
    P->>B: 发送持久化消息
    B->>D: 写入消息到磁盘
    D-->>B: 确认写入
    B-->>P: 确认接收
    
    Note over B,D: 消息已持久化
    
    B->>C: 投递给消费者
    C->>B: 确认消费
    B->>D: 删除消息

持久化级别

级别说明性能影响
非持久化消息只在内存中无影响
持久化消息写入磁盘有影响
事务确保消息写入磁盘较大影响
发布确认异步确认消息持久化中等影响

PHP 代码示例

完整持久化配置

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 1. 声明持久化交换机
$exchangeName = 'persistent_exchange';
$channel->exchange_declare(
    $exchangeName,
    AMQPExchangeType::DIRECT,
    false,
    true,   // durable = true,持久化
    false
);

// 2. 声明持久化队列
$queueName = 'persistent_queue';
$channel->queue_declare(
    $queueName,
    false,
    true,   // durable = true,持久化
    false,
    false
);

// 3. 绑定队列到交换机
$channel->queue_bind($queueName, $exchangeName, 'routing.key');

// 4. 发送持久化消息
$messageBody = json_encode([
    'order_id' => 'ORD-001',
    'amount' => 299.99,
    'timestamp' => time()
]);

$message = new AMQPMessage(
    $messageBody,
    [
        'content_type' => 'application/json',
        'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT  // 持久化消息
    ]
);

$channel->basic_publish($message, $exchangeName, 'routing.key');

echo "持久化消息已发送\n";

$channel->close();
$connection->close();

持久化工具类

php
<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

class PersistentMessenger
{
    private $channel;
    
    public function __construct($channel)
    {
        $this->channel = $channel;
    }
    
    public function declarePersistentExchange($exchangeName, $type = AMQPExchangeType::DIRECT)
    {
        $this->channel->exchange_declare(
            $exchangeName,
            $type,
            false,
            true,   // durable
            false
        );
        
        return $exchangeName;
    }
    
    public function declarePersistentQueue($queueName, array $arguments = [])
    {
        $this->channel->queue_declare(
            $queueName,
            false,
            true,   // durable
            false,
            false,
            false,
            new \PhpAmqpLib\Wire\AMQPTable($arguments)
        );
        
        return $queueName;
    }
    
    public function sendPersistentMessage($exchangeName, $routingKey, $data)
    {
        $message = new AMQPMessage(
            json_encode($data),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'timestamp' => time()
            ]
        );
        
        $this->channel->basic_publish($message, $exchangeName, $routingKey);
    }
    
    public function sendTransientMessage($exchangeName, $routingKey, $data)
    {
        $message = new AMQPMessage(
            json_encode($data),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT
            ]
        );
        
        $this->channel->basic_publish($message, $exchangeName, $routingKey);
    }
}

带发布确认的持久化

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 启用发布确认模式
$channel->confirm_select();

$queueName = 'confirmed_queue';
$channel->queue_declare($queueName, false, true, false, false);

$message = new AMQPMessage(
    json_encode(['data' => 'important']),
    ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);

$channel->basic_publish($message, '', $queueName);

// 等待确认
try {
    $channel->wait_for_pending_acks();
    echo "消息已确认持久化\n";
} catch (Exception $e) {
    echo "消息持久化失败: " . $e->getMessage() . "\n";
}

$channel->close();
$connection->close();

异步确认持久化

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->confirm_select();

$queueName = 'async_confirmed_queue';
$channel->queue_declare($queueName, false, true, false, false);

// 设置异步确认回调
$channel->set_ack_handler(function ($deliveryTag) {
    echo "消息 {$deliveryTag} 已确认\n";
});

$channel->set_nack_handler(function ($deliveryTag) {
    echo "消息 {$deliveryTag} 被拒绝\n";
});

// 发送多条消息
for ($i = 1; $i <= 10; $i++) {
    $message = new AMQPMessage(
        json_encode(['id' => $i]),
        ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
    );
    
    $channel->basic_publish($message, '', $queueName);
}

// 等待所有确认
$channel->wait_for_pending_acks();

$channel->close();
$connection->close();

实际应用场景

1. 订单系统持久化

php
<?php

class OrderPersistenceService
{
    private $messenger;
    private $exchangeName = 'orders';
    private $queueName = 'order_processing';
    
    public function __construct($channel)
    {
        $this->messenger = new PersistentMessenger($channel);
        $this->setupInfrastructure();
    }
    
    private function setupInfrastructure()
    {
        // 声明持久化交换机
        $this->messenger->declarePersistentExchange($this->exchangeName, 'topic');
        
        // 声明持久化队列
        $this->messenger->declarePersistentQueue($this->queueName, [
            'x-message-ttl' => 86400000,  // 24小时
            'x-dead-letter-exchange' => 'orders_dlx'
        ]);
        
        // 绑定
        $this->messenger->bindQueue($this->queueName, $this->exchangeName, 'order.#');
    }
    
    public function submitOrder($order)
    {
        $this->messenger->sendPersistentMessage(
            $this->exchangeName,
            'order.created',
            [
                'order_id' => $order['id'],
                'customer_id' => $order['customer_id'],
                'items' => $order['items'],
                'total' => $order['total'],
                'created_at' => time()
            ]
        );
    }
    
    public function updateOrderStatus($orderId, $status)
    {
        $this->messenger->sendPersistentMessage(
            $this->exchangeName,
            'order.status_changed',
            [
                'order_id' => $orderId,
                'status' => $status,
                'updated_at' => time()
            ]
        );
    }
}

2. 金融交易持久化

php
<?php

class TransactionPersistenceService
{
    private $channel;
    private $queueName = 'transactions';
    
    public function __construct($channel)
    {
        $this->channel = $channel;
        $this->setupQueue();
    }
    
    private function setupQueue()
    {
        // 启用发布确认
        $this->channel->confirm_select();
        
        // 声明持久化队列
        $this->channel->queue_declare(
            $this->queueName,
            false,
            true,
            false,
            false
        );
    }
    
    public function recordTransaction($transaction)
    {
        $message = new AMQPMessage(
            json_encode([
                'transaction_id' => $transaction['id'],
                'type' => $transaction['type'],
                'amount' => $transaction['amount'],
                'from_account' => $transaction['from'],
                'to_account' => $transaction['to'],
                'timestamp' => time()
            ]),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'message_id' => $transaction['id'],
                'timestamp' => time()
            ]
        );
        
        $this->channel->basic_publish($message, '', $this->queueName);
        
        // 同步等待确认
        $this->channel->wait_for_pending_acks();
        
        return true;
    }
    
    public function recordTransactionAsync($transaction, callable $onSuccess, callable $onFailure)
    {
        $message = new AMQPMessage(
            json_encode($transaction),
            ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
        );
        
        $this->channel->basic_publish($message, '', $this->queueName);
        
        // 异步确认
        $this->channel->set_ack_handler(function () use ($onSuccess, $transaction) {
            $onSuccess($transaction);
        });
        
        $this->channel->set_nack_handler(function () use ($onFailure, $transaction) {
            $onFailure($transaction);
        });
    }
}

3. 日志持久化

php
<?php

class LogPersistenceService
{
    private $channel;
    private $exchangeName = 'logs';
    
    public function __construct($channel)
    {
        $this->channel = $channel;
        $this->setupExchange();
    }
    
    private function setupExchange()
    {
        $this->channel->exchange_declare(
            $this->exchangeName,
            'topic',
            false,
            true,   // 持久化
            false
        );
    }
    
    public function log($level, $service, $message, $context = [])
    {
        $routingKey = "{$service}.{$level}";
        
        $logMessage = new AMQPMessage(
            json_encode([
                'level' => $level,
                'service' => $service,
                'message' => $message,
                'context' => $context,
                'hostname' => gethostname(),
                'timestamp' => time()
            ]),
            [
                'content_type' => 'application/json',
                'delivery_mode' => $level === 'error' 
                    ? AMQPMessage::DELIVERY_MODE_PERSISTENT 
                    : AMQPMessage::DELIVERY_MODE_NON_PERSISTENT
            ]
        );
        
        $this->channel->basic_publish($logMessage, $this->exchangeName, $routingKey);
    }
    
    public function logError($service, $message, $context = [])
    {
        $this->log('error', $service, $message, $context);
    }
    
    public function logInfo($service, $message, $context = [])
    {
        $this->log('info', $service, $message, $context);
    }
}

常见问题与解决方案

问题 1: 消息仍然丢失

症状: 配置了持久化但消息仍然丢失

原因: 只配置了消息持久化,未配置队列或交换机持久化

解决方案:

php
<?php

// 三者都必须持久化
// 1. 交换机持久化
$channel->exchange_declare($exchange, $type, false, true, false);

// 2. 队列持久化
$channel->queue_declare($queue, false, true, false, false);

// 3. 消息持久化
$message = new AMQPMessage($body, [
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);

问题 2: 性能下降

症状: 启用持久化后性能明显下降

解决方案:

php
<?php

// 使用批量确认提高性能
$channel->confirm_select();

for ($i = 0; $i < 100; $i++) {
    $channel->basic_publish($message, '', $queue);
}

// 批量等待确认
$channel->wait_for_pending_acks();

// 或者使用异步确认
$channel->set_ack_handler($ackCallback);
$channel->set_nack_handler($nackCallback);

问题 3: 磁盘空间不足

症状: 持久化消息占用大量磁盘空间

解决方案:

php
<?php

// 设置队列 TTL 和最大长度
$channel->queue_declare(
    $queueName,
    false,
    true,
    false,
    false,
    false,
    new AMQPTable([
        'x-message-ttl' => 86400000,      // 24小时后过期
        'x-max-length' => 100000,          // 最多10万条消息
        'x-max-length-bytes' => 1073741824 // 最多1GB
    ])
);

最佳实践建议

  1. 三要素齐全: 交换机、队列、消息都要持久化
  2. 使用发布确认: 确保消息真正持久化
  3. 合理设置 TTL: 避免磁盘空间耗尽
  4. 监控磁盘: 监控磁盘使用情况
  5. 批量确认: 提高持久化性能

相关链接