Skip to content

RabbitMQ 集群运维管理

一、概述

RabbitMQ 集群的运维管理是确保消息系统稳定运行的关键。本文档涵盖日常运维、监控告警、故障处理、备份恢复等核心运维操作。

运维管理体系

mermaid
graph TB
    A[运维管理] --> B[日常运维]
    A --> C[监控告警]
    A --> D[故障处理]
    A --> E[备份恢复]
    A --> F[性能优化]
    
    B --> B1[状态检查]
    B --> B2[日志管理]
    B --> B3[用户管理]
    
    C --> C1[指标采集]
    C --> C2[告警配置]
    C --> C3[可视化面板]
    
    D --> D1[故障诊断]
    D --> D2[故障恢复]
    D --> D3[故障复盘]
    
    E --> E1[数据备份]
    E --> E2[配置备份]
    E --> E3[恢复演练]
    
    F --> F1[参数调优]
    F --> F2[资源优化]
    F --> F3[架构优化]

二、核心知识点

2.1 日常运维操作

状态检查命令

bash
# 集群状态
rabbitmqctl cluster_status

# 节点状态
rabbitmqctl status

# 运行环境
rabbitmqctl environment

# 队列状态
rabbitmqctl list_queues name messages consumers

# 连接状态
rabbitmqctl list_connections

# 通道状态
rabbitmqctl list_channels

# 用户列表
rabbitmqctl list_users

# 虚拟主机
rabbitmqctl list_vhosts

# 策略列表
rabbitmqctl list_policies

常用运维命令

bash
# 重置节点
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app

# 关闭节点
rabbitmqctl stop

# 滚动重启(逐个节点重启)
# 在每个节点上执行
rabbitmqctl stop_app
systemctl restart rabbitmq-server
rabbitmqctl start_app

# 清空队列
rabbitmqctl purge_queue queue_name

# 删除队列
rabbitmqctl delete_queue queue_name

# 同步队列
rabbitmqctl sync_queue queue_name

# 取消同步
rabbitmqctl cancel_sync_queue queue_name

2.2 监控指标

关键监控指标

类别指标说明告警阈值
节点运行状态节点是否存活不可用
节点内存使用内存占用率> 80%
节点磁盘空间磁盘可用空间< 20%
节点文件描述符fd 使用率> 80%
节点Socket 连接socket 使用率> 80%
队列消息数量队列深度> 10000
队列消费者数消费者数量= 0
集群网络分区分区状态存在分区
集群节点数量运行节点数< 预期

Prometheus 指标

promql
# 节点状态
rabbitmq_up{cluster="rabbitmq-cluster"}

# 内存使用
rabbitmq_node_mem_used_bytes / rabbitmq_node_mem_limit_bytes

# 磁盘空间
rabbitmq_node_disk_free_bytes / rabbitmq_node_disk_free_limit_bytes

# 队列深度
rabbitmq_queue_messages

# 消息速率
rate(rabbitmq_queue_messages_ready_total[5m])

# 连接数
rabbitmq_connections

# 通道数
rabbitmq_channels

2.3 日志管理

日志文件位置

/var/log/rabbitmq/
├── rabbit@hostname.log          # 主日志
├── rabbit@hostname_upgrade.log  # 升级日志
├── rabbit@hostname-sasl.log     # SASL 日志
└── crash.log                     # 崩溃日志

日志级别配置

ini
# /etc/rabbitmq/rabbitmq.conf

# 控制台日志级别
log.console.level = info

# 文件日志级别
log.file.level = info
log.file = /var/log/rabbitmq/rabbit@hostname.log

# 日志轮转
log.file.max_size = 100MB
log.file.max_age = 7 days
log.file.max_count = 10

# JSON 格式日志
log.file.formatter = json

日志查看

bash
# 实时查看日志
tail -f /var/log/rabbitmq/rabbit@$(hostname).log

# 搜索错误
grep -i error /var/log/rabbitmq/rabbit@$(hostname).log

# 搜索警告
grep -i warning /var/log/rabbitmq/rabbit@$(hostname).log

# 搜索特定队列
grep "queue_name" /var/log/rabbitmq/rabbit@$(hostname).log

2.4 备份与恢复

备份内容

mermaid
graph TB
    A[备份内容] --> B[配置文件]
    A --> C[消息数据]
    A --> D[元数据]
    
    B --> B1[rabbitmq.conf]
    B --> B2[enabled_plugins]
    B --> B3[advanced.config]
    
    C --> C1[持久化消息]
    C --> C2[队列定义]
    
    D --> D1[用户信息]
    D --> D2[虚拟主机]
    D --> D3[权限配置]
    D --> D4[策略定义]

备份命令

bash
# 导出定义(用户、队列、交换器、绑定等)
rabbitmqctl export_definitions /backup/definitions.json

# 导入定义
rabbitmqctl import_definitions /backup/definitions.json

# 备份数据目录
tar -czvf /backup/rabbitmq-data-$(date +%Y%m%d).tar.gz /var/lib/rabbitmq/mnesia/

# 备份配置文件
tar -czvf /backup/rabbitmq-config-$(date +%Y%m%d).tar.gz /etc/rabbitmq/

2.5 性能优化

内存优化

ini
# /etc/rabbitmq/rabbitmq.conf

# 内存高水位线
vm_memory_high_watermark.relative = 0.6

# 内存分页比例
vm_memory_high_watermark_paging_ratio = 0.75

# 总内存限制(绝对值)
# vm_memory_high_watermark.absolute = 4GB

磁盘优化

ini
# /etc/rabbitmq/rabbitmq.conf

# 磁盘自由空间限制
disk_free_limit.relative = 2.0
# 或绝对值
# disk_free_limit.absolute = 10GB

# 消息存储模式
# queue_default_type = quorum

连接优化

ini
# /etc/rabbitmq/rabbitmq.conf

# 最大连接数
connection_max = 50000

# 最大通道数
channel_max = 2047

# 心跳间隔
heartbeat = 60

# TCP 缓冲区
tcp_listen_options.backlog = 4096
tcp_listen_options.nodelay = true
tcp_listen_options.sndbuf = 196608
tcp_listen_options.recbuf = 196608

三、配置示例

3.1 监控配置

Prometheus 配置

yaml
# prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

scrape_configs:
  - job_name: 'rabbitmq'
    static_configs:
      - targets:
          - 'rabbitmq-node1:15692'
          - 'rabbitmq-node2:15692'
          - 'rabbitmq-node3:15692'
    metrics_path: /metrics

告警规则

yaml
# rabbitmq_alerts.yml
groups:
  - name: rabbitmq
    rules:
      - alert: RabbitMQNodeDown
        expr: rabbitmq_up == 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "RabbitMQ 节点不可用"
          description: "节点 {{ $labels.instance }} 已停止"

      - alert: RabbitMQHighMemory
        expr: rabbitmq_node_mem_used_bytes / rabbitmq_node_mem_limit_bytes > 0.8
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "RabbitMQ 内存使用过高"
          description: "节点 {{ $labels.instance }} 内存使用率过高"

      - alert: RabbitMQDiskSpaceLow
        expr: rabbitmq_node_disk_free_bytes / rabbitmq_node_disk_free_limit_bytes < 1.5
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "RabbitMQ 磁盘空间不足"
          description: "节点 {{ $labels.instance }} 磁盘空间不足"

      - alert: RabbitMQQueueDepth
        expr: rabbitmq_queue_messages > 10000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "RabbitMQ 队列深度过高"
          description: "队列 {{ $labels.queue }} 深度过高"

      - alert: RabbitMQNetworkPartition
        expr: count(rabbitmq_partitions) > 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "RabbitMQ 网络分区"
          description: "检测到网络分区"

3.2 备份脚本

bash
#!/bin/bash

# backup_rabbitmq.sh
# RabbitMQ 备份脚本

BACKUP_DIR="/backup/rabbitmq"
DATE=$(date +%Y%m%d_%H%M%S)
RETENTION_DAYS=7

mkdir -p $BACKUP_DIR/$DATE

echo "=== 开始备份 RabbitMQ ==="
echo "时间: $(date)"

# 1. 导出定义
echo "导出定义..."
rabbitmqctl export_definitions $BACKUP_DIR/$DATE/definitions.json

# 2. 备份配置
echo "备份配置..."
tar -czvf $BACKUP_DIR/$DATE/config.tar.gz /etc/rabbitmq/

# 3. 备份数据目录(可选,需要停止服务)
# echo "备份数据..."
# tar -czvf $BACKUP_DIR/$DATE/data.tar.gz /var/lib/rabbitmq/mnesia/

# 4. 记录集群状态
echo "记录集群状态..."
rabbitmqctl cluster_status > $BACKUP_DIR/$DATE/cluster_status.txt
rabbitmqctl list_queues name messages consumers > $BACKUP_DIR/$DATE/queues.txt
rabbitmqctl list_users > $BACKUP_DIR/$DATE/users.txt

# 5. 清理旧备份
echo "清理旧备份..."
find $BACKUP_DIR -type d -mtime +$RETENTION_DAYS -exec rm -rf {} \;

echo "=== 备份完成 ==="
echo "备份位置: $BACKUP_DIR/$DATE"

3.3 健康检查脚本

bash
#!/bin/bash

# health_check.sh
# RabbitMQ 健康检查脚本

NODE="rabbit@$(hostname)"
LOG_FILE="/var/log/rabbitmq/health_check.log"

log() {
    echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" | tee -a $LOG_FILE
}

check_node_status() {
    if rabbitmqctl status > /dev/null 2>&1; then
        log "节点状态: 正常"
        return 0
    else
        log "节点状态: 异常"
        return 1
    fi
}

check_memory() {
    MEMORY_USAGE=$(rabbitmqctl status | grep -A 5 "Memory" | grep "total" | awk '{print $3}' | tr -d ',')
    MEMORY_LIMIT=$(rabbitmqctl status | grep -A 5 "Memory" | grep "high watermark" | awk '{print $NF}' | tr -d ',')
    
    if [ -n "$MEMORY_USAGE" ] && [ -n "$MEMORY_LIMIT" ]; then
        RATIO=$(echo "scale=2; $MEMORY_USAGE / $MEMORY_LIMIT" | bc)
        log "内存使用: ${RATIO}%"
        
        if (( $(echo "$RATIO > 0.8" | bc -l) )); then
            log "警告: 内存使用过高"
            return 1
        fi
    fi
    return 0
}

check_disk() {
    DISK_FREE=$(rabbitmqctl status | grep -A 3 "Free Disk Space" | grep "free" | awk '{print $3}')
    DISK_LIMIT=$(rabbitmqctl status | grep -A 3 "Free Disk Space" | grep "limit" | awk '{print $3}')
    
    log "磁盘空间: ${DISK_FREE} / ${DISK_LIMIT}"
    return 0
}

check_queues() {
    QUEUES=$(rabbitmqctl list_queues name messages | grep -v "Timeout" | grep -v "Listing" | wc -l)
    TOTAL_MESSAGES=$(rabbitmqctl list_queues messages | grep -v "Timeout" | grep -v "Listing" | awk '{sum+=$1} END {print sum}')
    
    log "队列数量: $QUEUES"
    log "总消息数: ${TOTAL_MESSAGES:-0}"
    
    if [ "${TOTAL_MESSAGES:-0}" -gt 100000 ]; then
        log "警告: 消息积压过多"
        return 1
    fi
    return 0
}

check_connections() {
    CONNECTIONS=$(rabbitmqctl list_connections | grep -v "Timeout" | grep -v "Listing" | wc -l)
    log "连接数: $CONNECTIONS"
    return 0
}

check_partitions() {
    PARTITIONS=$(rabbitmqctl cluster_status | grep -A 5 "Partitions" | grep -v "none" | grep -v "Partitions" | wc -l)
    
    if [ "$PARTITIONS" -gt 0 ]; then
        log "警告: 检测到网络分区"
        return 1
    fi
    log "网络分区: 无"
    return 0
}

main() {
    log "=== 开始健康检查 ==="
    
    ERRORS=0
    
    check_node_status || ((ERRORS++))
    check_memory || ((ERRORS++))
    check_disk || ((ERRORS++))
    check_queues || ((ERRORS++))
    check_connections || ((ERRORS++))
    check_partitions || ((ERRORS++))
    
    log "=== 检查完成,错误数: $ERRORS ==="
    
    exit $ERRORS
}

main

四、PHP 代码示例

4.1 运维管理工具

php
<?php

class RabbitMQOperations
{
    private string $host;
    private int $port;
    private string $user;
    private string $password;
    
    public function __construct(
        string $host = 'localhost',
        int $port = 15672,
        string $user = 'guest',
        string $password = 'guest'
    ) {
        $this->host = $host;
        $this->port = $port;
        $this->user = $user;
        $this->password = $password;
    }
    
    private function request(string $endpoint, string $method = 'GET', array $data = null): array
    {
        $url = "http://{$this->host}:{$this->port}/api/{$endpoint}";
        
        $ch = curl_init();
        curl_setopt_array($ch, [
            CURLOPT_URL => $url,
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_USERPWD => "{$this->user}:{$this->password}",
            CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
            CURLOPT_CUSTOMREQUEST => $method,
            CURLOPT_TIMEOUT => 30,
        ]);
        
        if ($data !== null) {
            curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data));
        }
        
        $response = curl_exec($ch);
        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        curl_close($ch);
        
        return [
            'status' => $httpCode,
            'data' => json_decode($response, true)
        ];
    }
    
    public function getOverview(): array
    {
        return $this->request('overview');
    }
    
    public function getNodes(): array
    {
        return $this->request('nodes');
    }
    
    public function getQueues(): array
    {
        return $this->request('queues');
    }
    
    public function getConnections(): array
    {
        return $this->request('connections');
    }
    
    public function purgeQueue(string $vhost, string $queue): array
    {
        return $this->request("queues/{$vhost}/{$queue}/contents", 'DELETE');
    }
    
    public function deleteQueue(string $vhost, string $queue): array
    {
        return $this->request("queues/{$vhost}/{$queue}", 'DELETE');
    }
    
    public function exportDefinitions(): array
    {
        return $this->request('definitions');
    }
    
    public function importDefinitions(array $definitions): array
    {
        return $this->request('definitions', 'POST', $definitions);
    }
    
    public function generateReport(): string
    {
        $overview = $this->getOverview();
        $nodes = $this->getNodes();
        $queues = $this->getQueues();
        $connections = $this->getConnections();
        
        $report = "=== RabbitMQ 运维报告 ===\n";
        $report .= "生成时间: " . date('Y-m-d H:i:s') . "\n\n";
        
        if ($overview['status'] === 200) {
            $report .= "集群信息:\n";
            $report .= "  名称: {$overview['data']['cluster_name']}\n";
            $report .= "  版本: {$overview['data']['rabbitmq_version']}\n";
            $report .= "  Erlang: {$overview['data']['erlang_version']}\n\n";
        }
        
        if ($nodes['status'] === 200) {
            $report .= "节点状态:\n";
            foreach ($nodes['data'] as $node) {
                $status = $node['running'] ? '运行中' : '已停止';
                $memUsage = round($node['mem_used'] / $node['mem_limit'] * 100, 2);
                $report .= "  {$node['name']}: {$status}, 内存 {$memUsage}%\n";
            }
            $report .= "\n";
        }
        
        if ($queues['status'] === 200) {
            $totalMessages = array_sum(array_column($queues['data'], 'messages'));
            $report .= "队列统计:\n";
            $report .= "  队列数: " . count($queues['data']) . "\n";
            $report .= "  总消息数: {$totalMessages}\n\n";
        }
        
        if ($connections['status'] === 200) {
            $report .= "连接统计:\n";
            $report .= "  连接数: " . count($connections['data']) . "\n";
        }
        
        return $report;
    }
}

$ops = new RabbitMQOperations('localhost', 15672, 'admin', 'Admin@123456');
echo $ops->generateReport();

五、常见问题与解决方案

5.1 节点无法启动

排查步骤:

bash
# 查看日志
tail -f /var/log/rabbitmq/rabbit@$(hostname).log

# 检查端口占用
netstat -tlnp | grep 5672

# 检查 Erlang Cookie
cat /var/lib/rabbitmq/.erlang.cookie

# 检查权限
ls -la /var/lib/rabbitmq/
ls -la /var/log/rabbitmq/

5.2 内存告警

处理步骤:

bash
# 查看内存使用
rabbitmqctl status | grep -A 10 "Memory"

# 临时提高内存限制
rabbitmqctl eval 'application:set_env(rabbit, vm_memory_high_watermark, 0.8).'

# 清理队列
rabbitmqctl purge_queue queue_name

5.3 磁盘告警

处理步骤:

bash
# 查看磁盘使用
df -h

# 清理日志
find /var/log/rabbitmq -name "*.log" -mtime +7 -delete

# 清理旧备份
find /backup/rabbitmq -type d -mtime +30 -exec rm -rf {} \;

六、最佳实践建议

6.1 日常运维建议

  1. 定期检查: 每日检查集群状态和资源使用
  2. 日志监控: 实时监控错误和警告日志
  3. 定期备份: 每日备份定义和配置
  4. 容量规划: 定期评估资源需求

6.2 监控建议

  1. 全面覆盖: 监控所有关键指标
  2. 分级告警: 设置合理的告警阈值
  3. 可视化: 使用 Grafana 展示监控数据
  4. 自动化: 配置自动告警和自动恢复

6.3 安全建议

  1. 定期更新: 保持版本更新
  2. 权限管理: 定期审计用户权限
  3. 访问控制: 限制管理界面访问
  4. 审计日志: 启用操作审计

七、相关链接