Appearance
RabbitMQ 扩容与缩容
一、概述
集群扩容与缩容是 RabbitMQ 运维中的常见操作。扩容可以提升集群处理能力,缩容可以优化资源利用。正确的扩缩容操作能够保证服务的连续性和数据的安全性。
扩缩容流程
mermaid
graph TB
subgraph "扩容流程"
A1[评估需求] --> A2[准备新节点]
A2 --> A3[安装配置]
A3 --> A4[加入集群]
A4 --> A5[同步数据]
A5 --> A6[负载均衡]
end
subgraph "缩容流程"
B1[评估影响] --> B2[迁移队列]
B2 --> B3[停止服务]
B3 --> B4[移出集群]
B4 --> B5[清理资源]
end二、核心知识点
2.1 扩容原理
扩容效果
mermaid
graph LR
subgraph "扩容前"
N1A[节点1<br/>负载 80%]
N2A[节点2<br/>负载 75%]
N3A[节点3<br/>负载 85%]
end
subgraph "扩容后"
N1B[节点1<br/>负载 50%]
N2B[节点2<br/>负载 45%]
N3B[节点3<br/>负载 55%]
N4B[节点4<br/>负载 40%]
end
N1A <--> N2A <--> N3A
N1B <--> N2B <--> N3B <--> N4B扩容收益
| 指标 | 扩容前 | 扩容后 | 提升 |
|---|---|---|---|
| 节点数 | 3 | 4 | +33% |
| 总吞吐量 | 30K TPS | 40K TPS | +33% |
| 单节点负载 | 85% | 55% | -35% |
| 故障容忍 | 1 节点 | 1 节点 | 不变 |
2.2 缩容原理
缩容影响
mermaid
graph LR
subgraph "缩容前"
N1A[节点1]
N2A[节点2]
N3A[节点3]
N4A[节点4<br/>待移除]
end
subgraph "缩容后"
N1B[节点1]
N2B[节点2]
N3B[节点3]
end
N1A <--> N2A <--> N3A <--> N4A
N1B <--> N2B <--> N3B缩容注意事项
- 队列迁移: 确保队列主节点不在待移除节点
- 数据同步: 等待数据同步完成
- 连接转移: 客户端连接需要转移
- 负载评估: 确保剩余节点能承担负载
2.3 队列迁移
队列主节点迁移
mermaid
sequenceDiagram
participant Admin as 管理员
participant N1 as 节点1<br/>队列主
participant N2 as 节点2<br/>镜像
participant N3 as 节点3
Admin->>N1: 迁移队列主节点
N1->>N2: 同步数据
N2->>N2: 提升为主
N1->>N1: 删除队列
Note over N2: 成为新的队列主迁移命令
bash
# 查看队列主节点
rabbitmqctl list_queues name pid
# 迁移队列(需要先配置镜像)
rabbitmqctl set_policy ha-migrate "queue_name" \
'{"ha-mode":"nodes","ha-params":["rabbit@node2"]}'
# 等待同步完成
rabbitmqctl sync_queue queue_name
# 取消同步
rabbitmqctl cancel_sync_queue queue_name2.4 扩缩容策略
扩容时机
| 指标 | 阈值 | 说明 |
|---|---|---|
| CPU 使用率 | > 70% | 持续 5 分钟 |
| 内存使用率 | > 70% | 接近高水位线 |
| 消息积压 | > 10000 | 队列深度持续增长 |
| 连接数 | > 80% | 接近连接上限 |
| TPS | 接近峰值 | 处理能力不足 |
缩容时机
| 指标 | 阈值 | 说明 |
|---|---|---|
| CPU 使用率 | < 30% | 持续 30 分钟 |
| 内存使用率 | < 40% | 远低于高水位线 |
| 消息积压 | < 1000 | 队列深度稳定 |
| 连接数 | < 50% | 连接数较少 |
三、配置示例
3.1 扩容操作步骤
准备新节点
bash
# 在新节点上执行
# 1. 设置主机名
hostnamectl set-hostname rabbitmq-node4
# 2. 配置 hosts
cat >> /etc/hosts << EOF
192.168.1.101 rabbitmq-node1
192.168.1.102 rabbitmq-node2
192.168.1.103 rabbitmq-node3
192.168.1.104 rabbitmq-node4
EOF
# 3. 同步 Erlang Cookie
COOKIE=$(ssh root@rabbitmq-node1 'cat /var/lib/rabbitmq/.erlang.cookie')
echo $COOKIE > /var/lib/rabbitmq/.erlang.cookie
chmod 600 /var/lib/rabbitmq/.erlang.cookie
chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie
# 4. 安装 RabbitMQ
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
yum install -y erlang
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
yum install -y rabbitmq-server
# 5. 启用管理插件
rabbitmq-plugins enable rabbitmq_management加入集群
bash
# 在新节点上执行
# 1. 启动服务
systemctl start rabbitmq-server
# 2. 停止应用
rabbitmqctl stop_app
# 3. 加入集群
rabbitmqctl join_cluster rabbit@rabbitmq-node1
# 4. 启动应用
rabbitmqctl start_app
# 5. 验证状态
rabbitmqctl cluster_status更新负载均衡
bash
# 更新 HAProxy 配置
cat >> /etc/haproxy/haproxy.cfg << 'EOF'
# 添加新节点
server rabbitmq4 192.168.1.104:5672 check inter 5000 rise 2 fall 3
EOF
# 重载 HAProxy
systemctl reload haproxy3.2 缩容操作步骤
迁移队列
bash
# 在管理节点上执行
# 1. 查看待移除节点上的队列
rabbitmqctl list_queues name pid | grep rabbit@rabbitmq-node4
# 2. 迁移队列主节点
for queue in $(rabbitmqctl list_queues name pid | grep rabbit@rabbitmq-node4 | awk '{print $1}'); do
echo "Migrating queue: $queue"
rabbitmqctl set_policy temp-migrate-$queue "^$queue$" \
'{"ha-mode":"nodes","ha-params":["rabbit@rabbitmq-node1","rabbit@rabbitmq-node2","rabbit@rabbitmq-node3"]}'
# 等待同步
sleep 5
# 删除临时策略
rabbitmqctl clear_policy temp-migrate-$queue
done
# 3. 验证队列已迁移
rabbitmqctl list_queues name pid | grep rabbit@rabbitmq-node4移出集群
bash
# 在待移除节点上执行
# 1. 停止应用
rabbitmqctl stop_app
# 2. 重置节点
rabbitmqctl reset
# 3. 停止服务
systemctl stop rabbitmq-server
# 4. 禁用服务
systemctl disable rabbitmq-server更新负载均衡
bash
# 移除节点配置
sed -i '/rabbitmq4/d' /etc/haproxy/haproxy.cfg
# 重载 HAProxy
systemctl reload haproxy3.3 自动扩缩容脚本
bash
#!/bin/bash
# scale_cluster.sh
# RabbitMQ 集群自动扩缩容脚本
ACTION=$1
NODE=$2
CLUSTER_NODE="rabbit@rabbitmq-node1"
case $ACTION in
"add")
echo "=== 添加节点 $NODE 到集群 ==="
# 同步 Cookie
COOKIE=$(rabbitmqctl -n $CLUSTER_NODE eval 'io:format("~s~n", [rabbit_misc:cookie()]).')
ssh root@$NODE "echo '$COOKIE' > /var/lib/rabbitmq/.erlang.cookie && chmod 600 /var/lib/rabbitmq/.erlang.cookie && chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie"
# 加入集群
ssh root@$NODE "rabbitmqctl stop_app && rabbitmqctl join_cluster $CLUSTER_NODE && rabbitmqctl start_app"
# 验证
rabbitmqctl cluster_status
;;
"remove")
echo "=== 从集群移除节点 $NODE ==="
# 检查队列
QUEUES=$(rabbitmqctl list_queues name pid | grep rabbit@$NODE | wc -l)
if [ $QUEUES -gt 0 ]; then
echo "警告: 节点 $NODE 上有 $QUEUES 个队列,请先迁移"
exit 1
fi
# 移出集群
rabbitmqctl -n rabbit@$NODE stop_app
rabbitmqctl -n $CLUSTER_NODE forget_cluster_node rabbit@$NODE
# 验证
rabbitmqctl cluster_status
;;
"status")
echo "=== 集群状态 ==="
rabbitmqctl cluster_status
rabbitmqctl list_queues name pid
;;
*)
echo "用法: $0 {add|remove|status} <node>"
exit 1
;;
esac四、PHP 代码示例
4.1 扩缩容管理器
php
<?php
class ClusterScaler
{
private array $nodes;
private string $apiHost;
private int $apiPort;
private string $user;
private string $password;
public function __construct(
array $existingNodes,
string $apiHost = 'localhost',
int $apiPort = 15672,
string $user = 'guest',
string $password = 'guest'
) {
$this->nodes = $existingNodes;
$this->apiHost = $apiHost;
$this->apiPort = $apiPort;
$this->user = $user;
$this->password = $password;
}
private function request(string $endpoint, string $method = 'GET', array $data = null): array
{
$url = "http://{$this->apiHost}:{$this->apiPort}/api/{$endpoint}";
$ch = curl_init();
curl_setopt_array($ch, [
CURLOPT_URL => $url,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_USERPWD => "{$this->user}:{$this->password}",
CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
CURLOPT_CUSTOMREQUEST => $method,
]);
if ($data !== null) {
curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data));
}
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
return [
'status' => $httpCode,
'data' => json_decode($response, true)
];
}
public function evaluateScalingNeed(): array
{
$nodes = $this->request('nodes');
if ($nodes['status'] !== 200) {
return ['error' => 'Failed to get node data'];
}
$evaluation = [
'timestamp' => date('Y-m-d H:i:s'),
'current_nodes' => count($nodes['data']),
'metrics' => [],
'recommendation' => null,
];
$totalCpu = 0;
$totalMemory = 0;
$totalConnections = 0;
$maxConnections = 0;
foreach ($nodes['data'] as $node) {
$memUsage = $node['mem_used'] / $node['mem_limit'];
$fdUsage = $node['fd_used'] / $node['fd_total'];
$socketUsage = $node['sockets_used'] / $node['sockets_total'];
$totalMemory += $memUsage;
$totalConnections += $node['sockets_used'];
$maxConnections += $node['sockets_total'];
$evaluation['metrics'][$node['name']] = [
'memory_usage' => round($memUsage * 100, 2) . '%',
'fd_usage' => round($fdUsage * 100, 2) . '%',
'socket_usage' => round($socketUsage * 100, 2) . '%',
'running' => $node['running'],
];
}
$avgMemory = $totalMemory / count($nodes['data']);
$connectionUsage = $totalConnections / $maxConnections;
$evaluation['summary'] = [
'avg_memory_usage' => round($avgMemory * 100, 2) . '%',
'connection_usage' => round($connectionUsage * 100, 2) . '%',
];
if ($avgMemory > 0.7 || $connectionUsage > 0.8) {
$evaluation['recommendation'] = 'scale_out';
$evaluation['reason'] = '资源使用率过高,建议扩容';
} elseif ($avgMemory < 0.3 && $connectionUsage < 0.3 && count($nodes['data']) > 3) {
$evaluation['recommendation'] = 'scale_in';
$evaluation['reason'] = '资源使用率较低,可考虑缩容';
} else {
$evaluation['recommendation'] = 'maintain';
$evaluation['reason'] = '资源使用正常';
}
return $evaluation;
}
public function prepareScaleOut(string $newNode): array
{
return [
'steps' => [
[
'step' => 1,
'action' => 'configure_hostname',
'commands' => [
"hostnamectl set-hostname {$newNode}",
],
],
[
'step' => 2,
'action' => 'configure_hosts',
'commands' => $this->generateHostsCommands($newNode),
],
[
'step' => 3,
'action' => 'sync_cookie',
'commands' => [
'COOKIE=$(rabbitmqctl eval \'rabbit_misc:cookie().\' | tr -d \'"\' )',
"ssh root@{$newNode} \"echo \$COOKIE > /var/lib/rabbitmq/.erlang.cookie\"",
"ssh root@{$newNode} \"chmod 600 /var/lib/rabbitmq/.erlang.cookie\"",
"ssh root@{$newNode} \"chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie\"",
],
],
[
'step' => 4,
'action' => 'join_cluster',
'commands' => [
"ssh root@{$newNode} \"systemctl start rabbitmq-server\"",
"ssh root@{$newNode} \"rabbitmqctl stop_app\"",
"ssh root@{$newNode} \"rabbitmqctl join_cluster rabbit@{$this->nodes[0]}\"",
"ssh root@{$newNode} \"rabbitmqctl start_app\"",
],
],
[
'step' => 5,
'action' => 'verify',
'commands' => [
'rabbitmqctl cluster_status',
],
],
],
];
}
public function prepareScaleIn(string $nodeToRemove): array
{
$queuesOnNode = $this->getQueuesOnNode($nodeToRemove);
return [
'node_to_remove' => $nodeToRemove,
'queues_to_migrate' => $queuesOnNode,
'steps' => [
[
'step' => 1,
'action' => 'check_queues',
'description' => '检查待移除节点上的队列',
'queue_count' => count($queuesOnNode),
],
[
'step' => 2,
'action' => 'migrate_queues',
'description' => '迁移队列到其他节点',
'commands' => $this->generateMigrationCommands($queuesOnNode, $nodeToRemove),
],
[
'step' => 3,
'action' => 'stop_app',
'commands' => [
"rabbitmqctl -n rabbit@{$nodeToRemove} stop_app",
],
],
[
'step' => 4,
'action' => 'forget_node',
'commands' => [
"rabbitmqctl -n rabbit@{$this->nodes[0]} forget_cluster_node rabbit@{$nodeToRemove}",
],
],
[
'step' => 5,
'action' => 'verify',
'commands' => [
'rabbitmqctl cluster_status',
],
],
],
];
}
private function getQueuesOnNode(string $nodeName): array
{
$queues = $this->request('queues');
if ($queues['status'] !== 200) {
return [];
}
$queuesOnNode = [];
foreach ($queues['data'] as $queue) {
$pid = $queue['pid'] ?? '';
if (strpos($pid, "rabbit@{$nodeName}") !== false) {
$queuesOnNode[] = [
'name' => $queue['name'],
'vhost' => $queue['vhost'],
'messages' => $queue['messages'] ?? 0,
];
}
}
return $queuesOnNode;
}
private function generateHostsCommands(string $newNode): array
{
$commands = [];
$newNodeIp = "192.168.1." . (100 + count($this->nodes) + 1);
foreach ($this->nodes as $existingNode) {
$commands[] = "ssh root@{$existingNode} \"echo '{$newNodeIp} {$newNode}' >> /etc/hosts\"";
}
foreach ($this->nodes as $index => $existingNode) {
$ip = "192.168.1." . (101 + $index);
$commands[] = "ssh root@{$newNode} \"echo '{$ip} {$existingNode}' >> /etc/hosts\"";
}
return $commands;
}
private function generateMigrationCommands(array $queues, string $fromNode): array
{
$commands = [];
$targetNodes = array_filter($this->nodes, fn($n) => $n !== $fromNode);
foreach ($queues as $queue) {
$targetNode = $targetNodes[array_rand($targetNodes)];
$commands[] = "rabbitmqctl set_policy migrate-{$queue['name']} \"^{$queue['name']}$\" " .
"'{\"ha-mode\":\"nodes\",\"ha-params\":[\"rabbit@{$targetNode}\"]}'";
}
return $commands;
}
}
$scaler = new ClusterScaler(
['rabbitmq-node1', 'rabbitmq-node2', 'rabbitmq-node3'],
'rabbitmq-node1',
15672,
'admin',
'Admin@123456'
);
echo "=== 扩缩容评估 ===\n";
$evaluation = $scaler->evaluateScalingNeed();
echo json_encode($evaluation, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE) . "\n";
echo "\n=== 扩容准备 ===\n";
$scaleOut = $scaler->prepareScaleOut('rabbitmq-node4');
echo json_encode($scaleOut, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE) . "\n";4.2 队列迁移工具
php
<?php
class QueueMigrator
{
private string $apiHost;
private int $apiPort;
private string $user;
private string $password;
public function __construct(
string $apiHost = 'localhost',
int $apiPort = 15672,
string $user = 'guest',
string $password = 'guest'
) {
$this->apiHost = $apiHost;
$this->apiPort = $apiPort;
$this->user = $user;
$this->password = $password;
}
private function request(string $endpoint, string $method = 'GET', array $data = null): array
{
$url = "http://{$this->apiHost}:{$this->apiPort}/api/{$endpoint}";
$ch = curl_init();
curl_setopt_array($ch, [
CURLOPT_URL => $url,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_USERPWD => "{$this->user}:{$this->password}",
CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
CURLOPT_CUSTOMREQUEST => $method,
]);
if ($data !== null) {
curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data));
}
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
return [
'status' => $httpCode,
'data' => json_decode($response, true)
];
}
public function listQueuesByNode(string $nodeName): array
{
$queues = $this->request('queues');
if ($queues['status'] !== 200) {
return [];
}
$result = [];
foreach ($queues['data'] as $queue) {
$pid = $queue['pid'] ?? '';
if (strpos($pid, "rabbit@{$nodeName}") !== false) {
$result[] = [
'name' => $queue['name'],
'vhost' => $queue['vhost'],
'type' => $queue['type'] ?? 'classic',
'messages' => $queue['messages'] ?? 0,
'pid' => $pid,
];
}
}
return $result;
}
public function migrateQueue(string $queueName, string $vhost, string $targetNode): array
{
$policyName = "migrate-{$queueName}";
$result = $this->request(
"policies/%2f/{$policyName}",
'PUT',
[
'pattern' => "^{$queueName}$",
'definition' => [
'ha-mode' => 'nodes',
'ha-params' => ["rabbit@{$targetNode}"],
],
'priority' => 100,
]
);
return [
'queue' => $queueName,
'target_node' => $targetNode,
'status' => $result['status'] === 204 ? 'success' : 'failed',
'http_code' => $result['status'],
];
}
public function waitForSync(string $queueName, int $timeout = 60): bool
{
$startTime = time();
while (time() - $startTime < $timeout) {
$result = $this->request("queues/%2f/{$queueName}");
if ($result['status'] === 200) {
$backingQueueStatus = $result['data']['backing_queue_status'] ?? [];
$mode = $backingQueueStatus['mode'] ?? '';
if ($mode === 'default') {
return true;
}
}
sleep(2);
}
return false;
}
public function migrateAllQueuesFromNode(string $sourceNode, string $targetNode): array
{
$queues = $this->listQueuesByNode($sourceNode);
$results = [];
foreach ($queues as $queue) {
$result = $this->migrateQueue($queue['name'], $queue['vhost'], $targetNode);
$result['messages'] = $queue['messages'];
$results[] = $result;
if ($queue['messages'] > 0) {
$this->waitForSync($queue['name']);
}
}
return [
'source_node' => $sourceNode,
'target_node' => $targetNode,
'total_queues' => count($queues),
'migrated' => count(array_filter($results, fn($r) => $r['status'] === 'success')),
'details' => $results,
];
}
public function generateMigrationReport(string $sourceNode): string
{
$queues = $this->listQueuesByNode($sourceNode);
$report = "=== 队列迁移报告 ===\n";
$report .= "源节点: {$sourceNode}\n";
$report .= "队列数量: " . count($queues) . "\n\n";
$totalMessages = 0;
foreach ($queues as $queue) {
$report .= "- {$queue['name']}\n";
$report .= " 类型: {$queue['type']}\n";
$report .= " 消息数: {$queue['messages']}\n";
$totalMessages += $queue['messages'];
}
$report .= "\n总消息数: {$totalMessages}\n";
return $report;
}
}
$migrator = new QueueMigrator('rabbitmq-node1', 15672, 'admin', 'Admin@123456');
echo $migrator->generateMigrationReport('rabbitmq-node4');
$result = $migrator->migrateAllQueuesFromNode('rabbitmq-node4', 'rabbitmq-node1');
echo "\n=== 迁移结果 ===\n";
echo json_encode($result, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE) . "\n";五、实际应用场景
5.1 业务高峰扩容
mermaid
graph TB
subgraph "扩容流程"
A[监控告警<br/>CPU > 70%] --> B[评估扩容需求]
B --> C[准备新节点]
C --> D[加入集群]
D --> E[更新负载均衡]
E --> F[验证服务]
end5.2 成本优化缩容
mermaid
graph TB
subgraph "缩容流程"
A[资源使用率低<br/>CPU < 30%] --> B[评估缩容影响]
B --> C[迁移队列]
C --> D[移出集群]
D --> E[释放资源]
E --> F[更新配置]
end六、常见问题与解决方案
6.1 扩容后队列分布不均
问题: 新节点加入后队列没有自动分布
解决方案:
bash
# 手动迁移部分队列
rabbitmqctl set_policy rebalance ".*" '{"ha-mode":"all"}' --apply-to queues
# 或使用队列迁移工具
rabbitmqctl migrate_queue queue_name rabbit@new_node6.2 缩容时队列丢失
问题: 缩容时队列数据丢失
预防措施:
bash
# 1. 确保队列已迁移
rabbitmqctl list_queues name pid slave_pids
# 2. 等待同步完成
rabbitmqctl sync_queue queue_name
# 3. 验证镜像存在
rabbitmqctl list_queues name policy6.3 扩容后性能下降
问题: 扩容后集群性能反而下降
原因分析:
- 网络带宽不足
- 磁盘 I/O 瓶颈
- 同步开销过大
解决方案:
ini
# 优化配置
tcp_listen_options.sndbuf = 196608
tcp_listen_options.recbuf = 196608
# 减少同步频率
ha-sync-batch-size = 100七、最佳实践建议
7.1 扩容建议
- 提前规划: 预留扩容空间和时间
- 监控先行: 确保监控系统正常
- 逐步扩容: 避免一次性扩容过多节点
- 验证测试: 扩容后进行功能验证
7.2 缩容建议
- 评估影响: 确认缩容不会影响服务
- 数据迁移: 确保数据完整迁移
- 通知用户: 提前通知相关方
- 保留日志: 记录缩容操作日志
7.3 检查清单
扩容检查清单:
- [ ] 新节点硬件配置符合要求
- [ ] 网络配置正确
- [ ] Erlang Cookie 已同步
- [ ] 防火墙端口已开放
- [ ] 负载均衡已更新
- [ ] 监控已配置
缩容检查清单:
- [ ] 队列已迁移
- [ ] 数据已同步
- [ ] 客户端连接已转移
- [ ] 节点已安全移出
- [ ] 资源已释放
- [ ] 配置已更新
