Appearance
RabbitMQ 版本升级指南
概述
RabbitMQ 版本升级是运维工作中重要的维护任务。正确的升级策略可以确保服务连续性和数据安全。本文档详细介绍 RabbitMQ 版本升级的完整流程、注意事项和最佳实践。
核心知识点
升级类型
| 类型 | 说明 | 风险等级 | 停机时间 |
|---|---|---|---|
| 小版本升级 | 如 3.11.x 到 3.11.y | 低 | 无需停机 |
| 次版本升级 | 如 3.11.x 到 3.12.x | 中 | 可能需要短暂停机 |
| 主版本升级 | 如 3.x 到 4.x | 高 | 需要规划停机 |
| Erlang 升级 | Erlang 版本变更 | 高 | 需要重启服务 |
升级前检查清单
升级前检查清单
├── 版本兼容性
│ ├── RabbitMQ 版本兼容性
│ ├── Erlang 版本要求
│ └── 插件兼容性
├── 数据备份
│ ├── 消息数据备份
│ ├── 配置文件备份
│ └── 元数据备份
├── 集群状态
│ ├── 节点健康状态
│ ├── 队列同步状态
│ └── 网络分区检查
└── 回滚计划
├── 回滚步骤文档
├── 备份恢复流程
└── 应急联系人版本兼容性矩阵
| RabbitMQ 版本 | 最低 Erlang 版本 | 推荐 Erlang 版本 |
|---|---|---|
| 3.12.x | 25.0 | 25.3 / 26.x |
| 3.11.x | 24.3 | 25.x |
| 3.10.x | 24.0 | 24.3 / 25.x |
| 3.9.x | 23.2 | 24.x |
配置示例
Docker 环境升级配置
yaml
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3.12-management
container_name: rabbitmq
hostname: rabbitmq-node1
environment:
RABBITMQ_ERLANG_COOKIE: 'secret_cookie_here'
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: admin123
ports:
- "5672:5672"
- "15672:15672"
- "25672:25672"
volumes:
- rabbitmq_data:/var/lib/rabbitmq
- ./config/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf:ro
- ./config/enabled_plugins:/etc/rabbitmq/enabled_plugins:ro
restart: unless-stopped
volumes:
rabbitmq_data:
driver: localKubernetes 滚动升级配置
yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: rabbitmq
namespace: messaging
spec:
serviceName: rabbitmq-headless
replicas: 3
updateStrategy:
type: RollingUpdate
rollingUpdate:
partition: 0
selector:
matchLabels:
app: rabbitmq
template:
metadata:
labels:
app: rabbitmq
spec:
terminationGracePeriodSeconds: 600
containers:
- name: rabbitmq
image: rabbitmq:3.12-management
ports:
- containerPort: 5672
name: amqp
- containerPort: 15672
name: management
- containerPort: 25672
name: epmd
env:
- name: RABBITMQ_ERLANG_COOKIE
valueFrom:
secretKeyRef:
name: rabbitmq-secret
key: erlang-cookie
- name: RABBITMQ_DEFAULT_USER
value: admin
- name: RABBITMQ_DEFAULT_PASS
valueFrom:
secretKeyRef:
name: rabbitmq-secret
key: password
volumeMounts:
- name: rabbitmq-data
mountPath: /var/lib/rabbitmq
- name: config
mountPath: /etc/rabbitmq
readinessProbe:
exec:
command:
- rabbitmq-diagnostics
- check_running
initialDelaySeconds: 60
periodSeconds: 30
timeoutSeconds: 10
livenessProbe:
exec:
command:
- rabbitmq-diagnostics
- check_running
initialDelaySeconds: 120
periodSeconds: 60
timeoutSeconds: 10
volumes:
- name: config
configMap:
name: rabbitmq-config
volumeClaimTemplates:
- metadata:
name: rabbitmq-data
spec:
accessModes: [ "ReadWriteOnce" ]
storageClassName: standard
resources:
requests:
storage: 10GiPHP 代码示例
RabbitMQ 升级管理器
php
<?php
namespace App\Services\RabbitMQ;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RabbitMQUpgradeManager
{
private $connection;
private $channel;
private $config;
public function __construct(array $config)
{
$this->config = $config;
$this->connect();
}
private function connect(): void
{
$this->connection = new AMQPStreamConnection(
$this->config['host'],
$this->config['port'],
$this->config['user'],
$this->config['password'],
$this->config['vhost'] ?? '/'
);
$this->channel = $this->connection->channel();
}
public function preUpgradeCheck(): array
{
$checks = [];
$checks['version'] = $this->getCurrentVersion();
$checks['erlang_version'] = $this->getErlangVersion();
$checks['node_status'] = $this->checkNodeStatus();
$checks['queue_status'] = $this->checkQueueStatus();
$checks['connection_count'] = $this->getConnectionCount();
$checks['message_count'] = $this->getTotalMessageCount();
$checks['disk_space'] = $this->checkDiskSpace();
$checks['memory_usage'] = $this->getMemoryUsage();
$checks['ready'] = $this->evaluateReadiness($checks);
return $checks;
}
public function getCurrentVersion(): string
{
$response = $this->apiRequest('GET', '/overview');
return $response['rabbitmq_version'] ?? 'unknown';
}
public function getErlangVersion(): string
{
$response = $this->apiRequest('GET', '/overview');
return $response['erlang_version'] ?? 'unknown';
}
public function checkNodeStatus(): array
{
$nodes = $this->apiRequest('GET', '/nodes');
$status = [];
foreach ($nodes as $node) {
$status[$node['name']] = [
'running' => $node['running'] ?? false,
'type' => $node['type'] ?? 'unknown',
'uptime' => $node['uptime'] ?? 0,
];
}
return $status;
}
public function checkQueueStatus(): array
{
$queues = $this->apiRequest('GET', '/queues');
$status = [
'total' => count($queues),
'messages' => 0,
'unmirrored' => 0,
'idle' => 0,
];
foreach ($queues as $queue) {
$status['messages'] += $queue['messages'] ?? 0;
if (empty($queue['backing_queue_status']['mode'])) {
$status['unmirrored']++;
}
if (($queue['consumers'] ?? 0) === 0) {
$status['idle']++;
}
}
return $status;
}
public function getConnectionCount(): int
{
$connections = $this->apiRequest('GET', '/connections');
return count($connections);
}
public function getTotalMessageCount(): int
{
$overview = $this->apiRequest('GET', '/overview');
return $overview['queue_totals']['messages'] ?? 0;
}
public function checkDiskSpace(): array
{
$nodes = $this->apiRequest('GET', '/nodes');
$diskInfo = [];
foreach ($nodes as $node) {
$diskInfo[$node['name']] = [
'free' => $node['disk_free'] ?? 0,
'limit' => $node['disk_free_limit'] ?? 0,
'alarm' => $node['disk_free_alarm'] ?? false,
];
}
return $diskInfo;
}
public function getMemoryUsage(): array
{
$nodes = $this->apiRequest('GET', '/nodes');
$memoryInfo = [];
foreach ($nodes as $node) {
$memoryInfo[$node['name']] = [
'used' => $node['mem_used'] ?? 0,
'limit' => $node['mem_limit'] ?? 0,
'alarm' => $node['mem_alarm'] ?? false,
];
}
return $memoryInfo;
}
private function evaluateReadiness(array $checks): array
{
$issues = [];
if (isset($checks['disk_space'])) {
foreach ($checks['disk_space'] as $node => $info) {
if ($info['alarm']) {
$issues[] = "磁盘告警: {$node}";
}
}
}
if (isset($checks['memory_usage'])) {
foreach ($checks['memory_usage'] as $node => $info) {
if ($info['alarm']) {
$issues[] = "内存告警: {$node}";
}
}
}
if (isset($checks['node_status'])) {
foreach ($checks['node_status'] as $node => $info) {
if (!$info['running']) {
$issues[] = "节点未运行: {$node}";
}
}
}
return [
'ready' => empty($issues),
'issues' => $issues,
];
}
public function backupDefinitions(): string
{
$definitions = $this->apiRequest('GET', '/definitions');
$backupFile = storage_path('rabbitmq_definitions_' . date('Y-m-d_His') . '.json');
file_put_contents($backupFile, json_encode($definitions, JSON_PRETTY_PRINT));
return $backupFile;
}
public function exportMessages(string $queueName, string $outputFile): int
{
$messages = [];
$count = 0;
while (true) {
$message = $this->channel->basic_get($queueName);
if (!$message) {
break;
}
$messages[] = [
'body' => $message->body,
'properties' => $message->get_properties(),
];
$count++;
}
file_put_contents($outputFile, json_encode($messages, JSON_PRETTY_PRINT));
return $count;
}
public function importMessages(string $queueName, string $inputFile): int
{
$messages = json_decode(file_get_contents($inputFile), true);
$count = 0;
foreach ($messages as $msgData) {
$message = new AMQPMessage(
$msgData['body'],
$msgData['properties'] ?? []
);
$this->channel->basic_publish($message, '', $queueName);
$count++;
}
return $count;
}
public function drainNode(string $nodeName): array
{
$result = [
'success' => false,
'migrated_queues' => 0,
'errors' => [],
];
try {
$queues = $this->apiRequest('GET', '/queues/' . urlencode($nodeName));
foreach ($queues as $queue) {
if ($queue['node'] === $nodeName) {
$this->migrateQueue($queue['name'], $nodeName);
$result['migrated_queues']++;
}
}
$result['success'] = true;
} catch (\Exception $e) {
$result['errors'][] = $e->getMessage();
}
return $result;
}
private function migrateQueue(string $queueName, string $fromNode): void
{
$this->apiRequest('POST', "/queues/%2F/{$queueName}/migrate", [
'destination_node' => $this->getTargetNode($fromNode),
]);
}
private function getTargetNode(string $excludeNode): ?string
{
$nodes = $this->apiRequest('GET', '/nodes');
foreach ($nodes as $node) {
if ($node['name'] !== $excludeNode && $node['running']) {
return $node['name'];
}
}
return null;
}
public function postUpgradeVerify(): array
{
$checks = [];
$checks['version'] = $this->getCurrentVersion();
$checks['erlang_version'] = $this->getErlangVersion();
$checks['node_status'] = $this->checkNodeStatus();
$checks['queue_status'] = $this->checkQueueStatus();
$checks['plugin_status'] = $this->checkPlugins();
$checks['policy_status'] = $this->checkPolicies();
$checks['healthy'] = $this->evaluateHealth($checks);
return $checks;
}
public function checkPlugins(): array
{
$plugins = $this->apiRequest('GET', '/plugins');
$status = [];
foreach ($plugins as $plugin) {
if ($plugin['enabled']) {
$status[$plugin['name']] = [
'enabled' => true,
'version' => $plugin['version'] ?? 'unknown',
];
}
}
return $status;
}
public function checkPolicies(): array
{
$policies = $this->apiRequest('GET', '/policies');
$status = [];
foreach ($policies as $policy) {
$status[$policy['name']] = [
'vhost' => $policy['vhost'],
'pattern' => $policy['pattern'],
'definition' => $policy['definition'],
];
}
return $status;
}
private function evaluateHealth(array $checks): array
{
$issues = [];
foreach ($checks['node_status'] ?? [] as $node => $info) {
if (!($info['running'] ?? false)) {
$issues[] = "节点未运行: {$node}";
}
}
$version = $checks['version'] ?? '';
if (empty($version) || $version === 'unknown') {
$issues[] = "无法获取版本信息";
}
return [
'healthy' => empty($issues),
'issues' => $issues,
];
}
private function apiRequest(string $method, string $endpoint, array $data = null): array
{
$url = "http://{$this->config['host']}:15672/api{$endpoint}";
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_USERPWD, $this->config['user'] . ':' . $this->config['password']);
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, $method);
if ($data !== null) {
curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data));
curl_setopt($ch, CURLOPT_HTTPHEADER, ['Content-Type: application/json']);
}
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
if ($httpCode >= 400) {
throw new \RuntimeException("API request failed: {$endpoint}, HTTP {$httpCode}");
}
return json_decode($response, true) ?: [];
}
public function close(): void
{
if ($this->channel) {
$this->channel->close();
}
if ($this->connection) {
$this->connection->close();
}
}
public function __destruct()
{
$this->close();
}
}升级执行脚本
php
<?php
namespace App\Console\Commands;
use App\Services\RabbitMQ\RabbitMQUpgradeManager;
use Illuminate\Console\Command;
class RabbitMQUpgrade extends Command
{
protected $signature = 'rabbitmq:upgrade
{action : pre-check|backup|verify|rollback}
{--target-version= : 目标版本}
{--node= : 指定节点}';
protected $description = 'RabbitMQ 升级管理命令';
private $manager;
public function handle()
{
$config = [
'host' => config('rabbitmq.host', 'localhost'),
'port' => config('rabbitmq.port', 5672),
'user' => config('rabbitmq.user', 'guest'),
'password' => config('rabbitmq.password', 'guest'),
'vhost' => config('rabbitmq.vhost', '/'),
];
$this->manager = new RabbitMQUpgradeManager($config);
$action = $this->argument('action');
switch ($action) {
case 'pre-check':
$this->preCheck();
break;
case 'backup':
$this->backup();
break;
case 'verify':
$this->verify();
break;
case 'rollback':
$this->rollback();
break;
default:
$this->error("未知操作: {$action}");
}
}
private function preCheck(): void
{
$this->info('执行升级前检查...');
$checks = $this->manager->preUpgradeCheck();
$this->table(
['检查项', '状态', '详情'],
[
['当前版本', $checks['version'], '-'],
['Erlang 版本', $checks['erlang_version'], '-'],
['连接数', $checks['connection_count'], '-'],
['消息总数', $checks['message_count'], '-'],
['队列数量', $checks['queue_status']['total'], '-'],
['就绪状态', $checks['ready']['ready'] ? '✓ 就绪' : '✗ 未就绪', implode(', ', $checks['ready']['issues'])],
]
);
if (!$checks['ready']['ready']) {
$this->error('升级前检查未通过,请解决以下问题:');
foreach ($checks['ready']['issues'] as $issue) {
$this->error(" - {$issue}");
}
return;
}
$this->info('升级前检查通过,可以执行升级。');
}
private function backup(): void
{
$this->info('执行备份操作...');
$backupFile = $this->manager->backupDefinitions();
$this->info("定义备份已保存: {$backupFile}");
$this->info('备份完成。');
}
private function verify(): void
{
$this->info('执行升级后验证...');
$checks = $this->manager->postUpgradeVerify();
$this->table(
['检查项', '状态', '详情'],
[
['版本', $checks['version'], '-'],
['Erlang 版本', $checks['erlang_version'], '-'],
['健康状态', $checks['healthy']['healthy'] ? '✓ 健康' : '✗ 异常', implode(', ', $checks['healthy']['issues'])],
]
);
if ($checks['healthy']['healthy']) {
$this->info('升级后验证通过。');
} else {
$this->error('升级后验证发现问题:');
foreach ($checks['healthy']['issues'] as $issue) {
$this->error(" - {$issue}");
}
}
}
private function rollback(): void
{
$this->warn('回滚操作需要手动执行,请参考回滚文档。');
}
}实际应用场景
场景一:单节点滚动升级
bash
#!/bin/bash
UPGRADE_VERSION="3.12.0"
CURRENT_VERSION=$(rabbitmqctl version | awk '{print $2}')
echo "当前版本: $CURRENT_VERSION"
echo "目标版本: $UPGRADE_VERSION"
echo "步骤 1: 备份配置和定义"
rabbitmqctl export_definitions /backup/rabbitmq_definitions_$(date +%Y%m%d).json
cp /etc/rabbitmq/rabbitmq.conf /backup/rabbitmq.conf.bak
echo "步骤 2: 停止服务"
systemctl stop rabbitmq-server
echo "步骤 3: 升级软件包"
if command -v apt-get &> /dev/null; then
apt-get update
apt-get install rabbitmq-server=$UPGRADE_VERSION -y
elif command -v yum &> /dev/null; then
yum install rabbitmq-server-$UPGRADE_VERSION -y
fi
echo "步骤 4: 启动服务"
systemctl start rabbitmq-server
echo "步骤 5: 验证升级"
rabbitmqctl status
rabbitmqctl version
echo "升级完成"场景二:集群滚动升级
bash
#!/bin/bash
NODES=("node1" "node2" "node3")
UPGRADE_VERSION="3.12.0"
upgrade_node() {
local node=$1
echo "升级节点: $node"
ssh $node "rabbitmqctl stop_app"
ssh $node "apt-get update && apt-get install rabbitmq-server=$UPGRADE_VERSION -y"
ssh $node "systemctl restart rabbitmq-server"
ssh $node "rabbitmqctl start_app"
ssh $node "rabbitmqctl wait /var/lib/rabbitmq/mnesia/rabbit@${node}.pid"
echo "节点 $node 升级完成"
}
for node in "${NODES[@]}"; do
upgrade_node $node
echo "等待集群同步..."
sleep 30
done
echo "集群升级完成"
rabbitmqctl cluster_status场景三:Docker 环境升级
bash
#!/bin/bash
OLD_IMAGE="rabbitmq:3.11-management"
NEW_IMAGE="rabbitmq:3.12-management"
CONTAINER_NAME="rabbitmq"
echo "备份当前配置"
docker exec $CONTAINER_NAME rabbitmqctl export_definitions /tmp/definitions.json
docker cp $CONTAINER_NAME:/tmp/definitions.json ./backup_definitions.json
echo "停止当前容器"
docker stop $CONTAINER_NAME
docker rename $CONTAINER_NAME ${CONTAINER_NAME}_backup
echo "启动新版本容器"
docker run -d \
--name $CONTAINER_NAME \
--hostname rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-v rabbitmq_data:/var/lib/rabbitmq \
$NEW_IMAGE
echo "等待服务启动"
sleep 30
echo "验证升级"
docker exec $CONTAINER_NAME rabbitmqctl status
echo "导入定义(如需要)"
docker cp ./backup_definitions.json $CONTAINER_NAME:/tmp/definitions.json
docker exec $CONTAINER_NAME rabbitmqctl import_definitions /tmp/definitions.json
echo "清理旧容器"
docker rm ${CONTAINER_NAME}_backup
echo "升级完成"常见问题与解决方案
问题 1:升级后队列不可用
症状:升级后部分队列显示为不可用状态
原因:队列数据格式变化或索引不兼容
解决方案:
bash
# 检查队列状态
rabbitmqctl list_queues name state
# 重建问题队列
rabbitmqctl delete_queue problem_queue_name
# 或通过 API 重建
curl -u admin:password -XDELETE http://localhost:15672/api/queues/%2F/problem_queue_name问题 2:插件不兼容
症状:升级后部分插件无法启用
原因:插件版本与 RabbitMQ 版本不兼容
解决方案:
bash
# 检查插件状态
rabbitmq-plugins list
# 禁用不兼容插件
rabbitmq-plugins disable rabbitmq_delayed_message_exchange
# 安装兼容版本插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange --offline问题 3:集群节点无法加入
症状:升级后节点无法重新加入集群
原因:Erlang Cookie 不一致或数据不兼容
解决方案:
bash
# 检查 Erlang Cookie
cat /var/lib/rabbitmq/.erlang.cookie
# 同步 Cookie(所有节点必须一致)
# 在主节点执行
scp /var/lib/rabbitmq/.erlang.cookie node2:/var/lib/rabbitmq/.erlang.cookie
scp /var/lib/rabbitmq/.erlang.cookie node3:/var/lib/rabbitmq/.erlang.cookie
# 重置问题节点
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app问题 4:消息丢失
症状:升级后发现消息数量减少
原因:非持久化消息在升级过程中丢失
解决方案:
bash
# 升级前确保所有重要消息已持久化
# 检查持久化队列
rabbitmqctl list_queues name durable
# 对于非持久化队列,升级前手动导出消息
# 使用管理 API 导出
curl -u admin:password http://localhost:15672/api/queues/%2F/queue_name/get \
-H "Content-Type: application/json" \
-d '{"count":1000,"ackmode":"ack_requeue_false","encoding":"auto"}'最佳实践建议
升级前准备
完整备份
- 导出所有定义(队列、交换机、绑定、策略)
- 备份配置文件
- 记录当前版本和配置
测试环境验证
- 在测试环境先执行升级
- 验证应用兼容性
- 测试回滚流程
制定回滚计划
- 准备回滚脚本
- 确定回滚触发条件
- 通知相关团队
升级执行
选择低峰期
- 避免业务高峰期升级
- 提前通知相关团队
- 准备应急预案
滚动升级
- 集群环境逐节点升级
- 确保每节点升级后稳定
- 监控集群状态
持续监控
- 监控服务状态
- 关注错误日志
- 检查性能指标
升级后验证
功能验证
- 验证消息收发
- 检查队列状态
- 确认插件正常
性能验证
- 对比升级前后性能
- 检查资源使用
- 监控延迟变化
清理工作
- 清理备份文件
- 更新文档
- 总结经验
