Skip to content

RabbitMQ 磁盘告警机制

概述

磁盘告警是 RabbitMQ 保护系统免受磁盘空间耗尽影响的重要机制。当可用磁盘空间低于阈值时,系统会触发告警并阻止生产者继续发送消息,确保系统稳定运行。

核心知识点

磁盘告警工作原理

┌─────────────────────────────────────────────────────────────┐
│                   磁盘告警机制流程                           │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  磁盘可用空间                                                │
│    100% ─┬─────────────────────────────────────────────    │
│          │                                                   │
│          │     正常运行                                       │
│          │                                                   │
│    10%  ─┤     ┌─────────────────────────────────────┐     │
│          │────▶│  磁盘告警触发                        │     │
│          │     │  disk_free_limit 达到阈值            │     │
│          │     │  阻止所有发布者                      │     │
│    5%   ─┤     └─────────────────────────────────────┘     │
│          │                                                   │
│    1GB  ─┤     默认阈值(绝对值)                           │
│          │                                                   │
│   50MB  ─┤     默认阈值(最小值)                           │
│          │                                                   │
│     0%  ─┴─────────────────────────────────────────────    │
│                                                             │
│  触发条件:disk_free < disk_free_limit                       │
│                                                             │
└─────────────────────────────────────────────────────────────┘

告警阈值配置

配置方式默认值说明
绝对值50MB固定阈值
相对值磁盘总量的 10%动态阈值

告警状态影响

┌─────────────────────────────────────────────────────────────┐
│                    告警状态影响                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  正常状态:                                                  │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  生产者 ──▶ 发布消息 ──▶ 队列 ──▶ 消费者            │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│  告警状态:                                                  │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  生产者 ──▶ 发布消息 ──✕ 阻塞                       │   │
│  │                        │                            │   │
│  │                        ▼                            │   │
│  │              等待磁盘空间释放...                     │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│  影响范围:                                                  │
│  - 所有发布连接被阻塞                                        │
│  - 消费者仍可正常消费                                        │
│  - 系统等待磁盘空间释放                                      │
│                                                             │
└─────────────────────────────────────────────────────────────┘

磁盘检测机制

检测流程:

┌──────────────┐     ┌──────────────┐     ┌──────────────┐
│  定时检测     │────▶│  计算可用空间  │────▶│  比较阈值    │
│  (10秒间隔)   │     │              │     │              │
└──────────────┘     └──────────────┘     └──────────────┘

                    ┌─────────────────────────────┴─────┐
                    │                                   │
                    ▼                                   ▼
           ┌──────────────┐                    ┌──────────────┐
           │  触发告警     │                    │  清除告警     │
           │  阻塞发布者   │                    │  恢复正常     │
           └──────────────┘                    └──────────────┘

配置示例

基础配置

ini
# /etc/rabbitmq/rabbitmq.conf

# 磁盘告警阈值(绝对值)
# 推荐:1GB 或更大
disk_free_limit.absolute = 1GB

# 磁盘告警阈值(相对值)
# disk_free_limit.relative = 2.0

# 磁盘检测间隔(毫秒)
# 默认:10000(10秒)
disk_monitor_interval = 10000

不同场景配置

ini
# ============ 场景一:小磁盘(< 100GB) ============
disk_free_limit.absolute = 5GB

# ============ 场景二:中等磁盘(100GB - 1TB) ============
disk_free_limit.absolute = 10GB

# ============ 场景三:大磁盘(> 1TB) ============
disk_free_limit.relative = 2.0

# ============ 场景四:容器环境 ============
disk_free_limit.absolute = 2GB

高级配置

bash
# /etc/rabbitmq/advanced.config

[
  {rabbit, [
    {disk_free_limit, {absolute, 1073741824}},
    {disk_monitor_interval, 10000}
  ]}
].

运行时配置

bash
# 查看当前磁盘状态
rabbitmqctl status | grep disk

# 查看磁盘告警
rabbitmqctl list_disk_alerts

# 运行时修改阈值(重启后失效)
rabbitmqctl eval 'application:set_env(rabbit, disk_free_limit, {absolute, 2000000000}).'

PHP 代码示例

磁盘告警监控类

php
<?php

namespace App\RabbitMQ\Disk;

class DiskAlarmMonitor
{
    private string $apiHost;
    private int $apiPort;
    private string $apiUser;
    private string $apiPass;
    private array $alertCallbacks = [];
    
    public function __construct(
        string $apiHost = 'localhost',
        int $apiPort = 15672,
        string $apiUser = 'guest',
        string $apiPass = 'guest'
    ) {
        $this->apiHost = $apiHost;
        $this->apiPort = $apiPort;
        $this->apiUser = $apiUser;
        $this->apiPass = $apiPass;
    }
    
    public function checkAlarmStatus(): array
    {
        $nodes = $this->apiRequest('/api/nodes');
        
        if (empty($nodes)) {
            return ['error' => 'Unable to fetch node information'];
        }
        
        $node = $nodes[0];
        
        $diskFree = $node['disk_free'] ?? 0;
        $diskLimit = $node['disk_free_limit'] ?? 0;
        
        return [
            'node_name' => $node['name'],
            'disk_alarm' => $node['disk_free_alarm'] ?? false,
            'disk_free' => $diskFree,
            'disk_free_human' => $this->formatBytes($diskFree),
            'disk_limit' => $diskLimit,
            'disk_limit_human' => $this->formatBytes($diskLimit),
            'status' => $this->determineStatus($diskFree, $diskLimit, $node['disk_free_alarm'] ?? false),
            'timestamp' => date('Y-m-d H:i:s'),
        ];
    }

    public function getAlarmHistory(): array
    {
        return $this->apiRequest('/api/alarms');
    }

    public function registerAlertCallback(string $type, callable $callback): void
    {
        $this->alertCallbacks[$type] = $callback;
    }

    public function startMonitoring(int $intervalSeconds = 10): void
    {
        $previousAlarm = false;
        
        while (true) {
            $status = $this->checkAlarmStatus();
            
            if ($status['disk_alarm'] && !$previousAlarm) {
                $this->triggerAlert('disk_alarm', $status);
            }
            
            if (!$status['disk_alarm'] && $previousAlarm) {
                $this->triggerAlert('disk_clear', $status);
            }
            
            $previousAlarm = $status['disk_alarm'];
            
            sleep($intervalSeconds);
        }
    }

    public function getBlockedConnections(): array
    {
        $connections = $this->apiRequest('/api/connections');
        
        $blocked = [];
        
        foreach ($connections ?? [] as $conn) {
            if (isset($conn['state']) && $conn['state'] === 'blocked') {
                $blocked[] = [
                    'name' => $conn['name'],
                    'client_properties' => $conn['client_properties'] ?? [],
                    'peer_host' => $conn['peer_host'] ?? 'unknown',
                ];
            }
        }
        
        return [
            'blocked_count' => count($blocked),
            'connections' => $blocked,
        ];
    }

    public function getDiskThresholdSettings(): array
    {
        $status = $this->checkAlarmStatus();
        
        return [
            'current_free' => $status['disk_free'],
            'current_free_human' => $status['disk_free_human'],
            'configured_limit' => $status['disk_limit'],
            'configured_limit_human' => $status['disk_limit_human'],
            'margin' => $status['disk_free'] - $status['disk_limit'],
            'margin_human' => $this->formatBytes($status['disk_free'] - $status['disk_limit']),
        ];
    }

    private function triggerAlert(string $type, array $data): void
    {
        if (isset($this->alertCallbacks[$type])) {
            call_user_func($this->alertCallbacks[$type], $data);
        }
        
        $this->logAlert($type, $data);
    }

    private function logAlert(string $type, array $data): void
    {
        $logEntry = sprintf(
            "[%s] %s: Disk Free=%s, Limit=%s\n",
            date('Y-m-d H:i:s'),
            strtoupper($type),
            $data['disk_free_human'],
            $data['disk_limit_human']
        );
        
        file_put_contents('/var/log/rabbitmq_disk_alarm.log', $logEntry, FILE_APPEND);
    }

    private function determineStatus(int $diskFree, int $diskLimit, bool $alarm): string
    {
        if ($alarm) {
            return 'alarm';
        }
        
        $margin = $diskFree - $diskLimit;
        
        if ($margin < $diskLimit * 0.5) {
            return 'warning';
        }
        
        return 'normal';
    }

    private function apiRequest(string $endpoint): array
    {
        $url = "http://{$this->apiHost}:{$this->apiPort}{$endpoint}";
        
        $ch = curl_init();
        curl_setopt($ch, CURLOPT_URL, $url);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
        curl_setopt($ch, CURLOPT_USERPWD, "{$this->apiUser}:{$this->apiPass}");
        curl_setopt($ch, CURLOPT_TIMEOUT, 10);
        
        $response = curl_exec($ch);
        curl_close($ch);
        
        return json_decode($response, true) ?: [];
    }

    private function formatBytes(int $bytes): string
    {
        $units = ['B', 'KB', 'MB', 'GB', 'TB'];
        $i = 0;
        
        while ($bytes >= 1024 && $i < count($units) - 1) {
            $bytes /= 1024;
            $i++;
        }
        
        return round($bytes, 2) . ' ' . $units[$i];
    }
}

磁盘告警处理器

php
<?php

namespace App\RabbitMQ\Disk;

class DiskAlarmHandler
{
    private DiskAlarmMonitor $monitor;
    private array $config;
    
    public function __construct(DiskAlarmMonitor $monitor, array $config = [])
    {
        $this->monitor = $monitor;
        $this->config = array_merge([
            'notification_email' => null,
            'slack_webhook' => null,
            'auto_recovery' => true,
        ], $config);
        
        $this->registerCallbacks();
    }
    
    public function handleAlarm(): array
    {
        $status = $this->monitor->checkAlarmStatus();
        
        if ($status['disk_alarm']) {
            return $this->handleAlarmState($status);
        }
        
        return $this->handleNormalState($status);
    }

    public function getRecoveryActions(): array
    {
        return [
            [
                'action' => 'purge_queues',
                'description' => '清空非关键队列',
                'command' => 'rabbitmqctl purge_queue <queue_name>',
                'risk' => 'high',
            ],
            [
                'action' => 'delete_idle_queues',
                'description' => '删除空闲队列',
                'command' => 'rabbitmqctl list_queues name consumers | grep "0$" | awk \'{print $1}\' | xargs -I {} rabbitmqctl delete_queue {}',
                'risk' => 'medium',
            ],
            [
                'action' => 'clear_old_logs',
                'description' => '清理旧日志文件',
                'command' => 'find /var/log/rabbitmq -name "*.log" -mtime +7 -delete',
                'risk' => 'low',
            ],
            [
                'action' => 'compact_mnesia',
                'description' => '压缩 Mnesia 数据库',
                'command' => 'rabbitmqctl eval "mnesia:compact_tables()."',
                'risk' => 'low',
            ],
        ];
    }

    public function executeRecoveryAction(string $action): array
    {
        $actions = $this->getRecoveryActions();
        
        if (!isset($actions[$action])) {
            return ['error' => 'Unknown action'];
        }
        
        $actionInfo = $actions[$action];
        
        if ($actionInfo['risk'] === 'high') {
            return [
                'error' => 'High risk action requires manual execution',
                'command' => $actionInfo['command'],
            ];
        }
        
        $output = shell_exec($actionInfo['command'] . ' 2>&1');
        
        return [
            'action' => $action,
            'output' => $output,
            'timestamp' => date('Y-m-d H:i:s'),
        ];
    }

    private function registerCallbacks(): void
    {
        $this->monitor->registerAlertCallback('disk_alarm', function ($data) {
            $this->onDiskAlarm($data);
        });
        
        $this->monitor->registerAlertCallback('disk_clear', function ($data) {
            $this->onDiskClear($data);
        });
    }

    private function handleAlarmState(array $status): array
    {
        $blocked = $this->monitor->getBlockedConnections();
        
        $response = [
            'status' => 'alarm',
            'disk_free' => $status['disk_free_human'],
            'disk_limit' => $status['disk_limit_human'],
            'blocked_connections' => $blocked['blocked_count'],
            'recommended_actions' => $this->getRecommendedActions($status),
        ];
        
        if ($this->config['auto_recovery']) {
            $response['auto_recovery'] = $this->attemptAutoRecovery();
        }
        
        return $response;
    }

    private function handleNormalState(array $status): array
    {
        return [
            'status' => 'normal',
            'disk_free' => $status['disk_free_human'],
            'disk_limit' => $status['disk_limit_human'],
        ];
    }

    private function onDiskAlarm(array $data): void
    {
        $this->sendNotification('DISK ALARM', $data);
    }

    private function onDiskClear(array $data): void
    {
        $this->sendNotification('DISK ALARM CLEARED', $data);
    }

    private function sendNotification(string $subject, array $data): void
    {
        if ($this->config['notification_email']) {
            mail(
                $this->config['notification_email'],
                "[RabbitMQ] {$subject}",
                json_encode($data, JSON_PRETTY_PRINT)
            );
        }
        
        if ($this->config['slack_webhook']) {
            $this->sendSlackNotification($subject, $data);
        }
    }

    private function sendSlackNotification(string $subject, array $data): void
    {
        $payload = [
            'text' => $subject,
            'attachments' => [
                [
                    'color' => strpos($subject, 'ALARM') !== false && strpos($subject, 'CLEARED') === false ? 'danger' : 'good',
                    'fields' => [
                        [
                            'title' => 'Disk Free',
                            'value' => $data['disk_free_human'],
                            'short' => true,
                        ],
                        [
                            'title' => 'Disk Limit',
                            'value' => $data['disk_limit_human'],
                            'short' => true,
                        ],
                    ],
                ],
            ],
        ];
        
        $ch = curl_init($this->config['slack_webhook']);
        curl_setopt($ch, CURLOPT_POST, true);
        curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($payload));
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
        curl_exec($ch);
        curl_close($ch);
    }

    private function getRecommendedActions(array $status): array
    {
        $actions = [];
        
        $margin = $status['disk_free'] - $status['disk_limit'];
        
        if ($margin < 0) {
            $actions[] = '立即清理磁盘空间';
            $actions[] = '删除空闲队列';
            $actions[] = '清理日志文件';
        } elseif ($margin < $status['disk_limit'] * 0.5) {
            $actions[] = '检查磁盘使用情况';
            $actions[] = '准备清理策略';
        }
        
        $actions[] = '检查磁盘告警阈值配置是否合理';
        
        return $actions;
    }

    private function attemptAutoRecovery(): array
    {
        $actions = [];
        
        $result = shell_exec('find /var/log/rabbitmq -name "*.log" -mtime +7 -delete 2>&1');
        $actions[] = [
            'action' => 'clear_old_logs',
            'result' => $result,
        ];
        
        return $actions;
    }
}

磁盘空间预测器

php
<?php

namespace App\RabbitMQ\Disk;

class DiskSpacePredictor
{
    private DiskUsageAnalyzer $analyzer;
    
    public function __construct(DiskUsageAnalyzer $analyzer)
    {
        $this->analyzer = $analyzer;
    }
    
    public function predictTimeToAlarm(): array
    {
        $growth = $this->analyzer->analyzeDiskGrowth(5, 60);
        $status = $this->analyzer->getDiskStatus();
        
        $growthRate = $growth['analysis']['growth_rate_per_second'] ?? 0;
        
        if ($growthRate <= 0) {
            return [
                'predicted' => false,
                'reason' => 'Disk usage is not growing',
            ];
        }
        
        $diskFree = $status['disk_free'];
        $diskLimit = $status['disk_limit'];
        $margin = $diskFree - $diskLimit;
        
        $secondsToAlarm = $margin / $growthRate;
        
        return [
            'predicted' => true,
            'current_free' => $this->formatBytes($diskFree),
            'limit' => $this->formatBytes($diskLimit),
            'margin' => $this->formatBytes($margin),
            'growth_rate' => $this->formatBytes($growthRate) . '/s',
            'seconds_to_alarm' => round($secondsToAlarm),
            'hours_to_alarm' => round($secondsToAlarm / 3600, 2),
            'days_to_alarm' => round($secondsToAlarm / 86400, 2),
            'estimated_alarm_time' => date('Y-m-d H:i:s', time() + (int) $secondsToAlarm),
        ];
    }

    public function recommendThreshold(): array
    {
        $growth = $this->analyzer->analyzeDiskGrowth(5, 60);
        $status = $this->analyzer->getDiskStatus();
        
        $growthRate = $growth['analysis']['growth_rate_per_second'] ?? 0;
        $dailyGrowth = $growthRate * 86400;
        
        $recommendedThreshold = max(
            $dailyGrowth * 7,
            1024 * 1024 * 1024
        );
        
        return [
            'daily_growth' => $this->formatBytes($dailyGrowth),
            'weekly_growth' => $this->formatBytes($dailyGrowth * 7),
            'current_threshold' => $this->formatBytes($status['disk_limit']),
            'recommended_threshold' => $this->formatBytes($recommendedThreshold),
            'reason' => 'At least 7 days of growth buffer',
        ];
    }

    private function formatBytes(int $bytes): string
    {
        $units = ['B', 'KB', 'MB', 'GB', 'TB'];
        $i = 0;
        while ($bytes >= 1024 && $i < count($units) - 1) {
            $bytes /= 1024;
            $i++;
        }
        return round($bytes, 2) . ' ' . $units[$i];
    }
}

实际应用场景

场景一:自动化磁盘管理

php
<?php

class AutomatedDiskManager
{
    private DiskAlarmHandler $handler;
    private DiskSpacePredictor $predictor;
    
    public function manage(): void
    {
        $prediction = $this->predictor->predictTimeToAlarm();
        
        if ($prediction['predicted'] && $prediction['hours_to_alarm'] < 24) {
            $this->handler->executeRecoveryAction('clear_old_logs');
        }
    }
}

场景二:告警通知系统

php
<?php

class DiskAlertNotificationSystem
{
    private DiskAlarmMonitor $monitor;
    
    public function start(): void
    {
        $this->monitor->registerAlertCallback('disk_alarm', function ($data) {
            $this->sendAlert('critical', 'Disk alarm triggered', $data);
        });
        
        $this->monitor->startMonitoring(10);
    }

    private function sendAlert(string $level, string $message, array $data): void
    {
        // 发送告警
    }
}

常见问题与解决方案

问题一:频繁触发告警

解决方案

ini
# 调整阈值
disk_free_limit.absolute = 5GB

问题二:告警后无法恢复

解决方案

bash
# 手动清除告警
rabbitmqctl eval "rabbit_alarm:clear_alarm({resource_limit, disk, node()})."

# 检查磁盘状态
df -h /var/lib/rabbitmq

最佳实践建议

阈值设置

磁盘大小推荐阈值
< 100GB5GB
100GB - 1TB10GB
> 1TB磁盘的 1-2%

监控策略

检查项频率告警阈值
磁盘空间每分钟< 2x limit
增长速率每小时> 1GB/天
阻塞连接实时> 0

相关链接