Appearance
节点故障处理
概述
RabbitMQ 集群中节点故障是常见运维场景,可能由硬件故障、网络问题、资源耗尽等原因引起。本文档将详细介绍节点故障的诊断方法、处理流程和恢复策略。
问题表现与症状
常见症状
┌─────────────────────────────────────────────────────────────┐
│ 节点故障典型症状 │
├─────────────────────────────────────────────────────────────┤
│ 1. 集群状态显示节点 down │
│ 2. 镜像队列同步停止 │
│ 3. 部分客户端连接失败 │
│ 4. 队列无法访问 │
│ 5. 客户端报错 "node not running" │
│ 6. 资源告警触发 │
└─────────────────────────────────────────────────────────────┘故障类型分类
┌─────────────────┐
│ 节点故障类型 │
└────────┬────────┘
│
┌────────────┬───────────┼───────────┬────────────┐
▼ ▼ ▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
│临时故障│ │硬件故障│ │网络故障│ │资源耗尽│ │人为错误│
│可恢复 │ │需更换 │ │可恢复 │ │可恢复 │ │可恢复 │
└────────┘ └────────┘ └────────┘ └────────┘ └────────┘问题原因分析
1. 临时故障
| 原因 | 说明 | 恢复方式 |
|---|---|---|
| 服务重启 | 正常维护或崩溃重启 | 自动恢复 |
| 短暂网络抖动 | 网络瞬时中断 | 自动恢复 |
| 资源瞬时耗尽 | 内存/磁盘短暂告警 | 自动恢复 |
| OOM Killer | 内存不足被系统杀死 | 重启服务 |
2. 硬件故障
| 原因 | 说明 | 恢复方式 |
|---|---|---|
| 磁盘损坏 | 磁盘故障导致数据丢失 | 更换磁盘,恢复数据 |
| 服务器宕机 | 物理服务器故障 | 更换服务器 |
| 网络设备故障 | 交换机/路由器故障 | 更换设备 |
3. 资源耗尽
| 原因 | 说明 | 恢复方式 |
|---|---|---|
| 内存耗尽 | 消息堆积导致OOM | 清理消息,扩容 |
| 磁盘空间耗尽 | 持久化消息过多 | 清理数据,扩容 |
| 文件描述符耗尽 | 连接数过多 | 重启服务,配置优化 |
4. 软件故障
| 原因 | 说明 | 恢复方式 |
|---|---|---|
| Erlang VM崩溃 | 内部错误导致崩溃 | 重启服务 |
| 插件故障 | 插件导致服务异常 | 禁用插件 |
| 配置错误 | 配置文件错误 | 修复配置 |
诊断步骤
步骤1:确认节点状态
bash
# 查看集群状态
rabbitmqctl cluster_status
# 查看特定节点状态
rabbitmqctl status -n rabbit@node1
# 查看节点详情
rabbitmqctl eval 'rabbit_nodes:lookup(rabbit@node1).'
# 查看节点健康检查
rabbitmq-diagnostics check_running步骤2:分析故障原因
bash
# 查看节点日志
tail -100 /var/log/rabbitmq/rabbit@node1.log
# 查看系统日志
journalctl -u rabbitmq-server -n 100
# 查看OOM日志
dmesg | grep -i "rabbitmq\|oom"
# 查看资源使用
rabbitmqctl status -n rabbit@node1 | grep -A 10 memory步骤3:检查资源状态
bash
# 检查磁盘空间
df -h /var/lib/rabbitmq
# 检查内存使用
free -h
# 检查CPU使用
top -p $(pgrep -d',' -f beam.smp)
# 检查文件描述符
cat /proc/$(pgrep -f beam.smp)/limits | grep "Max open files"步骤4:评估影响范围
bash
# 查看镜像队列状态
rabbitmqctl list_policies
# 查看队列同步状态
rabbitmqctl eval 'rabbit_mirror_queue_misc:get_synced().'
# 查看连接分布
rabbitmqctl list_connections node解决方案
1. 单节点故障恢复
php
<?php
class NodeFailureHandler
{
private $nodes = [];
private $connection;
public function __construct()
{
$this->connection = new \PhpAmqpLib\Connection\AMQPStreamConnection(
'localhost', 5672, 'guest', 'guest'
);
}
public function handleNodeDown(string $failedNode): array
{
$result = [
'timestamp' => date('Y-m-d H:i:s'),
'failed_node' => $failedNode,
'actions' => [],
'status' => 'unknown',
];
$clusterStatus = $this->getClusterStatus();
if (!$this->isNodeInCluster($failedNode, $clusterStatus)) {
$result['actions'][] = [
'action' => 'detect_failure',
'message' => "节点 {$failedNode} 已脱离集群",
];
$result['actions'][] = [
'action' => 'check_queues',
'message' => $this->checkQueuesOnNode($failedNode),
];
$result['actions'][] = [
'action' => 'rebalance',
'message' => $this->rebalanceConnections($failedNode),
];
$result['status'] = 'completed';
}
return $result;
}
private function getClusterStatus(): array
{
exec('rabbitmqctl cluster_status', $output, $return);
$status = [
'running' => [],
'stopped' => [],
'partitions' => [],
];
foreach ($output as $line) {
if (preg_match('/Running Nodes/', $line)) {
preg_match_all('/rabbit@[\w]+/', $line, $matches);
$status['running'] = $matches[0];
}
if (preg_match('/Stopped Nodes/', $line)) {
preg_match_all('/rabbit@[\w]+/', $line, $matches);
$status['stopped'] = $matches[0];
}
}
return $status;
}
private function isNodeInCluster(string $node, array $status): bool
{
return in_array($node, $status['running']) ||
in_array($node, $status['stopped']);
}
private function checkQueuesOnNode(string $node): string
{
exec("rabbitmqctl list_queues name node", $output);
$affectedQueues = [];
foreach ($output as $line) {
if (strpos($line, $node) !== false) {
$affectedQueues[] = $line;
}
}
return count($affectedQueues) . " 个队列受到影响";
}
private function rebalanceConnections(string $failedNode): string
{
return "建议重新连接客户端到其他节点";
}
public function close(): void
{
$this->connection->close();
}
}2. 节点重启流程
bash
#!/bin/bash
# node_restart.sh
NODE_NAME=$1
if [ -z "$NODE_NAME" ]; then
echo "用法: $0 <node_name>"
exit 1
fi
echo "=== 节点 {$NODE_NAME} 重启流程 ==="
echo "[1] 检查当前集群状态"
rabbitmqctl cluster_status
echo "[2] 停止节点"
ssh $NODE_NAME "systemctl stop rabbitmq-server"
echo "[3] 等待节点停止"
sleep 10
echo "[4] 启动节点"
ssh $NODE_NAME "systemctl start rabbitmq-server"
echo "[5] 等待节点加入集群"
sleep 30
echo "[6] 验证节点状态"
rabbitmqctl cluster_status
echo "[7] 检查队列同步状态"
rabbitmqctl eval 'rabbit_mirror_queue_misc:get_synced().'
echo "节点重启完成"3. 节点替换流程
bash
#!/bin/bash
# node_replacement.sh
OLD_NODE=$1
NEW_NODE=$2
if [ -z "$OLD_NODE" ] || [ -z "$NEW_NODE" ]; then
echo "用法: $0 <old_node> <new_node>"
exit 1
fi
echo "=== 节点替换流程 ==="
echo "[1] 确认旧节点状态"
rabbitmqctl forget_cluster_node $OLD_NODE --offline
echo "[2] 在新节点上初始化"
ssh $NEW_NODE "rabbitmqctl stop_app"
ssh $NEW_NODE "rabbitmqctl reset"
ssh $NEW_NODE "rabbitmqctl join_cluster rabbit@$MASTER_NODE"
ssh $NEW_NODE "rabbitmqctl start_app"
echo "[3] 验证集群状态"
rabbitmqctl cluster_status
echo "[4] 同步镜像队列"
# 等待镜像队列同步完成
echo "[5] 恢复策略"
rabbitmqctl list_policies
echo "节点替换完成"4. 故障节点数据恢复
php
<?php
class NodeDataRecovery
{
private $backupDir = '/var/backups/rabbitmq';
public function recoverFromBackup(string $nodeName, string $backupFile): bool
{
echo "开始从备份恢复节点: {$nodeName}\n";
echo "[1] 停止节点服务\n";
$this->executeCommand("ssh {$nodeName} 'systemctl stop rabbitmq-server'");
echo "[2] 备份当前数据\n";
$timestamp = date('YmdHis');
$this->executeCommand("ssh {$nodeName} 'cp -r /var/lib/rabbitmq/mnesia /var/lib/rabbitmq/mnesia.backup.{$timestamp}'");
echo "[3] 清理旧数据\n";
$this->executeCommand("ssh {$nodeName} 'rm -rf /var/lib/rabbitmq/mnesia/*'");
echo "[4] 解压备份\n";
$this->executeCommand("tar -xzf {$backupFile} -C /var/lib/rabbitmq/");
echo "[5] 启动节点\n";
$this->executeCommand("ssh {$nodeName} 'systemctl start rabbitmq-server'");
echo "[6] 验证恢复\n";
$status = $this->executeCommand("rabbitmqctl -n {$nodeName} status");
return strpos($status, 'running') !== false;
}
public function createBackup(string $nodeName): ?string
{
if (!is_dir($this->backupDir)) {
mkdir($this->backupDir, 0755, true);
}
$backupFile = "{$this->backupDir}/rabbitmq_backup_{$nodeName}_" . date('YmdHis') . ".tar.gz";
$command = "ssh {$nodeName} 'tar -czf - -C /var/lib/rabbitmq mnesia' > {$backupFile}";
exec($command, $output, $return);
if ($return === 0 && file_exists($backupFile)) {
echo "备份创建成功: {$backupFile}\n";
return $backupFile;
}
echo "备份创建失败\n";
return null;
}
private function executeCommand(string $command): string
{
exec($command, $output, $return);
if ($return !== 0) {
echo "命令执行失败: {$command}\n";
echo implode("\n", $output) . "\n";
}
return implode("\n", $output);
}
}预防措施
1. 高可用架构
┌─────────────────────────────────────────────────────────────┐
│ 高可用集群架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Node 1 │────▶│ Node 2 │────▶│ Node 3 │ │
│ │ (Master)│◀────│(Mirror) │◀────│(Mirror) │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │ │ │ │
│ └───────────────┴───────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────┐ │
│ │ Client │ │
│ │ (HAProxy)│ │
│ └─────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘2. 监控告警配置
yaml
# Prometheus 告警规则
groups:
- name: rabbitmq_node
rules:
- alert: NodeDown
expr: rabbitmq_resident_memory{job="rabbitmq"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "RabbitMQ 节点宕机"
description: "节点 {{ $labels.node }} 已经停止运行"
- alert: NodeMemoryHigh
expr: |
rabbitmq_resident_memory{job="rabbitmq"}
/
rabbitmq_resident_memory_limit{job="rabbitmq"}
> 0.9
for: 5m
labels:
severity: warning
annotations:
summary: "节点内存使用率过高"
description: "节点 {{ $labels.node }} 内存使用率超过 90%"
- alert: NodeDiskSpaceLow
expr: rabbitmq_disk_space_available_bytes / rabbitmq_disk_space_available_limit < 2
for: 5m
labels:
severity: warning
annotations:
summary: "节点磁盘空间不足"
description: "节点 {{ $labels.node }} 磁盘空间不足"3. 自动故障转移配置
bash
# 使用 HAProxy 实现自动故障转移
# haproxy.conf
listen rabbitmq_cluster
bind 0.0.0.0:5672
mode tcp
balance roundrobin
option tcplog
option clitcpka
server node1 rabbit@node1:5672 check inter 2000 rise 2 fall 3
server node2 rabbit@node2:5672 check inter 2000 rise 2 fall 3 backup
server node3 rabbit@node3:5672 check inter 2000 rise 2 fall 3 backup4. 定期维护脚本
bash
#!/bin/bash
# node_maintenance.sh
echo "=== RabbitMQ 节点维护检查 ==="
echo "[1] 集群状态检查"
rabbitmqctl cluster_status
echo "[2] 节点健康检查"
for node in $(rabbitmqctl eval 'rabbit_nodes:names().' | grep -oP 'rabbit@[\w]+'); do
echo "检查节点: $node"
rabbitmqctl -n $node ping
done
echo "[3] 资源检查"
rabbitmqctl status | grep -A 5 "Memory\|Disk"
echo "[4] 队列状态检查"
rabbitmqctl list_queues name messages consumers | awk '$3 == 0 && $2 > 0 {print}'
echo "[5] 连接状态检查"
rabbitmqctl list_connections | wc -l
echo "维护检查完成"注意事项
- 节点故障要及时发现:配置完善的监控告警
- 数据备份要定期执行:防止数据丢失
- 恢复流程要提前演练:确保恢复可行性
- 客户端要支持重连:自动切换到健康节点
- 日志要保留完整:便于故障分析
