Skip to content

RabbitMQ 数据迁移指南

概述

RabbitMQ 数据迁移是运维工作中常见的场景,包括集群间迁移、跨数据中心迁移、单节点到集群迁移等。本文档详细介绍各种迁移场景的方案、工具和最佳实践。

核心知识点

迁移类型

类型说明复杂度停机时间
单节点到集群从单节点扩展到集群可无缝迁移
集群内迁移节点间数据迁移无停机
跨集群迁移不同集群间迁移可能需要短暂停机
跨数据中心迁移异地数据中心迁移取决于网络延迟
云迁移本地到云端迁移需要规划

迁移工具对比

工具适用场景优点缺点
Shovel 插件实时迁移自动化、可靠配置复杂
Federation 插件跨集群同步支持多活延迟较高
定义导入导出元数据迁移简单快速不迁移消息
自定义脚本特殊需求灵活可控开发成本高

迁移架构图

源集群                          目标集群
┌─────────────────┐            ┌─────────────────┐
│   Node A        │            │   Node X        │
│  ┌───────────┐  │            │  ┌───────────┐  │
│  │  Queue 1  │──┼────────────┼─▶│  Queue 1  │  │
│  └───────────┘  │   Shovel   │  └───────────┘  │
│  ┌───────────┐  │            │  ┌───────────┐  │
│  │  Queue 2  │──┼────────────┼─▶│  Queue 2  │  │
│  └───────────┘  │            │  └───────────┘  │
│   Node B        │            │   Node Y        │
└─────────────────┘            └─────────────────┘

配置示例

Shovel 插件配置

erlang
%% /etc/rabbitmq/rabbitmq.conf
%% 启用 Shovel 插件
%% rabbitmq-plugins enable rabbitmq_shovel rabbitmq_shovel_management

%% Shovel 配置示例
{
  rabbitmq_shovel, [
    {shovels, [
      {'queue-migration-shovel', [
        {source, [
          {protocol, amqp091},
          {uris, ["amqp://source-host:5672"]},
          {declarations, [
            {'queue.declare', [{queue, <<"source-queue">>}]},
            {'queue.bind', [{queue, <<"source-queue">>}, {exchange, <<"">>}, {routing_key, <<"source-queue">>}]}
          ]},
          {queue, <<"source-queue">>},
          {prefetch_count, 1000}
        ]},
        {destination, [
          {protocol, amqp091},
          {uris, ["amqp://target-host:5672"]},
          {declarations, [
            {'queue.declare', [{queue, <<"target-queue">>}]}
          ]},
          {publish_properties, [{delivery_mode, 2}]},
          {publish_fields, [{exchange, <<"">>}, {routing_key, <<"target-queue">>}]}
        ]},
        {ack_mode, on_confirm},
        {reconnect_delay, 5}
      ]}
    ]}
  ]
}

Federation 插件配置

bash
# 启用 Federation 插件
rabbitmq-plugins enable rabbitmq_federation rabbitmq_federation_management

# 配置上游服务器
rabbitmqctl set_parameter federation-upstream source-upstream \
  '{"uri":"amqp://source-host:5672","ack-mode":"on-confirm"}'

# 配置策略
rabbitmqctl set_policy --apply-to exchanges federation-policy "^federated\." \
  '{"federation-upstream":"source-upstream"}'

Docker Compose 迁移环境

yaml
version: '3.8'

services:
  source-rabbitmq:
    image: rabbitmq:3.12-management
    container_name: source-rabbitmq
    hostname: source-rabbitmq
    environment:
      RABBITMQ_ERLANG_COOKIE: 'migration_cookie'
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: admin123
    ports:
      - "5672:5672"
      - "15672:15672"
    volumes:
      - source_data:/var/lib/rabbitmq

  target-rabbitmq:
    image: rabbitmq:3.12-management
    container_name: target-rabbitmq
    hostname: target-rabbitmq
    environment:
      RABBITMQ_ERLANG_COOKIE: 'migration_cookie'
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: admin123
    ports:
      - "5673:5672"
      - "15673:15672"
    volumes:
      - target_data:/var/lib/rabbitmq

volumes:
  source_data:
  target_data:

PHP 代码示例

RabbitMQ 迁移管理器

php
<?php

namespace App\Services\RabbitMQ;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class RabbitMQMigrationManager
{
    private $sourceConnection;
    private $targetConnection;
    private $sourceChannel;
    private $targetChannel;
    private $sourceConfig;
    private $targetConfig;

    public function __construct(array $sourceConfig, array $targetConfig)
    {
        $this->sourceConfig = $sourceConfig;
        $this->targetConfig = $targetConfig;
        $this->connect();
    }

    private function connect(): void
    {
        $this->sourceConnection = new AMQPStreamConnection(
            $this->sourceConfig['host'],
            $this->sourceConfig['port'],
            $this->sourceConfig['user'],
            $this->sourceConfig['password'],
            $this->sourceConfig['vhost'] ?? '/'
        );
        $this->sourceChannel = $this->sourceConnection->channel();

        $this->targetConnection = new AMQPStreamConnection(
            $this->targetConfig['host'],
            $this->targetConfig['port'],
            $this->targetConfig['user'],
            $this->targetConfig['password'],
            $this->targetConfig['vhost'] ?? '/'
        );
        $this->targetChannel = $this->targetConnection->channel();
    }

    public function exportDefinitions(): array
    {
        $definitions = [
            'queues' => [],
            'exchanges' => [],
            'bindings' => [],
            'policies' => [],
            'users' => [],
            'vhosts' => [],
        ];

        $queues = $this->sourceApiRequest('GET', '/queues');
        foreach ($queues as $queue) {
            $definitions['queues'][] = [
                'name' => $queue['name'],
                'vhost' => $queue['vhost'],
                'durable' => $queue['durable'],
                'auto_delete' => $queue['auto_delete'],
                'arguments' => $queue['arguments'] ?? [],
            ];
        }

        $exchanges = $this->sourceApiRequest('GET', '/exchanges');
        foreach ($exchanges as $exchange) {
            if ($exchange['name'] === '' || strpos($exchange['name'], 'amq.') === 0) {
                continue;
            }
            $definitions['exchanges'][] = [
                'name' => $exchange['name'],
                'vhost' => $exchange['vhost'],
                'type' => $exchange['type'],
                'durable' => $exchange['durable'],
                'auto_delete' => $exchange['auto_delete'],
                'internal' => $exchange['internal'],
                'arguments' => $exchange['arguments'] ?? [],
            ];
        }

        $bindings = $this->sourceApiRequest('GET', '/bindings');
        foreach ($bindings as $binding) {
            $definitions['bindings'][] = [
                'vhost' => $binding['vhost'],
                'source' => $binding['source'],
                'destination' => $binding['destination'],
                'destination_type' => $binding['destination_type'],
                'routing_key' => $binding['routing_key'],
                'arguments' => $binding['arguments'] ?? [],
            ];
        }

        $policies = $this->sourceApiRequest('GET', '/policies');
        foreach ($policies as $policy) {
            $definitions['policies'][] = [
                'name' => $policy['name'],
                'vhost' => $policy['vhost'],
                'pattern' => $policy['pattern'],
                'definition' => $policy['definition'],
                'priority' => $policy['priority'],
            ];
        }

        return $definitions;
    }

    public function importDefinitions(array $definitions): array
    {
        $results = [
            'queues' => ['success' => 0, 'failed' => 0],
            'exchanges' => ['success' => 0, 'failed' => 0],
            'bindings' => ['success' => 0, 'failed' => 0],
            'policies' => ['success' => 0, 'failed' => 0],
        ];

        foreach ($definitions['exchanges'] ?? [] as $exchange) {
            try {
                $this->declareExchange($exchange);
                $results['exchanges']['success']++;
            } catch (\Exception $e) {
                $results['exchanges']['failed']++;
            }
        }

        foreach ($definitions['queues'] ?? [] as $queue) {
            try {
                $this->declareQueue($queue);
                $results['queues']['success']++;
            } catch (\Exception $e) {
                $results['queues']['failed']++;
            }
        }

        foreach ($definitions['bindings'] ?? [] as $binding) {
            try {
                $this->createBinding($binding);
                $results['bindings']['success']++;
            } catch (\Exception $e) {
                $results['bindings']['failed']++;
            }
        }

        foreach ($definitions['policies'] ?? [] as $policy) {
            try {
                $this->createPolicy($policy);
                $results['policies']['success']++;
            } catch (\Exception $e) {
                $results['policies']['failed']++;
            }
        }

        return $results;
    }

    private function declareExchange(array $exchange): void
    {
        $this->targetChannel->exchange_declare(
            $exchange['name'],
            $exchange['type'],
            false,
            $exchange['durable'],
            $exchange['auto_delete'],
            $exchange['internal'],
            false,
            new AMQPTable($exchange['arguments'] ?? [])
        );
    }

    private function declareQueue(array $queue): void
    {
        $this->targetChannel->queue_declare(
            $queue['name'],
            false,
            $queue['durable'],
            false,
            $queue['auto_delete'],
            false,
            new AMQPTable($queue['arguments'] ?? [])
        );
    }

    private function createBinding(array $binding): void
    {
        if ($binding['destination_type'] === 'queue') {
            $this->targetChannel->queue_bind(
                $binding['destination'],
                $binding['source'],
                $binding['routing_key'],
                false,
                new AMQPTable($binding['arguments'] ?? [])
            );
        } else {
            $this->targetChannel->exchange_bind(
                $binding['destination'],
                $binding['source'],
                $binding['routing_key'],
                false,
                new AMQPTable($binding['arguments'] ?? [])
            );
        }
    }

    private function createPolicy(array $policy): void
    {
        $this->targetApiRequest('PUT', "/policies/{$policy['vhost']}/{$policy['name']}", [
            'pattern' => $policy['pattern'],
            'definition' => $policy['definition'],
            'priority' => $policy['priority'],
        ]);
    }

    public function migrateMessages(
        string $queueName,
        int $batchSize = 1000,
        callable $progressCallback = null
    ): array {
        $stats = [
            'total' => 0,
            'success' => 0,
            'failed' => 0,
            'start_time' => time(),
        ];

        $this->targetChannel->queue_declare($queueName, false, true, false, false);

        $continue = true;
        while ($continue) {
            $batchCount = 0;

            for ($i = 0; $i < $batchSize; $i++) {
                $message = $this->sourceChannel->basic_get($queueName, false);

                if (!$message) {
                    $continue = false;
                    break;
                }

                $stats['total']++;

                try {
                    $newMessage = new AMQPMessage(
                        $message->body,
                        $message->get_properties()
                    );

                    $this->targetChannel->basic_publish(
                        $newMessage,
                        '',
                        $queueName,
                        true
                    );

                    $this->sourceChannel->basic_ack($message->getDeliveryTag());
                    $stats['success']++;
                    $batchCount++;
                } catch (\Exception $e) {
                    $this->sourceChannel->basic_nack($message->getDeliveryTag(), false, true);
                    $stats['failed']++;
                }
            }

            if ($progressCallback && $batchCount > 0) {
                $progressCallback($stats);
            }

            if ($batchCount < $batchSize) {
                $continue = false;
            }
        }

        $stats['end_time'] = time();
        $stats['duration'] = $stats['end_time'] - $stats['start_time'];

        return $stats;
    }

    public function migrateWithTransformation(
        string $sourceQueue,
        string $targetQueue,
        callable $transformer,
        int $batchSize = 1000
    ): array {
        $stats = [
            'total' => 0,
            'transformed' => 0,
            'skipped' => 0,
            'failed' => 0,
        ];

        $continue = true;
        while ($continue) {
            for ($i = 0; $i < $batchSize; $i++) {
                $message = $this->sourceChannel->basic_get($sourceQueue, false);

                if (!$message) {
                    $continue = false;
                    break;
                }

                $stats['total']++;

                try {
                    $transformed = $transformer($message);

                    if ($transformed === null) {
                        $this->sourceChannel->basic_ack($message->getDeliveryTag());
                        $stats['skipped']++;
                        continue;
                    }

                    $newMessage = new AMQPMessage(
                        $transformed['body'] ?? $message->body,
                        $transformed['properties'] ?? $message->get_properties()
                    );

                    $this->targetChannel->basic_publish(
                        $newMessage,
                        $transformed['exchange'] ?? '',
                        $transformed['routing_key'] ?? $targetQueue,
                        true
                    );

                    $this->sourceChannel->basic_ack($message->getDeliveryTag());
                    $stats['transformed']++;
                } catch (\Exception $e) {
                    $this->sourceChannel->basic_nack($message->getDeliveryTag(), false, true);
                    $stats['failed']++;
                }
            }
        }

        return $stats;
    }

    public function setupShovel(array $config): bool
    {
        $shovelName = $config['name'] ?? 'migration-shovel';
        $shovelConfig = [
            'src-uri' => $config['source_uri'],
            'src-queue' => $config['source_queue'],
            'dest-uri' => $config['target_uri'],
            'dest-queue' => $config['target_queue'],
            'ack-mode' => $config['ack_mode'] ?? 'on-confirm',
            'prefetch-count' => $config['prefetch_count'] ?? 1000,
        ];

        $response = $this->targetApiRequest('PUT', "/parameters/shovel/%2F/{$shovelName}", $shovelConfig);

        return true;
    }

    public function removeShovel(string $shovelName): bool
    {
        $this->targetApiRequest('DELETE', "/parameters/shovel/%2F/{$shovelName}");
        return true;
    }

    public function getMigrationStatus(): array
    {
        $sourceStats = $this->getSourceQueueStats();
        $targetStats = $this->getTargetQueueStats();

        return [
            'source' => $sourceStats,
            'target' => $targetStats,
            'progress' => $this->calculateProgress($sourceStats, $targetStats),
        ];
    }

    private function getSourceQueueStats(): array
    {
        $queues = $this->sourceApiRequest('GET', '/queues');
        $stats = ['total_messages' => 0, 'queues' => []];

        foreach ($queues as $queue) {
            $stats['queues'][$queue['name']] = [
                'messages' => $queue['messages'] ?? 0,
                'consumers' => $queue['consumers'] ?? 0,
            ];
            $stats['total_messages'] += $queue['messages'] ?? 0;
        }

        return $stats;
    }

    private function getTargetQueueStats(): array
    {
        $queues = $this->targetApiRequest('GET', '/queues');
        $stats = ['total_messages' => 0, 'queues' => []];

        foreach ($queues as $queue) {
            $stats['queues'][$queue['name']] = [
                'messages' => $queue['messages'] ?? 0,
                'consumers' => $queue['consumers'] ?? 0,
            ];
            $stats['total_messages'] += $queue['messages'] ?? 0;
        }

        return $stats;
    }

    private function calculateProgress(array $source, array $target): array
    {
        $totalSource = $source['total_messages'];
        $totalTarget = $target['total_messages'];

        return [
            'source_messages' => $totalSource,
            'target_messages' => $totalTarget,
            'percentage' => $totalSource > 0 ? round(($totalTarget / $totalSource) * 100, 2) : 0,
        ];
    }

    private function sourceApiRequest(string $method, string $endpoint, array $data = null): array
    {
        return $this->apiRequest($this->sourceConfig, $method, $endpoint, $data);
    }

    private function targetApiRequest(string $method, string $endpoint, array $data = null): array
    {
        return $this->apiRequest($this->targetConfig, $method, $endpoint, $data);
    }

    private function apiRequest(array $config, string $method, string $endpoint, array $data = null): array
    {
        $url = "http://{$config['host']}:15672/api{$endpoint}";
        
        $ch = curl_init();
        curl_setopt($ch, CURLOPT_URL, $url);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
        curl_setopt($ch, CURLOPT_USERPWD, $config['user'] . ':' . $config['password']);
        curl_setopt($ch, CURLOPT_CUSTOMREQUEST, $method);
        
        if ($data !== null) {
            curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data));
            curl_setopt($ch, CURLOPT_HTTPHEADER, ['Content-Type: application/json']);
        }
        
        $response = curl_exec($ch);
        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        curl_close($ch);

        if ($httpCode >= 400) {
            throw new \RuntimeException("API request failed: {$endpoint}, HTTP {$httpCode}");
        }

        return json_decode($response, true) ?: [];
    }

    public function close(): void
    {
        if ($this->sourceChannel) {
            $this->sourceChannel->close();
        }
        if ($this->sourceConnection) {
            $this->sourceConnection->close();
        }
        if ($this->targetChannel) {
            $this->targetChannel->close();
        }
        if ($this->targetConnection) {
            $this->targetConnection->close();
        }
    }

    public function __destruct()
    {
        $this->close();
    }
}

迁移命令行工具

php
<?php

namespace App\Console\Commands;

use App\Services\RabbitMQ\RabbitMQMigrationManager;
use Illuminate\Console\Command;

class RabbitMQMigrate extends Command
{
    protected $signature = 'rabbitmq:migrate 
                            {action : export|import|migrate|status|shovel}
                            {--source-host= : 源服务器地址}
                            {--target-host= : 目标服务器地址}
                            {--queue= : 指定队列}
                            {--batch=1000 : 批次大小}';

    protected $description = 'RabbitMQ 数据迁移命令';

    private $manager;

    public function handle()
    {
        $sourceConfig = [
            'host' => $this->option('source-host') ?? config('rabbitmq.source_host'),
            'port' => config('rabbitmq.source_port', 5672),
            'user' => config('rabbitmq.source_user', 'guest'),
            'password' => config('rabbitmq.source_password', 'guest'),
        ];

        $targetConfig = [
            'host' => $this->option('target-host') ?? config('rabbitmq.target_host'),
            'port' => config('rabbitmq.target_port', 5672),
            'user' => config('rabbitmq.target_user', 'guest'),
            'password' => config('rabbitmq.target_password', 'guest'),
        ];

        $this->manager = new RabbitMQMigrationManager($sourceConfig, $targetConfig);

        $action = $this->argument('action');

        switch ($action) {
            case 'export':
                $this->exportDefinitions();
                break;
            case 'import':
                $this->importDefinitions();
                break;
            case 'migrate':
                $this->migrateMessages();
                break;
            case 'status':
                $this->showStatus();
                break;
            case 'shovel':
                $this->setupShovel();
                break;
            default:
                $this->error("未知操作: {$action}");
        }
    }

    private function exportDefinitions(): void
    {
        $this->info('导出源服务器定义...');

        $definitions = $this->manager->exportDefinitions();

        $file = storage_path('rabbitmq_definitions_' . date('YmdHis') . '.json');
        file_put_contents($file, json_encode($definitions, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE));

        $this->info("定义已导出到: {$file}");
        $this->table(
            ['类型', '数量'],
            [
                ['队列', count($definitions['queues'])],
                ['交换机', count($definitions['exchanges'])],
                ['绑定', count($definitions['bindings'])],
                ['策略', count($definitions['policies'])],
            ]
        );
    }

    private function importDefinitions(): void
    {
        $file = $this->ask('请输入定义文件路径');

        if (!file_exists($file)) {
            $this->error("文件不存在: {$file}");
            return;
        }

        $definitions = json_decode(file_get_contents($file), true);

        $this->info('导入定义到目标服务器...');

        $results = $this->manager->importDefinitions($definitions);

        $this->table(
            ['类型', '成功', '失败'],
            [
                ['队列', $results['queues']['success'], $results['queues']['failed']],
                ['交换机', $results['exchanges']['success'], $results['exchanges']['failed']],
                ['绑定', $results['bindings']['success'], $results['bindings']['failed']],
                ['策略', $results['policies']['success'], $results['policies']['failed']],
            ]
        );
    }

    private function migrateMessages(): void
    {
        $queue = $this->option('queue');

        if (!$queue) {
            $this->error('请指定队列名称 --queue=');
            return;
        }

        $batchSize = (int) $this->option('batch');

        $this->info("开始迁移队列: {$queue}");
        $this->info("批次大小: {$batchSize}");

        $progressBar = $this->output->createProgressBar();
        $progressBar->start();

        $stats = $this->manager->migrateMessages(
            $queue,
            $batchSize,
            function ($stats) use ($progressBar) {
                $progressBar->setProgress($stats['success']);
            }
        );

        $progressBar->finish();
        $this->newLine();

        $this->table(
            ['指标', '值'],
            [
                ['总消息数', $stats['total']],
                ['成功', $stats['success']],
                ['失败', $stats['failed']],
                ['耗时', $stats['duration'] . ' 秒'],
                ['速率', round($stats['success'] / max($stats['duration'], 1), 2) . ' 消息/秒'],
            ]
        );
    }

    private function showStatus(): void
    {
        $this->info('获取迁移状态...');

        $status = $this->manager->getMigrationStatus();

        $this->table(
            ['指标', '源服务器', '目标服务器'],
            [
                ['总消息数', $status['source']['total_messages'], $status['target']['total_messages']],
                ['队列数', count($status['source']['queues']), count($status['target']['queues'])],
            ]
        );

        $this->info("迁移进度: {$status['progress']['percentage']}%");
    }

    private function setupShovel(): void
    {
        $queue = $this->option('queue');

        if (!$queue) {
            $this->error('请指定队列名称 --queue=');
            return;
        }

        $config = [
            'name' => "migrate-{$queue}",
            'source_uri' => 'amqp://' . config('rabbitmq.source_host'),
            'source_queue' => $queue,
            'target_uri' => 'amqp://' . config('rabbitmq.target_host'),
            'target_queue' => $queue,
            'prefetch_count' => (int) $this->option('batch'),
        ];

        $this->info("创建 Shovel: {$config['name']}");

        $this->manager->setupShovel($config);

        $this->info('Shovel 创建成功,数据将自动迁移。');
    }
}

实际应用场景

场景一:单节点到集群迁移

bash
#!/bin/bash

SOURCE_HOST="single-node.example.com"
TARGET_HOSTS=("node1.cluster.example.com" "node2.cluster.example.com" "node3.cluster.example.com")

echo "步骤 1: 导出单节点定义"
curl -u admin:password http://${SOURCE_HOST}:15672/api/definitions \
    -o single_node_definitions.json

echo "步骤 2: 在目标集群创建队列和交换机"
for host in "${TARGET_HOSTS[@]}"; do
    echo "配置节点: $host"
done

curl -u admin:password -X POST \
    http://${TARGET_HOSTS[0]}:15672/api/definitions \
    -H "Content-Type: application/json" \
    -d @single_node_definitions.json

echo "步骤 3: 配置 Shovel 迁移消息"
for queue in $(curl -s -u admin:password http://${SOURCE_HOST}:15672/api/queues | jq -r '.[].name'); do
    echo "迁移队列: $queue"
    curl -u admin:password -X PUT \
        http://${SOURCE_HOST}:15672/api/parameters/shovel/%2F/migrate-${queue} \
        -H "Content-Type: application/json" \
        -d "{
            \"value\": {
                \"src-uri\": \"amqp://${SOURCE_HOST}\",
                \"src-queue\": \"${queue}\",
                \"dest-uri\": \"amqp://${TARGET_HOSTS[0]}\",
                \"dest-queue\": \"${queue}\",
                \"ack-mode\": \"on-confirm\"
            }
        }"
done

echo "步骤 4: 监控迁移进度"
while true; do
    pending=$(curl -s -u admin:password http://${SOURCE_HOST}:15672/api/queues | jq '[.[].messages] | add')
    echo "待迁移消息: $pending"
    if [ "$pending" -eq 0 ]; then
        echo "迁移完成"
        break
    fi
    sleep 60
done

echo "步骤 5: 切换应用连接"
echo "更新应用配置,将连接切换到集群负载均衡地址"

场景二:跨数据中心迁移

bash
#!/bin/bash

SOURCE_DC="dc1.example.com"
TARGET_DC="dc2.example.com"

echo "跨数据中心迁移方案"

echo "步骤 1: 建立网络隧道(如需要)"
# ssh -L 5672:localhost:5672 user@${TARGET_DC} -N &

echo "步骤 2: 配置 Federation"
rabbitmqctl -n rabbit@${TARGET_DC} set_parameter federation-upstream dc1-upstream \
    "{
        \"uri\": \"amqp://${SOURCE_DC}\",
        \"ack-mode\": \"on-confirm\",
        \"max-hops\": 1
    }"

echo "步骤 3: 应用 Federation 策略"
rabbitmqctl -n rabbit@${TARGET_DC} set_policy --apply-to exchanges federation-all "^" \
    "{
        \"federation-upstream\": \"dc1-upstream\"
    }"

echo "步骤 4: 验证数据同步"
curl -s -u admin:password http://${SOURCE_DC}:15672/api/overview | jq '.queue_totals'
curl -s -u admin:password http://${TARGET_DC}:15672/api/overview | jq '.queue_totals'

场景三:云迁移(本地到 AWS)

bash
#!/bin/bash

LOCAL_HOST="localhost"
AWS_HOST="rabbitmq.xxx.region.aws.amazon.com"

echo "本地到 AWS MQ 迁移"

echo "步骤 1: 导出本地定义"
curl -u admin:password http://${LOCAL_HOST}:15672/api/definitions \
    -o local_definitions.json

echo "步骤 2: 清理不兼容配置"
jq 'del(.users) | del(.permissions)' local_definitions.json > aws_definitions.json

echo "步骤 3: 导入到 AWS MQ"
curl -u aws_user:aws_password -X POST \
    https://${AWS_HOST}/api/definitions \
    -H "Content-Type: application/json" \
    -d @aws_definitions.json

echo "步骤 4: 使用 S3 中转大数据量消息"
# 导出消息到 S3
# aws s3 cp messages.json s3://migration-bucket/

echo "步骤 5: 验证迁移结果"
curl -s -u aws_user:aws_password https://${AWS_HOST}/api/overview | jq '.queue_totals'

常见问题与解决方案

问题 1:迁移过程中消息丢失

症状:迁移后目标队列消息数量少于源队列

原因:网络中断或确认机制配置错误

解决方案

php
// 使用 on-confirm 确认模式
$shovelConfig = [
    'ack-mode' => 'on-confirm', // 确保消息确认后才删除源消息
    'prefetch-count' => 100,    // 降低批次大小减少风险
];

// 迁移前记录消息数量
$sourceCount = $manager->getSourceQueueStats()['total_messages'];
// 迁移后验证
$targetCount = $manager->getTargetQueueStats()['total_messages'];
if ($targetCount < $sourceCount) {
    // 触发告警或重试
}

问题 2:迁移速度慢

症状:大量消息迁移耗时过长

原因:单线程处理、网络延迟、批次设置不当

解决方案

php
// 优化迁移配置
$config = [
    'prefetch_count' => 5000,      // 增加预取数量
    'ack_mode' => 'on-confirm',
    'batch_size' => 10000,         // 增加批次大小
];

// 多队列并行迁移
$queues = ['queue1', 'queue2', 'queue3'];
foreach ($queues as $queue) {
    // 使用异步进程并行处理
    Process::exec("php artisan rabbitmq:migrate migrate --queue={$queue} &");
}

问题 3:定义导入失败

症状:部分队列或交换机导入失败

原因:目标环境配置限制或不兼容

解决方案

bash
# 检查目标环境限制
rabbitmqctl environment | grep limit

# 分批导入,跳过错误项
curl -u admin:password -X POST \
    http://target-host:15672/api/definitions?ignore_errors=true \
    -H "Content-Type: application/json" \
    -d @definitions.json

# 手动创建失败项
rabbitmqctl eval 'rabbit_amqqueue:declare(rabbit_misc:r(<<"/">>, queue, <<"failed_queue">>), true, false, [], none, <<"guest">>).'

问题 4:跨版本迁移兼容性

症状:不同版本间定义格式不兼容

原因:RabbitMQ 版本差异导致 API 变化

解决方案

php
// 版本适配处理
public function adaptDefinitions(array $definitions, string $targetVersion): array
{
    $majorVersion = (int) explode('.', $targetVersion)[0];
    
    if ($majorVersion >= 4) {
        // 移除 4.x 不支持的属性
        foreach ($definitions['queues'] as &$queue) {
            unset($queue['deprecated_property']);
        }
    }
    
    return $definitions;
}

最佳实践建议

迁移前准备

  1. 完整评估

    • 评估消息量和迁移时间
    • 确认网络带宽和稳定性
    • 制定回滚计划
  2. 数据备份

    • 导出完整定义
    • 备份配置文件
    • 记录当前状态
  3. 测试验证

    • 在测试环境演练
    • 验证应用兼容性
    • 测试回滚流程

迁移执行

  1. 分阶段迁移

    • 先迁移定义
    • 再迁移消息
    • 最后切换流量
  2. 实时监控

    • 监控迁移进度
    • 关注错误日志
    • 验证数据一致性
  3. 流量切换

    • 通知相关团队
    • 选择低峰期切换
    • 准备快速回滚

迁移后验证

  1. 功能验证

    • 验证消息收发
    • 检查队列状态
    • 确认消费者正常
  2. 性能验证

    • 对比迁移前后性能
    • 检查资源使用
    • 监控延迟变化
  3. 清理工作

    • 清理迁移工具
    • 删除临时资源
    • 更新文档

相关链接