Skip to content

RabbitMQ 集群架构概述

一、概述

RabbitMQ 集群是由多个 RabbitMQ 节点组成的逻辑单元,节点之间通过 Erlang 分布式通信协议进行数据交换。集群提供了消息队列的高可用性、可扩展性和负载均衡能力。

集群的核心价值

mermaid
graph TB
    A[RabbitMQ 集群] --> B[高可用性]
    A --> C[可扩展性]
    A --> D[负载均衡]
    A --> E[数据安全]
    
    B --> B1[故障转移]
    B --> B2[服务连续性]
    
    C --> C1[水平扩展]
    C --> C2[弹性伸缩]
    
    D --> D1[消息分发]
    D --> D2[消费者均衡]
    
    E --> E1[数据冗余]
    E --> E2[持久化保障]

二、核心知识点

2.1 集群基本概念

什么是 RabbitMQ 集群

RabbitMQ 集群是多个 Erlang 节点的逻辑分组,这些节点:

  • 共享相同的 Erlang Cookie(认证凭证)
  • 通过 EPMD(Erlang Port Mapper Daemon)相互发现
  • 使用 Erlang 分布式协议 进行通信
  • 共享用户、虚拟主机、交换器等元数据
  • 队列数据默认不共享(除非配置镜像或仲裁队列)

集群架构图

mermaid
graph TB
    subgraph "RabbitMQ 集群"
        N1[节点1<br/>rabbit@node1]
        N2[节点2<br/>rabbit@node2]
        N3[节点3<br/>rabbit@node3]
        
        N1 <--> N2
        N2 <--> N3
        N1 <--> N3
    end
    
    subgraph "客户端层"
        P1[生产者]
        P2[生产者]
        C1[消费者]
        C2[消费者]
    end
    
    subgraph "负载均衡"
        LB[HAProxy/Nginx]
    end
    
    P1 --> LB
    P2 --> LB
    LB --> N1
    LB --> N2
    LB --> N3
    
    N1 --> C1
    N2 --> C2
    N3 --> C1

2.2 集群数据分布模型

元数据共享

集群中所有节点共享以下元数据:

元数据类型说明同步方式
用户信息用户名、密码、权限自动同步
虚拟主机vhost 定义自动同步
交换器exchange 定义和绑定关系自动同步
队列元数据队列名称、属性、位置自动同步
策略policy 定义自动同步
参数运行时参数自动同步

队列数据分布

mermaid
graph LR
    subgraph "队列数据分布"
        N1[节点1]
        N2[节点2]
        N3[节点3]
        
        Q1[队列A<br/>主节点: node1]
        Q2[队列B<br/>主节点: node2]
        Q3[队列C<br/>主节点: node3]
    end
    
    N1 --> Q1
    N2 --> Q2
    N3 --> Q3
    
    style Q1 fill:#e1f5fe
    style Q2 fill:#f3e5f5
    style Q3 fill:#e8f5e9

重要: 默认情况下,队列的消息数据只存储在队列的主节点上,其他节点只存储元数据。这意味着:

  • 客户端可以从任意节点访问队列
  • 实际消息操作会路由到队列所在节点
  • 队列所在节点故障会导致队列不可用(除非配置镜像)

2.3 集群通信机制

EPMD(Erlang Port Mapper Daemon)

EPMD 是 Erlang 分布式系统的端口映射服务:

  • 监听端口:4369
  • 功能:节点发现和端口映射
  • 每个节点启动时向 EPMD 注册
  • 节点间通信通过 EPMD 获取对方端口

节点间通信端口

端口用途说明
4369EPMDErlang 端口映射服务
25672节点通信集群节点间数据交换
5672AMQP客户端连接端口
15672管理界面Web 管理控制台
15692Prometheus监控指标导出

通信流程

mermaid
sequenceDiagram
    participant N1 as 节点1
    participant EPMD as EPMD
    participant N2 as 节点2
    
    N1->>EPMD: 注册节点 (rabbit@node1)
    EPMD-->>N1: 注册成功
    
    N2->>EPMD: 注册节点 (rabbit@node2)
    EPMD-->>N2: 注册成功
    
    N1->>EPMD: 查询 rabbit@node2 端口
    EPMD-->>N1: 返回端口 25672
    
    N1->>N2: 建立连接 (Erlang 分布式协议)
    N2-->>N1: 连接建立
    
    N1<<->>N2: 数据同步与心跳

2.4 集群状态管理

节点状态

mermaid
stateDiagram-v2
    [*] --> Starting: 启动节点
    Starting --> Joining: 初始化完成
    Joining --> Running: 加入集群成功
    Running --> Partitioned: 网络分区
    Partitioned --> Running: 分区恢复
    Running --> Stopping: 停止节点
    Stopping --> [*]

集群状态查看

bash
# 查看集群状态
rabbitmqctl cluster_status

# 输出示例
Cluster status of node rabbit@node1 ...
Basics

Cluster name: rabbit@node1

Disk Nodes

rabbit@node1
rabbit@node2
rabbit@node3

Running Nodes

rabbit@node1
rabbit@node2
rabbit@node3

Versions

rabbit@node1: RabbitMQ 3.12.0 on Erlang 25.3
rabbit@node2: RabbitMQ 3.12.0 on Erlang 25.3
rabbit@node3: RabbitMQ 3.12.0 on Erlang 25.3

2.5 集群模式对比

模式说明适用场景优缺点
普通集群队列数据单节点存储开发测试、非关键业务优点:简单、资源消耗低
缺点:单点故障风险
镜像队列队列数据多节点复制高可用要求场景优点:数据冗余、故障转移
缺点:同步开销大
仲裁队列基于 Raft 的队列生产环境推荐优点:数据一致性强
缺点:需要奇数节点

三、配置示例

3.1 集群配置文件

ini
# /etc/rabbitmq/rabbitmq.conf

# 集群名称
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config

# 集群节点列表
cluster_formation.classic_config.nodes.1 = rabbit@node1
cluster_formation.classic_config.nodes.2 = rabbit@node2
cluster_formation.classic_config.nodes.3 = rabbit@node3

# 节点类型(disc 或 ram)
cluster_node_type = disc

# 网络分区处理策略
cluster_partition_handling = autoheal

# 集群内部通信端口
distribution.listener.port_range.min = 25672
distribution.listener.port_range.max = 25672

# 心跳间隔(秒)
net_ticktime = 60

3.2 环境变量配置

bash
# /etc/rabbitmq/rabbitmq-env.conf

# 节点名称
NODENAME=rabbit@node1

# Erlang Cookie 文件路径
ERLANG_COOKIE_FILE=/var/lib/rabbitmq/.erlang.cookie

# 节点类型
RABBITMQ_NODE_TYPE=disc

# 集群节点
RABBITMQ_CLUSTER_NODE_TYPE=disc

四、PHP 代码示例

4.1 通过 HTTP API 获取集群状态

php
<?php

class RabbitMQClusterManager
{
    private string $host;
    private int $port;
    private string $user;
    private string $password;
    
    public function __construct(
        string $host = 'localhost',
        int $port = 15672,
        string $user = 'guest',
        string $password = 'guest'
    ) {
        $this->host = $host;
        $this->port = $port;
        $this->user = $user;
        $this->password = $password;
    }
    
    private function request(string $endpoint, string $method = 'GET', array $data = null): array
    {
        $url = "http://{$this->host}:{$this->port}/api/{$endpoint}";
        
        $ch = curl_init();
        curl_setopt_array($ch, [
            CURLOPT_URL => $url,
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_USERPWD => "{$this->user}:{$this->password}",
            CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
            CURLOPT_CUSTOMREQUEST => $method,
        ]);
        
        if ($data !== null) {
            curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data));
        }
        
        $response = curl_exec($ch);
        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        curl_close($ch);
        
        return [
            'status' => $httpCode,
            'data' => json_decode($response, true)
        ];
    }
    
    public function getClusterName(): array
    {
        return $this->request('cluster-name');
    }
    
    public function getNodes(): array
    {
        return $this->request('nodes');
    }
    
    public function getNodeInfo(string $nodeName): array
    {
        return $this->request("nodes/" . urlencode($nodeName));
    }
    
    public function getOverview(): array
    {
        return $this->request('overview');
    }
    
    public function getClusterStatus(): array
    {
        $overview = $this->getOverview();
        $nodes = $this->getNodes();
        
        if ($overview['status'] !== 200 || $nodes['status'] !== 200) {
            return [
                'status' => 'error',
                'message' => 'Failed to get cluster status'
            ];
        }
        
        $nodeList = [];
        foreach ($nodes['data'] as $node) {
            $nodeList[] = [
                'name' => $node['name'],
                'type' => $node['type'],
                'running' => $node['running'],
                'mem_used' => $node['mem_used'],
                'mem_limit' => $node['mem_limit'],
                'fd_used' => $node['fd_used'],
                'fd_total' => $node['fd_total'],
                'sockets_used' => $node['sockets_used'],
                'sockets_total' => $node['sockets_total'],
                'partitions' => $node['partitions'] ?? [],
            ];
        }
        
        return [
            'status' => 'success',
            'cluster_name' => $overview['data']['cluster_name'],
            'rabbitmq_version' => $overview['data']['rabbitmq_version'],
            'erlang_version' => $overview['data']['erlang_version'],
            'node_count' => count($nodeList),
            'nodes' => $nodeList,
        ];
    }
    
    public function checkClusterHealth(): array
    {
        $status = $this->getClusterStatus();
        
        if ($status['status'] !== 'success') {
            return $status;
        }
        
        $issues = [];
        
        foreach ($status['nodes'] as $node) {
            if (!$node['running']) {
                $issues[] = "Node {$node['name']} is not running";
            }
            
            if (!empty($node['partitions'])) {
                $issues[] = "Node {$node['name']} has network partitions: " . implode(', ', $node['partitions']);
            }
            
            $memUsage = $node['mem_used'] / $node['mem_limit'];
            if ($memUsage > 0.8) {
                $issues[] = "Node {$node['name']} memory usage is high: " . round($memUsage * 100, 2) . "%";
            }
            
            $fdUsage = $node['fd_used'] / $node['fd_total'];
            if ($fdUsage > 0.8) {
                $issues[] = "Node {$node['name']} file descriptor usage is high: " . round($fdUsage * 100, 2) . "%";
            }
        }
        
        return [
            'status' => empty($issues) ? 'healthy' : 'warning',
            'issues' => $issues,
            'node_count' => $status['node_count'],
        ];
    }
}

$manager = new RabbitMQClusterManager('localhost', 15672, 'admin', 'admin123');

$health = $manager->checkClusterHealth();
echo "Cluster Health: " . $health['status'] . "\n";
echo "Node Count: " . $health['node_count'] . "\n";

if (!empty($health['issues'])) {
    echo "Issues:\n";
    foreach ($health['issues'] as $issue) {
        echo "  - {$issue}\n";
    }
}

4.2 集群监控脚本

php
<?php

class ClusterMonitor
{
    private RabbitMQClusterManager $manager;
    private array $thresholds;
    
    public function __construct(RabbitMQClusterManager $manager, array $thresholds = [])
    {
        $this->manager = $manager;
        $this->thresholds = array_merge([
            'memory_warning' => 0.8,
            'memory_critical' => 0.9,
            'fd_warning' => 0.8,
            'fd_critical' => 0.9,
            'socket_warning' => 0.8,
            'socket_critical' => 0.9,
        ], $thresholds);
    }
    
    public function collectMetrics(): array
    {
        $status = $this->manager->getClusterStatus();
        
        if ($status['status'] !== 'success') {
            return ['error' => 'Failed to collect metrics'];
        }
        
        $metrics = [
            'timestamp' => time(),
            'cluster_name' => $status['cluster_name'],
            'node_count' => $status['node_count'],
            'nodes' => [],
        ];
        
        foreach ($status['nodes'] as $node) {
            $metrics['nodes'][$node['name']] = [
                'memory_usage_percent' => round(($node['mem_used'] / $node['mem_limit']) * 100, 2),
                'fd_usage_percent' => round(($node['fd_used'] / $node['fd_total']) * 100, 2),
                'socket_usage_percent' => round(($node['sockets_used'] / $node['sockets_total']) * 100, 2),
                'running' => $node['running'],
                'partitions' => count($node['partitions']),
            ];
        }
        
        return $metrics;
    }
    
    public function checkAlerts(): array
    {
        $metrics = $this->collectMetrics();
        $alerts = [];
        
        if (isset($metrics['error'])) {
            return ['critical' => [$metrics['error']]];
        }
        
        foreach ($metrics['nodes'] as $nodeName => $nodeMetrics) {
            if (!$nodeMetrics['running']) {
                $alerts['critical'][] = "Node {$nodeName} is not running";
            }
            
            if ($nodeMetrics['partitions'] > 0) {
                $alerts['critical'][] = "Node {$nodeName} has {$nodeMetrics['partitions']} network partitions";
            }
            
            if ($nodeMetrics['memory_usage_percent'] > $this->thresholds['memory_critical'] * 100) {
                $alerts['critical'][] = "Node {$nodeName} memory usage critical: {$nodeMetrics['memory_usage_percent']}%";
            } elseif ($nodeMetrics['memory_usage_percent'] > $this->thresholds['memory_warning'] * 100) {
                $alerts['warning'][] = "Node {$nodeName} memory usage warning: {$nodeMetrics['memory_usage_percent']}%";
            }
            
            if ($nodeMetrics['fd_usage_percent'] > $this->thresholds['fd_critical'] * 100) {
                $alerts['critical'][] = "Node {$nodeName} file descriptor usage critical: {$nodeMetrics['fd_usage_percent']}%";
            } elseif ($nodeMetrics['fd_usage_percent'] > $this->thresholds['fd_warning'] * 100) {
                $alerts['warning'][] = "Node {$nodeName} file descriptor usage warning: {$nodeMetrics['fd_usage_percent']}%";
            }
        }
        
        return $alerts;
    }
}

$manager = new RabbitMQClusterManager('localhost', 15672, 'admin', 'admin123');
$monitor = new ClusterMonitor($manager);

$metrics = $monitor->collectMetrics();
echo "=== Cluster Metrics ===\n";
echo json_encode($metrics, JSON_PRETTY_PRINT) . "\n";

$alerts = $monitor->checkAlerts();
if (!empty($alerts)) {
    echo "\n=== Alerts ===\n";
    foreach ($alerts as $level => $messages) {
        echo "[{$level}]\n";
        foreach ($messages as $msg) {
            echo "  - {$msg}\n";
        }
    }
}

五、实际应用场景

5.1 高可用消息系统

mermaid
graph TB
    subgraph "生产环境集群"
        LB[负载均衡器]
        
        subgraph "RabbitMQ 集群"
            N1[节点1<br/>Disc Node]
            N2[节点2<br/>Disc Node]
            N3[节点3<br/>Disc Node]
        end
        
        subgraph "应用服务"
            APP1[应用实例1]
            APP2[应用实例2]
            APP3[应用实例3]
        end
    end
    
    APP1 --> LB
    APP2 --> LB
    APP3 --> LB
    
    LB --> N1
    LB --> N2
    LB --> N3

5.2 多数据中心部署

mermaid
graph TB
    subgraph "数据中心 A"
        CA[集群A]
        CA1[节点A1]
        CA2[节点A2]
        CA3[节点A3]
        CA --> CA1
        CA --> CA2
        CA --> CA3
    end
    
    subgraph "数据中心 B"
        CB[集群B]
        CB1[节点B1]
        CB2[节点B2]
        CB3[节点B3]
        CB --> CB1
        CB --> CB2
        CB --> CB3
    end
    
    CA <-.->|Federation/Shovel| CB

六、常见问题与解决方案

6.1 节点无法加入集群

问题: 新节点无法加入现有集群

排查步骤:

bash
# 1. 检查 Erlang Cookie 是否一致
cat /var/lib/rabbitmq/.erlang.cookie

# 2. 检查网络连通性
ping node1
telnet node1 4369
telnet node1 25672

# 3. 检查 EPMD 服务
epmd -names

# 4. 查看节点日志
tail -f /var/log/rabbitmq/rabbit@node2.log

解决方案:

bash
# 确保 Cookie 一致
echo "MYSECRETCOOKIE" > /var/lib/rabbitmq/.erlang.cookie
chmod 600 /var/lib/rabbitmq/.erlang.cookie
chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie

# 重启服务
systemctl restart rabbitmq-server

# 手动加入集群
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app

6.2 集群节点状态不同步

问题: 节点间元数据不同步

解决方案:

bash
# 检查集群状态
rabbitmqctl cluster_status

# 同步用户信息
rabbitmqctl sync_queue <queue_name>

# 重置节点(慎用)
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app

6.3 集群性能下降

问题: 集群整体性能下降

排查方向:

bash
# 检查网络延迟
ping -c 10 node2

# 检查节点负载
rabbitmqctl status

# 检查队列积压
rabbitmqctl list_queues name messages consumers

# 检查连接数
rabbitmqctl list_connections

七、最佳实践建议

7.1 集群规划

  1. 节点数量: 生产环境建议至少 3 个节点(奇数)
  2. 节点类型: 全部使用 Disc 节点,避免 RAM 节点
  3. 网络规划: 使用独立网络或 VLAN,保证低延迟
  4. 资源规划: 每个节点独立部署,避免资源竞争

7.2 配置优化

ini
# 心跳配置
net_ticktime = 60

# 分区处理
cluster_partition_handling = autoheal

# 内存阈值
vm_memory_high_watermark.relative = 0.6
vm_memory_high_watermark_paging_ratio = 0.75

# 磁盘阈值
disk_free_limit.relative = 2.0

# 连接数限制
connection_max = 10000

7.3 监控告警

  1. 监控节点状态和心跳
  2. 监控内存、磁盘、文件描述符使用率
  3. 监控队列深度和消费者数量
  4. 监控网络分区事件
  5. 配置 Prometheus + Grafana 监控面板

7.4 运维建议

  1. 定期备份消息和配置
  2. 制定故障恢复预案
  3. 使用基础设施即代码管理集群
  4. 定期进行故障演练
  5. 保持版本一致性

八、相关链接