Skip to content

RabbitMQ 手动搭建集群

一、概述

手动搭建 RabbitMQ 集群是最基础的集群部署方式,适合理解集群工作原理和进行小规模部署。本文档详细介绍手动搭建集群的完整流程。

搭建流程

mermaid
graph TB
    A[环境准备] --> B[安装 RabbitMQ]
    B --> C[配置 Erlang Cookie]
    C --> D[启动节点]
    D --> E[加入集群]
    E --> F[配置策略]
    F --> G[验证集群]

二、核心知识点

2.1 集群搭建前提条件

系统要求

要求说明
操作系统Linux(推荐 CentOS 7+/Ubuntu 18.04+)
内存至少 4GB,推荐 8GB+
磁盘至少 10GB 可用空间
网络节点间网络互通,延迟 <1ms
时间同步所有节点时间必须同步

端口要求

端口用途必须开放
4369EPMD
25672节点通信
5672AMQP
15672管理界面可选

Erlang Cookie 是 RabbitMQ 集群节点间的认证凭证,类似于共享密钥:

  • 所有节点必须使用相同的 Cookie
  • Cookie 存储在 /var/lib/rabbitmq/.erlang.cookie
  • 文件权限必须是 600
  • 文件所有者是 rabbitmq 用户
mermaid
sequenceDiagram
    participant N1 as 节点1
    participant N2 as 节点2
    
    N1->>N2: 连接请求 + Cookie
    N2->>N2: 验证 Cookie
    alt Cookie 匹配
        N2->>N1: 认证成功
        Note over N1,N2: 建立集群连接
    else Cookie 不匹配
        N2->>N1: 认证失败
        Note over N1,N2: 拒绝连接
    end

2.3 集群加入方式

join_cluster 命令

bash
# 基本语法
rabbitmqctl join_cluster [--ram] <node>

# 参数说明
# --ram: 作为 RAM 节点加入(默认为 Disc 节点)
# <node>: 已存在的集群节点,格式为 rabbit@hostname

加入流程

mermaid
stateDiagram-v2
    [*] --> StopApp: 停止应用
    StopApp --> Reset: 重置节点(可选)
    Reset --> JoinCluster: 加入集群
    JoinCluster --> StartApp: 启动应用
    StartApp --> [*]: 集群成员

三、配置示例

3.1 环境准备

主机名配置

bash
# 节点1 (192.168.1.101)
hostnamectl set-hostname rabbitmq-node1
echo "192.168.1.101 rabbitmq-node1" >> /etc/hosts
echo "192.168.1.102 rabbitmq-node2" >> /etc/hosts
echo "192.168.1.103 rabbitmq-node3" >> /etc/hosts

# 节点2 (192.168.1.102)
hostnamectl set-hostname rabbitmq-node2
echo "192.168.1.101 rabbitmq-node1" >> /etc/hosts
echo "192.168.1.102 rabbitmq-node2" >> /etc/hosts
echo "192.168.1.103 rabbitmq-node3" >> /etc/hosts

# 节点3 (192.168.1.103)
hostnamectl set-hostname rabbitmq-node3
echo "192.168.1.101 rabbitmq-node1" >> /etc/hosts
echo "192.168.1.102 rabbitmq-node2" >> /etc/hosts
echo "192.168.1.103 rabbitmq-node3" >> /etc/hosts

时间同步

bash
# 安装 chrony
yum install -y chrony

# 配置时间服务器
cat > /etc/chrony.conf << 'EOF'
server ntp.aliyun.com iburst
server ntp.tencent.com iburst
driftfile /var/lib/chrony/drift
makestep 1.0 3
rtcsync
logdir /var/log/chrony
EOF

# 启动服务
systemctl enable chronyd
systemctl start chronyd

# 验证时间同步
chronyc sources

防火墙配置

bash
# 开放必要端口
firewall-cmd --permanent --add-port=4369/tcp
firewall-cmd --permanent --add-port=25672/tcp
firewall-cmd --permanent --add-port=5672/tcp
firewall-cmd --permanent --add-port=15672/tcp
firewall-cmd --reload

# 验证
firewall-cmd --list-ports

3.2 安装 RabbitMQ

CentOS 安装

bash
# 安装 Erlang
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
yum install -y erlang

# 安装 RabbitMQ
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
yum install -y rabbitmq-server

# 启动服务
systemctl enable rabbitmq-server
systemctl start rabbitmq-server

# 启用管理插件
rabbitmq-plugins enable rabbitmq_management

Ubuntu 安装

bash
# 安装依赖
apt-get update
apt-get install -y curl gnupg apt-transport-https

# 添加 Erlang 仓库
curl -1sLf 'https://keys.openpgp.org/vks/v1/by-fingerprint/0A9AF2115F4687BD29803A206B73A36E6026DFCA' | sudo gpg --dearmor -o /usr/share/keyrings/com.rabbitmq.team.gpg
curl -1sLf 'https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq.E495BB49CC4BBE5B.key' | sudo gpg --dearmor -o /usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg

# 添加仓库
cat << EOF > /etc/apt/sources.list.d/rabbitmq.list
deb [arch=amd64 signed-by=/usr/share/keyrings/com.rabbitmq.team.gpg] https://ppa1.novemberain.net/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main
deb [arch=amd64 signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa1.novemberain.net/rabbitmq/rabbitmq-server/deb/ubuntu jammy main
EOF

# 安装
apt-get update
apt-get install -y rabbitmq-server

# 启动服务
systemctl enable rabbitmq-server
systemctl start rabbitmq-server

# 启用管理插件
rabbitmq-plugins enable rabbitmq_management
bash
# 在第一个节点生成 Cookie
COOKIE=$(openssl rand -base64 32)
echo $COOKIE > /var/lib/rabbitmq/.erlang.cookie
chmod 600 /var/lib/rabbitmq/.erlang.cookie
chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie

# 复制到其他节点(在所有节点执行)
# 方法1: 手动复制
scp root@rabbitmq-node1:/var/lib/rabbitmq/.erlang.cookie /var/lib/rabbitmq/.erlang.cookie
chmod 600 /var/lib/rabbitmq/.erlang.cookie
chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie

# 方法2: 使用共享存储
# 将 Cookie 文件放在 NFS 或其他共享存储上

# 重启 RabbitMQ 服务
systemctl restart rabbitmq-server

3.4 搭建集群

完整搭建脚本

bash
#!/bin/bash

set -e

NODES=("rabbitmq-node1" "rabbitmq-node2" "rabbitmq-node3")
CURRENT_NODE=$(hostname)
FIRST_NODE="rabbitmq-node1"

echo "=== 开始搭建 RabbitMQ 集群 ==="
echo "当前节点: $CURRENT_NODE"
echo "集群节点: ${NODES[@]}"

if [ "$CURRENT_NODE" == "$FIRST_NODE" ]; then
    echo "当前是第一个节点,跳过加入集群步骤"
    rabbitmqctl status
else
    echo "停止 RabbitMQ 应用..."
    rabbitmqctl stop_app
    
    echo "加入集群..."
    rabbitmqctl join_cluster rabbit@$FIRST_NODE
    
    echo "启动 RabbitMQ 应用..."
    rabbitmqctl start_app
fi

echo "查看集群状态..."
rabbitmqctl cluster_status

echo "=== 集群搭建完成 ==="

手动执行步骤

bash
# === 在节点2上执行 ===
# 停止应用
rabbitmqctl stop_app

# 加入集群(作为 Disc 节点)
rabbitmqctl join_cluster rabbit@rabbitmq-node1

# 启动应用
rabbitmqctl start_app

# 查看状态
rabbitmqctl cluster_status

# === 在节点3上执行 ===
# 停止应用
rabbitmqctl stop_app

# 加入集群
rabbitmqctl join_cluster rabbit@rabbitmq-node1

# 启动应用
rabbitmqctl start_app

# 查看状态
rabbitmqctl cluster_status

3.5 配置集群策略

高可用策略

bash
# 设置仲裁队列策略(推荐)
rabbitmqctl set_policy ha-quorum "^(?!amq\\.).*" \
    '{"queue-type":"quorum"}' \
    --apply-to queues

# 设置镜像队列策略(传统方式)
rabbitmqctl set_policy ha-mirror "^(?!amq\\.).*" \
    '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}' \
    --apply-to queues

# 查看策略
rabbitmqctl list_policies

用户和权限配置

bash
# 创建管理员用户
rabbitmqctl add_user admin Admin@123456
rabbitmqctl set_user_tags admin administrator

# 授予所有权限
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

# 创建应用用户
rabbitmqctl add_user app_user App@123456
rabbitmqctl set_permissions -p / app_user ".*" ".*" ".*"

# 删除默认用户(可选)
rabbitmqctl delete_user guest

# 查看用户列表
rabbitmqctl list_users

3.6 验证集群

bash
# 查看集群状态
rabbitmqctl cluster_status

# 查看节点列表
rabbitmqctl list_nodes

# 查看队列状态
rabbitmqctl list_queues name pid

# 测试消息收发
# 在节点1创建队列
rabbitmqadmin declare queue name=test_queue durable=true

# 在节点2查看队列
rabbitmqctl -n rabbit@rabbitmq-node2 list_queues name

# 发送测试消息
rabbitmqadmin publish routing_key=test_queue payload="Hello Cluster"

# 消费测试消息
rabbitmqadmin get queue=test_queue

四、PHP 代码示例

4.1 集群搭建自动化脚本

php
<?php

class ClusterSetupManager
{
    private array $nodes;
    private string $cookie;
    private string $adminUser;
    private string $adminPassword;
    
    public function __construct(
        array $nodes,
        string $cookie = null,
        string $adminUser = 'admin',
        string $adminPassword = 'Admin@123456'
    ) {
        $this->nodes = $nodes;
        $this->cookie = $cookie ?? $this->generateCookie();
        $this->adminUser = $adminUser;
        $this->adminPassword = $adminPassword;
    }
    
    private function generateCookie(): string
    {
        return base64_encode(random_bytes(32));
    }
    
    public function generateSetupScripts(): array
    {
        $scripts = [];
        
        foreach ($this->nodes as $index => $node) {
            $scripts[$node] = $this->generateNodeScript($node, $index === 0);
        }
        
        return $scripts;
    }
    
    private function generateNodeScript(string $node, bool $isFirst): string
    {
        $script = "#!/bin/bash\n\n";
        $script .= "# RabbitMQ 集群节点配置脚本\n";
        $script .= "# 节点: {$node}\n";
        $script .= "# 生成时间: " . date('Y-m-d H:i:s') . "\n\n";
        
        $script .= "set -e\n\n";
        
        $script .= "echo '=== 配置 Erlang Cookie ==='\n";
        $script .= "echo '{$this->cookie}' > /var/lib/rabbitmq/.erlang.cookie\n";
        $script .= "chmod 600 /var/lib/rabbitmq/.erlang.cookie\n";
        $script .= "chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie\n\n";
        
        $script .= "echo '=== 重启 RabbitMQ 服务 ==='\n";
        $script .= "systemctl restart rabbitmq-server\n";
        $script .= "sleep 10\n\n";
        
        if (!$isFirst) {
            $firstNode = $this->nodes[0];
            $script .= "echo '=== 加入集群 ==='\n";
            $script .= "rabbitmqctl stop_app\n";
            $script .= "rabbitmqctl join_cluster rabbit@{$firstNode}\n";
            $script .= "rabbitmqctl start_app\n\n";
        }
        
        if ($isFirst) {
            $script .= "echo '=== 配置用户和策略 ==='\n";
            $script .= "rabbitmqctl add_user {$this->adminUser} {$this->adminPassword}\n";
            $script .= "rabbitmqctl set_user_tags {$this->adminUser} administrator\n";
            $script .= "rabbitmqctl set_permissions -p / {$this->adminUser} \".*\" \".*\" \".*\"\n";
            $script .= "rabbitmqctl set_policy ha-quorum '^(?!amq\\\\.).*' '{\"queue-type\":\"quorum\"}' --apply-to queues\n\n";
        }
        
        $script .= "echo '=== 验证集群状态 ==='\n";
        $script .= "rabbitmqctl cluster_status\n";
        
        return $script;
    }
    
    public function generateHostsConfig(): string
    {
        $config = "# RabbitMQ 集群 hosts 配置\n";
        foreach ($this->nodes as $index => $node) {
            $ip = "192.168.1." . (101 + $index);
            $config .= "{$ip} {$node}\n";
        }
        return $config;
    }
    
    public function generateFirewallCommands(): string
    {
        return <<<'BASH'
# 开放 RabbitMQ 端口
firewall-cmd --permanent --add-port=4369/tcp
firewall-cmd --permanent --add-port=25672/tcp
firewall-cmd --permanent --add-port=5672/tcp
firewall-cmd --permanent --add-port=15672/tcp
firewall-cmd --reload
BASH;
    }
    
    public function generateVerificationCommands(): array
    {
        return [
            'cluster_status' => 'rabbitmqctl cluster_status',
            'list_nodes' => 'rabbitmqctl list_nodes',
            'list_queues' => 'rabbitmqctl list_queues name pid',
            'list_users' => 'rabbitmqctl list_users',
            'list_policies' => 'rabbitmqctl list_policies',
        ];
    }
    
    public function getClusterInfo(): array
    {
        return [
            'nodes' => $this->nodes,
            'node_count' => count($this->nodes),
            'cookie' => $this->cookie,
            'admin_user' => $this->adminUser,
            'expected_status' => [
                'cluster_name' => 'rabbit@' . $this->nodes[0],
                'running_nodes' => array_map(fn($n) => "rabbit@{$n}", $this->nodes),
                'partition_handling' => 'autoheal',
            ],
        ];
    }
}

$nodes = ['rabbitmq-node1', 'rabbitmq-node2', 'rabbitmq-node3'];
$setup = new ClusterSetupManager($nodes);

echo "=== 集群信息 ===\n";
echo json_encode($setup->getClusterInfo(), JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE) . "\n";

echo "\n=== Hosts 配置 ===\n";
echo $setup->generateHostsConfig() . "\n";

echo "\n=== 节点1 脚本 ===\n";
$scripts = $setup->generateSetupScripts();
echo $scripts['rabbitmq-node1'] . "\n";

4.2 集群状态验证

php
<?php

class ClusterStatusVerifier
{
    private array $apiEndpoints;
    private string $user;
    private string $password;
    
    public function __construct(array $nodes, string $user = 'guest', string $password = 'guest')
    {
        $this->apiEndpoints = array_map(fn($n) => "http://{$n}:15672/api", $nodes);
        $this->user = $user;
        $this->password = $password;
    }
    
    private function request(string $url): array
    {
        $ch = curl_init();
        curl_setopt_array($ch, [
            CURLOPT_URL => $url,
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_USERPWD => "{$this->user}:{$this->password}",
            CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
            CURLOPT_TIMEOUT => 10,
        ]);
        
        $response = curl_exec($ch);
        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        curl_close($ch);
        
        return [
            'status' => $httpCode,
            'data' => json_decode($response, true)
        ];
    }
    
    public function verifyCluster(): array
    {
        $results = [
            'timestamp' => date('Y-m-d H:i:s'),
            'nodes' => [],
            'cluster_consistent' => true,
            'issues' => [],
        ];
        
        $clusterNames = [];
        $runningNodes = [];
        
        foreach ($this->apiEndpoints as $endpoint) {
            $overview = $this->request("{$endpoint}/overview");
            $nodes = $this->request("{$endpoint}/nodes");
            
            if ($overview['status'] !== 200) {
                $results['issues'][] = "无法连接到 {$endpoint}";
                continue;
            }
            
            $clusterName = $overview['data']['cluster_name'] ?? 'unknown';
            $clusterNames[] = $clusterName;
            
            $nodeList = [];
            foreach ($nodes['data'] ?? [] as $node) {
                $nodeList[] = [
                    'name' => $node['name'],
                    'running' => $node['running'],
                    'type' => $node['type'],
                    'partitions' => $node['partitions'] ?? [],
                ];
                $runningNodes[] = $node['name'];
            }
            
            $results['nodes'][$endpoint] = [
                'cluster_name' => $clusterName,
                'node_count' => count($nodeList),
                'nodes' => $nodeList,
            ];
        }
        
        $uniqueClusterNames = array_unique($clusterNames);
        if (count($uniqueClusterNames) > 1) {
            $results['cluster_consistent'] = false;
            $results['issues'][] = '集群名称不一致: ' . implode(', ', $uniqueClusterNames);
        }
        
        $uniqueRunningNodes = array_unique($runningNodes);
        $expectedNodeCount = count($this->apiEndpoints);
        if (count($uniqueRunningNodes) !== $expectedNodeCount) {
            $results['cluster_consistent'] = false;
            $results['issues'][] = "节点数量不一致,期望 {$expectedNodeCount},实际 " . count($uniqueRunningNodes);
        }
        
        return $results;
    }
    
    public function testMessageRouting(): array
    {
        $testQueue = 'test_cluster_queue_' . time();
        $testMessage = 'Cluster test message at ' . date('Y-m-d H:i:s');
        
        $results = [
            'queue_created' => false,
            'message_sent' => false,
            'message_received' => false,
            'queue_deleted' => false,
        ];
        
        $endpoint = $this->apiEndpoints[0];
        
        $createQueue = $this->request("{$endpoint}/queues/%2f/{$testQueue}");
        if ($createQueue['status'] === 201 || $createQueue['status'] === 204) {
            $results['queue_created'] = true;
        }
        
        $publish = $this->request("{$endpoint}/exchanges/%2f/amq.default/publish");
        $results['message_sent'] = true;
        
        $get = $this->request("{$endpoint}/queues/%2f/{$testQueue}/get");
        if ($get['status'] === 200 && !empty($get['data'])) {
            $results['message_received'] = true;
        }
        
        $delete = $this->request("{$endpoint}/queues/%2f/{$testQueue}");
        if ($delete['status'] === 204) {
            $results['queue_deleted'] = true;
        }
        
        return $results;
    }
    
    public function generateReport(): string
    {
        $verification = $this->verifyCluster();
        
        $report = "=== RabbitMQ 集群验证报告 ===\n";
        $report .= "时间: {$verification['timestamp']}\n";
        $report .= "集群一致性: " . ($verification['cluster_consistent'] ? '正常' : '异常') . "\n\n";
        
        foreach ($verification['nodes'] as $endpoint => $info) {
            $report .= "节点: {$endpoint}\n";
            $report .= "  集群名称: {$info['cluster_name']}\n";
            $report .= "  节点数量: {$info['node_count']}\n";
            foreach ($info['nodes'] as $node) {
                $status = $node['running'] ? '运行中' : '已停止';
                $report .= "    - {$node['name']} ({$node['type']}): {$status}\n";
            }
            $report .= "\n";
        }
        
        if (!empty($verification['issues'])) {
            $report .= "问题:\n";
            foreach ($verification['issues'] as $issue) {
                $report .= "  - {$issue}\n";
            }
        }
        
        return $report;
    }
}

$verifier = new ClusterStatusVerifier(
    ['rabbitmq-node1', 'rabbitmq-node2', 'rabbitmq-node3'],
    'admin',
    'Admin@123456'
);

echo $verifier->generateReport();

五、实际应用场景

5.1 三节点生产集群搭建

mermaid
graph TB
    subgraph "三节点集群架构"
        N1[节点1<br/>192.168.1.101<br/>Disc Node]
        N2[节点2<br/>192.168.1.102<br/>Disc Node]
        N3[节点3<br/>192.168.1.103<br/>Disc Node]
        
        N1 <--> N2
        N2 <--> N3
        N1 <--> N3
    end
    
    subgraph "客户端"
        P[生产者]
        C[消费者]
    end
    
    subgraph "负载均衡"
        LB[HAProxy]
    end
    
    P --> LB
    C --> LB
    LB --> N1
    LB --> N2
    LB --> N3

5.2 搭建检查清单

markdown
## 集群搭建检查清单

### 环境准备
- [ ] 所有节点主机名已配置
- [ ] 所有节点 hosts 文件已更新
- [ ] 所有节点时间已同步
- [ ] 防火墙端口已开放
- [ ] SELinux 已配置或关闭

### 软件安装
- [ ] Erlang 已安装(版本兼容)
- [ ] RabbitMQ 已安装
- [ ] 管理插件已启用

### 集群配置
- [ ] Erlang Cookie 已同步
- [ ] 所有节点已加入集群
- [ ] 集群状态正常

### 用户和权限
- [ ] 管理员用户已创建
- [ ] 应用用户已创建
- [ ] 权限已配置

### 高可用配置
- [ ] 队列策略已配置
- [ ] 镜像/仲裁队列已启用

### 验证测试
- [ ] 节点间通信正常
- [ ] 消息收发正常
- [ ] 故障转移测试通过

六、常见问题与解决方案

6.1 节点无法加入集群

问题: 执行 join_cluster 报错

排查步骤:

bash
# 1. 检查 Cookie 是否一致
cat /var/lib/rabbitmq/.erlang.cookie

# 2. 检查网络连通性
ping rabbitmq-node1
telnet rabbitmq-node1 4369
telnet rabbitmq-node1 25672

# 3. 检查 EPMD
epmd -names

# 4. 查看日志
tail -f /var/log/rabbitmq/rabbit@$(hostname).log

解决方案:

bash
# 确保 Cookie 一致
COOKIE=$(ssh root@rabbitmq-node1 'cat /var/lib/rabbitmq/.erlang.cookie')
echo $COOKIE > /var/lib/rabbitmq/.erlang.cookie
chmod 600 /var/lib/rabbitmq/.erlang.cookie
chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie
systemctl restart rabbitmq-server

6.2 集群状态异常

问题: cluster_status 显示节点状态异常

解决方案:

bash
# 重置问题节点
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@rabbitmq-node1
rabbitmqctl start_app

6.3 节点重启后无法加入

问题: 节点重启后无法自动加入集群

解决方案:

bash
# 检查节点类型
rabbitmqctl cluster_status | grep -A 5 "Disk Nodes"

# 如果是 RAM 节点,确保 Disc 节点先启动
# 启动顺序:Disc 节点 -> RAM 节点

七、最佳实践建议

7.1 搭建建议

  1. 统一 Cookie: 使用脚本确保所有节点 Cookie 一致
  2. 时间同步: 所有节点必须时间同步
  3. 网络规划: 使用独立网络,确保低延迟
  4. 监控部署: 搭建完成后立即部署监控

7.2 安全建议

  1. 修改默认用户: 删除或修改 guest 用户
  2. 强密码策略: 使用复杂密码
  3. 最小权限原则: 按需分配权限
  4. 启用 TLS: 生产环境启用加密通信

7.3 运维建议

  1. 备份 Cookie: 妥善保存 Erlang Cookie
  2. 记录配置: 保存所有配置文件
  3. 定期演练: 定期进行故障演练
  4. 版本管理: 保持所有节点版本一致

八、相关链接