Skip to content

集群恢复流程

概述

RabbitMQ 集群恢复是在发生故障后重建集群的过程,可能涉及单节点故障恢复、网络分区恢复、整体重建等情况。本文档将详细介绍集群恢复的流程、方法和最佳实践。

恢复场景分类

┌─────────────────────────────────────────────────────────────┐
│                    集群恢复场景                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐        │
│  │ 单节点恢复  │  │ 多节点恢复  │  │ 全集群重建  │        │
│  │            │  │            │  │            │        │
│  │ 故障影响小  │  │ 故障影响中  │  │ 故障影响大  │        │
│  │ 恢复速度快  │  │ 恢复需协调  │  │ 需完全重建  │        │
│  └─────────────┘  └─────────────┘  └─────────────┘        │
│                                                             │
└─────────────────────────────────────────────────────────────┘

问题表现与症状

常见症状

┌─────────────────────────────────────────────────────────────┐
│                   集群故障典型症状                           │
├─────────────────────────────────────────────────────────────┤
│  1. 集群无法正常启动                                         │
│  2. 节点无法加入集群                                         │
│  3. 队列无法访问                                             │
│  4. 数据丢失或损坏                                           │
│  5. 客户端无法连接                                           │
│  6. 镜像同步失败                                             │
└─────────────────────────────────────────────────────────────┘

恢复流程

1. 单节点故障恢复

bash
#!/bin/bash
# single_node_recovery.sh

FAILED_NODE=$1

if [ -z "$FAILED_NODE" ]; then
    echo "用法: $0 <failed_node>"
    exit 1
fi

echo "=== 单节点故障恢复流程 ==="
echo "故障节点: $FAILED_NODE"

echo "[1] 检查当前集群状态"
rabbitmqctl cluster_status

echo "[2] 检查故障节点状态"
ssh $FAILED_NODE "systemctl status rabbitmq-server"

echo "[3] 尝试重启故障节点"
ssh $FAILED_NODE "systemctl restart rabbitmq-server"

echo "[4] 等待节点启动"
sleep 30

echo "[5] 验证节点状态"
rabbitmqctl -n $FAILED_NODE ping

echo "[6] 检查节点是否加入集群"
rabbitmqctl cluster_status | grep $FAILED_NODE

echo "[7] 检查镜像队列同步"
rabbitmqctl list_queues name policy synchronised_slave_nodes

echo "单节点恢复完成"

2. 多节点故障恢复

bash
#!/bin/bash
# multi_node_recovery.sh

echo "=== 多节点故障恢复流程 ==="

echo "[1] 检查当前集群状态"
rabbitmqctl cluster_status

echo "[2] 识别故障节点"
CLUSTER_STATUS=$(rabbitmqctl cluster_status 2>&1)

# 分析故障节点
FAILED_NODES=$(echo "$CLUSTER_STATUS" | grep -oP 'rabbit@[\w]+(?=.*\(non-running\))')

if [ -z "$FAILED_NODES" ]; then
    echo "未检测到故障节点"
    exit 0
fi

echo "故障节点: $FAILED_NODES"

echo "[3] 确定主节点"
# 假设第一个运行中的节点为主节点
MASTER_NODE=$(echo "$CLUSTER_STATUS" | grep -oP 'rabbit@[\w]+(?=.*\(running\))' | head -1)
echo "主节点: $MASTER_NODE"

echo "[4] 在每个故障节点上执行恢复"
for node in $FAILED_NODES; do
    echo "--- 恢复节点: $node ---"
    
    echo "[4.$node] 停止应用"
    ssh $node "rabbitmqctl stop_app 2>/dev/null || true"
    
    echo "[4.$node] 重置节点"
    ssh $node "rabbitmqctl reset 2>/dev/null || true"
    
    echo "[4.$node] 重新加入集群"
    ssh $node "rabbitmqctl join_cluster $MASTER_NODE"
    
    echo "[4.$node] 启动应用"
    ssh $node "rabbitmqctl start_app"
    
    sleep 15
done

echo "[5] 等待集群稳定"
sleep 30

echo "[6] 验证集群状态"
rabbitmqctl cluster_status

echo "[7] 检查队列同步"
rabbitmqctl list_queues name policy synchronised_slave_nodes

echo "多节点恢复完成"

3. 全集群重建

bash
#!/bin/bash
# full_cluster_rebuild.sh

echo "=== 全集群重建流程 ==="
echo "警告: 此操作将清除所有数据!"

read -p "确认继续? (yes/no): " confirm
if [ "$confirm" != "yes" ]; then
    echo "操作已取消"
    exit 1
fi

NODES=($1)
MASTER=${NODES[0]}

if [ -z "$NODES" ]; then
    echo "用法: $0 <node1> <node2> <node3> ..."
    exit 1
fi

BACKUP_FILE="/tmp/rabbitmq_backup_$(date +%Y%m%d%H%M%S).json"

echo "[1] 备份当前配置"
rabbitmqctl export_definitions $BACKUP_FILE
echo "配置已备份到: $BACKUP_FILE"

echo "[2] 停止所有节点"
for node in "${NODES[@]}"; do
    echo "停止节点: $node"
    ssh $node "systemctl stop rabbitmq-server" 2>/dev/null || true
done

echo "[3] 等待节点停止"
sleep 10

echo "[4] 清理所有节点数据"
for node in "${NODES[@]}"; do
    echo "清理节点: $node"
    ssh $node "rm -rf /var/lib/rabbitmq/mnesia/*"
    ssh $node "rm -rf /var/lib/rabbitmq/logs/*"
done

echo "[5] 在主节点初始化"
ssh $MASTER "rabbitmq-server -detached"
sleep 15
ssh $MASTER "rabbitmqctl stop_app"
ssh $MASTER "rabbitmqctl reset"
ssh $MASTER "rabbitmqctl start_app"

echo "[6] 其他节点加入集群"
for i in "${!NODES[@]}"; do
    if [ $i -eq 0 ]; then
        continue
    fi
    node=${NODES[$i]}
    echo "节点加入: $node"
    ssh $node "rabbitmq-server -detached"
    sleep 10
    ssh $node "rabbitmqctl stop_app"
    ssh $node "rabbitmqctl reset"
    ssh $node "rabbitmqctl join_cluster $MASTER"
    ssh $node "rabbitmqctl start_app"
    sleep 10
done

echo "[7] 等待集群稳定"
sleep 30

echo "[8] 恢复配置"
rabbitmqctl import_definitions $BACKUP_FILE

echo "[9] 验证集群状态"
rabbitmqctl cluster_status

echo "[10] 恢复镜像队列策略"
rabbitmqctl list_policies

echo "全集群重建完成"
echo "备份文件: $BACKUP_FILE"

PHP 集群恢复工具

php
<?php

class ClusterRecoveryTool
{
    private $nodes = [];
    private $masterNode;

    public function __construct(array $nodes)
    {
        $this->nodes = $nodes;
        $this->masterNode = $nodes[0] ?? null;
    }

    public function diagnose(): array
    {
        $result = [
            'timestamp' => date('Y-m-d H:i:s'),
            'cluster_status' => 'unknown',
            'nodes' => [],
            'issues' => [],
            'recommendation' => '',
        ];

        $clusterStatus = $this->getClusterStatus();
        
        $result['cluster_status'] = $clusterStatus['status'];
        $result['nodes'] = $clusterStatus['nodes'];
        
        if ($clusterStatus['status'] === 'healthy') {
            $result['recommendation'] = '集群运行正常,无需恢复';
        } elseif ($clusterStatus['status'] === 'degraded') {
            $result['issues'] = $clusterStatus['issues'];
            $result['recommendation'] = $this->generateRecoveryPlan($clusterStatus);
        } else {
            $result['issues'][] = '集群完全不可用';
            $result['recommendation'] = '建议执行全集群重建';
        }
        
        return $result;
    }

    private function getClusterStatus(): array
    {
        $result = [
            'status' => 'unknown',
            'running_nodes' => [],
            'stopped_nodes' => [],
            'issues' => [],
            'nodes' => [],
        ];

        exec('rabbitmqctl cluster_status 2>&1', $output);
        $outputText = implode("\n", $output);

        if (preg_match_all('/rabbit@[\w]+.*?running/', $outputText, $runningMatches)) {
            foreach ($runningMatches[0] as $match) {
                preg_match('/rabbit@[\w]+/', $match, $nodeMatch);
                if ($nodeMatch) {
                    $result['running_nodes'][] = $nodeMatch[0];
                }
            }
        }

        if (preg_match_all('/rabbit@[\w]+.*?not.?running/', $outputText, $stoppedMatches)) {
            foreach ($stoppedMatches[0] as $match) {
                preg_match('/rabbit@[\w]+/', $match, $nodeMatch);
                if ($nodeMatch) {
                    $result['stopped_nodes'][] = $nodeMatch[0];
                }
            }
        }

        foreach ($this->nodes as $node) {
            $status = [
                'name' => $node,
                'running' => in_array($node, $result['running_nodes']),
            ];
            $result['nodes'][] = $status;
        }

        if (empty($result['running_nodes']) && !empty($this->nodes)) {
            $result['status'] = 'failed';
            $result['issues'][] = '所有节点都已停止';
        } elseif (count($result['running_nodes']) < count($this->nodes)) {
            $result['status'] = 'degraded';
            $result['issues'][] = sprintf(
                '%d/%d 节点运行',
                count($result['running_nodes']),
                count($this->nodes)
            );
        } else {
            $result['status'] = 'healthy';
        }

        return $result;
    }

    private function generateRecoveryPlan(array $clusterStatus): string
    {
        $runningCount = count($clusterStatus['running_nodes']);
        $totalCount = count($this->nodes);
        
        if ($runningCount === 0) {
            return '执行全集群重建';
        } elseif ($runningCount < $totalCount / 2) {
            return '建议重新初始化集群,选择一个健康节点作为主节点';
        } else {
            return '尝试重新启动故障节点';
        }
    }

    public function recover(array $options = []): array
    {
        $result = [
            'timestamp' => date('Y-m-d H:i:s'),
            'actions' => [],
            'success' => false,
        ];

        $diagnosis = $this->diagnose();

        if ($diagnosis['cluster_status'] === 'healthy') {
            $result['actions'][] = '集群已健康,无需恢复';
            $result['success'] = true;
            return $result;
        }

        $strategy = $options['strategy'] ?? 'auto';

        switch ($strategy) {
            case 'rebuild':
                $result['actions'][] = $this->fullRebuild();
                break;
            case 'repair':
                $result['actions'][] = $this->repairNodes();
                break;
            case 'auto':
            default:
                if ($diagnosis['cluster_status'] === 'failed') {
                    $result['actions'][] = $this->fullRebuild();
                } else {
                    $result['actions'][] = $this->repairNodes();
                }
                break;
        }

        $finalDiagnosis = $this->diagnose();
        $result['success'] = $finalDiagnosis['cluster_status'] === 'healthy';
        $result['final_status'] = $finalDiagnosis['cluster_status'];

        return $result;
    }

    private function fullRebuild(): string
    {
        $this->stopAllNodes();
        $this->clearAllData();
        $this->initializeMasterNode();
        $this->joinOtherNodes();
        
        return '全集群重建完成';
    }

    private function repairNodes(): string
    {
        $diagnosis = $this->diagnose();
        
        foreach ($diagnosis['nodes'] as $node) {
            if (!$node['running']) {
                $this->restartNode($node['name']);
            }
        }
        
        return '节点修复完成';
    }

    private function stopAllNodes(): void
    {
        foreach ($this->nodes as $node) {
            $this->executeOnNode($node, 'systemctl stop rabbitmq-server');
        }
        sleep(10);
    }

    private function clearAllData(): void
    {
        foreach ($this->nodes as $node) {
            $this->executeOnNode($node, 'rm -rf /var/lib/rabbitmq/mnesia/*');
        }
    }

    private function initializeMasterNode(): void
    {
        $master = $this->masterNode;
        
        $this->executeOnNode($master, 'rabbitmq-server -detached');
        sleep(15);
        $this->executeOnNode($master, 'rabbitmqctl stop_app');
        $this->executeOnNode($master, 'rabbitmqctl reset');
        $this->executeOnNode($master, 'rabbitmqctl start_app');
        sleep(15);
    }

    private function joinOtherNodes(): void
    {
        $master = $this->masterNode;
        
        foreach ($this->nodes as $node) {
            if ($node === $master) {
                continue;
            }
            
            $this->executeOnNode($node, 'rabbitmq-server -detached');
            sleep(10);
            $this->executeOnNode($node, 'rabbitmqctl stop_app');
            $this->executeOnNode($node, 'rabbitmqctl reset');
            $this->executeOnNode($node, "rabbitmqctl join_cluster $master");
            $this->executeOnNode($node, 'rabbitmqctl start_app');
            sleep(10);
        }
    }

    private function restartNode(string $node): void
    {
        $this->executeOnNode($node, 'systemctl restart rabbitmq-server');
        sleep(15);
        
        $pingResult = $this->executeOnNode($node, 'rabbitmqctl ping');
        
        if (strpos($pingResult, 'pong') !== false) {
            echo "节点 {$node} 重启成功\n";
        } else {
            echo "节点 {$node} 重启失败\n";
        }
    }

    private function executeOnNode(string $node, string $command): string
    {
        $sshCommand = "ssh -o StrictHostKeyChecking=no {$node} '{$command}' 2>&1";
        
        exec($sshCommand, $output, $return);
        
        return implode("\n", $output);
    }
}

// 使用示例
$recovery = new ClusterRecoveryTool([
    'rabbit@node1',
    'rabbit@node2', 
    'rabbit@node3',
]);

echo "=== 集群诊断 ===\n";
$diagnosis = $recovery->diagnose();
echo json_encode($diagnosis, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE) . "\n";

if ($diagnosis['cluster_status'] !== 'healthy') {
    echo "\n=== 开始恢复 ===\n";
    $result = $recovery->recover(['strategy' => 'auto']);
    echo json_encode($result, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE) . "\n";
}

恢复后验证

1. 集群状态验证

bash
#!/bin/bash
# verify_cluster.sh

echo "=== 集群恢复验证 ==="

echo "[1] 验证集群状态"
rabbitmqctl cluster_status

echo "[2] 验证所有节点运行"
for node in $(rabbitmqctl eval 'rabbit_nodes:names().' | grep -oP 'rabbit@[\w]+'); do
    result=$(rabbitmqctl -n $node ping 2>&1)
    if [ $? -eq 0 ]; then
        echo "节点 $node: OK"
    else
        echo "节点 $node: FAILED"
    fi
done

echo "[3] 验证队列访问"
rabbitmqctl list_queues name

echo "[4] 验证交换机"
rabbitmqctl list_exchanges

echo "[5] 验证绑定"
rabbitmqctl list_bindings

echo "[6] 验证用户权限"
rabbitmqctl list_users

echo "[7] 验证镜像队列同步"
rabbitmqctl list_queues name policy synchronised_slave_nodes

echo "[8] 测试消息发送"
# 发送测试消息
rabbitmqadmin declare queue name=test.queue durable=false
rabbitmqadmin publish routing_key=test.queue payload="test message"

echo "[9] 测试消息消费"
rabbitmqadmin get queue=test.queue ackmode=ack_requeue_false

echo "[10] 清理测试队列"
rabbitmqadmin delete queue name=test.queue

echo "验证完成"

2. 性能验证

bash
#!/bin/bash
# performance_verify.sh

echo "=== 集群性能验证 ==="

echo "[1] 测试连接性能"
time for i in {1..100}; do
    rabbitmqctl status > /dev/null 2>&1
done

echo "[2] 测试消息发布"
rabbitmqadmin declare queue name=perf.test durable=false

for i in {1..1000}; do
    rabbitmqadmin publish routing_key=perf.test payload="test-$i" &
done

wait

echo "[3] 测试消息消费"
rabbitmqadmin get queue=perf.test ackmode=ack_requeue_false count=1000 > /dev/null

echo "[4] 清理测试队列"
rabbitmqadmin delete queue name=perf.test

echo "性能验证完成"

预防措施

1. 定期备份

bash
#!/bin/bash
# backup_rabbitmq.sh

BACKUP_DIR="/var/backups/rabbitmq"
DATE=$(date +%Y%m%d%H%M%S)

mkdir -p $BACKUP_DIR

echo "[1] 备份定义"
rabbitmqctl export_definitions $BACKUP_DIR/definitions_$DATE.json

echo "[2] 备份集群状态"
rabbitmqctl cluster_status > $BACKUP_DIR/cluster_status_$DATE.txt

echo "[3] 备份策略"
rabbitmqctl list_policies > $BACKUP_DIR/policies_$DATE.txt

echo "[4] 清理旧备份(保留30天)"
find $BACKUP_DIR -name "*.json" -mtime +30 -delete
find $BACKUP_DIR -name "*.txt" -mtime +30 -delete

echo "备份完成: $BACKUP_DIR"

2. 监控告警配置

yaml
groups:
  - name: rabbitmq_cluster_recovery
    rules:
      - alert: ClusterUnhealthy
        expr: rabbitmq_cluster_nodes < 3
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "集群节点数不足"
          description: "集群节点数少于预期"

      - alert: ClusterNodeDown
        expr: rate(rabbitmq_cluster_node_down[5m]) > 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "集群节点离线"
          description: "检测到集群节点离线"

3. 恢复演练

bash
#!/bin/bash
# recovery_drill.sh

echo "=== 定期恢复演练 ==="

BACKUP_DIR="/var/backups/rabbitmq"
LATEST_BACKUP=$(ls -t $BACKUP_DIR/definitions_*.json | head -1)

if [ -z "$LATEST_BACKUP" ]; then
    echo "未找到备份文件"
    exit 1
fi

echo "使用备份: $LATEST_BACKUP"

echo "[1] 在测试环境恢复"
# 在测试环境执行恢复流程

echo "[2] 验证恢复结果"
# 验证数据完整性

echo "[3] 性能测试"
# 执行性能测试

echo "[4] 清理测试环境"
# 清理测试环境

echo "演练完成"

注意事项

  1. 恢复前要备份:防止操作失误
  2. 选择合适策略:根据故障情况选择恢复方式
  3. 验证要全面:确保服务完全恢复
  4. 记录恢复过程:便于问题分析和改进
  5. 定期演练:确保恢复流程可行

相关链接