Skip to content

节点故障处理

概述

RabbitMQ 集群中节点故障是常见运维场景,可能由硬件故障、网络问题、资源耗尽等原因引起。本文档将详细介绍节点故障的诊断方法、处理流程和恢复策略。

问题表现与症状

常见症状

┌─────────────────────────────────────────────────────────────┐
│                   节点故障典型症状                           │
├─────────────────────────────────────────────────────────────┤
│  1. 集群状态显示节点 down                                  │
│  2. 镜像队列同步停止                                       │
│  3. 部分客户端连接失败                                     │
│  4. 队列无法访问                                           │
│  5. 客户端报错 "node not running"                         │
│  6. 资源告警触发                                           │
└─────────────────────────────────────────────────────────────┘

故障类型分类

                    ┌─────────────────┐
                    │   节点故障类型   │
                    └────────┬────────┘

    ┌────────────┬───────────┼───────────┬────────────┐
    ▼            ▼           ▼           ▼            ▼
┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
│临时故障│ │硬件故障│ │网络故障│ │资源耗尽│ │人为错误│
│可恢复  │ │需更换  │ │可恢复  │ │可恢复  │ │可恢复  │
└────────┘ └────────┘ └────────┘ └────────┘ └────────┘

问题原因分析

1. 临时故障

原因说明恢复方式
服务重启正常维护或崩溃重启自动恢复
短暂网络抖动网络瞬时中断自动恢复
资源瞬时耗尽内存/磁盘短暂告警自动恢复
OOM Killer内存不足被系统杀死重启服务

2. 硬件故障

原因说明恢复方式
磁盘损坏磁盘故障导致数据丢失更换磁盘,恢复数据
服务器宕机物理服务器故障更换服务器
网络设备故障交换机/路由器故障更换设备

3. 资源耗尽

原因说明恢复方式
内存耗尽消息堆积导致OOM清理消息,扩容
磁盘空间耗尽持久化消息过多清理数据,扩容
文件描述符耗尽连接数过多重启服务,配置优化

4. 软件故障

原因说明恢复方式
Erlang VM崩溃内部错误导致崩溃重启服务
插件故障插件导致服务异常禁用插件
配置错误配置文件错误修复配置

诊断步骤

步骤1:确认节点状态

bash
# 查看集群状态
rabbitmqctl cluster_status

# 查看特定节点状态
rabbitmqctl status -n rabbit@node1

# 查看节点详情
rabbitmqctl eval 'rabbit_nodes:lookup(rabbit@node1).'

# 查看节点健康检查
rabbitmq-diagnostics check_running

步骤2:分析故障原因

bash
# 查看节点日志
tail -100 /var/log/rabbitmq/rabbit@node1.log

# 查看系统日志
journalctl -u rabbitmq-server -n 100

# 查看OOM日志
dmesg | grep -i "rabbitmq\|oom"

# 查看资源使用
rabbitmqctl status -n rabbit@node1 | grep -A 10 memory

步骤3:检查资源状态

bash
# 检查磁盘空间
df -h /var/lib/rabbitmq

# 检查内存使用
free -h

# 检查CPU使用
top -p $(pgrep -d',' -f beam.smp)

# 检查文件描述符
cat /proc/$(pgrep -f beam.smp)/limits | grep "Max open files"

步骤4:评估影响范围

bash
# 查看镜像队列状态
rabbitmqctl list_policies

# 查看队列同步状态
rabbitmqctl eval 'rabbit_mirror_queue_misc:get_synced().'

# 查看连接分布
rabbitmqctl list_connections node

解决方案

1. 单节点故障恢复

php
<?php

class NodeFailureHandler
{
    private $nodes = [];
    private $connection;

    public function __construct()
    {
        $this->connection = new \PhpAmqpLib\Connection\AMQPStreamConnection(
            'localhost', 5672, 'guest', 'guest'
        );
    }

    public function handleNodeDown(string $failedNode): array
    {
        $result = [
            'timestamp' => date('Y-m-d H:i:s'),
            'failed_node' => $failedNode,
            'actions' => [],
            'status' => 'unknown',
        ];

        $clusterStatus = $this->getClusterStatus();
        
        if (!$this->isNodeInCluster($failedNode, $clusterStatus)) {
            $result['actions'][] = [
                'action' => 'detect_failure',
                'message' => "节点 {$failedNode} 已脱离集群",
            ];
            
            $result['actions'][] = [
                'action' => 'check_queues',
                'message' => $this->checkQueuesOnNode($failedNode),
            ];
            
            $result['actions'][] = [
                'action' => 'rebalance',
                'message' => $this->rebalanceConnections($failedNode),
            ];
            
            $result['status'] = 'completed';
        }
        
        return $result;
    }

    private function getClusterStatus(): array
    {
        exec('rabbitmqctl cluster_status', $output, $return);
        
        $status = [
            'running' => [],
            'stopped' => [],
            'partitions' => [],
        ];
        
        foreach ($output as $line) {
            if (preg_match('/Running Nodes/', $line)) {
                preg_match_all('/rabbit@[\w]+/', $line, $matches);
                $status['running'] = $matches[0];
            }
            if (preg_match('/Stopped Nodes/', $line)) {
                preg_match_all('/rabbit@[\w]+/', $line, $matches);
                $status['stopped'] = $matches[0];
            }
        }
        
        return $status;
    }

    private function isNodeInCluster(string $node, array $status): bool
    {
        return in_array($node, $status['running']) || 
               in_array($node, $status['stopped']);
    }

    private function checkQueuesOnNode(string $node): string
    {
        exec("rabbitmqctl list_queues name node", $output);
        
        $affectedQueues = [];
        foreach ($output as $line) {
            if (strpos($line, $node) !== false) {
                $affectedQueues[] = $line;
            }
        }
        
        return count($affectedQueues) . " 个队列受到影响";
    }

    private function rebalanceConnections(string $failedNode): string
    {
        return "建议重新连接客户端到其他节点";
    }

    public function close(): void
    {
        $this->connection->close();
    }
}

2. 节点重启流程

bash
#!/bin/bash
# node_restart.sh

NODE_NAME=$1

if [ -z "$NODE_NAME" ]; then
    echo "用法: $0 <node_name>"
    exit 1
fi

echo "=== 节点 {$NODE_NAME} 重启流程 ==="

echo "[1] 检查当前集群状态"
rabbitmqctl cluster_status

echo "[2] 停止节点"
ssh $NODE_NAME "systemctl stop rabbitmq-server"

echo "[3] 等待节点停止"
sleep 10

echo "[4] 启动节点"
ssh $NODE_NAME "systemctl start rabbitmq-server"

echo "[5] 等待节点加入集群"
sleep 30

echo "[6] 验证节点状态"
rabbitmqctl cluster_status

echo "[7] 检查队列同步状态"
rabbitmqctl eval 'rabbit_mirror_queue_misc:get_synced().'

echo "节点重启完成"

3. 节点替换流程

bash
#!/bin/bash
# node_replacement.sh

OLD_NODE=$1
NEW_NODE=$2

if [ -z "$OLD_NODE" ] || [ -z "$NEW_NODE" ]; then
    echo "用法: $0 <old_node> <new_node>"
    exit 1
fi

echo "=== 节点替换流程 ==="

echo "[1] 确认旧节点状态"
rabbitmqctl forget_cluster_node $OLD_NODE --offline

echo "[2] 在新节点上初始化"
ssh $NEW_NODE "rabbitmqctl stop_app"
ssh $NEW_NODE "rabbitmqctl reset"
ssh $NEW_NODE "rabbitmqctl join_cluster rabbit@$MASTER_NODE"
ssh $NEW_NODE "rabbitmqctl start_app"

echo "[3] 验证集群状态"
rabbitmqctl cluster_status

echo "[4] 同步镜像队列"
# 等待镜像队列同步完成

echo "[5] 恢复策略"
rabbitmqctl list_policies

echo "节点替换完成"

4. 故障节点数据恢复

php
<?php

class NodeDataRecovery
{
    private $backupDir = '/var/backups/rabbitmq';

    public function recoverFromBackup(string $nodeName, string $backupFile): bool
    {
        echo "开始从备份恢复节点: {$nodeName}\n";
        
        echo "[1] 停止节点服务\n";
        $this->executeCommand("ssh {$nodeName} 'systemctl stop rabbitmq-server'");
        
        echo "[2] 备份当前数据\n";
        $timestamp = date('YmdHis');
        $this->executeCommand("ssh {$nodeName} 'cp -r /var/lib/rabbitmq/mnesia /var/lib/rabbitmq/mnesia.backup.{$timestamp}'");
        
        echo "[3] 清理旧数据\n";
        $this->executeCommand("ssh {$nodeName} 'rm -rf /var/lib/rabbitmq/mnesia/*'");
        
        echo "[4] 解压备份\n";
        $this->executeCommand("tar -xzf {$backupFile} -C /var/lib/rabbitmq/");
        
        echo "[5] 启动节点\n";
        $this->executeCommand("ssh {$nodeName} 'systemctl start rabbitmq-server'");
        
        echo "[6] 验证恢复\n";
        $status = $this->executeCommand("rabbitmqctl -n {$nodeName} status");
        
        return strpos($status, 'running') !== false;
    }

    public function createBackup(string $nodeName): ?string
    {
        if (!is_dir($this->backupDir)) {
            mkdir($this->backupDir, 0755, true);
        }
        
        $backupFile = "{$this->backupDir}/rabbitmq_backup_{$nodeName}_" . date('YmdHis') . ".tar.gz";
        
        $command = "ssh {$nodeName} 'tar -czf - -C /var/lib/rabbitmq mnesia' > {$backupFile}";
        
        exec($command, $output, $return);
        
        if ($return === 0 && file_exists($backupFile)) {
            echo "备份创建成功: {$backupFile}\n";
            return $backupFile;
        }
        
        echo "备份创建失败\n";
        return null;
    }

    private function executeCommand(string $command): string
    {
        exec($command, $output, $return);
        
        if ($return !== 0) {
            echo "命令执行失败: {$command}\n";
            echo implode("\n", $output) . "\n";
        }
        
        return implode("\n", $output);
    }
}

预防措施

1. 高可用架构

┌─────────────────────────────────────────────────────────────┐
│                    高可用集群架构                            │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────┐     ┌─────────┐     ┌─────────┐              │
│  │ Node 1  │────▶│ Node 2  │────▶│ Node 3  │              │
│  │ (Master)│◀────│(Mirror) │◀────│(Mirror) │              │
│  └─────────┘     └─────────┘     └─────────┘              │
│       │               │               │                    │
│       └───────────────┴───────────────┘                    │
│                    │                                       │
│                    ▼                                       │
│              ┌─────────┐                                    │
│              │  Client │                                    │
│              │ (HAProxy)│                                   │
│              └─────────┘                                    │
│                                                             │
└─────────────────────────────────────────────────────────────┘

2. 监控告警配置

yaml
# Prometheus 告警规则
groups:
  - name: rabbitmq_node
    rules:
      - alert: NodeDown
        expr: rabbitmq_resident_memory{job="rabbitmq"} == 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "RabbitMQ 节点宕机"
          description: "节点 {{ $labels.node }} 已经停止运行"

      - alert: NodeMemoryHigh
        expr: |
          rabbitmq_resident_memory{job="rabbitmq"} 
          / 
          rabbitmq_resident_memory_limit{job="rabbitmq"} 
          > 0.9
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "节点内存使用率过高"
          description: "节点 {{ $labels.node }} 内存使用率超过 90%"

      - alert: NodeDiskSpaceLow
        expr: rabbitmq_disk_space_available_bytes / rabbitmq_disk_space_available_limit < 2
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "节点磁盘空间不足"
          description: "节点 {{ $labels.node }} 磁盘空间不足"

3. 自动故障转移配置

bash
# 使用 HAProxy 实现自动故障转移
# haproxy.conf

listen rabbitmq_cluster
    bind 0.0.0.0:5672
    mode tcp
    balance roundrobin
    option tcplog
    option clitcpka
    
    server node1 rabbit@node1:5672 check inter 2000 rise 2 fall 3
    server node2 rabbit@node2:5672 check inter 2000 rise 2 fall 3 backup
    server node3 rabbit@node3:5672 check inter 2000 rise 2 fall 3 backup

4. 定期维护脚本

bash
#!/bin/bash
# node_maintenance.sh

echo "=== RabbitMQ 节点维护检查 ==="

echo "[1] 集群状态检查"
rabbitmqctl cluster_status

echo "[2] 节点健康检查"
for node in $(rabbitmqctl eval 'rabbit_nodes:names().' | grep -oP 'rabbit@[\w]+'); do
    echo "检查节点: $node"
    rabbitmqctl -n $node ping
done

echo "[3] 资源检查"
rabbitmqctl status | grep -A 5 "Memory\|Disk"

echo "[4] 队列状态检查"
rabbitmqctl list_queues name messages consumers | awk '$3 == 0 && $2 > 0 {print}'

echo "[5] 连接状态检查"
rabbitmqctl list_connections | wc -l

echo "维护检查完成"

注意事项

  1. 节点故障要及时发现:配置完善的监控告警
  2. 数据备份要定期执行:防止数据丢失
  3. 恢复流程要提前演练:确保恢复可行性
  4. 客户端要支持重连:自动切换到健康节点
  5. 日志要保留完整:便于故障分析

相关链接