Appearance
RabbitMQ 网络分区处理
一、概述
网络分区(Network Partition)是 RabbitMQ 集群中一个重要且复杂的问题。当集群节点之间的网络通信中断时,可能导致集群分裂成多个独立的子集群,这就是网络分区。正确理解和处理网络分区对于保证集群的高可用性至关重要。
网络分区示意图
mermaid
graph TB
subgraph "正常集群"
N1A[节点1] <--> N2A[节点2]
N2A <--> N3A[节点3]
N1A <--> N3A
end
subgraph "网络分区后"
subgraph "分区 A"
N1B[节点1]
N2B[节点2]
N1B <--> N2B
end
subgraph "分区 B"
N3B[节点3]
end
N1B -.->|网络中断| N3B
N2B -.->|网络中断| N3B
end二、核心知识点
2.1 什么是网络分区
定义
网络分区是指由于网络故障,集群节点之间无法正常通信,导致集群分裂成两个或多个独立子集群的状态。每个子集群都认为自己是唯一存活的集群。
产生原因
mermaid
graph LR
A[网络分区原因] --> B[网络设备故障]
A --> C[网络拥塞]
A --> D[防火墙阻断]
A --> E[节点过载]
A --> F[GC 暂停]
B --> B1[交换机/路由器故障]
C --> C1[带宽耗尽]
D --> D1[端口被封锁]
E --> E1[CPU/内存耗尽]
F --> F1[长时间 GC]分区检测机制
RabbitMQ 使用 Net Tick 机制检测网络分区:
mermaid
sequenceDiagram
participant N1 as 节点1
participant N2 as 节点2
loop 每 net_ticktime/4 秒
N1->>N2: 心跳包 (tick)
N2->>N1: 心跳响应
end
Note over N1,N2: 网络中断
loop 等待 net_ticktime * 4
N1-xN2: 心跳包 (失败)
N2-xN1: 心跳响应 (失败)
end
Note over N1: 判定 N2 故障<br/>触发分区处理
Note over N2: 判定 N1 故障<br/>触发分区处理2.2 分区检测参数
Net Tick 配置
ini
# /etc/rabbitmq/rabbitmq.conf
# 心跳间隔(秒),默认 60
# 超时时间 = net_ticktime * 4
net_ticktime = 60
# 计算示例
# net_ticktime = 60 时,超时时间 = 240 秒
# net_ticktime = 120 时,超时时间 = 480 秒分区检测时间线
时间轴:
|----|----|----|----|----|----|----|----|
0 15 30 45 60 75 90 105 120 ... 240秒
Tick 1 Tick 2 Tick 3 Tick 4 ...
| | | |
v v v v
[正常] [正常] [正常] [正常] ... [超时判定分区]
如果 240 秒内没有收到心跳响应,则判定为网络分区2.3 分区处理策略
RabbitMQ 提供三种分区处理策略:
策略对比
| 策略 | 行为 | 适用场景 | 数据风险 |
|---|---|---|---|
| ignore | 忽略分区,继续运行 | 开发测试 | 高,可能导致数据不一致 |
| pause_minority | 少数派节点暂停 | 偶数节点集群 | 中,少数派数据可能丢失 |
| autoheal | 自动恢复,重启少数派 | 生产环境推荐 | 低,自动选择主分区 |
策略详解
1. ignore(忽略分区)
mermaid
stateDiagram-v2
[*] --> Running
Running --> Partitioned: 检测到分区
Partitioned --> Running: 网络恢复
note right of Partitioned: 节点继续运行<br/>可能导致数据不一致特点:
- 不做任何处理
- 网络恢复后自动重新连接
- 可能导致消息重复或丢失
- 不推荐用于生产环境
2. pause_minority(暂停少数派)
mermaid
stateDiagram-v2
[*] --> Running
Running --> CheckPartition: 检测到分区
CheckPartition --> Majority: 多数派
CheckPartition --> Pause: 少数派
Majority --> Running: 网络恢复
Pause --> Running: 网络恢复
note right of Pause: 节点暂停服务<br/>等待网络恢复特点:
- 少数派节点自动暂停
- 多数派节点继续运行
- 网络恢复后自动恢复
- 适合偶数节点集群
3. autoheal(自动恢复)
mermaid
stateDiagram-v2
[*] --> Running
Running --> Partitioned: 检测到分区
Partitioned --> SelectWinner: 网络恢复
SelectWinner --> Winner: 获胜分区
SelectWinner --> Loser: 失败分区
Winner --> Running: 继续运行
Loser --> Restart: 重启节点
Restart --> Running: 重新加入集群特点:
- 网络恢复后自动选择获胜分区
- 失败分区的节点重启并重新加入
- 选择标准:客户端连接数最多的分区
- 生产环境推荐
2.4 分区场景分析
场景1:三节点集群,单节点隔离
mermaid
graph TB
subgraph "分区前"
A1[N1] <--> A2[N2]
A2 <--> A3[N3]
A1 <--> A3
end
subgraph "分区后 - autoheal"
subgraph "获胜分区(2节点)"
B1[N1]
B2[N2]
B1 <--> B2
end
subgraph "失败分区(1节点)"
B3[N3 - 重启]
end
B1 -.-> B3
B2 -.-> B3
end场景2:三节点集群,网络完全中断
mermaid
graph TB
subgraph "完全中断"
C1[N1 - 暂停]
C2[N2 - 暂停]
C3[N3 - 暂停]
C1 -.-> C2
C2 -.-> C3
C1 -.-> C3
end注意: 当所有节点都无法通信时,pause_minority 策略会导致所有节点暂停。
场景3:四节点集群,对等分区
mermaid
graph TB
subgraph "对等分区(2:2)"
subgraph "分区 A"
D1[N1]
D2[N2]
D1 <--> D2
end
subgraph "分区 B"
D3[N3]
D4[N4]
D3 <--> D4
end
D1 -.-> D3
D2 -.-> D4
end注意: 对等分区时,pause_minority 会导致所有节点暂停,因为没有多数派。
2.5 分区与队列
镜像队列在分区中的行为
mermaid
graph TB
subgraph "镜像队列分区前"
Q1[队列主: N1]
Q2[镜像: N2]
Q3[镜像: N3]
Q1 --> Q2
Q1 --> Q3
end
subgraph "分区后"
Q1A[队列主: N1<br/>继续服务]
Q2A[镜像: N2<br/>提升为主]
Q1A -.->|分区| Q2A
end问题:
- 分区恢复后可能出现两个主队列
- 可能导致消息重复或丢失
- 需要手动或自动处理
仲裁队列在分区中的行为
mermaid
graph TB
subgraph "仲裁队列(Raft)"
Q1[Leader: N1]
Q2[Follower: N2]
Q3[Follower: N3]
Q1 --> Q2
Q1 --> Q3
end优势:
- 基于 Raft 协议,自动处理分区
- 保证数据一致性
- 分区恢复后自动同步
三、配置示例
3.1 分区处理策略配置
ini
# /etc/rabbitmq/rabbitmq.conf
# 策略1: ignore(不推荐生产使用)
# cluster_partition_handling = ignore
# 策略2: pause_minority
# cluster_partition_handling = pause_minority
# 策略3: autoheal(推荐)
cluster_partition_handling = autoheal
# autoheal 参数
# 自动恢复的时间间隔(毫秒)
cluster_partition_handling.autoheal.heal_time = 60000
# 心跳配置
net_ticktime = 603.2 不同场景的推荐配置
生产环境(推荐)
ini
# /etc/rabbitmq/rabbitmq.conf
cluster_partition_handling = autoheal
net_ticktime = 60
# 配合仲裁队列使用高可用要求环境
ini
# /etc/rabbitmq/rabbitmq.conf
cluster_partition_handling = pause_minority
net_ticktime = 120
# 需要奇数节点开发测试环境
ini
# /etc/rabbitmq/rabbitmq.conf
cluster_partition_handling = ignore
net_ticktime = 303.3 Docker Compose 配置
yaml
version: '3.8'
services:
rabbitmq-node1:
image: rabbitmq:3.12-management
hostname: rabbitmq-node1
environment:
RABBITMQ_ERLANG_COOKIE: 'secret_cookie'
RABBITMQ_NODENAME: 'rabbit@rabbitmq-node1'
volumes:
- ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf:ro
ports:
- "5672:5672"
- "15672:15672"
networks:
- rabbitmq-cluster
rabbitmq-node2:
image: rabbitmq:3.12-management
hostname: rabbitmq-node2
environment:
RABBITMQ_ERLANG_COOKIE: 'secret_cookie'
RABBITMQ_NODENAME: 'rabbit@rabbitmq-node2'
volumes:
- ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf:ro
ports:
- "5673:5672"
- "15673:15672"
networks:
- rabbitmq-cluster
rabbitmq-node3:
image: rabbitmq:3.12-management
hostname: rabbitmq-node3
environment:
RABBITMQ_ERLANG_COOKIE: 'secret_cookie'
RABBITMQ_NODENAME: 'rabbit@rabbitmq-node3'
volumes:
- ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf:ro
ports:
- "5674:5672"
- "15674:15672"
networks:
- rabbitmq-cluster
networks:
rabbitmq-cluster:
driver: bridge3.4 分区检测脚本
bash
#!/bin/bash
# partition_check.sh
# 定期检查网络分区状态
NODE="rabbit@node1"
LOG_FILE="/var/log/rabbitmq/partition_check.log"
check_partition() {
local status=$(rabbitmqctl -n $NODE cluster_status 2>/dev/null | grep -A 10 "Partitions")
if echo "$status" | grep -q "none"; then
echo "$(date): No partitions detected" >> $LOG_FILE
return 0
else
echo "$(date): WARNING - Network partition detected!" >> $LOG_FILE
echo "$status" >> $LOG_FILE
# 发送告警
send_alert "$status"
return 1
fi
}
send_alert() {
local message="$1"
# 发送邮件或调用 Webhook
curl -X POST -H 'Content-Type: application/json' \
-d "{\"text\":\"RabbitMQ Network Partition Detected: $message\"}" \
"$WEBHOOK_URL"
}
check_partition四、PHP 代码示例
4.1 分区检测与监控
php
<?php
class NetworkPartitionMonitor
{
private string $host;
private int $port;
private string $user;
private string $password;
public function __construct(
string $host = 'localhost',
int $port = 15672,
string $user = 'guest',
string $password = 'guest'
) {
$this->host = $host;
$this->port = $port;
$this->user = $user;
$this->password = $password;
}
private function request(string $endpoint): array
{
$url = "http://{$this->host}:{$this->port}/api/{$endpoint}";
$ch = curl_init();
curl_setopt_array($ch, [
CURLOPT_URL => $url,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_USERPWD => "{$this->user}:{$this->password}",
CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
CURLOPT_TIMEOUT => 10,
]);
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
return [
'status' => $httpCode,
'data' => json_decode($response, true)
];
}
public function checkPartitions(): array
{
$result = $this->request('nodes');
if ($result['status'] !== 200) {
return ['error' => 'Failed to get node data', 'status' => $result['status']];
}
$partitions = [];
$hasPartition = false;
foreach ($result['data'] as $node) {
$nodePartitions = $node['partitions'] ?? [];
if (!empty($nodePartitions)) {
$hasPartition = true;
$partitions[$node['name']] = $nodePartitions;
}
}
return [
'has_partition' => $hasPartition,
'partition_details' => $partitions,
'timestamp' => date('Y-m-d H:i:s'),
'affected_nodes' => array_keys($partitions),
];
}
public function getClusterStatus(): array
{
$result = $this->request('overview');
if ($result['status'] !== 200) {
return ['error' => 'Failed to get overview'];
}
$nodes = $this->request('nodes');
return [
'cluster_name' => $result['data']['cluster_name'] ?? 'unknown',
'rabbitmq_version' => $result['data']['rabbitmq_version'] ?? 'unknown',
'erlang_version' => $result['data']['erlang_version'] ?? 'unknown',
'node_count' => count($nodes['data'] ?? []),
'running_nodes' => array_column($nodes['data'] ?? [], 'name'),
];
}
public function analyzePartitionImpact(): array
{
$partitionCheck = $this->checkPartitions();
$clusterStatus = $this->getClusterStatus();
if ($partitionCheck['has_partition']) {
$totalNodes = $clusterStatus['node_count'];
$affectedNodes = count($partitionCheck['affected_nodes']);
$healthyNodes = $totalNodes - $affectedNodes;
return [
'status' => 'critical',
'message' => 'Network partition detected',
'total_nodes' => $totalNodes,
'affected_nodes' => $affectedNodes,
'healthy_nodes' => $healthyNodes,
'partition_details' => $partitionCheck['partition_details'],
'recommendation' => $this->getRecommendation($healthyNodes, $totalNodes),
];
}
return [
'status' => 'healthy',
'message' => 'No network partitions detected',
'total_nodes' => $clusterStatus['node_count'],
];
}
private function getRecommendation(int $healthyNodes, int $totalNodes): string
{
if ($healthyNodes === 0) {
return '严重:所有节点都受影响,请立即检查网络连接';
}
if ($healthyNodes < $totalNodes / 2) {
return '警告:少数派节点存活,可能触发 pause_minority 策略';
}
return '多数派节点存活,autoheal 策略将自动恢复';
}
public function generateReport(): string
{
$analysis = $this->analyzePartitionImpact();
$report = "=== RabbitMQ 网络分区报告 ===\n";
$report .= "时间: " . date('Y-m-d H:i:s') . "\n";
$report .= "状态: {$analysis['status']}\n";
$report .= "消息: {$analysis['message']}\n";
$report .= "总节点数: {$analysis['total_nodes']}\n";
if (isset($analysis['affected_nodes'])) {
$report .= "受影响节点: {$analysis['affected_nodes']}\n";
$report .= "健康节点: {$analysis['healthy_nodes']}\n";
$report .= "\n分区详情:\n";
foreach ($analysis['partition_details'] as $node => $partitions) {
$report .= " {$node}: " . implode(', ', $partitions) . "\n";
}
$report .= "\n建议: {$analysis['recommendation']}\n";
}
return $report;
}
}
$monitor = new NetworkPartitionMonitor('localhost', 15672, 'admin', 'admin123');
echo $monitor->generateReport();
$check = $monitor->checkPartitions();
if ($check['has_partition']) {
echo "\n!!! 警告:检测到网络分区 !!!\n";
}4.2 分区恢复处理脚本
php
<?php
class PartitionRecoveryHandler
{
private array $nodes;
private string $sshUser;
public function __construct(array $nodes, string $sshUser = 'root')
{
$this->nodes = $nodes;
$this->sshUser = $sshUser;
}
public function diagnosePartition(): array
{
$diagnosis = [
'timestamp' => date('Y-m-d H:i:s'),
'nodes' => [],
];
foreach ($this->nodes as $node) {
$diagnosis['nodes'][$node] = $this->checkNodeStatus($node);
}
return $diagnosis;
}
private function checkNodeStatus(string $node): array
{
$status = [
'reachable' => $this->pingNode($node),
'rabbitmq_running' => false,
'cluster_status' => null,
'partitions' => [],
];
if ($status['reachable']) {
$clusterStatus = $this->executeCommand($node, 'rabbitmqctl cluster_status');
$status['rabbitmq_running'] = $clusterStatus !== null;
$status['cluster_status'] = $clusterStatus;
if (preg_match('/Partitions\s*\n\s*(.+?)\n/s', $clusterStatus ?? '', $matches)) {
$partitionInfo = trim($matches[1]);
if ($partitionInfo !== 'none') {
$status['partitions'] = explode("\n", $partitionInfo);
}
}
}
return $status;
}
private function pingNode(string $node): bool
{
$result = shell_exec("ping -c 1 -W 2 {$node}");
return $result !== null && strpos($result, '1 received') !== false;
}
private function executeCommand(string $node, string $command): ?string
{
$fullCommand = "ssh {$this->sshUser}@{$node} '{$command}' 2>/dev/null";
return shell_exec($fullCommand);
}
public function generateRecoveryCommands(string $strategy = 'autoheal'): array
{
$commands = [];
switch ($strategy) {
case 'autoheal':
$commands = $this->autohealCommands();
break;
case 'manual':
$commands = $this->manualRecoveryCommands();
break;
case 'full_restart':
$commands = $this->fullRestartCommands();
break;
}
return $commands;
}
private function autohealCommands(): array
{
return [
'# Autoheal 策略下,系统会自动处理',
'# 如果需要手动触发,可以执行以下命令:',
'',
'# 1. 查看当前分区状态',
'rabbitmqctl cluster_status | grep -A 10 "Partitions"',
'',
'# 2. 等待自动恢复(默认 60 秒)',
'# 或者手动重启受影响的节点',
'rabbitmqctl -n rabbit@<affected_node> stop_app',
'rabbitmqctl -n rabbit@<affected_node> start_app',
];
}
private function manualRecoveryCommands(): array
{
return [
'# 手动恢复步骤',
'',
'# 1. 确定要保留的分区(通常选择数据最新的)',
'rabbitmqctl cluster_status',
'',
'# 2. 停止要放弃的分区中的节点',
'rabbitmqctl -n rabbit@<node_to_stop> stop_app',
'',
'# 3. 重置该节点',
'rabbitmqctl -n rabbit@<node_to_stop> reset',
'',
'# 4. 重新加入集群',
'rabbitmqctl -n rabbit@<node_to_stop> join_cluster rabbit@<surviving_node>',
'',
'# 5. 启动节点',
'rabbitmqctl -n rabbit@<node_to_stop> start_app',
'',
'# 6. 验证集群状态',
'rabbitmqctl cluster_status',
];
}
private function fullRestartCommands(): array
{
$commands = [
'# 完全重启集群(最后手段)',
'',
'# 1. 停止所有节点',
];
foreach ($this->nodes as $node) {
$commands[] = "rabbitmqctl -n rabbit@{$node} stop_app";
}
$commands[] = '';
$commands[] = '# 2. 在第一个节点上启动并初始化';
$commands[] = "rabbitmqctl -n rabbit@{$this->nodes[0]} start_app";
$commands[] = '';
$commands[] = '# 3. 其他节点依次加入集群';
for ($i = 1; $i < count($this->nodes); $i++) {
$commands[] = "rabbitmqctl -n rabbit@{$this->nodes[$i]} join_cluster rabbit@{$this->nodes[0]}";
$commands[] = "rabbitmqctl -n rabbit@{$this->nodes[$i]} start_app";
}
$commands[] = '';
$commands[] = '# 4. 验证集群状态';
$commands[] = 'rabbitmqctl cluster_status';
return $commands;
}
public function getPartitionHandlingStrategy(): array
{
return [
'ignore' => [
'description' => '忽略分区,继续运行',
'pros' => ['服务不中断'],
'cons' => ['可能导致数据不一致', '消息可能重复或丢失'],
'suitable_for' => '开发测试环境',
],
'pause_minority' => [
'description' => '少数派节点暂停',
'pros' => ['保证数据一致性', '自动恢复'],
'cons' => ['服务可能中断', '需要奇数节点'],
'suitable_for' => '对数据一致性要求高的环境',
],
'autoheal' => [
'description' => '自动恢复,重启少数派',
'pros' => ['自动恢复', '服务连续性好'],
'cons' => ['少数派数据可能丢失'],
'suitable_for' => '生产环境(推荐)',
],
];
}
}
$handler = new PartitionRecoveryHandler(['node1', 'node2', 'node3']);
echo "=== 分区诊断 ===\n";
$diagnosis = $handler->diagnosePartition();
echo json_encode($diagnosis, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE) . "\n";
echo "\n=== 恢复命令 ===\n";
$commands = $handler->generateRecoveryCommands('manual');
echo implode("\n", $commands) . "\n";五、实际应用场景
5.1 生产环境分区处理流程
mermaid
flowchart TD
A[检测到网络分区] --> B{分区处理策略}
B -->|autoheal| C[等待自动恢复]
C --> D{是否自动恢复?}
D -->|是| E[验证集群状态]
D -->|否| F[人工介入]
B -->|pause_minority| G[少数派节点暂停]
G --> H[等待网络恢复]
H --> I[节点自动恢复]
B -->|ignore| J[继续运行]
J --> K[可能数据不一致]
F --> L[手动恢复]
E --> M[完成]
I --> M
K --> N[需要数据修复]
L --> M5.2 监控告警架构
mermaid
graph TB
subgraph "监控系统"
RM[RabbitMQ 节点] -->|指标| PM[Prometheus]
PM -->|数据| GR[Grafana]
PM -->|告警| AM[AlertManager]
end
subgraph "告警渠道"
AM --> EM[邮件]
AM --> SM[Slack/钉钉]
AM --> WH[Webhook]
end六、常见问题与解决方案
6.1 分区后消息丢失
问题: 网络分区恢复后发现消息丢失
原因分析:
- 镜像队列在分区时可能出现双主
- 分区恢复时选择了错误的分区
- 消息未持久化
解决方案:
bash
# 1. 使用仲裁队列替代镜像队列
rabbitmqctl set_policy ha-all ".*" '{"queue-type":"quorum"}'
# 2. 确保消息持久化
# 生产者发送消息时设置 delivery_mode=2
# 3. 选择正确的分区恢复
# 优先保留消息数量最多的分区6.2 分区频繁发生
问题: 集群频繁发生网络分区
排查方向:
bash
# 1. 检查网络稳定性
ping -c 100 node2
mtr node2
# 2. 检查系统负载
top
iostat -x 1
# 3. 检查 Erlang VM 状态
rabbitmqctl status | grep -A 20 "Memory"
# 4. 检查 GC 情况
rabbitmqctl status | grep -A 10 "GC"优化建议:
ini
# 增加心跳超时时间
net_ticktime = 120
# 优化内存使用
vm_memory_high_watermark.relative = 0.5
# 限制队列数量
max_queues = 100006.3 分区恢复后队列不可用
问题: 分区恢复后部分队列不可用
解决方案:
bash
# 1. 检查队列状态
rabbitmqctl list_queues name status
# 2. 查看队列所在节点
rabbitmqctl list_queues name pid
# 3. 如果队列主节点故障,重新同步
rabbitmqctl sync_queue <queue_name>
# 4. 如果无法恢复,删除重建
rabbitmqctl delete_queue <queue_name>七、最佳实践建议
7.1 预防措施
网络规划
- 使用专用网络
- 确保网络稳定性
- 配置网络监控
集群配置
- 使用奇数节点(推荐 3 或 5)
- 配置 autoheal 策略
- 合理设置 net_ticktime
队列选择
- 优先使用仲裁队列
- 配置消息持久化
- 避免大量临时队列
7.2 监控告警
关键指标
- 分区状态
- 节点间延迟
- 网络流量
- 心跳超时次数
告警规则
- 检测到分区立即告警
- 节点间延迟超过阈值告警
- 心跳超时次数异常告警
7.3 恢复流程
mermaid
flowchart LR
A[检测分区] --> B[评估影响]
B --> C[选择恢复策略]
C --> D[执行恢复]
D --> E[验证状态]
E --> F[数据修复]
F --> G[总结复盘]