Appearance
RabbitMQ 内存告警机制
概述
RabbitMQ 的内存告警机制是保护系统稳定性的重要防线。当内存使用达到阈值时,系统会触发告警并阻止生产者继续发送消息,防止内存溢出导致系统崩溃。
核心知识点
内存告警工作原理
┌─────────────────────────────────────────────────────────────┐
│ 内存告警机制流程 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 内存使用率 │
│ 100% ─┬───────────────────────────────────────────── │
│ │ ┌─────────────────────┐ │
│ 95% ─┤ │ 阻塞所有发布者 │ │
│ │ │ (memory_alarm) │ │
│ 90% ─┤ └─────────────────────┘ │
│ │ │
│ 75% ─┤ ┌─────────────────────┐ │
│ │ │ 开始分页到磁盘 │ │
│ 60% ─┤────▶│ (paging) │ │
│ │ └─────────────────────┘ │
│ 40% ─┤ │
│ │ 正常运行 │
│ 0% ─┴───────────────────────────────────────────── │
│ │
│ 触发顺序:正常 → 分页 → 阻塞 → 恢复 │
│ │
└─────────────────────────────────────────────────────────────┘告警阈值配置
| 参数 | 默认值 | 说明 |
|---|---|---|
| vm_memory_high_watermark | 0.4 | 内存水位线(相对值) |
| vm_memory_high_watermark_paging_ratio | 0.75 | 分页阈值比例 |
| vm_memory_high_watermark.absolute | - | 内存水位线(绝对值) |
告警状态影响
┌─────────────────────────────────────────────────────────────┐
│ 告警状态影响 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 正常状态: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 生产者 ──▶ 发布消息 ──▶ 队列 ──▶ 消费者 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 告警状态: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 生产者 ──▶ 发布消息 ──✕ 阻塞 │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ 等待内存释放... │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 影响范围: │
│ - 所有发布连接被阻塞 │
│ - 消费者仍可正常消费 │
│ - 系统等待内存释放 │
│ │
└─────────────────────────────────────────────────────────────┘分页机制
当内存使用达到 watermark * paging_ratio 时,开始将消息分页到磁盘:
内存使用 = watermark * paging_ratio
例如:watermark = 0.4, paging_ratio = 0.75
分页触发点 = 0.4 * 0.75 = 0.3 (30%)
分页过程:
1. 将队列中的部分消息写入磁盘
2. 释放内存空间
3. 消费时从磁盘读取配置示例
基础配置
ini
# /etc/rabbitmq/rabbitmq.conf
# 相对值配置(推荐)
vm_memory_high_watermark.relative = 0.6
# 分页比例
vm_memory_high_watermark_paging_ratio = 0.75
# 绝对值配置(可选)
# vm_memory_high_watermark.absolute = 4GB高级配置
ini
# /etc/rabbitmq/rabbitmq.conf
# 内存计算策略
# total_memory: 使用系统总内存(推荐)
# legacy: 使用 Erlang VM 报告的内存
memory_calculation_strategy = total_memory
# 内存告警间隔(毫秒)
# collect_statistics_interval = 5000策略配置
bash
# 为特定队列设置内存告警策略
rabbitmqctl set_policy memory-policy "^high-memory\." \
'{"max-length":10000,"overflow":"reject-publish"}' \
--apply-to queues
# 设置队列模式为懒队列
rabbitmqctl set_policy lazy "^lazy\." \
'{"queue-mode":"lazy"}' \
--apply-to queuesPHP 代码示例
内存告警监控类
php
<?php
namespace App\RabbitMQ\Memory;
class MemoryAlarmMonitor
{
private string $apiHost;
private int $apiPort;
private string $apiUser;
private string $apiPass;
private array $alertCallbacks = [];
public function __construct(
string $apiHost = 'localhost',
int $apiPort = 15672,
string $apiUser = 'guest',
string $apiPass = 'guest'
) {
$this->apiHost = $apiHost;
$this->apiPort = $apiPort;
$this->apiUser = $apiUser;
$this->apiPass = $apiPass;
}
public function checkAlarmStatus(): array
{
$nodes = $this->apiRequest('/api/nodes');
if (empty($nodes)) {
return ['error' => 'Unable to fetch node information'];
}
$node = $nodes[0];
return [
'node_name' => $node['name'],
'memory_alarm' => $node['mem_alarm'] ?? false,
'disk_alarm' => $node['disk_free_alarm'] ?? false,
'memory_used' => $node['mem_used'] ?? 0,
'memory_limit' => $node['mem_limit'] ?? 0,
'memory_usage_percent' => $this->calculatePercent(
$node['mem_used'] ?? 0,
$node['mem_limit'] ?? 1
),
'status' => $this->determineStatus($node),
'timestamp' => date('Y-m-d H:i:s'),
];
}
public function getAlarmHistory(int $limit = 100): array
{
$alarms = $this->apiRequest('/api/alarms');
return array_slice($alarms ?? [], 0, $limit);
}
public function registerAlertCallback(string $type, callable $callback): void
{
$this->alertCallbacks[$type] = $callback;
}
public function startMonitoring(int $intervalSeconds = 10): void
{
$previousAlarm = false;
while (true) {
$status = $this->checkAlarmStatus();
if ($status['memory_alarm'] && !$previousAlarm) {
$this->triggerAlert('memory_alarm', $status);
}
if (!$status['memory_alarm'] && $previousAlarm) {
$this->triggerAlert('memory_clear', $status);
}
$previousAlarm = $status['memory_alarm'];
sleep($intervalSeconds);
}
}
public function getBlockedConnections(): array
{
$connections = $this->apiRequest('/api/connections');
$blocked = [];
foreach ($connections ?? [] as $conn) {
if (isset($conn['state']) && $conn['state'] === 'blocked') {
$blocked[] = [
'name' => $conn['name'],
'client_properties' => $conn['client_properties'] ?? [],
'blocked_since' => $conn['blocked_since'] ?? null,
];
}
}
return [
'blocked_count' => count($blocked),
'connections' => $blocked,
];
}
public function getWatermarkSettings(): array
{
$node = $this->apiRequest('/api/node');
return [
'memory_watermark' => $node['mem_limit'] ?? 0,
'paging_ratio' => 0.75,
'effective_watermark' => ($node['mem_limit'] ?? 0) * 0.75,
];
}
public function simulateAlarm(): array
{
return [
'action' => 'set_alarm',
'command' => 'rabbitmqctl eval "rabbit_alarm:set_alarm({{resource_limit, memory, node()}, []})."',
'warning' => 'This is for testing purposes only',
];
}
public function clearAlarm(): array
{
return [
'action' => 'clear_alarm',
'command' => 'rabbitmqctl eval "rabbit_alarm:clear_alarm({resource_limit, memory, node()})."',
];
}
private function triggerAlert(string $type, array $data): void
{
if (isset($this->alertCallbacks[$type])) {
call_user_func($this->alertCallbacks[$type], $data);
}
$this->logAlert($type, $data);
}
private function logAlert(string $type, array $data): void
{
$logEntry = sprintf(
"[%s] %s: %s\n",
date('Y-m-d H:i:s'),
strtoupper($type),
json_encode($data)
);
file_put_contents('/var/log/rabbitmq_alarm.log', $logEntry, FILE_APPEND);
}
private function determineStatus(array $node): string
{
if ($node['mem_alarm'] ?? false) {
return 'alarm';
}
$usage = $this->calculatePercent(
$node['mem_used'] ?? 0,
$node['mem_limit'] ?? 1
);
if ($usage > 80) {
return 'warning';
}
return 'normal';
}
private function calculatePercent(int $used, int $total): float
{
if ($total === 0) {
return 0;
}
return round($used / $total * 100, 2);
}
private function apiRequest(string $endpoint): array
{
$url = "http://{$this->apiHost}:{$this->apiPort}{$endpoint}";
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_USERPWD, "{$this->apiUser}:{$this->apiPass}");
curl_setopt($ch, CURLOPT_TIMEOUT, 10);
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
if ($httpCode !== 200) {
return [];
}
return json_decode($response, true) ?: [];
}
}告警处理器
php
<?php
namespace App\RabbitMQ\Memory;
class MemoryAlarmHandler
{
private MemoryAlarmMonitor $monitor;
private array $config;
public function __construct(MemoryAlarmMonitor $monitor, array $config = [])
{
$this->monitor = $monitor;
$this->config = array_merge([
'notification_email' => null,
'slack_webhook' => null,
'auto_recovery' => true,
], $config);
$this->registerCallbacks();
}
public function handleAlarm(): array
{
$status = $this->monitor->checkAlarmStatus();
if ($status['memory_alarm']) {
return $this->handleAlarmState($status);
}
return $this->handleNormalState($status);
}
public function getRecoveryActions(): array
{
return [
[
'action' => 'purge_queues',
'description' => '清空非关键队列',
'command' => 'rabbitmqctl purge_queue <queue_name>',
'risk' => 'high',
],
[
'action' => 'delete_queues',
'description' => '删除空闲队列',
'command' => 'rabbitmqctl delete_queue <queue_name>',
'risk' => 'medium',
],
[
'action' => 'set_lazy_mode',
'description' => '将队列设置为懒模式',
'command' => 'rabbitmqctl set_policy lazy ".*" \'{"queue-mode":"lazy"}\' --apply-to queues',
'risk' => 'low',
],
[
'action' => 'force_gc',
'description' => '强制垃圾回收',
'command' => 'rabbitmqctl eval "erlang:garbage_collect()."',
'risk' => 'low',
],
];
}
public function executeRecoveryAction(string $action): array
{
$actions = $this->getRecoveryActions();
if (!isset($actions[$action])) {
return ['error' => 'Unknown action'];
}
$actionInfo = $actions[$action];
if ($actionInfo['risk'] === 'high') {
return [
'error' => 'High risk action requires manual execution',
'command' => $actionInfo['command'],
];
}
$output = shell_exec($actionInfo['command'] . ' 2>&1');
return [
'action' => $action,
'output' => $output,
'timestamp' => date('Y-m-d H:i:s'),
];
}
private function registerCallbacks(): void
{
$this->monitor->registerAlertCallback('memory_alarm', function ($data) {
$this->onMemoryAlarm($data);
});
$this->monitor->registerAlertCallback('memory_clear', function ($data) {
$this->onMemoryClear($data);
});
}
private function handleAlarmState(array $status): array
{
$blocked = $this->monitor->getBlockedConnections();
$response = [
'status' => 'alarm',
'memory_usage' => $status['memory_usage_percent'],
'blocked_connections' => $blocked['blocked_count'],
'recommended_actions' => $this->getRecommendedActions($status),
];
if ($this->config['auto_recovery']) {
$response['auto_recovery'] = $this->attemptAutoRecovery($status);
}
return $response;
}
private function handleNormalState(array $status): array
{
return [
'status' => 'normal',
'memory_usage' => $status['memory_usage_percent'],
];
}
private function onMemoryAlarm(array $data): void
{
$this->sendNotification('MEMORY ALARM', $data);
}
private function onMemoryClear(array $data): void
{
$this->sendNotification('MEMORY ALARM CLEARED', $data);
}
private function sendNotification(string $subject, array $data): void
{
if ($this->config['notification_email']) {
mail(
$this->config['notification_email'],
"[RabbitMQ] {$subject}",
json_encode($data, JSON_PRETTY_PRINT)
);
}
if ($this->config['slack_webhook']) {
$this->sendSlackNotification($subject, $data);
}
}
private function sendSlackNotification(string $subject, array $data): void
{
$payload = [
'text' => $subject,
'attachments' => [
[
'color' => strpos($subject, 'ALARM') !== false ? 'danger' : 'good',
'fields' => [
[
'title' => 'Memory Usage',
'value' => $data['memory_usage_percent'] . '%',
'short' => true,
],
[
'title' => 'Timestamp',
'value' => $data['timestamp'],
'short' => true,
],
],
],
],
];
$ch = curl_init($this->config['slack_webhook']);
curl_setopt($ch, CURLOPT_POST, true);
curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($payload));
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_exec($ch);
curl_close($ch);
}
private function getRecommendedActions(array $status): array
{
$actions = [];
if ($status['memory_usage_percent'] > 90) {
$actions[] = '立即清理非关键队列';
$actions[] = '增加消费者处理速度';
}
if ($status['memory_usage_percent'] > 80) {
$actions[] = '将队列转换为懒队列模式';
$actions[] = '检查是否有消息积压';
}
$actions[] = '检查内存配置是否合理';
return $actions;
}
private function attemptAutoRecovery(array $status): array
{
$actions = [];
if ($status['memory_usage_percent'] > 85) {
$result = shell_exec('rabbitmqctl eval "erlang:garbage_collect()." 2>&1');
$actions[] = [
'action' => 'force_gc',
'result' => $result,
];
}
return $actions;
}
}阻塞检测工具
php
<?php
namespace App\RabbitMQ\Memory;
class BlockDetector
{
private string $apiHost;
private int $apiPort;
private string $apiUser;
private string $apiPass;
public function __construct(
string $apiHost = 'localhost',
int $apiPort = 15672,
string $apiUser = 'guest',
string $apiPass = 'guest'
) {
$this->apiHost = $apiHost;
$this->apiPort = $apiPort;
$this->apiUser = $apiUser;
$this->apiPass = $apiPass;
}
public function detectBlockedPublishers(): array
{
$connections = $this->apiRequest('/api/connections');
$blocked = [];
foreach ($connections ?? [] as $conn) {
if (($conn['state'] ?? '') === 'blocked') {
$blocked[] = [
'connection_name' => $conn['name'],
'client' => $conn['client_properties']['connection_name'] ?? 'unknown',
'peer_host' => $conn['peer_host'] ?? 'unknown',
'peer_port' => $conn['peer_port'] ?? 0,
'channels' => $conn['channels'] ?? 0,
'blocked_at' => date('Y-m-d H:i:s'),
];
}
}
return [
'total_connections' => count($connections ?? []),
'blocked_count' => count($blocked),
'blocked_publishers' => $blocked,
];
}
public function getBlockingReason(): array
{
$nodes = $this->apiRequest('/api/nodes');
if (empty($nodes)) {
return ['error' => 'Unable to determine blocking reason'];
}
$node = $nodes[0];
$reasons = [];
if ($node['mem_alarm'] ?? false) {
$reasons[] = [
'type' => 'memory',
'message' => 'Memory alarm triggered',
'current' => $this->formatBytes($node['mem_used'] ?? 0),
'limit' => $this->formatBytes($node['mem_limit'] ?? 0),
];
}
if ($node['disk_free_alarm'] ?? false) {
$reasons[] = [
'type' => 'disk',
'message' => 'Disk alarm triggered',
'current' => $this->formatBytes($node['disk_free'] ?? 0),
'limit' => $this->formatBytes($node['disk_free_limit'] ?? 0),
];
}
return [
'is_blocked' => !empty($reasons),
'reasons' => $reasons,
];
}
public function estimateUnblockTime(): array
{
$status = $this->detectBlockedPublishers();
if ($status['blocked_count'] === 0) {
return ['estimated_time' => 0, 'message' => 'No blocked publishers'];
}
$queues = $this->apiRequest('/api/queues?columns=name,messages,messages_ready');
$totalMessages = 0;
foreach ($queues ?? [] as $queue) {
$totalMessages += $queue['messages'] ?? 0;
}
$estimatedSeconds = $totalMessages / 1000;
return [
'estimated_time' => round($estimatedSeconds, 2),
'message' => "Estimated {$estimatedSeconds} seconds to clear backlog",
'total_messages' => $totalMessages,
];
}
private function apiRequest(string $endpoint): array
{
$url = "http://{$this->apiHost}:{$this->apiPort}{$endpoint}";
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_USERPWD, "{$this->apiUser}:{$this->apiPass}");
curl_setopt($ch, CURLOPT_TIMEOUT, 10);
$response = curl_exec($ch);
curl_close($ch);
return json_decode($response, true) ?: [];
}
private function formatBytes(int $bytes): string
{
$units = ['B', 'KB', 'MB', 'GB'];
$i = 0;
while ($bytes >= 1024 && $i < count($units) - 1) {
$bytes /= 1024;
$i++;
}
return round($bytes, 2) . ' ' . $units[$i];
}
}实际应用场景
场景一:告警通知系统
php
<?php
class AlarmNotificationSystem
{
private MemoryAlarmMonitor $monitor;
public function start(): void
{
$this->monitor->registerAlertCallback('memory_alarm', function ($data) {
$this->sendAlert('critical', 'Memory alarm triggered', $data);
});
$this->monitor->registerAlertCallback('memory_clear', function ($data) {
$this->sendAlert('info', 'Memory alarm cleared', $data);
});
$this->monitor->startMonitoring(10);
}
private function sendAlert(string $level, string $message, array $data): void
{
// 发送告警
}
}场景二:自动恢复系统
php
<?php
class AutoRecoverySystem
{
private MemoryAlarmHandler $handler;
public function monitor(): void
{
while (true) {
$result = $this->handler->handleAlarm();
if ($result['status'] === 'alarm') {
$this->attemptRecovery($result);
}
sleep(30);
}
}
private function attemptRecovery(array $status): void
{
if ($status['memory_usage'] > 90) {
$this->handler->executeRecoveryAction('force_gc');
}
}
}常见问题与解决方案
问题一:频繁触发告警
解决方案:
ini
# 调整水位线
vm_memory_high_watermark.relative = 0.5
# 或使用绝对值
vm_memory_high_watermark.absolute = 8GB问题二:告警后无法恢复
解决方案:
bash
# 手动清除告警
rabbitmqctl eval "rabbit_alarm:clear_alarm({resource_limit, memory, node()})."
# 检查内存状态
rabbitmqctl status | grep memory最佳实践建议
水位线设置
| 系统内存 | 推荐水位线 | 分页比例 |
|---|---|---|
| 4GB | 0.4 | 0.75 |
| 8GB | 0.5 | 0.75 |
| 16GB+ | 0.6 | 0.75 |
告警响应
| 告警级别 | 响应时间 | 处理方式 |
|---|---|---|
| Warning | 5分钟内 | 监控、准备 |
| Alarm | 1分钟内 | 限流、清理 |
| Critical | 立即 | 强制恢复 |
