Skip to content

RabbitMQ 备份与恢复

概述

备份是保障数据安全的重要手段。RabbitMQ 的备份包括消息数据、配置信息和元数据。完善的备份策略可以在故障发生时快速恢复服务,最小化业务影响。本文将详细介绍 RabbitMQ 的备份与恢复策略、方法和最佳实践。

核心知识点

备份类型

类型内容频率恢复时间
全量备份所有数据和配置每天较长
增量备份变化的数据每小时中等
配置备份用户、权限、策略每次变更较短
元数据备份队列、交换机定义每小时较短

备份内容

内容位置说明
消息数据/var/lib/rabbitmq/mnesia持久化消息
配置文件/etc/rabbitmqrabbitmq.conf, enabled_plugins
定义文件HTTP API 导出队列、交换机、绑定
用户权限HTTP API 导出用户、权限、策略

恢复策略

场景恢复方法RTO
单节点故障节点重建分钟级
数据损坏数据恢复小时级
误删资源定义恢复分钟级
灾难恢复全量恢复小时级

配置示例

PHP 备份管理类

php
<?php

class RabbitMQBackupManager
{
    private $apiClient;
    private $config;
    private $logger;
    
    public function __construct($apiClient, $config = [], $logger = null)
    {
        $this->apiClient = $apiClient;
        $this->config = array_merge([
            'backup_dir' => '/var/backups/rabbitmq',
            'retention_days' => 30,
            'compress' => true,
        ], $config);
        $this->logger = $logger;
    }
    
    public function createFullBackup()
    {
        $timestamp = date('Ymd_His');
        $backupDir = $this->config['backup_dir'] . '/full_' . $timestamp;
        
        $this->ensureDir($backupDir);
        
        $results = [
            'timestamp' => $timestamp,
            'backup_dir' => $backupDir,
            'definitions' => $this->exportDefinitions($backupDir),
            'users' => $this->exportUsers($backupDir),
            'permissions' => $this->exportPermissions($backupDir),
            'policies' => $this->exportPolicies($backupDir),
            'config' => $this->backupConfig($backupDir),
            'data' => $this->backupData($backupDir),
        ];
        
        $results['manifest'] = $this->createManifest($backupDir, $results);
        
        if ($this->config['compress']) {
            $results['archive'] = $this->compressBackup($backupDir);
        }
        
        $this->cleanupOldBackups();
        
        $this->log('Full backup completed', $results);
        
        return $results;
    }
    
    public function exportDefinitions($backupDir)
    {
        $response = $this->apiClient->request('GET', 'definitions');
        
        if ($response['code'] !== 200) {
            throw new Exception('Failed to export definitions');
        }
        
        $file = $backupDir . '/definitions.json';
        file_put_contents($file, json_encode($response['data'], JSON_PRETTY_PRINT));
        
        return [
            'file' => $file,
            'size' => filesize($file),
            'queues' => count($response['data']['queues'] ?? []),
            'exchanges' => count($response['data']['exchanges'] ?? []),
            'bindings' => count($response['data']['bindings'] ?? []),
        ];
    }
    
    public function exportUsers($backupDir)
    {
        $response = $this->apiClient->request('GET', 'users');
        
        if ($response['code'] !== 200) {
            throw new Exception('Failed to export users');
        }
        
        $users = [];
        foreach ($response['data'] as $user) {
            $users[] = [
                'name' => $user['name'],
                'tags' => $user['tags'],
                'password_hash' => $user['password_hash'] ?? null,
                'hashing_algorithm' => $user['hashing_algorithm'] ?? 'rabbit_password_hashing_sha256',
            ];
        }
        
        $file = $backupDir . '/users.json';
        file_put_contents($file, json_encode($users, JSON_PRETTY_PRINT));
        
        return [
            'file' => $file,
            'count' => count($users),
        ];
    }
    
    public function exportPermissions($backupDir)
    {
        $response = $this->apiClient->request('GET', 'permissions');
        
        if ($response['code'] !== 200) {
            throw new Exception('Failed to export permissions');
        }
        
        $file = $backupDir . '/permissions.json';
        file_put_contents($file, json_encode($response['data'], JSON_PRETTY_PRINT));
        
        return [
            'file' => $file,
            'count' => count($response['data']),
        ];
    }
    
    public function exportPolicies($backupDir)
    {
        $response = $this->apiClient->request('GET', 'policies');
        
        if ($response['code'] !== 200) {
            throw new Exception('Failed to export policies');
        }
        
        $file = $backupDir . '/policies.json';
        file_put_contents($file, json_encode($response['data'], JSON_PRETTY_PRINT));
        
        return [
            'file' => $file,
            'count' => count($response['data']),
        ];
    }
    
    public function backupConfig($backupDir)
    {
        $configDir = $backupDir . '/config';
        $this->ensureDir($configDir);
        
        $configFiles = [
            '/etc/rabbitmq/rabbitmq.conf',
            '/etc/rabbitmq/enabled_plugins',
            '/etc/rabbitmq/rabbitmq-env.conf',
            '/etc/rabbitmq/advanced.config',
        ];
        
        $backedUp = [];
        
        foreach ($configFiles as $file) {
            if (file_exists($file)) {
                $dest = $configDir . '/' . basename($file);
                copy($file, $dest);
                $backedUp[] = basename($file);
            }
        }
        
        return [
            'dir' => $configDir,
            'files' => $backedUp,
        ];
    }
    
    public function backupData($backupDir)
    {
        $dataDir = $backupDir . '/mnesia';
        $this->ensureDir($dataDir);
        
        $mnesiaDir = '/var/lib/rabbitmq/mnesia';
        
        if (is_dir($mnesiaDir)) {
            $command = sprintf(
                'cp -r %s/* %s/',
                escapeshellarg($mnesiaDir),
                escapeshellarg($dataDir)
            );
            
            exec($command, $output, $returnCode);
            
            return [
                'source' => $mnesiaDir,
                'dest' => $dataDir,
                'success' => $returnCode === 0,
            ];
        }
        
        return ['success' => false, 'message' => 'Mnesia directory not found'];
    }
    
    public function createManifest($backupDir, $results)
    {
        $manifest = [
            'version' => '1.0',
            'created_at' => date('Y-m-d H:i:s'),
            'hostname' => gethostname(),
            'type' => 'full',
            'contents' => [
                'definitions' => $results['definitions']['file'] ?? null,
                'users' => $results['users']['file'] ?? null,
                'permissions' => $results['permissions']['file'] ?? null,
                'policies' => $results['policies']['file'] ?? null,
                'config' => $results['config']['dir'] ?? null,
                'data' => $results['data']['dest'] ?? null,
            ],
            'statistics' => [
                'queues' => $results['definitions']['queues'] ?? 0,
                'exchanges' => $results['definitions']['exchanges'] ?? 0,
                'users' => $results['users']['count'] ?? 0,
            ],
        ];
        
        $file = $backupDir . '/manifest.json';
        file_put_contents($file, json_encode($manifest, JSON_PRETTY_PRINT));
        
        return $file;
    }
    
    public function compressBackup($backupDir)
    {
        $archive = $backupDir . '.tar.gz';
        
        $command = sprintf(
            'tar -czf %s -C %s .',
            escapeshellarg($archive),
            escapeshellarg($backupDir)
        );
        
        exec($command, $output, $returnCode);
        
        if ($returnCode === 0) {
            exec('rm -rf ' . escapeshellarg($backupDir));
            
            return [
                'file' => $archive,
                'size' => filesize($archive),
            ];
        }
        
        return ['success' => false];
    }
    
    public function restoreFromBackup($backupFile, $options = [])
    {
        $options = array_merge([
            'restore_definitions' => true,
            'restore_users' => true,
            'restore_permissions' => true,
            'restore_policies' => true,
            'restore_config' => false,
            'restore_data' => false,
        ], $options);
        
        $extractDir = $this->extractBackup($backupFile);
        
        $results = [
            'backup_file' => $backupFile,
            'extract_dir' => $extractDir,
        ];
        
        if ($options['restore_definitions']) {
            $results['definitions'] = $this->restoreDefinitions($extractDir);
        }
        
        if ($options['restore_users']) {
            $results['users'] = $this->restoreUsers($extractDir);
        }
        
        if ($options['restore_permissions']) {
            $results['permissions'] = $this->restorePermissions($extractDir);
        }
        
        if ($options['restore_policies']) {
            $results['policies'] = $this->restorePolicies($extractDir);
        }
        
        if ($options['restore_config']) {
            $results['config'] = $this->restoreConfig($extractDir);
        }
        
        if ($options['restore_data']) {
            $results['data'] = $this->restoreData($extractDir);
        }
        
        $this->log('Restore completed', $results);
        
        return $results;
    }
    
    private function extractBackup($backupFile)
    {
        $extractDir = sys_get_temp_dir() . '/rabbitmq_restore_' . time();
        $this->ensureDir($extractDir);
        
        $command = sprintf(
            'tar -xzf %s -C %s',
            escapeshellarg($backupFile),
            escapeshellarg($extractDir)
        );
        
        exec($command, $output, $returnCode);
        
        if ($returnCode !== 0) {
            throw new Exception('Failed to extract backup');
        }
        
        return $extractDir;
    }
    
    private function restoreDefinitions($extractDir)
    {
        $file = $extractDir . '/definitions.json';
        
        if (!file_exists($file)) {
            return ['success' => false, 'message' => 'Definitions file not found'];
        }
        
        $definitions = file_get_contents($file);
        
        $response = $this->apiClient->request('POST', 'definitions', json_decode($definitions, true));
        
        return [
            'success' => $response['code'] === 204 || $response['code'] === 201,
            'http_code' => $response['code'],
        ];
    }
    
    private function restoreUsers($extractDir)
    {
        $file = $extractDir . '/users.json';
        
        if (!file_exists($file)) {
            return ['success' => false, 'message' => 'Users file not found'];
        }
        
        $users = json_decode(file_get_contents($file), true);
        $results = [];
        
        foreach ($users as $user) {
            $response = $this->apiClient->request('PUT', "users/{$user['name']}", [
                'password_hash' => $user['password_hash'],
                'tags' => $user['tags'],
                'hashing_algorithm' => $user['hashing_algorithm'],
            ]);
            
            $results[$user['name']] = $response['code'] === 204 || $response['code'] === 201;
        }
        
        return ['success' => !in_array(false, $results), 'results' => $results];
    }
    
    private function restorePermissions($extractDir)
    {
        $file = $extractDir . '/permissions.json';
        
        if (!file_exists($file)) {
            return ['success' => false, 'message' => 'Permissions file not found'];
        }
        
        $permissions = json_decode(file_get_contents($file), true);
        $results = [];
        
        foreach ($permissions as $perm) {
            $response = $this->apiClient->request(
                'PUT',
                "permissions/{$perm['vhost']}/{$perm['user']}",
                [
                    'configure' => $perm['configure'],
                    'write' => $perm['write'],
                    'read' => $perm['read'],
                ]
            );
            
            $results["{$perm['vhost']}/{$perm['user']}"] = $response['code'] === 204 || $response['code'] === 201;
        }
        
        return ['success' => !in_array(false, $results), 'results' => $results];
    }
    
    private function restorePolicies($extractDir)
    {
        $file = $extractDir . '/policies.json';
        
        if (!file_exists($file)) {
            return ['success' => false, 'message' => 'Policies file not found'];
        }
        
        $policies = json_decode(file_get_contents($file), true);
        $results = [];
        
        foreach ($policies as $policy) {
            $response = $this->apiClient->request(
                'PUT',
                "policies/{$policy['vhost']}/{$policy['name']}",
                [
                    'pattern' => $policy['pattern'],
                    'definition' => $policy['definition'],
                    'priority' => $policy['priority'] ?? 0,
                    'apply-to' => $policy['apply-to'] ?? 'all',
                ]
            );
            
            $results[$policy['name']] = $response['code'] === 204 || $response['code'] === 201;
        }
        
        return ['success' => !in_array(false, $results), 'results' => $results];
    }
    
    private function restoreConfig($extractDir)
    {
        $configDir = $extractDir . '/config';
        
        if (!is_dir($configDir)) {
            return ['success' => false, 'message' => 'Config directory not found'];
        }
        
        $results = [];
        
        $iterator = new DirectoryIterator($configDir);
        foreach ($iterator as $file) {
            if ($file->isFile()) {
                $dest = '/etc/rabbitmq/' . $file->getFilename();
                copy($file->getPathname(), $dest);
                $results[$file->getFilename()] = true;
            }
        }
        
        return ['success' => true, 'files' => $results];
    }
    
    private function restoreData($extractDir)
    {
        $dataDir = $extractDir . '/mnesia';
        
        if (!is_dir($dataDir)) {
            return ['success' => false, 'message' => 'Mnesia directory not found'];
        }
        
        exec('rabbitmqctl stop_app');
        
        $command = sprintf(
            'cp -r %s/* /var/lib/rabbitmq/mnesia/',
            escapeshellarg($dataDir)
        );
        
        exec($command, $output, $returnCode);
        
        exec('rabbitmqctl start_app');
        
        return ['success' => $returnCode === 0];
    }
    
    public function listBackups()
    {
        $backups = [];
        $backupDir = $this->config['backup_dir'];
        
        if (!is_dir($backupDir)) {
            return $backups;
        }
        
        $iterator = new DirectoryIterator($backupDir);
        
        foreach ($iterator as $file) {
            if ($file->isFile() && preg_match('/\.tar\.gz$/', $file->getFilename())) {
                $manifest = $this->readManifest($file->getPathname());
                
                $backups[] = [
                    'file' => $file->getFilename(),
                    'path' => $file->getPathname(),
                    'size' => $file->getSize(),
                    'modified' => date('Y-m-d H:i:s', $file->getMTime()),
                    'manifest' => $manifest,
                ];
            }
        }
        
        usort($backups, function($a, $b) {
            return strcmp($b['modified'], $a['modified']);
        });
        
        return $backups;
    }
    
    private function readManifest($backupFile)
    {
        $command = sprintf(
            'tar -xzf %s -O manifest.json',
            escapeshellarg($backupFile)
        );
        
        $output = [];
        exec($command, $output);
        
        $content = implode("\n", $output);
        
        return json_decode($content, true);
    }
    
    private function cleanupOldBackups()
    {
        $retentionDays = $this->config['retention_days'];
        $threshold = time() - ($retentionDays * 86400);
        
        $backups = $this->listBackups();
        
        foreach ($backups as $backup) {
            if (filemtime($backup['path']) < $threshold) {
                unlink($backup['path']);
                $this->log('Deleted old backup: ' . $backup['file']);
            }
        }
    }
    
    private function ensureDir($dir)
    {
        if (!is_dir($dir)) {
            mkdir($dir, 0755, true);
        }
    }
    
    private function log($message, $context = [])
    {
        if ($this->logger) {
            $this->logger->info($message, $context);
        }
    }
}

Shell 备份脚本

bash
#!/bin/bash
/opt/rabbitmq/scripts/backup.sh

BACKUP_DIR="/var/backups/rabbitmq"
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
BACKUP_NAME="rabbitmq_backup_${TIMESTAMP}"
BACKUP_PATH="${BACKUP_DIR}/${BACKUP_NAME}"

mkdir -p "${BACKUP_PATH}"

echo "Starting RabbitMQ backup at ${TIMESTAMP}"

echo "Exporting definitions..."
curl -s -u admin:admin123 \
    "http://localhost:15672/api/definitions" \
> "${BACKUP_PATH}/definitions.json"

echo "Exporting users..."
curl -s -u admin:admin123 \
    "http://localhost:15672/api/users" \
> "${BACKUP_PATH}/users.json"

echo "Exporting permissions..."
curl -s -u admin:admin123 \
    "http://localhost:15672/api/permissions" \
> "${BACKUP_PATH}/permissions.json"

echo "Exporting policies..."
curl -s -u admin:admin123 \
    "http://localhost:15672/api/policies" \
> "${BACKUP_PATH}/policies.json"

echo "Backing up config files..."
mkdir -p "${BACKUP_PATH}/config"
cp /etc/rabbitmq/rabbitmq.conf "${BACKUP_PATH}/config/" 2>/dev/null
cp /etc/rabbitmq/enabled_plugins "${BACKUP_PATH}/config/" 2>/dev/null

echo "Creating manifest..."
cat > "${BACKUP_PATH}/manifest.json" << EOF
{
    "version": "1.0",
    "created_at": "$(date -Iseconds)",
    "hostname": "$(hostname)",
    "type": "full"
}
EOF

echo "Compressing backup..."
cd "${BACKUP_DIR}"
tar -czf "${BACKUP_NAME}.tar.gz" "${BACKUP_NAME}"
rm -rf "${BACKUP_NAME}"

echo "Cleaning old backups..."
find "${BACKUP_DIR}" -name "*.tar.gz" -mtime +30 -delete

echo "Backup completed: ${BACKUP_NAME}.tar.gz"
echo "Size: $(du -h "${BACKUP_NAME}.tar.gz" | cut -f1)"

恢复脚本

bash
#!/bin/bash
/opt/rabbitmq/scripts/restore.sh

BACKUP_FILE=$1

if [ -z "$BACKUP_FILE" ]; then
    echo "Usage: $0 <backup_file.tar.gz>"
    exit 1
fi

EXTRACT_DIR=$(mktemp -d)

echo "Extracting backup..."
tar -xzf "${BACKUP_FILE}" -C "${EXTRACT_DIR}"

BACKUP_DIR=$(find "${EXTRACT_DIR}" -mindepth 1 -maxdepth 1 -type d)

echo "Restoring definitions..."
curl -s -u admin:admin123 \
    -X POST \
    -H "Content-Type: application/json" \
    -d @"${BACKUP_DIR}/definitions.json" \
    "http://localhost:15672/api/definitions"

echo "Restoring users..."
for user in $(jq -r '.[].name' "${BACKUP_DIR}/users.json"); do
    user_data=$(jq ".[] | select(.name == \"${user}\")" "${BACKUP_DIR}/users.json")
    curl -s -u admin:admin123 \
        -X PUT \
        -H "Content-Type: application/json" \
        -d "${user_data}" \
        "http://localhost:15672/api/users/${user}"
done

echo "Restoring permissions..."
for perm in $(jq -r '.[] | @base64' "${BACKUP_DIR}/permissions.json"); do
    perm_data=$(echo "${perm}" | base64 -d)
    vhost=$(echo "${perm_data}" | jq -r '.vhost')
    user=$(echo "${perm_data}" | jq -r '.user')
    curl -s -u admin:admin123 \
        -X PUT \
        -H "Content-Type: application/json" \
        -d "${perm_data}" \
        "http://localhost:15672/api/permissions/${vhost}/${user}"
done

echo "Restoring policies..."
for policy in $(jq -r '.[].name' "${BACKUP_DIR}/policies.json"); do
    policy_data=$(jq ".[] | select(.name == \"${policy}\")" "${BACKUP_DIR}/policies.json")
    vhost=$(echo "${policy_data}" | jq -r '.vhost')
    curl -s -u admin:admin123 \
        -X PUT \
        -H "Content-Type: application/json" \
        -d "${policy_data}" \
        "http://localhost:15672/api/policies/${vhost}/${policy}"
done

rm -rf "${EXTRACT_DIR}"

echo "Restore completed"

实际应用场景

场景一:定时自动备份

php
<?php

class ScheduledBackup
{
    private $backupManager;
    private $config;
    
    public function __construct(RabbitMQBackupManager $backupManager, $config = [])
    {
        $this->backupManager = $backupManager;
        $this->config = array_merge([
            'full_backup_cron' => '0 2 * * *',
            'incremental_cron' => '0 * * * *',
        ], $config);
    }
    
    public function runScheduledBackup($type = 'full')
    {
        switch ($type) {
            case 'full':
                return $this->backupManager->createFullBackup();
            case 'definitions':
                return $this->backupManager->exportDefinitions(
                    $this->getBackupDir('definitions')
                );
            default:
                throw new InvalidArgumentException("Unknown backup type: {$type}");
        }
    }
    
    private function getBackupDir($type)
    {
        $baseDir = $this->config['backup_dir'];
        $timestamp = date('Ymd_His');
        
        return "{$baseDir}/{$type}_{$timestamp}";
    }
}

场景二:备份验证

php
<?php

class BackupVerifier
{
    private $backupManager;
    
    public function __construct(RabbitMQBackupManager $backupManager)
    {
        $this->backupManager = $backupManager;
    }
    
    public function verifyBackup($backupFile)
    {
        $results = [
            'file' => $backupFile,
            'exists' => file_exists($backupFile),
            'readable' => is_readable($backupFile),
            'checks' => [],
        ];
        
        if (!$results['exists'] || !$results['readable']) {
            $results['valid'] = false;
            return $results;
        }
        
        $manifest = $this->backupManager->readManifest($backupFile);
        $results['manifest'] = $manifest;
        
        $results['checks']['manifest_valid'] = !empty($manifest);
        $results['checks']['has_definitions'] = isset($manifest['contents']['definitions']);
        $results['checks']['has_users'] = isset($manifest['contents']['users']);
        
        $results['valid'] = !in_array(false, $results['checks'], true);
        
        return $results;
    }
    
    public function verifyAllBackups()
    {
        $backups = $this->backupManager->listBackups();
        $results = [];
        
        foreach ($backups as $backup) {
            $results[$backup['file']] = $this->verifyBackup($backup['path']);
        }
        
        return $results;
    }
}

常见问题与解决方案

问题一:备份文件过大

现象:备份文件占用过多存储空间。

解决方案

php
class IncrementalBackup
{
    public function createIncrementalBackup($lastBackupTime)
    {
        $changes = $this->getChangesSince($lastBackupTime);
        
        return $this->backupOnlyChanges($changes);
    }
}

问题二:恢复后数据不一致

现象:恢复后队列和消息状态不正确。

解决方案

php
class ConsistencyChecker
{
    public function checkAfterRestore($backupFile)
    {
        $manifest = $this->readManifest($backupFile);
        $current = $this->getCurrentState();
        
        return [
            'queues_match' => $manifest['queues'] === $current['queues'],
            'exchanges_match' => $manifest['exchanges'] === $current['exchanges'],
        ];
    }
}

问题三:备份期间服务不可用

现象:备份过程中服务响应变慢。

解决方案

php
class NonBlockingBackup
{
    public function createBackupAsync()
    {
        $pid = pcntl_fork();
        
        if ($pid === -1) {
            throw new Exception('Could not fork process');
        } elseif ($pid) {
            return ['status' => 'started', 'pid' => $pid];
        } else {
            $this->createFullBackup();
            posix_kill(getmypid(), SIGTERM);
        }
    }
}

最佳实践

1. 备份策略建议

数据类型备份频率保留时间
定义文件每小时7 天
配置文件每次变更30 天
消息数据每天7 天
全量备份每天30 天

2. 备份存储位置

  • 本地存储:快速恢复
  • 远程存储:灾难恢复
  • 云存储:异地备份

3. 恢复测试

  • 每月进行恢复演练
  • 验证备份数据完整性
  • 记录恢复时间

相关链接