Skip to content

RabbitMQ 版本升级指南

概述

RabbitMQ 版本升级是运维工作中重要的维护任务。正确的升级策略可以确保服务连续性和数据安全。本文档详细介绍 RabbitMQ 版本升级的完整流程、注意事项和最佳实践。

核心知识点

升级类型

类型说明风险等级停机时间
小版本升级如 3.11.x 到 3.11.y无需停机
次版本升级如 3.11.x 到 3.12.x可能需要短暂停机
主版本升级如 3.x 到 4.x需要规划停机
Erlang 升级Erlang 版本变更需要重启服务

升级前检查清单

升级前检查清单
├── 版本兼容性
│   ├── RabbitMQ 版本兼容性
│   ├── Erlang 版本要求
│   └── 插件兼容性
├── 数据备份
│   ├── 消息数据备份
│   ├── 配置文件备份
│   └── 元数据备份
├── 集群状态
│   ├── 节点健康状态
│   ├── 队列同步状态
│   └── 网络分区检查
└── 回滚计划
    ├── 回滚步骤文档
    ├── 备份恢复流程
    └── 应急联系人

版本兼容性矩阵

RabbitMQ 版本最低 Erlang 版本推荐 Erlang 版本
3.12.x25.025.3 / 26.x
3.11.x24.325.x
3.10.x24.024.3 / 25.x
3.9.x23.224.x

配置示例

Docker 环境升级配置

yaml
version: '3.8'
services:
  rabbitmq:
    image: rabbitmq:3.12-management
    container_name: rabbitmq
    hostname: rabbitmq-node1
    environment:
      RABBITMQ_ERLANG_COOKIE: 'secret_cookie_here'
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: admin123
    ports:
      - "5672:5672"
      - "15672:15672"
      - "25672:25672"
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq
      - ./config/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf:ro
      - ./config/enabled_plugins:/etc/rabbitmq/enabled_plugins:ro
    restart: unless-stopped

volumes:
  rabbitmq_data:
    driver: local

Kubernetes 滚动升级配置

yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: rabbitmq
  namespace: messaging
spec:
  serviceName: rabbitmq-headless
  replicas: 3
  updateStrategy:
    type: RollingUpdate
    rollingUpdate:
      partition: 0
  selector:
    matchLabels:
      app: rabbitmq
  template:
    metadata:
      labels:
        app: rabbitmq
    spec:
      terminationGracePeriodSeconds: 600
      containers:
      - name: rabbitmq
        image: rabbitmq:3.12-management
        ports:
        - containerPort: 5672
          name: amqp
        - containerPort: 15672
          name: management
        - containerPort: 25672
          name: epmd
        env:
        - name: RABBITMQ_ERLANG_COOKIE
          valueFrom:
            secretKeyRef:
              name: rabbitmq-secret
              key: erlang-cookie
        - name: RABBITMQ_DEFAULT_USER
          value: admin
        - name: RABBITMQ_DEFAULT_PASS
          valueFrom:
            secretKeyRef:
              name: rabbitmq-secret
              key: password
        volumeMounts:
        - name: rabbitmq-data
          mountPath: /var/lib/rabbitmq
        - name: config
          mountPath: /etc/rabbitmq
        readinessProbe:
          exec:
            command:
            - rabbitmq-diagnostics
            - check_running
          initialDelaySeconds: 60
          periodSeconds: 30
          timeoutSeconds: 10
        livenessProbe:
          exec:
            command:
            - rabbitmq-diagnostics
            - check_running
          initialDelaySeconds: 120
          periodSeconds: 60
          timeoutSeconds: 10
      volumes:
      - name: config
        configMap:
          name: rabbitmq-config
  volumeClaimTemplates:
  - metadata:
      name: rabbitmq-data
    spec:
      accessModes: [ "ReadWriteOnce" ]
      storageClassName: standard
      resources:
        requests:
          storage: 10Gi

PHP 代码示例

RabbitMQ 升级管理器

php
<?php

namespace App\Services\RabbitMQ;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class RabbitMQUpgradeManager
{
    private $connection;
    private $channel;
    private $config;

    public function __construct(array $config)
    {
        $this->config = $config;
        $this->connect();
    }

    private function connect(): void
    {
        $this->connection = new AMQPStreamConnection(
            $this->config['host'],
            $this->config['port'],
            $this->config['user'],
            $this->config['password'],
            $this->config['vhost'] ?? '/'
        );
        $this->channel = $this->connection->channel();
    }

    public function preUpgradeCheck(): array
    {
        $checks = [];

        $checks['version'] = $this->getCurrentVersion();
        $checks['erlang_version'] = $this->getErlangVersion();
        $checks['node_status'] = $this->checkNodeStatus();
        $checks['queue_status'] = $this->checkQueueStatus();
        $checks['connection_count'] = $this->getConnectionCount();
        $checks['message_count'] = $this->getTotalMessageCount();
        $checks['disk_space'] = $this->checkDiskSpace();
        $checks['memory_usage'] = $this->getMemoryUsage();

        $checks['ready'] = $this->evaluateReadiness($checks);

        return $checks;
    }

    public function getCurrentVersion(): string
    {
        $response = $this->apiRequest('GET', '/overview');
        return $response['rabbitmq_version'] ?? 'unknown';
    }

    public function getErlangVersion(): string
    {
        $response = $this->apiRequest('GET', '/overview');
        return $response['erlang_version'] ?? 'unknown';
    }

    public function checkNodeStatus(): array
    {
        $nodes = $this->apiRequest('GET', '/nodes');
        $status = [];

        foreach ($nodes as $node) {
            $status[$node['name']] = [
                'running' => $node['running'] ?? false,
                'type' => $node['type'] ?? 'unknown',
                'uptime' => $node['uptime'] ?? 0,
            ];
        }

        return $status;
    }

    public function checkQueueStatus(): array
    {
        $queues = $this->apiRequest('GET', '/queues');
        $status = [
            'total' => count($queues),
            'messages' => 0,
            'unmirrored' => 0,
            'idle' => 0,
        ];

        foreach ($queues as $queue) {
            $status['messages'] += $queue['messages'] ?? 0;
            
            if (empty($queue['backing_queue_status']['mode'])) {
                $status['unmirrored']++;
            }
            
            if (($queue['consumers'] ?? 0) === 0) {
                $status['idle']++;
            }
        }

        return $status;
    }

    public function getConnectionCount(): int
    {
        $connections = $this->apiRequest('GET', '/connections');
        return count($connections);
    }

    public function getTotalMessageCount(): int
    {
        $overview = $this->apiRequest('GET', '/overview');
        return $overview['queue_totals']['messages'] ?? 0;
    }

    public function checkDiskSpace(): array
    {
        $nodes = $this->apiRequest('GET', '/nodes');
        $diskInfo = [];

        foreach ($nodes as $node) {
            $diskInfo[$node['name']] = [
                'free' => $node['disk_free'] ?? 0,
                'limit' => $node['disk_free_limit'] ?? 0,
                'alarm' => $node['disk_free_alarm'] ?? false,
            ];
        }

        return $diskInfo;
    }

    public function getMemoryUsage(): array
    {
        $nodes = $this->apiRequest('GET', '/nodes');
        $memoryInfo = [];

        foreach ($nodes as $node) {
            $memoryInfo[$node['name']] = [
                'used' => $node['mem_used'] ?? 0,
                'limit' => $node['mem_limit'] ?? 0,
                'alarm' => $node['mem_alarm'] ?? false,
            ];
        }

        return $memoryInfo;
    }

    private function evaluateReadiness(array $checks): array
    {
        $issues = [];

        if (isset($checks['disk_space'])) {
            foreach ($checks['disk_space'] as $node => $info) {
                if ($info['alarm']) {
                    $issues[] = "磁盘告警: {$node}";
                }
            }
        }

        if (isset($checks['memory_usage'])) {
            foreach ($checks['memory_usage'] as $node => $info) {
                if ($info['alarm']) {
                    $issues[] = "内存告警: {$node}";
                }
            }
        }

        if (isset($checks['node_status'])) {
            foreach ($checks['node_status'] as $node => $info) {
                if (!$info['running']) {
                    $issues[] = "节点未运行: {$node}";
                }
            }
        }

        return [
            'ready' => empty($issues),
            'issues' => $issues,
        ];
    }

    public function backupDefinitions(): string
    {
        $definitions = $this->apiRequest('GET', '/definitions');
        $backupFile = storage_path('rabbitmq_definitions_' . date('Y-m-d_His') . '.json');
        file_put_contents($backupFile, json_encode($definitions, JSON_PRETTY_PRINT));
        return $backupFile;
    }

    public function exportMessages(string $queueName, string $outputFile): int
    {
        $messages = [];
        $count = 0;

        while (true) {
            $message = $this->channel->basic_get($queueName);
            if (!$message) {
                break;
            }

            $messages[] = [
                'body' => $message->body,
                'properties' => $message->get_properties(),
            ];
            $count++;
        }

        file_put_contents($outputFile, json_encode($messages, JSON_PRETTY_PRINT));
        return $count;
    }

    public function importMessages(string $queueName, string $inputFile): int
    {
        $messages = json_decode(file_get_contents($inputFile), true);
        $count = 0;

        foreach ($messages as $msgData) {
            $message = new AMQPMessage(
                $msgData['body'],
                $msgData['properties'] ?? []
            );
            $this->channel->basic_publish($message, '', $queueName);
            $count++;
        }

        return $count;
    }

    public function drainNode(string $nodeName): array
    {
        $result = [
            'success' => false,
            'migrated_queues' => 0,
            'errors' => [],
        ];

        try {
            $queues = $this->apiRequest('GET', '/queues/' . urlencode($nodeName));
            
            foreach ($queues as $queue) {
                if ($queue['node'] === $nodeName) {
                    $this->migrateQueue($queue['name'], $nodeName);
                    $result['migrated_queues']++;
                }
            }

            $result['success'] = true;
        } catch (\Exception $e) {
            $result['errors'][] = $e->getMessage();
        }

        return $result;
    }

    private function migrateQueue(string $queueName, string $fromNode): void
    {
        $this->apiRequest('POST', "/queues/%2F/{$queueName}/migrate", [
            'destination_node' => $this->getTargetNode($fromNode),
        ]);
    }

    private function getTargetNode(string $excludeNode): ?string
    {
        $nodes = $this->apiRequest('GET', '/nodes');
        foreach ($nodes as $node) {
            if ($node['name'] !== $excludeNode && $node['running']) {
                return $node['name'];
            }
        }
        return null;
    }

    public function postUpgradeVerify(): array
    {
        $checks = [];

        $checks['version'] = $this->getCurrentVersion();
        $checks['erlang_version'] = $this->getErlangVersion();
        $checks['node_status'] = $this->checkNodeStatus();
        $checks['queue_status'] = $this->checkQueueStatus();
        $checks['plugin_status'] = $this->checkPlugins();
        $checks['policy_status'] = $this->checkPolicies();

        $checks['healthy'] = $this->evaluateHealth($checks);

        return $checks;
    }

    public function checkPlugins(): array
    {
        $plugins = $this->apiRequest('GET', '/plugins');
        $status = [];

        foreach ($plugins as $plugin) {
            if ($plugin['enabled']) {
                $status[$plugin['name']] = [
                    'enabled' => true,
                    'version' => $plugin['version'] ?? 'unknown',
                ];
            }
        }

        return $status;
    }

    public function checkPolicies(): array
    {
        $policies = $this->apiRequest('GET', '/policies');
        $status = [];

        foreach ($policies as $policy) {
            $status[$policy['name']] = [
                'vhost' => $policy['vhost'],
                'pattern' => $policy['pattern'],
                'definition' => $policy['definition'],
            ];
        }

        return $status;
    }

    private function evaluateHealth(array $checks): array
    {
        $issues = [];

        foreach ($checks['node_status'] ?? [] as $node => $info) {
            if (!($info['running'] ?? false)) {
                $issues[] = "节点未运行: {$node}";
            }
        }

        $version = $checks['version'] ?? '';
        if (empty($version) || $version === 'unknown') {
            $issues[] = "无法获取版本信息";
        }

        return [
            'healthy' => empty($issues),
            'issues' => $issues,
        ];
    }

    private function apiRequest(string $method, string $endpoint, array $data = null): array
    {
        $url = "http://{$this->config['host']}:15672/api{$endpoint}";
        
        $ch = curl_init();
        curl_setopt($ch, CURLOPT_URL, $url);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
        curl_setopt($ch, CURLOPT_USERPWD, $this->config['user'] . ':' . $this->config['password']);
        curl_setopt($ch, CURLOPT_CUSTOMREQUEST, $method);
        
        if ($data !== null) {
            curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data));
            curl_setopt($ch, CURLOPT_HTTPHEADER, ['Content-Type: application/json']);
        }
        
        $response = curl_exec($ch);
        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        curl_close($ch);

        if ($httpCode >= 400) {
            throw new \RuntimeException("API request failed: {$endpoint}, HTTP {$httpCode}");
        }

        return json_decode($response, true) ?: [];
    }

    public function close(): void
    {
        if ($this->channel) {
            $this->channel->close();
        }
        if ($this->connection) {
            $this->connection->close();
        }
    }

    public function __destruct()
    {
        $this->close();
    }
}

升级执行脚本

php
<?php

namespace App\Console\Commands;

use App\Services\RabbitMQ\RabbitMQUpgradeManager;
use Illuminate\Console\Command;

class RabbitMQUpgrade extends Command
{
    protected $signature = 'rabbitmq:upgrade 
                            {action : pre-check|backup|verify|rollback}
                            {--target-version= : 目标版本}
                            {--node= : 指定节点}';

    protected $description = 'RabbitMQ 升级管理命令';

    private $manager;

    public function handle()
    {
        $config = [
            'host' => config('rabbitmq.host', 'localhost'),
            'port' => config('rabbitmq.port', 5672),
            'user' => config('rabbitmq.user', 'guest'),
            'password' => config('rabbitmq.password', 'guest'),
            'vhost' => config('rabbitmq.vhost', '/'),
        ];

        $this->manager = new RabbitMQUpgradeManager($config);

        $action = $this->argument('action');

        switch ($action) {
            case 'pre-check':
                $this->preCheck();
                break;
            case 'backup':
                $this->backup();
                break;
            case 'verify':
                $this->verify();
                break;
            case 'rollback':
                $this->rollback();
                break;
            default:
                $this->error("未知操作: {$action}");
        }
    }

    private function preCheck(): void
    {
        $this->info('执行升级前检查...');

        $checks = $this->manager->preUpgradeCheck();

        $this->table(
            ['检查项', '状态', '详情'],
            [
                ['当前版本', $checks['version'], '-'],
                ['Erlang 版本', $checks['erlang_version'], '-'],
                ['连接数', $checks['connection_count'], '-'],
                ['消息总数', $checks['message_count'], '-'],
                ['队列数量', $checks['queue_status']['total'], '-'],
                ['就绪状态', $checks['ready']['ready'] ? '✓ 就绪' : '✗ 未就绪', implode(', ', $checks['ready']['issues'])],
            ]
        );

        if (!$checks['ready']['ready']) {
            $this->error('升级前检查未通过,请解决以下问题:');
            foreach ($checks['ready']['issues'] as $issue) {
                $this->error("  - {$issue}");
            }
            return;
        }

        $this->info('升级前检查通过,可以执行升级。');
    }

    private function backup(): void
    {
        $this->info('执行备份操作...');

        $backupFile = $this->manager->backupDefinitions();
        $this->info("定义备份已保存: {$backupFile}");

        $this->info('备份完成。');
    }

    private function verify(): void
    {
        $this->info('执行升级后验证...');

        $checks = $this->manager->postUpgradeVerify();

        $this->table(
            ['检查项', '状态', '详情'],
            [
                ['版本', $checks['version'], '-'],
                ['Erlang 版本', $checks['erlang_version'], '-'],
                ['健康状态', $checks['healthy']['healthy'] ? '✓ 健康' : '✗ 异常', implode(', ', $checks['healthy']['issues'])],
            ]
        );

        if ($checks['healthy']['healthy']) {
            $this->info('升级后验证通过。');
        } else {
            $this->error('升级后验证发现问题:');
            foreach ($checks['healthy']['issues'] as $issue) {
                $this->error("  - {$issue}");
            }
        }
    }

    private function rollback(): void
    {
        $this->warn('回滚操作需要手动执行,请参考回滚文档。');
    }
}

实际应用场景

场景一:单节点滚动升级

bash
#!/bin/bash

UPGRADE_VERSION="3.12.0"
CURRENT_VERSION=$(rabbitmqctl version | awk '{print $2}')

echo "当前版本: $CURRENT_VERSION"
echo "目标版本: $UPGRADE_VERSION"

echo "步骤 1: 备份配置和定义"
rabbitmqctl export_definitions /backup/rabbitmq_definitions_$(date +%Y%m%d).json
cp /etc/rabbitmq/rabbitmq.conf /backup/rabbitmq.conf.bak

echo "步骤 2: 停止服务"
systemctl stop rabbitmq-server

echo "步骤 3: 升级软件包"
if command -v apt-get &> /dev/null; then
    apt-get update
    apt-get install rabbitmq-server=$UPGRADE_VERSION -y
elif command -v yum &> /dev/null; then
    yum install rabbitmq-server-$UPGRADE_VERSION -y
fi

echo "步骤 4: 启动服务"
systemctl start rabbitmq-server

echo "步骤 5: 验证升级"
rabbitmqctl status
rabbitmqctl version

echo "升级完成"

场景二:集群滚动升级

bash
#!/bin/bash

NODES=("node1" "node2" "node3")
UPGRADE_VERSION="3.12.0"

upgrade_node() {
    local node=$1
    echo "升级节点: $node"
    
    ssh $node "rabbitmqctl stop_app"
    
    ssh $node "apt-get update && apt-get install rabbitmq-server=$UPGRADE_VERSION -y"
    
    ssh $node "systemctl restart rabbitmq-server"
    
    ssh $node "rabbitmqctl start_app"
    
    ssh $node "rabbitmqctl wait /var/lib/rabbitmq/mnesia/rabbit@${node}.pid"
    
    echo "节点 $node 升级完成"
}

for node in "${NODES[@]}"; do
    upgrade_node $node
    echo "等待集群同步..."
    sleep 30
done

echo "集群升级完成"
rabbitmqctl cluster_status

场景三:Docker 环境升级

bash
#!/bin/bash

OLD_IMAGE="rabbitmq:3.11-management"
NEW_IMAGE="rabbitmq:3.12-management"
CONTAINER_NAME="rabbitmq"

echo "备份当前配置"
docker exec $CONTAINER_NAME rabbitmqctl export_definitions /tmp/definitions.json
docker cp $CONTAINER_NAME:/tmp/definitions.json ./backup_definitions.json

echo "停止当前容器"
docker stop $CONTAINER_NAME
docker rename $CONTAINER_NAME ${CONTAINER_NAME}_backup

echo "启动新版本容器"
docker run -d \
    --name $CONTAINER_NAME \
    --hostname rabbitmq \
    -p 5672:5672 \
    -p 15672:15672 \
    -v rabbitmq_data:/var/lib/rabbitmq \
    $NEW_IMAGE

echo "等待服务启动"
sleep 30

echo "验证升级"
docker exec $CONTAINER_NAME rabbitmqctl status

echo "导入定义(如需要)"
docker cp ./backup_definitions.json $CONTAINER_NAME:/tmp/definitions.json
docker exec $CONTAINER_NAME rabbitmqctl import_definitions /tmp/definitions.json

echo "清理旧容器"
docker rm ${CONTAINER_NAME}_backup

echo "升级完成"

常见问题与解决方案

问题 1:升级后队列不可用

症状:升级后部分队列显示为不可用状态

原因:队列数据格式变化或索引不兼容

解决方案

bash
# 检查队列状态
rabbitmqctl list_queues name state

# 重建问题队列
rabbitmqctl delete_queue problem_queue_name

# 或通过 API 重建
curl -u admin:password -XDELETE http://localhost:15672/api/queues/%2F/problem_queue_name

问题 2:插件不兼容

症状:升级后部分插件无法启用

原因:插件版本与 RabbitMQ 版本不兼容

解决方案

bash
# 检查插件状态
rabbitmq-plugins list

# 禁用不兼容插件
rabbitmq-plugins disable rabbitmq_delayed_message_exchange

# 安装兼容版本插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange --offline

问题 3:集群节点无法加入

症状:升级后节点无法重新加入集群

原因:Erlang Cookie 不一致或数据不兼容

解决方案

bash
# 检查 Erlang Cookie
cat /var/lib/rabbitmq/.erlang.cookie

# 同步 Cookie(所有节点必须一致)
# 在主节点执行
scp /var/lib/rabbitmq/.erlang.cookie node2:/var/lib/rabbitmq/.erlang.cookie
scp /var/lib/rabbitmq/.erlang.cookie node3:/var/lib/rabbitmq/.erlang.cookie

# 重置问题节点
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app

问题 4:消息丢失

症状:升级后发现消息数量减少

原因:非持久化消息在升级过程中丢失

解决方案

bash
# 升级前确保所有重要消息已持久化
# 检查持久化队列
rabbitmqctl list_queues name durable

# 对于非持久化队列,升级前手动导出消息
# 使用管理 API 导出
curl -u admin:password http://localhost:15672/api/queues/%2F/queue_name/get \
    -H "Content-Type: application/json" \
    -d '{"count":1000,"ackmode":"ack_requeue_false","encoding":"auto"}'

最佳实践建议

升级前准备

  1. 完整备份

    • 导出所有定义(队列、交换机、绑定、策略)
    • 备份配置文件
    • 记录当前版本和配置
  2. 测试环境验证

    • 在测试环境先执行升级
    • 验证应用兼容性
    • 测试回滚流程
  3. 制定回滚计划

    • 准备回滚脚本
    • 确定回滚触发条件
    • 通知相关团队

升级执行

  1. 选择低峰期

    • 避免业务高峰期升级
    • 提前通知相关团队
    • 准备应急预案
  2. 滚动升级

    • 集群环境逐节点升级
    • 确保每节点升级后稳定
    • 监控集群状态
  3. 持续监控

    • 监控服务状态
    • 关注错误日志
    • 检查性能指标

升级后验证

  1. 功能验证

    • 验证消息收发
    • 检查队列状态
    • 确认插件正常
  2. 性能验证

    • 对比升级前后性能
    • 检查资源使用
    • 监控延迟变化
  3. 清理工作

    • 清理备份文件
    • 更新文档
    • 总结经验

相关链接