Appearance
RabbitMQ 磁盘告警机制
概述
磁盘告警是 RabbitMQ 保护系统免受磁盘空间耗尽影响的重要机制。当可用磁盘空间低于阈值时,系统会触发告警并阻止生产者继续发送消息,确保系统稳定运行。
核心知识点
磁盘告警工作原理
┌─────────────────────────────────────────────────────────────┐
│ 磁盘告警机制流程 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 磁盘可用空间 │
│ 100% ─┬───────────────────────────────────────────── │
│ │ │
│ │ 正常运行 │
│ │ │
│ 10% ─┤ ┌─────────────────────────────────────┐ │
│ │────▶│ 磁盘告警触发 │ │
│ │ │ disk_free_limit 达到阈值 │ │
│ │ │ 阻止所有发布者 │ │
│ 5% ─┤ └─────────────────────────────────────┘ │
│ │ │
│ 1GB ─┤ 默认阈值(绝对值) │
│ │ │
│ 50MB ─┤ 默认阈值(最小值) │
│ │ │
│ 0% ─┴───────────────────────────────────────────── │
│ │
│ 触发条件:disk_free < disk_free_limit │
│ │
└─────────────────────────────────────────────────────────────┘告警阈值配置
| 配置方式 | 默认值 | 说明 |
|---|---|---|
| 绝对值 | 50MB | 固定阈值 |
| 相对值 | 磁盘总量的 10% | 动态阈值 |
告警状态影响
┌─────────────────────────────────────────────────────────────┐
│ 告警状态影响 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 正常状态: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 生产者 ──▶ 发布消息 ──▶ 队列 ──▶ 消费者 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 告警状态: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 生产者 ──▶ 发布消息 ──✕ 阻塞 │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ 等待磁盘空间释放... │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 影响范围: │
│ - 所有发布连接被阻塞 │
│ - 消费者仍可正常消费 │
│ - 系统等待磁盘空间释放 │
│ │
└─────────────────────────────────────────────────────────────┘磁盘检测机制
检测流程:
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ 定时检测 │────▶│ 计算可用空间 │────▶│ 比较阈值 │
│ (10秒间隔) │ │ │ │ │
└──────────────┘ └──────────────┘ └──────────────┘
│
┌─────────────────────────────┴─────┐
│ │
▼ ▼
┌──────────────┐ ┌──────────────┐
│ 触发告警 │ │ 清除告警 │
│ 阻塞发布者 │ │ 恢复正常 │
└──────────────┘ └──────────────┘配置示例
基础配置
ini
# /etc/rabbitmq/rabbitmq.conf
# 磁盘告警阈值(绝对值)
# 推荐:1GB 或更大
disk_free_limit.absolute = 1GB
# 磁盘告警阈值(相对值)
# disk_free_limit.relative = 2.0
# 磁盘检测间隔(毫秒)
# 默认:10000(10秒)
disk_monitor_interval = 10000不同场景配置
ini
# ============ 场景一:小磁盘(< 100GB) ============
disk_free_limit.absolute = 5GB
# ============ 场景二:中等磁盘(100GB - 1TB) ============
disk_free_limit.absolute = 10GB
# ============ 场景三:大磁盘(> 1TB) ============
disk_free_limit.relative = 2.0
# ============ 场景四:容器环境 ============
disk_free_limit.absolute = 2GB高级配置
bash
# /etc/rabbitmq/advanced.config
[
{rabbit, [
{disk_free_limit, {absolute, 1073741824}},
{disk_monitor_interval, 10000}
]}
].运行时配置
bash
# 查看当前磁盘状态
rabbitmqctl status | grep disk
# 查看磁盘告警
rabbitmqctl list_disk_alerts
# 运行时修改阈值(重启后失效)
rabbitmqctl eval 'application:set_env(rabbit, disk_free_limit, {absolute, 2000000000}).'PHP 代码示例
磁盘告警监控类
php
<?php
namespace App\RabbitMQ\Disk;
class DiskAlarmMonitor
{
private string $apiHost;
private int $apiPort;
private string $apiUser;
private string $apiPass;
private array $alertCallbacks = [];
public function __construct(
string $apiHost = 'localhost',
int $apiPort = 15672,
string $apiUser = 'guest',
string $apiPass = 'guest'
) {
$this->apiHost = $apiHost;
$this->apiPort = $apiPort;
$this->apiUser = $apiUser;
$this->apiPass = $apiPass;
}
public function checkAlarmStatus(): array
{
$nodes = $this->apiRequest('/api/nodes');
if (empty($nodes)) {
return ['error' => 'Unable to fetch node information'];
}
$node = $nodes[0];
$diskFree = $node['disk_free'] ?? 0;
$diskLimit = $node['disk_free_limit'] ?? 0;
return [
'node_name' => $node['name'],
'disk_alarm' => $node['disk_free_alarm'] ?? false,
'disk_free' => $diskFree,
'disk_free_human' => $this->formatBytes($diskFree),
'disk_limit' => $diskLimit,
'disk_limit_human' => $this->formatBytes($diskLimit),
'status' => $this->determineStatus($diskFree, $diskLimit, $node['disk_free_alarm'] ?? false),
'timestamp' => date('Y-m-d H:i:s'),
];
}
public function getAlarmHistory(): array
{
return $this->apiRequest('/api/alarms');
}
public function registerAlertCallback(string $type, callable $callback): void
{
$this->alertCallbacks[$type] = $callback;
}
public function startMonitoring(int $intervalSeconds = 10): void
{
$previousAlarm = false;
while (true) {
$status = $this->checkAlarmStatus();
if ($status['disk_alarm'] && !$previousAlarm) {
$this->triggerAlert('disk_alarm', $status);
}
if (!$status['disk_alarm'] && $previousAlarm) {
$this->triggerAlert('disk_clear', $status);
}
$previousAlarm = $status['disk_alarm'];
sleep($intervalSeconds);
}
}
public function getBlockedConnections(): array
{
$connections = $this->apiRequest('/api/connections');
$blocked = [];
foreach ($connections ?? [] as $conn) {
if (isset($conn['state']) && $conn['state'] === 'blocked') {
$blocked[] = [
'name' => $conn['name'],
'client_properties' => $conn['client_properties'] ?? [],
'peer_host' => $conn['peer_host'] ?? 'unknown',
];
}
}
return [
'blocked_count' => count($blocked),
'connections' => $blocked,
];
}
public function getDiskThresholdSettings(): array
{
$status = $this->checkAlarmStatus();
return [
'current_free' => $status['disk_free'],
'current_free_human' => $status['disk_free_human'],
'configured_limit' => $status['disk_limit'],
'configured_limit_human' => $status['disk_limit_human'],
'margin' => $status['disk_free'] - $status['disk_limit'],
'margin_human' => $this->formatBytes($status['disk_free'] - $status['disk_limit']),
];
}
private function triggerAlert(string $type, array $data): void
{
if (isset($this->alertCallbacks[$type])) {
call_user_func($this->alertCallbacks[$type], $data);
}
$this->logAlert($type, $data);
}
private function logAlert(string $type, array $data): void
{
$logEntry = sprintf(
"[%s] %s: Disk Free=%s, Limit=%s\n",
date('Y-m-d H:i:s'),
strtoupper($type),
$data['disk_free_human'],
$data['disk_limit_human']
);
file_put_contents('/var/log/rabbitmq_disk_alarm.log', $logEntry, FILE_APPEND);
}
private function determineStatus(int $diskFree, int $diskLimit, bool $alarm): string
{
if ($alarm) {
return 'alarm';
}
$margin = $diskFree - $diskLimit;
if ($margin < $diskLimit * 0.5) {
return 'warning';
}
return 'normal';
}
private function apiRequest(string $endpoint): array
{
$url = "http://{$this->apiHost}:{$this->apiPort}{$endpoint}";
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_USERPWD, "{$this->apiUser}:{$this->apiPass}");
curl_setopt($ch, CURLOPT_TIMEOUT, 10);
$response = curl_exec($ch);
curl_close($ch);
return json_decode($response, true) ?: [];
}
private function formatBytes(int $bytes): string
{
$units = ['B', 'KB', 'MB', 'GB', 'TB'];
$i = 0;
while ($bytes >= 1024 && $i < count($units) - 1) {
$bytes /= 1024;
$i++;
}
return round($bytes, 2) . ' ' . $units[$i];
}
}磁盘告警处理器
php
<?php
namespace App\RabbitMQ\Disk;
class DiskAlarmHandler
{
private DiskAlarmMonitor $monitor;
private array $config;
public function __construct(DiskAlarmMonitor $monitor, array $config = [])
{
$this->monitor = $monitor;
$this->config = array_merge([
'notification_email' => null,
'slack_webhook' => null,
'auto_recovery' => true,
], $config);
$this->registerCallbacks();
}
public function handleAlarm(): array
{
$status = $this->monitor->checkAlarmStatus();
if ($status['disk_alarm']) {
return $this->handleAlarmState($status);
}
return $this->handleNormalState($status);
}
public function getRecoveryActions(): array
{
return [
[
'action' => 'purge_queues',
'description' => '清空非关键队列',
'command' => 'rabbitmqctl purge_queue <queue_name>',
'risk' => 'high',
],
[
'action' => 'delete_idle_queues',
'description' => '删除空闲队列',
'command' => 'rabbitmqctl list_queues name consumers | grep "0$" | awk \'{print $1}\' | xargs -I {} rabbitmqctl delete_queue {}',
'risk' => 'medium',
],
[
'action' => 'clear_old_logs',
'description' => '清理旧日志文件',
'command' => 'find /var/log/rabbitmq -name "*.log" -mtime +7 -delete',
'risk' => 'low',
],
[
'action' => 'compact_mnesia',
'description' => '压缩 Mnesia 数据库',
'command' => 'rabbitmqctl eval "mnesia:compact_tables()."',
'risk' => 'low',
],
];
}
public function executeRecoveryAction(string $action): array
{
$actions = $this->getRecoveryActions();
if (!isset($actions[$action])) {
return ['error' => 'Unknown action'];
}
$actionInfo = $actions[$action];
if ($actionInfo['risk'] === 'high') {
return [
'error' => 'High risk action requires manual execution',
'command' => $actionInfo['command'],
];
}
$output = shell_exec($actionInfo['command'] . ' 2>&1');
return [
'action' => $action,
'output' => $output,
'timestamp' => date('Y-m-d H:i:s'),
];
}
private function registerCallbacks(): void
{
$this->monitor->registerAlertCallback('disk_alarm', function ($data) {
$this->onDiskAlarm($data);
});
$this->monitor->registerAlertCallback('disk_clear', function ($data) {
$this->onDiskClear($data);
});
}
private function handleAlarmState(array $status): array
{
$blocked = $this->monitor->getBlockedConnections();
$response = [
'status' => 'alarm',
'disk_free' => $status['disk_free_human'],
'disk_limit' => $status['disk_limit_human'],
'blocked_connections' => $blocked['blocked_count'],
'recommended_actions' => $this->getRecommendedActions($status),
];
if ($this->config['auto_recovery']) {
$response['auto_recovery'] = $this->attemptAutoRecovery();
}
return $response;
}
private function handleNormalState(array $status): array
{
return [
'status' => 'normal',
'disk_free' => $status['disk_free_human'],
'disk_limit' => $status['disk_limit_human'],
];
}
private function onDiskAlarm(array $data): void
{
$this->sendNotification('DISK ALARM', $data);
}
private function onDiskClear(array $data): void
{
$this->sendNotification('DISK ALARM CLEARED', $data);
}
private function sendNotification(string $subject, array $data): void
{
if ($this->config['notification_email']) {
mail(
$this->config['notification_email'],
"[RabbitMQ] {$subject}",
json_encode($data, JSON_PRETTY_PRINT)
);
}
if ($this->config['slack_webhook']) {
$this->sendSlackNotification($subject, $data);
}
}
private function sendSlackNotification(string $subject, array $data): void
{
$payload = [
'text' => $subject,
'attachments' => [
[
'color' => strpos($subject, 'ALARM') !== false && strpos($subject, 'CLEARED') === false ? 'danger' : 'good',
'fields' => [
[
'title' => 'Disk Free',
'value' => $data['disk_free_human'],
'short' => true,
],
[
'title' => 'Disk Limit',
'value' => $data['disk_limit_human'],
'short' => true,
],
],
],
],
];
$ch = curl_init($this->config['slack_webhook']);
curl_setopt($ch, CURLOPT_POST, true);
curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($payload));
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_exec($ch);
curl_close($ch);
}
private function getRecommendedActions(array $status): array
{
$actions = [];
$margin = $status['disk_free'] - $status['disk_limit'];
if ($margin < 0) {
$actions[] = '立即清理磁盘空间';
$actions[] = '删除空闲队列';
$actions[] = '清理日志文件';
} elseif ($margin < $status['disk_limit'] * 0.5) {
$actions[] = '检查磁盘使用情况';
$actions[] = '准备清理策略';
}
$actions[] = '检查磁盘告警阈值配置是否合理';
return $actions;
}
private function attemptAutoRecovery(): array
{
$actions = [];
$result = shell_exec('find /var/log/rabbitmq -name "*.log" -mtime +7 -delete 2>&1');
$actions[] = [
'action' => 'clear_old_logs',
'result' => $result,
];
return $actions;
}
}磁盘空间预测器
php
<?php
namespace App\RabbitMQ\Disk;
class DiskSpacePredictor
{
private DiskUsageAnalyzer $analyzer;
public function __construct(DiskUsageAnalyzer $analyzer)
{
$this->analyzer = $analyzer;
}
public function predictTimeToAlarm(): array
{
$growth = $this->analyzer->analyzeDiskGrowth(5, 60);
$status = $this->analyzer->getDiskStatus();
$growthRate = $growth['analysis']['growth_rate_per_second'] ?? 0;
if ($growthRate <= 0) {
return [
'predicted' => false,
'reason' => 'Disk usage is not growing',
];
}
$diskFree = $status['disk_free'];
$diskLimit = $status['disk_limit'];
$margin = $diskFree - $diskLimit;
$secondsToAlarm = $margin / $growthRate;
return [
'predicted' => true,
'current_free' => $this->formatBytes($diskFree),
'limit' => $this->formatBytes($diskLimit),
'margin' => $this->formatBytes($margin),
'growth_rate' => $this->formatBytes($growthRate) . '/s',
'seconds_to_alarm' => round($secondsToAlarm),
'hours_to_alarm' => round($secondsToAlarm / 3600, 2),
'days_to_alarm' => round($secondsToAlarm / 86400, 2),
'estimated_alarm_time' => date('Y-m-d H:i:s', time() + (int) $secondsToAlarm),
];
}
public function recommendThreshold(): array
{
$growth = $this->analyzer->analyzeDiskGrowth(5, 60);
$status = $this->analyzer->getDiskStatus();
$growthRate = $growth['analysis']['growth_rate_per_second'] ?? 0;
$dailyGrowth = $growthRate * 86400;
$recommendedThreshold = max(
$dailyGrowth * 7,
1024 * 1024 * 1024
);
return [
'daily_growth' => $this->formatBytes($dailyGrowth),
'weekly_growth' => $this->formatBytes($dailyGrowth * 7),
'current_threshold' => $this->formatBytes($status['disk_limit']),
'recommended_threshold' => $this->formatBytes($recommendedThreshold),
'reason' => 'At least 7 days of growth buffer',
];
}
private function formatBytes(int $bytes): string
{
$units = ['B', 'KB', 'MB', 'GB', 'TB'];
$i = 0;
while ($bytes >= 1024 && $i < count($units) - 1) {
$bytes /= 1024;
$i++;
}
return round($bytes, 2) . ' ' . $units[$i];
}
}实际应用场景
场景一:自动化磁盘管理
php
<?php
class AutomatedDiskManager
{
private DiskAlarmHandler $handler;
private DiskSpacePredictor $predictor;
public function manage(): void
{
$prediction = $this->predictor->predictTimeToAlarm();
if ($prediction['predicted'] && $prediction['hours_to_alarm'] < 24) {
$this->handler->executeRecoveryAction('clear_old_logs');
}
}
}场景二:告警通知系统
php
<?php
class DiskAlertNotificationSystem
{
private DiskAlarmMonitor $monitor;
public function start(): void
{
$this->monitor->registerAlertCallback('disk_alarm', function ($data) {
$this->sendAlert('critical', 'Disk alarm triggered', $data);
});
$this->monitor->startMonitoring(10);
}
private function sendAlert(string $level, string $message, array $data): void
{
// 发送告警
}
}常见问题与解决方案
问题一:频繁触发告警
解决方案:
ini
# 调整阈值
disk_free_limit.absolute = 5GB问题二:告警后无法恢复
解决方案:
bash
# 手动清除告警
rabbitmqctl eval "rabbit_alarm:clear_alarm({resource_limit, disk, node()})."
# 检查磁盘状态
df -h /var/lib/rabbitmq最佳实践建议
阈值设置
| 磁盘大小 | 推荐阈值 |
|---|---|
| < 100GB | 5GB |
| 100GB - 1TB | 10GB |
| > 1TB | 磁盘的 1-2% |
监控策略
| 检查项 | 频率 | 告警阈值 |
|---|---|---|
| 磁盘空间 | 每分钟 | < 2x limit |
| 增长速率 | 每小时 | > 1GB/天 |
| 阻塞连接 | 实时 | > 0 |
