Appearance
RabbitMQ 手动搭建集群
一、概述
手动搭建 RabbitMQ 集群是最基础的集群部署方式,适合理解集群工作原理和进行小规模部署。本文档详细介绍手动搭建集群的完整流程。
搭建流程
mermaid
graph TB
A[环境准备] --> B[安装 RabbitMQ]
B --> C[配置 Erlang Cookie]
C --> D[启动节点]
D --> E[加入集群]
E --> F[配置策略]
F --> G[验证集群]二、核心知识点
2.1 集群搭建前提条件
系统要求
| 要求 | 说明 |
|---|---|
| 操作系统 | Linux(推荐 CentOS 7+/Ubuntu 18.04+) |
| 内存 | 至少 4GB,推荐 8GB+ |
| 磁盘 | 至少 10GB 可用空间 |
| 网络 | 节点间网络互通,延迟 <1ms |
| 时间同步 | 所有节点时间必须同步 |
端口要求
| 端口 | 用途 | 必须开放 |
|---|---|---|
| 4369 | EPMD | 是 |
| 25672 | 节点通信 | 是 |
| 5672 | AMQP | 是 |
| 15672 | 管理界面 | 可选 |
2.2 Erlang Cookie 机制
什么是 Erlang Cookie
Erlang Cookie 是 RabbitMQ 集群节点间的认证凭证,类似于共享密钥:
- 所有节点必须使用相同的 Cookie
- Cookie 存储在
/var/lib/rabbitmq/.erlang.cookie - 文件权限必须是
600 - 文件所有者是
rabbitmq用户
Cookie 认证流程
mermaid
sequenceDiagram
participant N1 as 节点1
participant N2 as 节点2
N1->>N2: 连接请求 + Cookie
N2->>N2: 验证 Cookie
alt Cookie 匹配
N2->>N1: 认证成功
Note over N1,N2: 建立集群连接
else Cookie 不匹配
N2->>N1: 认证失败
Note over N1,N2: 拒绝连接
end2.3 集群加入方式
join_cluster 命令
bash
# 基本语法
rabbitmqctl join_cluster [--ram] <node>
# 参数说明
# --ram: 作为 RAM 节点加入(默认为 Disc 节点)
# <node>: 已存在的集群节点,格式为 rabbit@hostname加入流程
mermaid
stateDiagram-v2
[*] --> StopApp: 停止应用
StopApp --> Reset: 重置节点(可选)
Reset --> JoinCluster: 加入集群
JoinCluster --> StartApp: 启动应用
StartApp --> [*]: 集群成员三、配置示例
3.1 环境准备
主机名配置
bash
# 节点1 (192.168.1.101)
hostnamectl set-hostname rabbitmq-node1
echo "192.168.1.101 rabbitmq-node1" >> /etc/hosts
echo "192.168.1.102 rabbitmq-node2" >> /etc/hosts
echo "192.168.1.103 rabbitmq-node3" >> /etc/hosts
# 节点2 (192.168.1.102)
hostnamectl set-hostname rabbitmq-node2
echo "192.168.1.101 rabbitmq-node1" >> /etc/hosts
echo "192.168.1.102 rabbitmq-node2" >> /etc/hosts
echo "192.168.1.103 rabbitmq-node3" >> /etc/hosts
# 节点3 (192.168.1.103)
hostnamectl set-hostname rabbitmq-node3
echo "192.168.1.101 rabbitmq-node1" >> /etc/hosts
echo "192.168.1.102 rabbitmq-node2" >> /etc/hosts
echo "192.168.1.103 rabbitmq-node3" >> /etc/hosts时间同步
bash
# 安装 chrony
yum install -y chrony
# 配置时间服务器
cat > /etc/chrony.conf << 'EOF'
server ntp.aliyun.com iburst
server ntp.tencent.com iburst
driftfile /var/lib/chrony/drift
makestep 1.0 3
rtcsync
logdir /var/log/chrony
EOF
# 启动服务
systemctl enable chronyd
systemctl start chronyd
# 验证时间同步
chronyc sources防火墙配置
bash
# 开放必要端口
firewall-cmd --permanent --add-port=4369/tcp
firewall-cmd --permanent --add-port=25672/tcp
firewall-cmd --permanent --add-port=5672/tcp
firewall-cmd --permanent --add-port=15672/tcp
firewall-cmd --reload
# 验证
firewall-cmd --list-ports3.2 安装 RabbitMQ
CentOS 安装
bash
# 安装 Erlang
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
yum install -y erlang
# 安装 RabbitMQ
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
yum install -y rabbitmq-server
# 启动服务
systemctl enable rabbitmq-server
systemctl start rabbitmq-server
# 启用管理插件
rabbitmq-plugins enable rabbitmq_managementUbuntu 安装
bash
# 安装依赖
apt-get update
apt-get install -y curl gnupg apt-transport-https
# 添加 Erlang 仓库
curl -1sLf 'https://keys.openpgp.org/vks/v1/by-fingerprint/0A9AF2115F4687BD29803A206B73A36E6026DFCA' | sudo gpg --dearmor -o /usr/share/keyrings/com.rabbitmq.team.gpg
curl -1sLf 'https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq.E495BB49CC4BBE5B.key' | sudo gpg --dearmor -o /usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg
# 添加仓库
cat << EOF > /etc/apt/sources.list.d/rabbitmq.list
deb [arch=amd64 signed-by=/usr/share/keyrings/com.rabbitmq.team.gpg] https://ppa1.novemberain.net/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main
deb [arch=amd64 signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa1.novemberain.net/rabbitmq/rabbitmq-server/deb/ubuntu jammy main
EOF
# 安装
apt-get update
apt-get install -y rabbitmq-server
# 启动服务
systemctl enable rabbitmq-server
systemctl start rabbitmq-server
# 启用管理插件
rabbitmq-plugins enable rabbitmq_management3.3 配置 Erlang Cookie
bash
# 在第一个节点生成 Cookie
COOKIE=$(openssl rand -base64 32)
echo $COOKIE > /var/lib/rabbitmq/.erlang.cookie
chmod 600 /var/lib/rabbitmq/.erlang.cookie
chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie
# 复制到其他节点(在所有节点执行)
# 方法1: 手动复制
scp root@rabbitmq-node1:/var/lib/rabbitmq/.erlang.cookie /var/lib/rabbitmq/.erlang.cookie
chmod 600 /var/lib/rabbitmq/.erlang.cookie
chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie
# 方法2: 使用共享存储
# 将 Cookie 文件放在 NFS 或其他共享存储上
# 重启 RabbitMQ 服务
systemctl restart rabbitmq-server3.4 搭建集群
完整搭建脚本
bash
#!/bin/bash
set -e
NODES=("rabbitmq-node1" "rabbitmq-node2" "rabbitmq-node3")
CURRENT_NODE=$(hostname)
FIRST_NODE="rabbitmq-node1"
echo "=== 开始搭建 RabbitMQ 集群 ==="
echo "当前节点: $CURRENT_NODE"
echo "集群节点: ${NODES[@]}"
if [ "$CURRENT_NODE" == "$FIRST_NODE" ]; then
echo "当前是第一个节点,跳过加入集群步骤"
rabbitmqctl status
else
echo "停止 RabbitMQ 应用..."
rabbitmqctl stop_app
echo "加入集群..."
rabbitmqctl join_cluster rabbit@$FIRST_NODE
echo "启动 RabbitMQ 应用..."
rabbitmqctl start_app
fi
echo "查看集群状态..."
rabbitmqctl cluster_status
echo "=== 集群搭建完成 ==="手动执行步骤
bash
# === 在节点2上执行 ===
# 停止应用
rabbitmqctl stop_app
# 加入集群(作为 Disc 节点)
rabbitmqctl join_cluster rabbit@rabbitmq-node1
# 启动应用
rabbitmqctl start_app
# 查看状态
rabbitmqctl cluster_status
# === 在节点3上执行 ===
# 停止应用
rabbitmqctl stop_app
# 加入集群
rabbitmqctl join_cluster rabbit@rabbitmq-node1
# 启动应用
rabbitmqctl start_app
# 查看状态
rabbitmqctl cluster_status3.5 配置集群策略
高可用策略
bash
# 设置仲裁队列策略(推荐)
rabbitmqctl set_policy ha-quorum "^(?!amq\\.).*" \
'{"queue-type":"quorum"}' \
--apply-to queues
# 设置镜像队列策略(传统方式)
rabbitmqctl set_policy ha-mirror "^(?!amq\\.).*" \
'{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}' \
--apply-to queues
# 查看策略
rabbitmqctl list_policies用户和权限配置
bash
# 创建管理员用户
rabbitmqctl add_user admin Admin@123456
rabbitmqctl set_user_tags admin administrator
# 授予所有权限
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
# 创建应用用户
rabbitmqctl add_user app_user App@123456
rabbitmqctl set_permissions -p / app_user ".*" ".*" ".*"
# 删除默认用户(可选)
rabbitmqctl delete_user guest
# 查看用户列表
rabbitmqctl list_users3.6 验证集群
bash
# 查看集群状态
rabbitmqctl cluster_status
# 查看节点列表
rabbitmqctl list_nodes
# 查看队列状态
rabbitmqctl list_queues name pid
# 测试消息收发
# 在节点1创建队列
rabbitmqadmin declare queue name=test_queue durable=true
# 在节点2查看队列
rabbitmqctl -n rabbit@rabbitmq-node2 list_queues name
# 发送测试消息
rabbitmqadmin publish routing_key=test_queue payload="Hello Cluster"
# 消费测试消息
rabbitmqadmin get queue=test_queue四、PHP 代码示例
4.1 集群搭建自动化脚本
php
<?php
class ClusterSetupManager
{
private array $nodes;
private string $cookie;
private string $adminUser;
private string $adminPassword;
public function __construct(
array $nodes,
string $cookie = null,
string $adminUser = 'admin',
string $adminPassword = 'Admin@123456'
) {
$this->nodes = $nodes;
$this->cookie = $cookie ?? $this->generateCookie();
$this->adminUser = $adminUser;
$this->adminPassword = $adminPassword;
}
private function generateCookie(): string
{
return base64_encode(random_bytes(32));
}
public function generateSetupScripts(): array
{
$scripts = [];
foreach ($this->nodes as $index => $node) {
$scripts[$node] = $this->generateNodeScript($node, $index === 0);
}
return $scripts;
}
private function generateNodeScript(string $node, bool $isFirst): string
{
$script = "#!/bin/bash\n\n";
$script .= "# RabbitMQ 集群节点配置脚本\n";
$script .= "# 节点: {$node}\n";
$script .= "# 生成时间: " . date('Y-m-d H:i:s') . "\n\n";
$script .= "set -e\n\n";
$script .= "echo '=== 配置 Erlang Cookie ==='\n";
$script .= "echo '{$this->cookie}' > /var/lib/rabbitmq/.erlang.cookie\n";
$script .= "chmod 600 /var/lib/rabbitmq/.erlang.cookie\n";
$script .= "chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie\n\n";
$script .= "echo '=== 重启 RabbitMQ 服务 ==='\n";
$script .= "systemctl restart rabbitmq-server\n";
$script .= "sleep 10\n\n";
if (!$isFirst) {
$firstNode = $this->nodes[0];
$script .= "echo '=== 加入集群 ==='\n";
$script .= "rabbitmqctl stop_app\n";
$script .= "rabbitmqctl join_cluster rabbit@{$firstNode}\n";
$script .= "rabbitmqctl start_app\n\n";
}
if ($isFirst) {
$script .= "echo '=== 配置用户和策略 ==='\n";
$script .= "rabbitmqctl add_user {$this->adminUser} {$this->adminPassword}\n";
$script .= "rabbitmqctl set_user_tags {$this->adminUser} administrator\n";
$script .= "rabbitmqctl set_permissions -p / {$this->adminUser} \".*\" \".*\" \".*\"\n";
$script .= "rabbitmqctl set_policy ha-quorum '^(?!amq\\\\.).*' '{\"queue-type\":\"quorum\"}' --apply-to queues\n\n";
}
$script .= "echo '=== 验证集群状态 ==='\n";
$script .= "rabbitmqctl cluster_status\n";
return $script;
}
public function generateHostsConfig(): string
{
$config = "# RabbitMQ 集群 hosts 配置\n";
foreach ($this->nodes as $index => $node) {
$ip = "192.168.1." . (101 + $index);
$config .= "{$ip} {$node}\n";
}
return $config;
}
public function generateFirewallCommands(): string
{
return <<<'BASH'
# 开放 RabbitMQ 端口
firewall-cmd --permanent --add-port=4369/tcp
firewall-cmd --permanent --add-port=25672/tcp
firewall-cmd --permanent --add-port=5672/tcp
firewall-cmd --permanent --add-port=15672/tcp
firewall-cmd --reload
BASH;
}
public function generateVerificationCommands(): array
{
return [
'cluster_status' => 'rabbitmqctl cluster_status',
'list_nodes' => 'rabbitmqctl list_nodes',
'list_queues' => 'rabbitmqctl list_queues name pid',
'list_users' => 'rabbitmqctl list_users',
'list_policies' => 'rabbitmqctl list_policies',
];
}
public function getClusterInfo(): array
{
return [
'nodes' => $this->nodes,
'node_count' => count($this->nodes),
'cookie' => $this->cookie,
'admin_user' => $this->adminUser,
'expected_status' => [
'cluster_name' => 'rabbit@' . $this->nodes[0],
'running_nodes' => array_map(fn($n) => "rabbit@{$n}", $this->nodes),
'partition_handling' => 'autoheal',
],
];
}
}
$nodes = ['rabbitmq-node1', 'rabbitmq-node2', 'rabbitmq-node3'];
$setup = new ClusterSetupManager($nodes);
echo "=== 集群信息 ===\n";
echo json_encode($setup->getClusterInfo(), JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE) . "\n";
echo "\n=== Hosts 配置 ===\n";
echo $setup->generateHostsConfig() . "\n";
echo "\n=== 节点1 脚本 ===\n";
$scripts = $setup->generateSetupScripts();
echo $scripts['rabbitmq-node1'] . "\n";4.2 集群状态验证
php
<?php
class ClusterStatusVerifier
{
private array $apiEndpoints;
private string $user;
private string $password;
public function __construct(array $nodes, string $user = 'guest', string $password = 'guest')
{
$this->apiEndpoints = array_map(fn($n) => "http://{$n}:15672/api", $nodes);
$this->user = $user;
$this->password = $password;
}
private function request(string $url): array
{
$ch = curl_init();
curl_setopt_array($ch, [
CURLOPT_URL => $url,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_USERPWD => "{$this->user}:{$this->password}",
CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
CURLOPT_TIMEOUT => 10,
]);
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
return [
'status' => $httpCode,
'data' => json_decode($response, true)
];
}
public function verifyCluster(): array
{
$results = [
'timestamp' => date('Y-m-d H:i:s'),
'nodes' => [],
'cluster_consistent' => true,
'issues' => [],
];
$clusterNames = [];
$runningNodes = [];
foreach ($this->apiEndpoints as $endpoint) {
$overview = $this->request("{$endpoint}/overview");
$nodes = $this->request("{$endpoint}/nodes");
if ($overview['status'] !== 200) {
$results['issues'][] = "无法连接到 {$endpoint}";
continue;
}
$clusterName = $overview['data']['cluster_name'] ?? 'unknown';
$clusterNames[] = $clusterName;
$nodeList = [];
foreach ($nodes['data'] ?? [] as $node) {
$nodeList[] = [
'name' => $node['name'],
'running' => $node['running'],
'type' => $node['type'],
'partitions' => $node['partitions'] ?? [],
];
$runningNodes[] = $node['name'];
}
$results['nodes'][$endpoint] = [
'cluster_name' => $clusterName,
'node_count' => count($nodeList),
'nodes' => $nodeList,
];
}
$uniqueClusterNames = array_unique($clusterNames);
if (count($uniqueClusterNames) > 1) {
$results['cluster_consistent'] = false;
$results['issues'][] = '集群名称不一致: ' . implode(', ', $uniqueClusterNames);
}
$uniqueRunningNodes = array_unique($runningNodes);
$expectedNodeCount = count($this->apiEndpoints);
if (count($uniqueRunningNodes) !== $expectedNodeCount) {
$results['cluster_consistent'] = false;
$results['issues'][] = "节点数量不一致,期望 {$expectedNodeCount},实际 " . count($uniqueRunningNodes);
}
return $results;
}
public function testMessageRouting(): array
{
$testQueue = 'test_cluster_queue_' . time();
$testMessage = 'Cluster test message at ' . date('Y-m-d H:i:s');
$results = [
'queue_created' => false,
'message_sent' => false,
'message_received' => false,
'queue_deleted' => false,
];
$endpoint = $this->apiEndpoints[0];
$createQueue = $this->request("{$endpoint}/queues/%2f/{$testQueue}");
if ($createQueue['status'] === 201 || $createQueue['status'] === 204) {
$results['queue_created'] = true;
}
$publish = $this->request("{$endpoint}/exchanges/%2f/amq.default/publish");
$results['message_sent'] = true;
$get = $this->request("{$endpoint}/queues/%2f/{$testQueue}/get");
if ($get['status'] === 200 && !empty($get['data'])) {
$results['message_received'] = true;
}
$delete = $this->request("{$endpoint}/queues/%2f/{$testQueue}");
if ($delete['status'] === 204) {
$results['queue_deleted'] = true;
}
return $results;
}
public function generateReport(): string
{
$verification = $this->verifyCluster();
$report = "=== RabbitMQ 集群验证报告 ===\n";
$report .= "时间: {$verification['timestamp']}\n";
$report .= "集群一致性: " . ($verification['cluster_consistent'] ? '正常' : '异常') . "\n\n";
foreach ($verification['nodes'] as $endpoint => $info) {
$report .= "节点: {$endpoint}\n";
$report .= " 集群名称: {$info['cluster_name']}\n";
$report .= " 节点数量: {$info['node_count']}\n";
foreach ($info['nodes'] as $node) {
$status = $node['running'] ? '运行中' : '已停止';
$report .= " - {$node['name']} ({$node['type']}): {$status}\n";
}
$report .= "\n";
}
if (!empty($verification['issues'])) {
$report .= "问题:\n";
foreach ($verification['issues'] as $issue) {
$report .= " - {$issue}\n";
}
}
return $report;
}
}
$verifier = new ClusterStatusVerifier(
['rabbitmq-node1', 'rabbitmq-node2', 'rabbitmq-node3'],
'admin',
'Admin@123456'
);
echo $verifier->generateReport();五、实际应用场景
5.1 三节点生产集群搭建
mermaid
graph TB
subgraph "三节点集群架构"
N1[节点1<br/>192.168.1.101<br/>Disc Node]
N2[节点2<br/>192.168.1.102<br/>Disc Node]
N3[节点3<br/>192.168.1.103<br/>Disc Node]
N1 <--> N2
N2 <--> N3
N1 <--> N3
end
subgraph "客户端"
P[生产者]
C[消费者]
end
subgraph "负载均衡"
LB[HAProxy]
end
P --> LB
C --> LB
LB --> N1
LB --> N2
LB --> N35.2 搭建检查清单
markdown
## 集群搭建检查清单
### 环境准备
- [ ] 所有节点主机名已配置
- [ ] 所有节点 hosts 文件已更新
- [ ] 所有节点时间已同步
- [ ] 防火墙端口已开放
- [ ] SELinux 已配置或关闭
### 软件安装
- [ ] Erlang 已安装(版本兼容)
- [ ] RabbitMQ 已安装
- [ ] 管理插件已启用
### 集群配置
- [ ] Erlang Cookie 已同步
- [ ] 所有节点已加入集群
- [ ] 集群状态正常
### 用户和权限
- [ ] 管理员用户已创建
- [ ] 应用用户已创建
- [ ] 权限已配置
### 高可用配置
- [ ] 队列策略已配置
- [ ] 镜像/仲裁队列已启用
### 验证测试
- [ ] 节点间通信正常
- [ ] 消息收发正常
- [ ] 故障转移测试通过六、常见问题与解决方案
6.1 节点无法加入集群
问题: 执行 join_cluster 报错
排查步骤:
bash
# 1. 检查 Cookie 是否一致
cat /var/lib/rabbitmq/.erlang.cookie
# 2. 检查网络连通性
ping rabbitmq-node1
telnet rabbitmq-node1 4369
telnet rabbitmq-node1 25672
# 3. 检查 EPMD
epmd -names
# 4. 查看日志
tail -f /var/log/rabbitmq/rabbit@$(hostname).log解决方案:
bash
# 确保 Cookie 一致
COOKIE=$(ssh root@rabbitmq-node1 'cat /var/lib/rabbitmq/.erlang.cookie')
echo $COOKIE > /var/lib/rabbitmq/.erlang.cookie
chmod 600 /var/lib/rabbitmq/.erlang.cookie
chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie
systemctl restart rabbitmq-server6.2 集群状态异常
问题: cluster_status 显示节点状态异常
解决方案:
bash
# 重置问题节点
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@rabbitmq-node1
rabbitmqctl start_app6.3 节点重启后无法加入
问题: 节点重启后无法自动加入集群
解决方案:
bash
# 检查节点类型
rabbitmqctl cluster_status | grep -A 5 "Disk Nodes"
# 如果是 RAM 节点,确保 Disc 节点先启动
# 启动顺序:Disc 节点 -> RAM 节点七、最佳实践建议
7.1 搭建建议
- 统一 Cookie: 使用脚本确保所有节点 Cookie 一致
- 时间同步: 所有节点必须时间同步
- 网络规划: 使用独立网络,确保低延迟
- 监控部署: 搭建完成后立即部署监控
7.2 安全建议
- 修改默认用户: 删除或修改 guest 用户
- 强密码策略: 使用复杂密码
- 最小权限原则: 按需分配权限
- 启用 TLS: 生产环境启用加密通信
7.3 运维建议
- 备份 Cookie: 妥善保存 Erlang Cookie
- 记录配置: 保存所有配置文件
- 定期演练: 定期进行故障演练
- 版本管理: 保持所有节点版本一致
