Skip to content

RabbitMQ 内存告警机制

概述

RabbitMQ 的内存告警机制是保护系统稳定性的重要防线。当内存使用达到阈值时,系统会触发告警并阻止生产者继续发送消息,防止内存溢出导致系统崩溃。

核心知识点

内存告警工作原理

┌─────────────────────────────────────────────────────────────┐
│                   内存告警机制流程                           │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  内存使用率                                                  │
│    100% ─┬─────────────────────────────────────────────    │
│          │           ┌─────────────────────┐               │
│    95%  ─┤           │   阻塞所有发布者     │               │
│          │           │   (memory_alarm)    │               │
│    90%  ─┤           └─────────────────────┘               │
│          │                                                   │
│    75%  ─┤     ┌─────────────────────┐                     │
│          │     │   开始分页到磁盘     │                     │
│    60%  ─┤────▶│   (paging)          │                     │
│          │     └─────────────────────┘                     │
│    40%  ─┤                                                   │
│          │     正常运行                                       │
│     0%  ─┴─────────────────────────────────────────────    │
│                                                             │
│  触发顺序:正常 → 分页 → 阻塞 → 恢复                          │
│                                                             │
└─────────────────────────────────────────────────────────────┘

告警阈值配置

参数默认值说明
vm_memory_high_watermark0.4内存水位线(相对值)
vm_memory_high_watermark_paging_ratio0.75分页阈值比例
vm_memory_high_watermark.absolute-内存水位线(绝对值)

告警状态影响

┌─────────────────────────────────────────────────────────────┐
│                    告警状态影响                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  正常状态:                                                  │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  生产者 ──▶ 发布消息 ──▶ 队列 ──▶ 消费者            │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│  告警状态:                                                  │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  生产者 ──▶ 发布消息 ──✕ 阻塞                       │   │
│  │                        │                            │   │
│  │                        ▼                            │   │
│  │              等待内存释放...                         │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│  影响范围:                                                  │
│  - 所有发布连接被阻塞                                        │
│  - 消费者仍可正常消费                                        │
│  - 系统等待内存释放                                          │
│                                                             │
└─────────────────────────────────────────────────────────────┘

分页机制

当内存使用达到 watermark * paging_ratio 时,开始将消息分页到磁盘:

内存使用 = watermark * paging_ratio
例如:watermark = 0.4, paging_ratio = 0.75
分页触发点 = 0.4 * 0.75 = 0.3 (30%)

分页过程:
1. 将队列中的部分消息写入磁盘
2. 释放内存空间
3. 消费时从磁盘读取

配置示例

基础配置

ini
# /etc/rabbitmq/rabbitmq.conf

# 相对值配置(推荐)
vm_memory_high_watermark.relative = 0.6

# 分页比例
vm_memory_high_watermark_paging_ratio = 0.75

# 绝对值配置(可选)
# vm_memory_high_watermark.absolute = 4GB

高级配置

ini
# /etc/rabbitmq/rabbitmq.conf

# 内存计算策略
# total_memory: 使用系统总内存(推荐)
# legacy: 使用 Erlang VM 报告的内存
memory_calculation_strategy = total_memory

# 内存告警间隔(毫秒)
# collect_statistics_interval = 5000

策略配置

bash
# 为特定队列设置内存告警策略
rabbitmqctl set_policy memory-policy "^high-memory\." \
  '{"max-length":10000,"overflow":"reject-publish"}' \
  --apply-to queues

# 设置队列模式为懒队列
rabbitmqctl set_policy lazy "^lazy\." \
  '{"queue-mode":"lazy"}' \
  --apply-to queues

PHP 代码示例

内存告警监控类

php
<?php

namespace App\RabbitMQ\Memory;

class MemoryAlarmMonitor
{
    private string $apiHost;
    private int $apiPort;
    private string $apiUser;
    private string $apiPass;
    private array $alertCallbacks = [];
    
    public function __construct(
        string $apiHost = 'localhost',
        int $apiPort = 15672,
        string $apiUser = 'guest',
        string $apiPass = 'guest'
    ) {
        $this->apiHost = $apiHost;
        $this->apiPort = $apiPort;
        $this->apiUser = $apiUser;
        $this->apiPass = $apiPass;
    }
    
    public function checkAlarmStatus(): array
    {
        $nodes = $this->apiRequest('/api/nodes');
        
        if (empty($nodes)) {
            return ['error' => 'Unable to fetch node information'];
        }
        
        $node = $nodes[0];
        
        return [
            'node_name' => $node['name'],
            'memory_alarm' => $node['mem_alarm'] ?? false,
            'disk_alarm' => $node['disk_free_alarm'] ?? false,
            'memory_used' => $node['mem_used'] ?? 0,
            'memory_limit' => $node['mem_limit'] ?? 0,
            'memory_usage_percent' => $this->calculatePercent(
                $node['mem_used'] ?? 0,
                $node['mem_limit'] ?? 1
            ),
            'status' => $this->determineStatus($node),
            'timestamp' => date('Y-m-d H:i:s'),
        ];
    }

    public function getAlarmHistory(int $limit = 100): array
    {
        $alarms = $this->apiRequest('/api/alarms');
        
        return array_slice($alarms ?? [], 0, $limit);
    }

    public function registerAlertCallback(string $type, callable $callback): void
    {
        $this->alertCallbacks[$type] = $callback;
    }

    public function startMonitoring(int $intervalSeconds = 10): void
    {
        $previousAlarm = false;
        
        while (true) {
            $status = $this->checkAlarmStatus();
            
            if ($status['memory_alarm'] && !$previousAlarm) {
                $this->triggerAlert('memory_alarm', $status);
            }
            
            if (!$status['memory_alarm'] && $previousAlarm) {
                $this->triggerAlert('memory_clear', $status);
            }
            
            $previousAlarm = $status['memory_alarm'];
            
            sleep($intervalSeconds);
        }
    }

    public function getBlockedConnections(): array
    {
        $connections = $this->apiRequest('/api/connections');
        
        $blocked = [];
        
        foreach ($connections ?? [] as $conn) {
            if (isset($conn['state']) && $conn['state'] === 'blocked') {
                $blocked[] = [
                    'name' => $conn['name'],
                    'client_properties' => $conn['client_properties'] ?? [],
                    'blocked_since' => $conn['blocked_since'] ?? null,
                ];
            }
        }
        
        return [
            'blocked_count' => count($blocked),
            'connections' => $blocked,
        ];
    }

    public function getWatermarkSettings(): array
    {
        $node = $this->apiRequest('/api/node');
        
        return [
            'memory_watermark' => $node['mem_limit'] ?? 0,
            'paging_ratio' => 0.75,
            'effective_watermark' => ($node['mem_limit'] ?? 0) * 0.75,
        ];
    }

    public function simulateAlarm(): array
    {
        return [
            'action' => 'set_alarm',
            'command' => 'rabbitmqctl eval "rabbit_alarm:set_alarm({{resource_limit, memory, node()}, []})."',
            'warning' => 'This is for testing purposes only',
        ];
    }

    public function clearAlarm(): array
    {
        return [
            'action' => 'clear_alarm',
            'command' => 'rabbitmqctl eval "rabbit_alarm:clear_alarm({resource_limit, memory, node()})."',
        ];
    }

    private function triggerAlert(string $type, array $data): void
    {
        if (isset($this->alertCallbacks[$type])) {
            call_user_func($this->alertCallbacks[$type], $data);
        }
        
        $this->logAlert($type, $data);
    }

    private function logAlert(string $type, array $data): void
    {
        $logEntry = sprintf(
            "[%s] %s: %s\n",
            date('Y-m-d H:i:s'),
            strtoupper($type),
            json_encode($data)
        );
        
        file_put_contents('/var/log/rabbitmq_alarm.log', $logEntry, FILE_APPEND);
    }

    private function determineStatus(array $node): string
    {
        if ($node['mem_alarm'] ?? false) {
            return 'alarm';
        }
        
        $usage = $this->calculatePercent(
            $node['mem_used'] ?? 0,
            $node['mem_limit'] ?? 1
        );
        
        if ($usage > 80) {
            return 'warning';
        }
        
        return 'normal';
    }

    private function calculatePercent(int $used, int $total): float
    {
        if ($total === 0) {
            return 0;
        }
        return round($used / $total * 100, 2);
    }

    private function apiRequest(string $endpoint): array
    {
        $url = "http://{$this->apiHost}:{$this->apiPort}{$endpoint}";
        
        $ch = curl_init();
        curl_setopt($ch, CURLOPT_URL, $url);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
        curl_setopt($ch, CURLOPT_USERPWD, "{$this->apiUser}:{$this->apiPass}");
        curl_setopt($ch, CURLOPT_TIMEOUT, 10);
        
        $response = curl_exec($ch);
        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        curl_close($ch);
        
        if ($httpCode !== 200) {
            return [];
        }
        
        return json_decode($response, true) ?: [];
    }
}

告警处理器

php
<?php

namespace App\RabbitMQ\Memory;

class MemoryAlarmHandler
{
    private MemoryAlarmMonitor $monitor;
    private array $config;
    
    public function __construct(MemoryAlarmMonitor $monitor, array $config = [])
    {
        $this->monitor = $monitor;
        $this->config = array_merge([
            'notification_email' => null,
            'slack_webhook' => null,
            'auto_recovery' => true,
        ], $config);
        
        $this->registerCallbacks();
    }
    
    public function handleAlarm(): array
    {
        $status = $this->monitor->checkAlarmStatus();
        
        if ($status['memory_alarm']) {
            return $this->handleAlarmState($status);
        }
        
        return $this->handleNormalState($status);
    }

    public function getRecoveryActions(): array
    {
        return [
            [
                'action' => 'purge_queues',
                'description' => '清空非关键队列',
                'command' => 'rabbitmqctl purge_queue <queue_name>',
                'risk' => 'high',
            ],
            [
                'action' => 'delete_queues',
                'description' => '删除空闲队列',
                'command' => 'rabbitmqctl delete_queue <queue_name>',
                'risk' => 'medium',
            ],
            [
                'action' => 'set_lazy_mode',
                'description' => '将队列设置为懒模式',
                'command' => 'rabbitmqctl set_policy lazy ".*" \'{"queue-mode":"lazy"}\' --apply-to queues',
                'risk' => 'low',
            ],
            [
                'action' => 'force_gc',
                'description' => '强制垃圾回收',
                'command' => 'rabbitmqctl eval "erlang:garbage_collect()."',
                'risk' => 'low',
            ],
        ];
    }

    public function executeRecoveryAction(string $action): array
    {
        $actions = $this->getRecoveryActions();
        
        if (!isset($actions[$action])) {
            return ['error' => 'Unknown action'];
        }
        
        $actionInfo = $actions[$action];
        
        if ($actionInfo['risk'] === 'high') {
            return [
                'error' => 'High risk action requires manual execution',
                'command' => $actionInfo['command'],
            ];
        }
        
        $output = shell_exec($actionInfo['command'] . ' 2>&1');
        
        return [
            'action' => $action,
            'output' => $output,
            'timestamp' => date('Y-m-d H:i:s'),
        ];
    }

    private function registerCallbacks(): void
    {
        $this->monitor->registerAlertCallback('memory_alarm', function ($data) {
            $this->onMemoryAlarm($data);
        });
        
        $this->monitor->registerAlertCallback('memory_clear', function ($data) {
            $this->onMemoryClear($data);
        });
    }

    private function handleAlarmState(array $status): array
    {
        $blocked = $this->monitor->getBlockedConnections();
        
        $response = [
            'status' => 'alarm',
            'memory_usage' => $status['memory_usage_percent'],
            'blocked_connections' => $blocked['blocked_count'],
            'recommended_actions' => $this->getRecommendedActions($status),
        ];
        
        if ($this->config['auto_recovery']) {
            $response['auto_recovery'] = $this->attemptAutoRecovery($status);
        }
        
        return $response;
    }

    private function handleNormalState(array $status): array
    {
        return [
            'status' => 'normal',
            'memory_usage' => $status['memory_usage_percent'],
        ];
    }

    private function onMemoryAlarm(array $data): void
    {
        $this->sendNotification('MEMORY ALARM', $data);
    }

    private function onMemoryClear(array $data): void
    {
        $this->sendNotification('MEMORY ALARM CLEARED', $data);
    }

    private function sendNotification(string $subject, array $data): void
    {
        if ($this->config['notification_email']) {
            mail(
                $this->config['notification_email'],
                "[RabbitMQ] {$subject}",
                json_encode($data, JSON_PRETTY_PRINT)
            );
        }
        
        if ($this->config['slack_webhook']) {
            $this->sendSlackNotification($subject, $data);
        }
    }

    private function sendSlackNotification(string $subject, array $data): void
    {
        $payload = [
            'text' => $subject,
            'attachments' => [
                [
                    'color' => strpos($subject, 'ALARM') !== false ? 'danger' : 'good',
                    'fields' => [
                        [
                            'title' => 'Memory Usage',
                            'value' => $data['memory_usage_percent'] . '%',
                            'short' => true,
                        ],
                        [
                            'title' => 'Timestamp',
                            'value' => $data['timestamp'],
                            'short' => true,
                        ],
                    ],
                ],
            ],
        ];
        
        $ch = curl_init($this->config['slack_webhook']);
        curl_setopt($ch, CURLOPT_POST, true);
        curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($payload));
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
        curl_exec($ch);
        curl_close($ch);
    }

    private function getRecommendedActions(array $status): array
    {
        $actions = [];
        
        if ($status['memory_usage_percent'] > 90) {
            $actions[] = '立即清理非关键队列';
            $actions[] = '增加消费者处理速度';
        }
        
        if ($status['memory_usage_percent'] > 80) {
            $actions[] = '将队列转换为懒队列模式';
            $actions[] = '检查是否有消息积压';
        }
        
        $actions[] = '检查内存配置是否合理';
        
        return $actions;
    }

    private function attemptAutoRecovery(array $status): array
    {
        $actions = [];
        
        if ($status['memory_usage_percent'] > 85) {
            $result = shell_exec('rabbitmqctl eval "erlang:garbage_collect()." 2>&1');
            $actions[] = [
                'action' => 'force_gc',
                'result' => $result,
            ];
        }
        
        return $actions;
    }
}

阻塞检测工具

php
<?php

namespace App\RabbitMQ\Memory;

class BlockDetector
{
    private string $apiHost;
    private int $apiPort;
    private string $apiUser;
    private string $apiPass;
    
    public function __construct(
        string $apiHost = 'localhost',
        int $apiPort = 15672,
        string $apiUser = 'guest',
        string $apiPass = 'guest'
    ) {
        $this->apiHost = $apiHost;
        $this->apiPort = $apiPort;
        $this->apiUser = $apiUser;
        $this->apiPass = $apiPass;
    }
    
    public function detectBlockedPublishers(): array
    {
        $connections = $this->apiRequest('/api/connections');
        
        $blocked = [];
        
        foreach ($connections ?? [] as $conn) {
            if (($conn['state'] ?? '') === 'blocked') {
                $blocked[] = [
                    'connection_name' => $conn['name'],
                    'client' => $conn['client_properties']['connection_name'] ?? 'unknown',
                    'peer_host' => $conn['peer_host'] ?? 'unknown',
                    'peer_port' => $conn['peer_port'] ?? 0,
                    'channels' => $conn['channels'] ?? 0,
                    'blocked_at' => date('Y-m-d H:i:s'),
                ];
            }
        }
        
        return [
            'total_connections' => count($connections ?? []),
            'blocked_count' => count($blocked),
            'blocked_publishers' => $blocked,
        ];
    }

    public function getBlockingReason(): array
    {
        $nodes = $this->apiRequest('/api/nodes');
        
        if (empty($nodes)) {
            return ['error' => 'Unable to determine blocking reason'];
        }
        
        $node = $nodes[0];
        $reasons = [];
        
        if ($node['mem_alarm'] ?? false) {
            $reasons[] = [
                'type' => 'memory',
                'message' => 'Memory alarm triggered',
                'current' => $this->formatBytes($node['mem_used'] ?? 0),
                'limit' => $this->formatBytes($node['mem_limit'] ?? 0),
            ];
        }
        
        if ($node['disk_free_alarm'] ?? false) {
            $reasons[] = [
                'type' => 'disk',
                'message' => 'Disk alarm triggered',
                'current' => $this->formatBytes($node['disk_free'] ?? 0),
                'limit' => $this->formatBytes($node['disk_free_limit'] ?? 0),
            ];
        }
        
        return [
            'is_blocked' => !empty($reasons),
            'reasons' => $reasons,
        ];
    }

    public function estimateUnblockTime(): array
    {
        $status = $this->detectBlockedPublishers();
        
        if ($status['blocked_count'] === 0) {
            return ['estimated_time' => 0, 'message' => 'No blocked publishers'];
        }
        
        $queues = $this->apiRequest('/api/queues?columns=name,messages,messages_ready');
        
        $totalMessages = 0;
        foreach ($queues ?? [] as $queue) {
            $totalMessages += $queue['messages'] ?? 0;
        }
        
        $estimatedSeconds = $totalMessages / 1000;
        
        return [
            'estimated_time' => round($estimatedSeconds, 2),
            'message' => "Estimated {$estimatedSeconds} seconds to clear backlog",
            'total_messages' => $totalMessages,
        ];
    }

    private function apiRequest(string $endpoint): array
    {
        $url = "http://{$this->apiHost}:{$this->apiPort}{$endpoint}";
        
        $ch = curl_init();
        curl_setopt($ch, CURLOPT_URL, $url);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
        curl_setopt($ch, CURLOPT_USERPWD, "{$this->apiUser}:{$this->apiPass}");
        curl_setopt($ch, CURLOPT_TIMEOUT, 10);
        
        $response = curl_exec($ch);
        curl_close($ch);
        
        return json_decode($response, true) ?: [];
    }

    private function formatBytes(int $bytes): string
    {
        $units = ['B', 'KB', 'MB', 'GB'];
        $i = 0;
        while ($bytes >= 1024 && $i < count($units) - 1) {
            $bytes /= 1024;
            $i++;
        }
        return round($bytes, 2) . ' ' . $units[$i];
    }
}

实际应用场景

场景一:告警通知系统

php
<?php

class AlarmNotificationSystem
{
    private MemoryAlarmMonitor $monitor;
    
    public function start(): void
    {
        $this->monitor->registerAlertCallback('memory_alarm', function ($data) {
            $this->sendAlert('critical', 'Memory alarm triggered', $data);
        });
        
        $this->monitor->registerAlertCallback('memory_clear', function ($data) {
            $this->sendAlert('info', 'Memory alarm cleared', $data);
        });
        
        $this->monitor->startMonitoring(10);
    }

    private function sendAlert(string $level, string $message, array $data): void
    {
        // 发送告警
    }
}

场景二:自动恢复系统

php
<?php

class AutoRecoverySystem
{
    private MemoryAlarmHandler $handler;
    
    public function monitor(): void
    {
        while (true) {
            $result = $this->handler->handleAlarm();
            
            if ($result['status'] === 'alarm') {
                $this->attemptRecovery($result);
            }
            
            sleep(30);
        }
    }

    private function attemptRecovery(array $status): void
    {
        if ($status['memory_usage'] > 90) {
            $this->handler->executeRecoveryAction('force_gc');
        }
    }
}

常见问题与解决方案

问题一:频繁触发告警

解决方案

ini
# 调整水位线
vm_memory_high_watermark.relative = 0.5

# 或使用绝对值
vm_memory_high_watermark.absolute = 8GB

问题二:告警后无法恢复

解决方案

bash
# 手动清除告警
rabbitmqctl eval "rabbit_alarm:clear_alarm({resource_limit, memory, node()})."

# 检查内存状态
rabbitmqctl status | grep memory

最佳实践建议

水位线设置

系统内存推荐水位线分页比例
4GB0.40.75
8GB0.50.75
16GB+0.60.75

告警响应

告警级别响应时间处理方式
Warning5分钟内监控、准备
Alarm1分钟内限流、清理
Critical立即强制恢复

相关链接