Appearance
集群恢复流程
概述
RabbitMQ 集群恢复是在发生故障后重建集群的过程,可能涉及单节点故障恢复、网络分区恢复、整体重建等情况。本文档将详细介绍集群恢复的流程、方法和最佳实践。
恢复场景分类
┌─────────────────────────────────────────────────────────────┐
│ 集群恢复场景 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 单节点恢复 │ │ 多节点恢复 │ │ 全集群重建 │ │
│ │ │ │ │ │ │ │
│ │ 故障影响小 │ │ 故障影响中 │ │ 故障影响大 │ │
│ │ 恢复速度快 │ │ 恢复需协调 │ │ 需完全重建 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘问题表现与症状
常见症状
┌─────────────────────────────────────────────────────────────┐
│ 集群故障典型症状 │
├─────────────────────────────────────────────────────────────┤
│ 1. 集群无法正常启动 │
│ 2. 节点无法加入集群 │
│ 3. 队列无法访问 │
│ 4. 数据丢失或损坏 │
│ 5. 客户端无法连接 │
│ 6. 镜像同步失败 │
└─────────────────────────────────────────────────────────────┘恢复流程
1. 单节点故障恢复
bash
#!/bin/bash
# single_node_recovery.sh
FAILED_NODE=$1
if [ -z "$FAILED_NODE" ]; then
echo "用法: $0 <failed_node>"
exit 1
fi
echo "=== 单节点故障恢复流程 ==="
echo "故障节点: $FAILED_NODE"
echo "[1] 检查当前集群状态"
rabbitmqctl cluster_status
echo "[2] 检查故障节点状态"
ssh $FAILED_NODE "systemctl status rabbitmq-server"
echo "[3] 尝试重启故障节点"
ssh $FAILED_NODE "systemctl restart rabbitmq-server"
echo "[4] 等待节点启动"
sleep 30
echo "[5] 验证节点状态"
rabbitmqctl -n $FAILED_NODE ping
echo "[6] 检查节点是否加入集群"
rabbitmqctl cluster_status | grep $FAILED_NODE
echo "[7] 检查镜像队列同步"
rabbitmqctl list_queues name policy synchronised_slave_nodes
echo "单节点恢复完成"2. 多节点故障恢复
bash
#!/bin/bash
# multi_node_recovery.sh
echo "=== 多节点故障恢复流程 ==="
echo "[1] 检查当前集群状态"
rabbitmqctl cluster_status
echo "[2] 识别故障节点"
CLUSTER_STATUS=$(rabbitmqctl cluster_status 2>&1)
# 分析故障节点
FAILED_NODES=$(echo "$CLUSTER_STATUS" | grep -oP 'rabbit@[\w]+(?=.*\(non-running\))')
if [ -z "$FAILED_NODES" ]; then
echo "未检测到故障节点"
exit 0
fi
echo "故障节点: $FAILED_NODES"
echo "[3] 确定主节点"
# 假设第一个运行中的节点为主节点
MASTER_NODE=$(echo "$CLUSTER_STATUS" | grep -oP 'rabbit@[\w]+(?=.*\(running\))' | head -1)
echo "主节点: $MASTER_NODE"
echo "[4] 在每个故障节点上执行恢复"
for node in $FAILED_NODES; do
echo "--- 恢复节点: $node ---"
echo "[4.$node] 停止应用"
ssh $node "rabbitmqctl stop_app 2>/dev/null || true"
echo "[4.$node] 重置节点"
ssh $node "rabbitmqctl reset 2>/dev/null || true"
echo "[4.$node] 重新加入集群"
ssh $node "rabbitmqctl join_cluster $MASTER_NODE"
echo "[4.$node] 启动应用"
ssh $node "rabbitmqctl start_app"
sleep 15
done
echo "[5] 等待集群稳定"
sleep 30
echo "[6] 验证集群状态"
rabbitmqctl cluster_status
echo "[7] 检查队列同步"
rabbitmqctl list_queues name policy synchronised_slave_nodes
echo "多节点恢复完成"3. 全集群重建
bash
#!/bin/bash
# full_cluster_rebuild.sh
echo "=== 全集群重建流程 ==="
echo "警告: 此操作将清除所有数据!"
read -p "确认继续? (yes/no): " confirm
if [ "$confirm" != "yes" ]; then
echo "操作已取消"
exit 1
fi
NODES=($1)
MASTER=${NODES[0]}
if [ -z "$NODES" ]; then
echo "用法: $0 <node1> <node2> <node3> ..."
exit 1
fi
BACKUP_FILE="/tmp/rabbitmq_backup_$(date +%Y%m%d%H%M%S).json"
echo "[1] 备份当前配置"
rabbitmqctl export_definitions $BACKUP_FILE
echo "配置已备份到: $BACKUP_FILE"
echo "[2] 停止所有节点"
for node in "${NODES[@]}"; do
echo "停止节点: $node"
ssh $node "systemctl stop rabbitmq-server" 2>/dev/null || true
done
echo "[3] 等待节点停止"
sleep 10
echo "[4] 清理所有节点数据"
for node in "${NODES[@]}"; do
echo "清理节点: $node"
ssh $node "rm -rf /var/lib/rabbitmq/mnesia/*"
ssh $node "rm -rf /var/lib/rabbitmq/logs/*"
done
echo "[5] 在主节点初始化"
ssh $MASTER "rabbitmq-server -detached"
sleep 15
ssh $MASTER "rabbitmqctl stop_app"
ssh $MASTER "rabbitmqctl reset"
ssh $MASTER "rabbitmqctl start_app"
echo "[6] 其他节点加入集群"
for i in "${!NODES[@]}"; do
if [ $i -eq 0 ]; then
continue
fi
node=${NODES[$i]}
echo "节点加入: $node"
ssh $node "rabbitmq-server -detached"
sleep 10
ssh $node "rabbitmqctl stop_app"
ssh $node "rabbitmqctl reset"
ssh $node "rabbitmqctl join_cluster $MASTER"
ssh $node "rabbitmqctl start_app"
sleep 10
done
echo "[7] 等待集群稳定"
sleep 30
echo "[8] 恢复配置"
rabbitmqctl import_definitions $BACKUP_FILE
echo "[9] 验证集群状态"
rabbitmqctl cluster_status
echo "[10] 恢复镜像队列策略"
rabbitmqctl list_policies
echo "全集群重建完成"
echo "备份文件: $BACKUP_FILE"PHP 集群恢复工具
php
<?php
class ClusterRecoveryTool
{
private $nodes = [];
private $masterNode;
public function __construct(array $nodes)
{
$this->nodes = $nodes;
$this->masterNode = $nodes[0] ?? null;
}
public function diagnose(): array
{
$result = [
'timestamp' => date('Y-m-d H:i:s'),
'cluster_status' => 'unknown',
'nodes' => [],
'issues' => [],
'recommendation' => '',
];
$clusterStatus = $this->getClusterStatus();
$result['cluster_status'] = $clusterStatus['status'];
$result['nodes'] = $clusterStatus['nodes'];
if ($clusterStatus['status'] === 'healthy') {
$result['recommendation'] = '集群运行正常,无需恢复';
} elseif ($clusterStatus['status'] === 'degraded') {
$result['issues'] = $clusterStatus['issues'];
$result['recommendation'] = $this->generateRecoveryPlan($clusterStatus);
} else {
$result['issues'][] = '集群完全不可用';
$result['recommendation'] = '建议执行全集群重建';
}
return $result;
}
private function getClusterStatus(): array
{
$result = [
'status' => 'unknown',
'running_nodes' => [],
'stopped_nodes' => [],
'issues' => [],
'nodes' => [],
];
exec('rabbitmqctl cluster_status 2>&1', $output);
$outputText = implode("\n", $output);
if (preg_match_all('/rabbit@[\w]+.*?running/', $outputText, $runningMatches)) {
foreach ($runningMatches[0] as $match) {
preg_match('/rabbit@[\w]+/', $match, $nodeMatch);
if ($nodeMatch) {
$result['running_nodes'][] = $nodeMatch[0];
}
}
}
if (preg_match_all('/rabbit@[\w]+.*?not.?running/', $outputText, $stoppedMatches)) {
foreach ($stoppedMatches[0] as $match) {
preg_match('/rabbit@[\w]+/', $match, $nodeMatch);
if ($nodeMatch) {
$result['stopped_nodes'][] = $nodeMatch[0];
}
}
}
foreach ($this->nodes as $node) {
$status = [
'name' => $node,
'running' => in_array($node, $result['running_nodes']),
];
$result['nodes'][] = $status;
}
if (empty($result['running_nodes']) && !empty($this->nodes)) {
$result['status'] = 'failed';
$result['issues'][] = '所有节点都已停止';
} elseif (count($result['running_nodes']) < count($this->nodes)) {
$result['status'] = 'degraded';
$result['issues'][] = sprintf(
'%d/%d 节点运行',
count($result['running_nodes']),
count($this->nodes)
);
} else {
$result['status'] = 'healthy';
}
return $result;
}
private function generateRecoveryPlan(array $clusterStatus): string
{
$runningCount = count($clusterStatus['running_nodes']);
$totalCount = count($this->nodes);
if ($runningCount === 0) {
return '执行全集群重建';
} elseif ($runningCount < $totalCount / 2) {
return '建议重新初始化集群,选择一个健康节点作为主节点';
} else {
return '尝试重新启动故障节点';
}
}
public function recover(array $options = []): array
{
$result = [
'timestamp' => date('Y-m-d H:i:s'),
'actions' => [],
'success' => false,
];
$diagnosis = $this->diagnose();
if ($diagnosis['cluster_status'] === 'healthy') {
$result['actions'][] = '集群已健康,无需恢复';
$result['success'] = true;
return $result;
}
$strategy = $options['strategy'] ?? 'auto';
switch ($strategy) {
case 'rebuild':
$result['actions'][] = $this->fullRebuild();
break;
case 'repair':
$result['actions'][] = $this->repairNodes();
break;
case 'auto':
default:
if ($diagnosis['cluster_status'] === 'failed') {
$result['actions'][] = $this->fullRebuild();
} else {
$result['actions'][] = $this->repairNodes();
}
break;
}
$finalDiagnosis = $this->diagnose();
$result['success'] = $finalDiagnosis['cluster_status'] === 'healthy';
$result['final_status'] = $finalDiagnosis['cluster_status'];
return $result;
}
private function fullRebuild(): string
{
$this->stopAllNodes();
$this->clearAllData();
$this->initializeMasterNode();
$this->joinOtherNodes();
return '全集群重建完成';
}
private function repairNodes(): string
{
$diagnosis = $this->diagnose();
foreach ($diagnosis['nodes'] as $node) {
if (!$node['running']) {
$this->restartNode($node['name']);
}
}
return '节点修复完成';
}
private function stopAllNodes(): void
{
foreach ($this->nodes as $node) {
$this->executeOnNode($node, 'systemctl stop rabbitmq-server');
}
sleep(10);
}
private function clearAllData(): void
{
foreach ($this->nodes as $node) {
$this->executeOnNode($node, 'rm -rf /var/lib/rabbitmq/mnesia/*');
}
}
private function initializeMasterNode(): void
{
$master = $this->masterNode;
$this->executeOnNode($master, 'rabbitmq-server -detached');
sleep(15);
$this->executeOnNode($master, 'rabbitmqctl stop_app');
$this->executeOnNode($master, 'rabbitmqctl reset');
$this->executeOnNode($master, 'rabbitmqctl start_app');
sleep(15);
}
private function joinOtherNodes(): void
{
$master = $this->masterNode;
foreach ($this->nodes as $node) {
if ($node === $master) {
continue;
}
$this->executeOnNode($node, 'rabbitmq-server -detached');
sleep(10);
$this->executeOnNode($node, 'rabbitmqctl stop_app');
$this->executeOnNode($node, 'rabbitmqctl reset');
$this->executeOnNode($node, "rabbitmqctl join_cluster $master");
$this->executeOnNode($node, 'rabbitmqctl start_app');
sleep(10);
}
}
private function restartNode(string $node): void
{
$this->executeOnNode($node, 'systemctl restart rabbitmq-server');
sleep(15);
$pingResult = $this->executeOnNode($node, 'rabbitmqctl ping');
if (strpos($pingResult, 'pong') !== false) {
echo "节点 {$node} 重启成功\n";
} else {
echo "节点 {$node} 重启失败\n";
}
}
private function executeOnNode(string $node, string $command): string
{
$sshCommand = "ssh -o StrictHostKeyChecking=no {$node} '{$command}' 2>&1";
exec($sshCommand, $output, $return);
return implode("\n", $output);
}
}
// 使用示例
$recovery = new ClusterRecoveryTool([
'rabbit@node1',
'rabbit@node2',
'rabbit@node3',
]);
echo "=== 集群诊断 ===\n";
$diagnosis = $recovery->diagnose();
echo json_encode($diagnosis, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE) . "\n";
if ($diagnosis['cluster_status'] !== 'healthy') {
echo "\n=== 开始恢复 ===\n";
$result = $recovery->recover(['strategy' => 'auto']);
echo json_encode($result, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE) . "\n";
}恢复后验证
1. 集群状态验证
bash
#!/bin/bash
# verify_cluster.sh
echo "=== 集群恢复验证 ==="
echo "[1] 验证集群状态"
rabbitmqctl cluster_status
echo "[2] 验证所有节点运行"
for node in $(rabbitmqctl eval 'rabbit_nodes:names().' | grep -oP 'rabbit@[\w]+'); do
result=$(rabbitmqctl -n $node ping 2>&1)
if [ $? -eq 0 ]; then
echo "节点 $node: OK"
else
echo "节点 $node: FAILED"
fi
done
echo "[3] 验证队列访问"
rabbitmqctl list_queues name
echo "[4] 验证交换机"
rabbitmqctl list_exchanges
echo "[5] 验证绑定"
rabbitmqctl list_bindings
echo "[6] 验证用户权限"
rabbitmqctl list_users
echo "[7] 验证镜像队列同步"
rabbitmqctl list_queues name policy synchronised_slave_nodes
echo "[8] 测试消息发送"
# 发送测试消息
rabbitmqadmin declare queue name=test.queue durable=false
rabbitmqadmin publish routing_key=test.queue payload="test message"
echo "[9] 测试消息消费"
rabbitmqadmin get queue=test.queue ackmode=ack_requeue_false
echo "[10] 清理测试队列"
rabbitmqadmin delete queue name=test.queue
echo "验证完成"2. 性能验证
bash
#!/bin/bash
# performance_verify.sh
echo "=== 集群性能验证 ==="
echo "[1] 测试连接性能"
time for i in {1..100}; do
rabbitmqctl status > /dev/null 2>&1
done
echo "[2] 测试消息发布"
rabbitmqadmin declare queue name=perf.test durable=false
for i in {1..1000}; do
rabbitmqadmin publish routing_key=perf.test payload="test-$i" &
done
wait
echo "[3] 测试消息消费"
rabbitmqadmin get queue=perf.test ackmode=ack_requeue_false count=1000 > /dev/null
echo "[4] 清理测试队列"
rabbitmqadmin delete queue name=perf.test
echo "性能验证完成"预防措施
1. 定期备份
bash
#!/bin/bash
# backup_rabbitmq.sh
BACKUP_DIR="/var/backups/rabbitmq"
DATE=$(date +%Y%m%d%H%M%S)
mkdir -p $BACKUP_DIR
echo "[1] 备份定义"
rabbitmqctl export_definitions $BACKUP_DIR/definitions_$DATE.json
echo "[2] 备份集群状态"
rabbitmqctl cluster_status > $BACKUP_DIR/cluster_status_$DATE.txt
echo "[3] 备份策略"
rabbitmqctl list_policies > $BACKUP_DIR/policies_$DATE.txt
echo "[4] 清理旧备份(保留30天)"
find $BACKUP_DIR -name "*.json" -mtime +30 -delete
find $BACKUP_DIR -name "*.txt" -mtime +30 -delete
echo "备份完成: $BACKUP_DIR"2. 监控告警配置
yaml
groups:
- name: rabbitmq_cluster_recovery
rules:
- alert: ClusterUnhealthy
expr: rabbitmq_cluster_nodes < 3
for: 5m
labels:
severity: critical
annotations:
summary: "集群节点数不足"
description: "集群节点数少于预期"
- alert: ClusterNodeDown
expr: rate(rabbitmq_cluster_node_down[5m]) > 0
for: 1m
labels:
severity: critical
annotations:
summary: "集群节点离线"
description: "检测到集群节点离线"3. 恢复演练
bash
#!/bin/bash
# recovery_drill.sh
echo "=== 定期恢复演练 ==="
BACKUP_DIR="/var/backups/rabbitmq"
LATEST_BACKUP=$(ls -t $BACKUP_DIR/definitions_*.json | head -1)
if [ -z "$LATEST_BACKUP" ]; then
echo "未找到备份文件"
exit 1
fi
echo "使用备份: $LATEST_BACKUP"
echo "[1] 在测试环境恢复"
# 在测试环境执行恢复流程
echo "[2] 验证恢复结果"
# 验证数据完整性
echo "[3] 性能测试"
# 执行性能测试
echo "[4] 清理测试环境"
# 清理测试环境
echo "演练完成"注意事项
- 恢复前要备份:防止操作失误
- 选择合适策略:根据故障情况选择恢复方式
- 验证要全面:确保服务完全恢复
- 记录恢复过程:便于问题分析和改进
- 定期演练:确保恢复流程可行
