Skip to content

RabbitMQ 日常运维任务

概述

日常运维是保障 RabbitMQ 稳定运行的基础工作。通过规范的运维流程和自动化工具,可以提高运维效率,减少人为错误。本文将详细介绍 RabbitMQ 的日常运维任务、检查清单和自动化脚本。

核心知识点

运维任务分类

分类频率任务
健康检查每小时节点状态、资源使用
日常巡检每天日志检查、队列状态
定期维护每周清理、优化
深度检查每月性能分析、容量评估

运维检查清单

□ 节点状态检查
□ 内存使用检查
□ 磁盘空间检查
□ 队列状态检查
□ 连接数检查
□ 消息堆积检查
□ 日志异常检查
□ 备份状态检查

运维工具

工具用途
rabbitmqctl命令行管理工具
rabbitmq-diagnostics诊断工具
rabbitmq-plugins插件管理
rabbitmq-queues队列管理
HTTP API远程管理接口

配置示例

PHP 日常运维类

php
<?php

class RabbitMQDailyOps
{
    private $apiClient;
    private $logger;
    private $config;
    
    public function __construct($apiClient, $logger, $config = [])
    {
        $this->apiClient = $apiClient;
        $this->logger = $logger;
        $this->config = array_merge([
            'max_messages_per_queue' => 10000,
            'max_connections' => 1000,
            'min_disk_free_gb' => 10,
            'max_memory_percent' => 80,
        ], $config);
    }
    
    public function runHealthCheck()
    {
        $results = [];
        
        $results['nodes'] = $this->checkNodes();
        $results['resources'] = $this->checkResources();
        $results['queues'] = $this->checkQueues();
        $results['connections'] = $this->checkConnections();
        $results['messages'] = $this->checkMessages();
        
        $results['summary'] = $this->generateSummary($results);
        
        $this->logger->info('Health check completed', $results);
        
        return $results;
    }
    
    private function checkNodes()
    {
        $nodes = $this->apiClient->getNodes();
        $results = [];
        
        foreach ($nodes as $node) {
            $issues = [];
            
            if (!$node['running']) {
                $issues[] = '节点未运行';
            }
            
            if ($node['mem_alarm'] ?? false) {
                $issues[] = '内存告警已触发';
            }
            
            if ($node['disk_free_alarm'] ?? false) {
                $issues[] = '磁盘告警已触发';
            }
            
            $results[$node['name']] = [
                'running' => $node['running'],
                'uptime' => $node['uptime'] ?? 0,
                'type' => $node['type'],
                'issues' => $issues,
                'status' => empty($issues) ? 'healthy' : 'unhealthy',
            ];
        }
        
        return $results;
    }
    
    private function checkResources()
    {
        $nodes = $this->apiClient->getNodes();
        $results = [];
        
        foreach ($nodes as $node) {
            $memUsed = $node['mem_used'] ?? 0;
            $memLimit = $node['mem_limit'] ?? 1;
            $memPercent = ($memUsed / $memLimit) * 100;
            
            $diskFree = ($node['disk_free'] ?? 0) / 1024 / 1024 / 1024;
            
            $fdUsed = $node['fd_used'] ?? 0;
            $fdTotal = $node['fd_total'] ?? 1;
            $fdPercent = ($fdUsed / $fdTotal) * 100;
            
            $results[$node['name']] = [
                'memory' => [
                    'used' => $memUsed,
                    'limit' => $memLimit,
                    'percent' => round($memPercent, 2),
                    'status' => $memPercent < $this->config['max_memory_percent'] ? 'ok' : 'warning',
                ],
                'disk' => [
                    'free_gb' => round($diskFree, 2),
                    'status' => $diskFree > $this->config['min_disk_free_gb'] ? 'ok' : 'warning',
                ],
                'file_descriptors' => [
                    'used' => $fdUsed,
                    'total' => $fdTotal,
                    'percent' => round($fdPercent, 2),
                    'status' => $fdPercent < 80 ? 'ok' : 'warning',
                ],
            ];
        }
        
        return $results;
    }
    
    private function checkQueues()
    {
        $queues = $this->apiClient->getQueues();
        $results = [];
        
        foreach ($queues as $queue) {
            $issues = [];
            
            if ($queue['messages'] > $this->config['max_messages_per_queue']) {
                $issues[] = "消息堆积过多: {$queue['messages']}";
            }
            
            if ($queue['consumers'] === 0 && $queue['messages'] > 0) {
                $issues[] = '队列有消息但无消费者';
            }
            
            if ($queue['state'] !== 'running') {
                $issues[] = "队列状态异常: {$queue['state']}";
            }
            
            if (!empty($issues)) {
                $results[$queue['name']] = [
                    'vhost' => $queue['vhost'],
                    'messages' => $queue['messages'],
                    'consumers' => $queue['consumers'],
                    'state' => $queue['state'],
                    'issues' => $issues,
                ];
            }
        }
        
        return $results;
    }
    
    private function checkConnections()
    {
        $connections = $this->apiClient->getConnections();
        
        $totalConnections = count($connections);
        $idleConnections = 0;
        $byHost = [];
        
        foreach ($connections as $conn) {
            if (($conn['channels'] ?? 0) === 0) {
                $idleConnections++;
            }
            
            $host = $conn['peer_host'] ?? 'unknown';
            $byHost[$host] = ($byHost[$host] ?? 0) + 1;
        }
        
        return [
            'total' => $totalConnections,
            'idle' => $idleConnections,
            'by_host' => $byHost,
            'status' => $totalConnections < $this->config['max_connections'] ? 'ok' : 'warning',
        ];
    }
    
    private function checkMessages()
    {
        $overview = $this->apiClient->getOverview();
        
        $queueTotals = $overview['queue_totals'] ?? [];
        $messageStats = $overview['message_stats'] ?? [];
        
        return [
            'total' => $queueTotals['messages'] ?? 0,
            'ready' => $queueTotals['messages_ready'] ?? 0,
            'unacked' => $queueTotals['messages_unacked'] ?? 0,
            'publish_rate' => $messageStats['publish_details']['rate'] ?? 0,
            'consume_rate' => $messageStats['consume_details']['rate'] ?? 0,
            'ack_rate' => $messageStats['ack_details']['rate'] ?? 0,
        ];
    }
    
    private function generateSummary($results)
    {
        $issues = [];
        
        foreach ($results['nodes'] as $name => $node) {
            if ($node['status'] === 'unhealthy') {
                $issues[] = "节点 {$name}: " . implode(', ', $node['issues']);
            }
        }
        
        foreach ($results['queues'] as $name => $queue) {
            $issues[] = "队列 {$name}: " . implode(', ', $queue['issues']);
        }
        
        return [
            'status' => empty($issues) ? 'healthy' : 'issues_found',
            'issues_count' => count($issues),
            'issues' => $issues,
        ];
    }
    
    public function cleanupIdleQueues($maxAge = 3600)
    {
        $queues = $this->apiClient->getQueues();
        $cleaned = [];
        
        foreach ($queues as $queue) {
            if ($queue['consumers'] === 0 && ($queue['messages'] ?? 0) === 0) {
                $idleSince = $queue['idle_since'] ?? null;
                
                if ($idleSince) {
                    $idleTime = strtotime($idleSince);
                    
                    if (time() - $idleTime > $maxAge) {
                        $this->apiClient->deleteQueue($queue['vhost'], $queue['name']);
                        $cleaned[] = $queue['name'];
                        $this->logger->info("Cleaned idle queue: {$queue['name']}");
                    }
                }
            }
        }
        
        return $cleaned;
    }
    
    public function closeIdleConnections($maxIdleTime = 3600)
    {
        $connections = $this->apiClient->getConnections();
        $closed = [];
        
        foreach ($connections as $conn) {
            if (($conn['channels'] ?? 0) === 0) {
                $connectedAt = $conn['connected_at'] ?? null;
                
                if ($connectedAt) {
                    $connectedTime = strtotime($connectedAt);
                    
                    if (time() - $connectedTime > $maxIdleTime) {
                        $this->apiClient->closeConnection($conn['name']);
                        $closed[] = $conn['name'];
                        $this->logger->info("Closed idle connection: {$conn['name']}");
                    }
                }
            }
        }
        
        return $closed;
    }
    
    public function generateDailyReport()
    {
        $overview = $this->apiClient->getOverview();
        $nodes = $this->apiClient->getNodes();
        $queues = $this->apiClient->getQueues();
        $connections = $this->apiClient->getConnections();
        
        $report = [
            'generated_at' => date('Y-m-d H:i:s'),
            'cluster' => [
                'name' => $overview['cluster_name'] ?? 'unknown',
                'version' => $overview['rabbitmq_version'] ?? 'unknown',
                'nodes' => count($nodes),
            ],
            'statistics' => [
                'queues' => count($queues),
                'connections' => count($connections),
                'messages_total' => $overview['queue_totals']['messages'] ?? 0,
                'messages_ready' => $overview['queue_totals']['messages_ready'] ?? 0,
                'messages_unacked' => $overview['queue_totals']['messages_unacked'] ?? 0,
            ],
            'rates' => [
                'publish' => $overview['message_stats']['publish_details']['rate'] ?? 0,
                'consume' => $overview['message_stats']['consume_details']['rate'] ?? 0,
                'ack' => $overview['message_stats']['ack_details']['rate'] ?? 0,
            ],
            'resources' => [],
        ];
        
        foreach ($nodes as $node) {
            $report['resources'][$node['name']] = [
                'memory_percent' => round(($node['mem_used'] / $node['mem_limit']) * 100, 2),
                'disk_free_gb' => round($node['disk_free'] / 1024 / 1024 / 1024, 2),
                'fd_percent' => round(($node['fd_used'] / $node['fd_total']) * 100, 2),
            ];
        }
        
        return $report;
    }
}

Shell 日常巡检脚本

bash
#!/bin/bash
/opt/rabbitmq/scripts/daily_check.sh

LOG_FILE="/var/log/rabbitmq/daily_check.log"
ALERT_FILE="/var/log/rabbitmq/alerts.log"

log() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE"
}

alert() {
    local level="$1"
    local message="$2"
    echo "[${level}] $(date '+%Y-%m-%d %H:%M:%S') - ${message}" | tee -a "$ALERT_FILE"
}

log "========== RabbitMQ 日常巡检开始 =========="

log "1. 检查节点状态..."
rabbitmqctl status > /dev/null 2>&1
if [ $? -eq 0 ]; then
    log "节点状态: 正常"
else
    alert "CRITICAL" "节点状态异常"
fi

log "2. 检查集群状态..."
rabbitmqctl cluster_status | grep -q "running_nodes"
if [ $? -eq 0 ]; then
    log "集群状态: 正常"
else
    alert "WARNING" "集群状态异常"
fi

log "3. 检查内存使用..."
MEMORY_USAGE=$(rabbitmqctl status | grep -oP 'memory:\s*\K[0-9]+')
MEMORY_LIMIT=$(rabbitmqctl status | grep -oP 'memory_limit:\s*\K[0-9]+')
if [ -n "$MEMORY_USAGE" ] && [ -n "$MEMORY_LIMIT" ]; then
    PERCENT=$((MEMORY_USAGE * 100 / MEMORY_LIMIT))
    log "内存使用: ${PERCENT}%"
    if [ $PERCENT -gt 80 ]; then
        alert "WARNING" "内存使用率过高: ${PERCENT}%"
    fi
fi

log "4. 检查磁盘空间..."
DISK_FREE=$(df -h /var/lib/rabbitmq | awk 'NR==2 {print $4}')
log "磁盘可用: ${DISK_FREE}"

log "5. 检查队列状态..."
QUEUE_COUNT=$(rabbitmqctl list_queues name | wc -l)
log "队列数量: ${QUEUE_COUNT}"

log "6. 检查消息堆积..."
rabbitmqctl list_queues name messages | while read queue messages; do
    if [ "$messages" -gt 10000 ]; then
        alert "WARNING" "队列 ${queue} 消息堆积: ${messages}"
    fi
done

log "7. 检查连接数..."
CONNECTION_COUNT=$(rabbitmqctl list_connections | wc -l)
log "连接数: ${CONNECTION_COUNT}"

log "8. 检查消费者状态..."
rabbitmqctl list_queues name consumers | while read queue consumers; do
    if [ "$consumers" -eq 0 ]; then
        alert "INFO" "队列 ${queue} 无消费者"
    fi
done

log "9. 检查日志错误..."
ERROR_COUNT=$(grep -c "error" /var/log/rabbitmq/rabbit.log 2>/dev/null || echo 0)
log "错误日志数: ${ERROR_COUNT}"

log "========== RabbitMQ 日常巡检完成 =========="

exit 0

定时任务配置

cron
/etc/cron.d/rabbitmq-ops

*/5 * * * * rabbitmq /opt/rabbitmq/scripts/health_check.sh >> /var/log/rabbitmq/health.log 2>&1
0 * * * * rabbitmq /opt/rabbitmq/scripts/hourly_check.sh >> /var/log/rabbitmq/hourly.log 2>&1
0 8 * * * rabbitmq /opt/rabbitmq/scripts/daily_check.sh >> /var/log/rabbitmq/daily.log 2>&1
0 0 * * 0 rabbitmq /opt/rabbitmq/scripts/weekly_cleanup.sh >> /var/log/rabbitmq/weekly.log 2>&1
0 0 1 * * rabbitmq /opt/rabbitmq/scripts/monthly_report.sh >> /var/log/rabbitmq/monthly.log 2>&1

实际应用场景

场景一:自动化运维平台

php
<?php

class RabbitMQOpsPlatform
{
    private $ops;
    private $db;
    
    public function __construct(RabbitMQDailyOps $ops, PDO $db)
    {
        $this->ops = $ops;
        $this->db = $db;
    }
    
    public function runScheduledTasks($type)
    {
        $tasks = [
            'hourly' => [$this, 'runHourlyTasks'],
            'daily' => [$this, 'runDailyTasks'],
            'weekly' => [$this, 'runWeeklyTasks'],
        ];
        
        $task = $tasks[$type] ?? null;
        
        if ($task) {
            return call_user_func($task);
        }
        
        return false;
    }
    
    private function runHourlyTasks()
    {
        $results = [];
        
        $results['health_check'] = $this->ops->runHealthCheck();
        
        $this->recordMetrics($results['health_check']);
        
        return $results;
    }
    
    private function runDailyTasks()
    {
        $results = [];
        
        $results['health_check'] = $this->ops->runHealthCheck();
        $results['cleanup_queues'] = $this->ops->cleanupIdleQueues(86400);
        $results['cleanup_connections'] = $this->ops->closeIdleConnections(86400);
        $results['daily_report'] = $this->ops->generateDailyReport();
        
        $this->saveDailyReport($results['daily_report']);
        
        return $results;
    }
    
    private function runWeeklyTasks()
    {
        $results = [];
        
        $results['health_check'] = $this->ops->runHealthCheck();
        $results['cleanup_queues'] = $this->ops->cleanupIdleQueues(604800);
        
        return $results;
    }
    
    private function recordMetrics($healthCheck)
    {
        $sql = "INSERT INTO rabbitmq_metrics (
            recorded_at, status, issues_count,
            total_messages, total_connections, total_queues
        ) VALUES (NOW(), :status, :issues_count, :total_messages, :total_connections, :total_queues)";
        
        $stmt = $this->db->prepare($sql);
        $stmt->execute([
            'status' => $healthCheck['summary']['status'],
            'issues_count' => $healthCheck['summary']['issues_count'],
            'total_messages' => $healthCheck['messages']['total'],
            'total_connections' => $healthCheck['connections']['total'],
            'total_queues' => count($healthCheck['queues']),
        ]);
    }
    
    private function saveDailyReport($report)
    {
        $sql = "INSERT INTO rabbitmq_daily_reports (
            report_date, cluster_name, cluster_version,
            node_count, queue_count, connection_count,
            messages_total, messages_ready, messages_unacked,
            publish_rate, consume_rate, ack_rate, resources
        ) VALUES (
            CURDATE(), :cluster_name, :cluster_version,
            :node_count, :queue_count, :connection_count,
            :messages_total, :messages_ready, :messages_unacked,
            :publish_rate, :consume_rate, :ack_rate, :resources
        )";
        
        $stmt = $this->db->prepare($sql);
        $stmt->execute([
            'cluster_name' => $report['cluster']['name'],
            'cluster_version' => $report['cluster']['version'],
            'node_count' => $report['cluster']['nodes'],
            'queue_count' => $report['statistics']['queues'],
            'connection_count' => $report['statistics']['connections'],
            'messages_total' => $report['statistics']['messages_total'],
            'messages_ready' => $report['statistics']['messages_ready'],
            'messages_unacked' => $report['statistics']['messages_unacked'],
            'publish_rate' => $report['rates']['publish'],
            'consume_rate' => $report['rates']['consume'],
            'ack_rate' => $report['rates']['ack'],
            'resources' => json_encode($report['resources']),
        ]);
    }
}

场景二:运维仪表板

php
<?php

class OpsDashboard
{
    private $ops;
    private $db;
    
    public function __construct(RabbitMQDailyOps $ops, PDO $db)
    {
        $this->ops = $ops;
        $this->db = $db;
    }
    
    public function getDashboardData()
    {
        return [
            'current_status' => $this->ops->runHealthCheck(),
            'metrics_trend' => $this->getMetricsTrend(24),
            'recent_alerts' => $this->getRecentAlerts(10),
            'top_queues' => $this->getTopQueues(10),
            'daily_summary' => $this->getDailySummary(),
        ];
    }
    
    private function getMetricsTrend($hours)
    {
        $sql = "SELECT * FROM rabbitmq_metrics 
                WHERE recorded_at >= DATE_SUB(NOW(), INTERVAL :hours HOUR)
                ORDER BY recorded_at ASC";
        
        $stmt = $this->db->prepare($sql);
        $stmt->execute(['hours' => $hours]);
        
        return $stmt->fetchAll(PDO::FETCH_ASSOC);
    }
    
    private function getRecentAlerts($limit)
    {
        $sql = "SELECT * FROM rabbitmq_alerts 
                ORDER BY created_at DESC 
                LIMIT :limit";
        
        $stmt = $this->db->prepare($sql);
        $stmt->execute(['limit' => $limit]);
        
        return $stmt->fetchAll(PDO::FETCH_ASSOC);
    }
    
    private function getTopQueues($limit)
    {
        $queues = $this->ops->apiClient->getQueues();
        
        usort($queues, function($a, $b) {
            return ($b['messages'] ?? 0) <=> ($a['messages'] ?? 0);
        });
        
        return array_slice($queues, 0, $limit);
    }
    
    private function getDailySummary()
    {
        $sql = "SELECT * FROM rabbitmq_daily_reports 
                WHERE report_date = CURDATE()";
        
        $stmt = $this->db->prepare($sql);
        $stmt->execute();
        
        return $stmt->fetch(PDO::FETCH_ASSOC);
    }
}

常见问题与解决方案

问题一:队列清理误删

现象:清理空闲队列时误删了有用的队列。

解决方案

php
public function cleanupIdleQueues($maxAge = 3600, $whitelist = [])
{
    $queues = $this->apiClient->getQueues();
    $cleaned = [];
    
    foreach ($queues as $queue) {
        if (in_array($queue['name'], $whitelist)) {
            continue;
        }
        
        if ($queue['consumers'] === 0 && ($queue['messages'] ?? 0) === 0) {
            $idleSince = $queue['idle_since'] ?? null;
            
            if ($idleSince && time() - strtotime($idleSince) > $maxAge) {
                $this->apiClient->deleteQueue($queue['vhost'], $queue['name']);
                $cleaned[] = $queue['name'];
            }
        }
    }
    
    return $cleaned;
}

问题二:巡检脚本超时

现象:巡检脚本执行时间过长。

解决方案

php
class TimeoutHealthCheck
{
    private $timeout = 30;
    
    public function checkWithTimeout($callback)
    {
        $startTime = microtime(true);
        
        $result = $callback();
        
        $elapsed = microtime(true) - $startTime;
        
        if ($elapsed > $this->timeout) {
            $this->logger->warning("Health check took too long: {$elapsed}s");
        }
        
        return $result;
    }
}

问题三:日志文件过大

现象:运维日志文件占用过多空间。

解决方案

bash
/var/log/rabbitmq/ops.log {
    daily
    rotate 7
    compress
    missingok
    notifempty
    size 100M
}

最佳实践

1. 运维任务时间表

时间任务负责人
每小时健康检查自动化
每天 8:00日常巡检自动化
每周一清理维护运维人员
每月 1 日深度检查架构师

2. 运维检查清单

日常检查:
□ 节点运行状态
□ 内存使用率
□ 磁盘空间
□ 队列消息数
□ 连接数
□ 消费者状态

每周检查:
□ 日志分析
□ 性能趋势
□ 备份验证
□ 清理空闲资源

每月检查:
□ 容量评估
□ 安全审计
□ 版本更新
□ 文档更新

3. 自动化程度建议

任务类型自动化程度
监控检查100%
数据收集100%
告警通知100%
日常清理80%
问题处理20%

相关链接