Skip to content

RabbitMQ 告警指标

概述

告警指标是监控系统的核心,合理选择和配置告警指标可以及时发现和预警系统问题。本文将详细介绍 RabbitMQ 的关键告警指标、指标阈值设置和告警级别划分。

核心知识点

告警指标分类

分类指标说明
资源类内存、磁盘、CPU、文件描述符系统资源使用情况
连接类连接数、通道数、消费者数客户端连接状态
队列类消息数、消费速率、队列长度队列运行状态
消息类发布速率、确认速率、重投递率消息流转情况
集群类节点状态、分区状态、同步状态集群健康状态

告警级别定义

级别颜色说明响应时间
Critical红色严重故障,影响业务5 分钟内
Warning橙色警告,需要关注30 分钟内
Info蓝色信息,仅供参考无需响应

指标阈值原则

  1. 基于历史数据:参考历史运行数据设置
  2. 留有余量:在临界值之前告警
  3. 分级设置:不同级别设置不同阈值
  4. 动态调整:根据业务变化调整

配置示例

关键告警指标详解

php
<?php

class RabbitMQAlertMetrics
{
    private $metrics;
    
    public function __construct()
    {
        $this->metrics = $this->defineMetrics();
    }
    
    private function defineMetrics()
    {
        return [
            'memory_usage' => [
                'name' => '内存使用率',
                'description' => 'RabbitMQ 节点内存使用占总内存的百分比',
                'unit' => '%',
                'thresholds' => [
                    'warning' => 70,
                    'critical' => 85,
                ],
                'calculation' => '(mem_used / mem_limit) * 100',
                'impact' => '内存不足会触发流控,影响消息吞吐',
                'recommendation' => '增加内存或优化消息处理',
            ],
            
            'disk_free' => [
                'name' => '磁盘可用空间',
                'description' => 'RabbitMQ 数据目录所在磁盘的可用空间',
                'unit' => 'GB',
                'thresholds' => [
                    'warning' => 10,
                    'critical' => 5,
                ],
                'calculation' => 'disk_free / 1024 / 1024 / 1024',
                'impact' => '磁盘空间不足会阻止消息写入',
                'recommendation' => '清理日志或扩展磁盘容量',
            ],
            
            'disk_alarm' => [
                'name' => '磁盘告警状态',
                'description' => '磁盘空间是否触发告警阈值',
                'unit' => 'boolean',
                'thresholds' => [
                    'critical' => true,
                ],
                'calculation' => 'disk_free_alarm == true',
                'impact' => '触发磁盘告警会阻塞所有生产者',
                'recommendation' => '立即释放磁盘空间',
            ],
            
            'memory_alarm' => [
                'name' => '内存告警状态',
                'description' => '内存是否触发告警阈值',
                'unit' => 'boolean',
                'thresholds' => [
                    'critical' => true,
                ],
                'calculation' => 'mem_alarm == true',
                'impact' => '触发内存告警会阻塞所有生产者',
                'recommendation' => '立即释放内存或增加内存限制',
            ],
            
            'queue_messages' => [
                'name' => '队列消息数',
                'description' => '队列中等待消费的消息总数',
                'unit' => '条',
                'thresholds' => [
                    'warning' => 50000,
                    'critical' => 100000,
                ],
                'calculation' => 'queue_messages',
                'impact' => '消息堆积会消耗内存和磁盘',
                'recommendation' => '增加消费者或优化消费逻辑',
            ],
            
            'queue_messages_ready' => [
                'name' => '待消费消息数',
                'description' => '队列中等待被消费的消息数',
                'unit' => '条',
                'thresholds' => [
                    'warning' => 30000,
                    'critical' => 80000,
                ],
                'calculation' => 'messages_ready',
                'impact' => '消息堆积影响业务处理时效',
                'recommendation' => '检查消费者状态和消费速率',
            ],
            
            'queue_messages_unacked' => [
                'name' => '未确认消息数',
                'description' => '已投递但未被确认的消息数',
                'unit' => '条',
                'thresholds' => [
                    'warning' => 10000,
                    'critical' => 50000,
                ],
                'calculation' => 'messages_unacked',
                'impact' => '未确认消息过多可能导致消息重复',
                'recommendation' => '检查消费者确认逻辑',
            ],
            
            'connections_total' => [
                'name' => '总连接数',
                'description' => '当前活跃的 TCP 连接总数',
                'unit' => '个',
                'thresholds' => [
                    'warning' => 800,
                    'critical' => 950,
                ],
                'calculation' => 'object_totals.connections',
                'impact' => '连接数过多消耗系统资源',
                'recommendation' => '优化连接池配置',
            ],
            
            'channels_total' => [
                'name' => '总通道数',
                'description' => '当前活跃的 AMQP 通道总数',
                'unit' => '个',
                'thresholds' => [
                    'warning' => 5000,
                    'critical' => 8000,
                ],
                'calculation' => 'object_totals.channels',
                'impact' => '通道数过多影响性能',
                'recommendation' => '优化通道使用策略',
            ],
            
            'consumers_total' => [
                'name' => '总消费者数',
                'description' => '当前活跃的消费者总数',
                'unit' => '个',
                'thresholds' => [
                    'warning' => 1000,
                    'critical' => 2000,
                ],
                'calculation' => 'object_totals.consumers',
                'impact' => '消费者过多增加调度开销',
                'recommendation' => '合并消费者或优化消费逻辑',
            ],
            
            'fd_usage' => [
                'name' => '文件描述符使用率',
                'description' => '已使用文件描述符占总数的百分比',
                'unit' => '%',
                'thresholds' => [
                    'warning' => 80,
                    'critical' => 90,
                ],
                'calculation' => '(fd_used / fd_total) * 100',
                'impact' => '文件描述符耗尽会导致无法接受新连接',
                'recommendation' => '增加系统文件描述符限制',
            ],
            
            'sockets_usage' => [
                'name' => 'Socket 使用率',
                'description' => '已使用 Socket 占总数的百分比',
                'unit' => '%',
                'thresholds' => [
                    'warning' => 80,
                    'critical' => 90,
                ],
                'calculation' => '(sockets_used / sockets_total) * 100',
                'impact' => 'Socket 耗尽会导致连接失败',
                'recommendation' => '增加 Socket 限制或优化连接复用',
            ],
            
            'proc_usage' => [
                'name' => 'Erlang 进程使用率',
                'description' => '已使用 Erlang 进程占总数的百分比',
                'unit' => '%',
                'thresholds' => [
                    'warning' => 80,
                    'critical' => 90,
                ],
                'calculation' => '(proc_used / proc_total) * 100',
                'impact' => '进程数耗尽会导致服务不可用',
                'recommendation' => '增加 Erlang 进程限制',
            ],
            
            'node_status' => [
                'name' => '节点状态',
                'description' => 'RabbitMQ 节点是否正常运行',
                'unit' => 'boolean',
                'thresholds' => [
                    'critical' => false,
                ],
                'calculation' => 'running == true',
                'impact' => '节点宕机会影响服务可用性',
                'recommendation' => '立即检查节点状态并恢复',
            ],
            
            'cluster_partition' => [
                'name' => '集群分区',
                'description' => '集群是否存在网络分区',
                'unit' => 'boolean',
                'thresholds' => [
                    'critical' => true,
                ],
                'calculation' => 'partitions.length > 0',
                'impact' => '网络分区会导致数据不一致',
                'recommendation' => '检查网络并处理分区',
            ],
            
            'queue_consumer_count' => [
                'name' => '队列消费者数',
                'description' => '队列当前的消费者数量',
                'unit' => '个',
                'thresholds' => [
                    'warning' => 0,
                ],
                'condition' => 'queue_messages > 1000',
                'calculation' => 'consumers',
                'impact' => '无消费者会导致消息堆积',
                'recommendation' => '启动消费者或检查消费者状态',
            ],
            
            'message_publish_rate' => [
                'name' => '消息发布速率',
                'description' => '每秒发布的消息数量',
                'unit' => '条/秒',
                'thresholds' => [
                    'warning' => 50000,
                    'critical' => 100000,
                ],
                'calculation' => 'message_stats.publish_details.rate',
                'impact' => '发布速率过高可能导致处理延迟',
                'recommendation' => '优化生产者或扩展集群',
            ],
            
            'message_redeliver_rate' => [
                'name' => '消息重投递率',
                'description' => '消息重投递占总投递的百分比',
                'unit' => '%',
                'thresholds' => [
                    'warning' => 10,
                    'critical' => 30,
                ],
                'calculation' => '(redeliver / deliver) * 100',
                'impact' => '重投递过多影响性能和业务',
                'recommendation' => '检查消费者处理逻辑',
            ],
        ];
    }
    
    public function getMetrics()
    {
        return $this->metrics;
    }
    
    public function getMetric($name)
    {
        return $this->metrics[$name] ?? null;
    }
    
    public function getThresholds($name)
    {
        $metric = $this->getMetric($name);
        return $metric['thresholds'] ?? [];
    }
    
    public function checkThreshold($name, $value)
    {
        $metric = $this->getMetric($name);
        
        if (!$metric) {
            return ['level' => 'unknown', 'message' => 'Unknown metric'];
        }
        
        $thresholds = $metric['thresholds'];
        
        if (isset($thresholds['critical'])) {
            if ($this->exceedsThreshold($value, $thresholds['critical'], $name)) {
                return [
                    'level' => 'critical',
                    'value' => $value,
                    'threshold' => $thresholds['critical'],
                    'message' => "{$metric['name']} 达到严重阈值",
                    'impact' => $metric['impact'],
                    'recommendation' => $metric['recommendation'],
                ];
            }
        }
        
        if (isset($thresholds['warning'])) {
            if ($this->exceedsThreshold($value, $thresholds['warning'], $name)) {
                return [
                    'level' => 'warning',
                    'value' => $value,
                    'threshold' => $thresholds['warning'],
                    'message' => "{$metric['name']} 达到警告阈值",
                    'impact' => $metric['impact'],
                    'recommendation' => $metric['recommendation'],
                ];
            }
        }
        
        return [
            'level' => 'ok',
            'value' => $value,
            'message' => "{$metric['name']} 正常",
        ];
    }
    
    private function exceedsThreshold($value, $threshold, $metricName)
    {
        $lowerIsBetter = in_array($metricName, ['disk_free', 'queue_consumer_count', 'node_status']);
        
        if ($lowerIsBetter) {
            if ($metricName === 'node_status') {
                return !$value;
            }
            if ($metricName === 'queue_consumer_count') {
                return $value === 0;
            }
            return $value < $threshold;
        }
        
        return $value > $threshold;
    }
    
    public function checkAllMetrics(array $currentValues)
    {
        $results = [];
        
        foreach ($currentValues as $name => $value) {
            if (isset($this->metrics[$name])) {
                $results[$name] = $this->checkThreshold($name, $value);
            }
        }
        
        return $results;
    }
    
    public function getAlerts(array $currentValues)
    {
        $results = $this->checkAllMetrics($currentValues);
        $alerts = [];
        
        foreach ($results as $name => $result) {
            if (in_array($result['level'], ['warning', 'critical'])) {
                $alerts[] = array_merge(['metric' => $name], $result);
            }
        }
        
        usort($alerts, function($a, $b) {
            $order = ['critical' => 0, 'warning' => 1];
            return $order[$a['level']] <=> $order[$b['level']];
        });
        
        return $alerts;
    }
    
    public function generateMetricsReport()
    {
        $report = "RabbitMQ 告警指标配置报告\n";
        $report .= str_repeat("=", 60) . "\n\n";
        
        foreach ($this->metrics as $name => $metric) {
            $report .= "【{$metric['name']}】\n";
            $report .= "- 指标名称: {$name}\n";
            $report .= "- 说明: {$metric['description']}\n";
            $report .= "- 单位: {$metric['unit']}\n";
            
            $thresholds = [];
            foreach ($metric['thresholds'] as $level => $value) {
                $thresholds[] = "{$level}: {$value}";
            }
            $report .= "- 阈值: " . implode(', ', $thresholds) . "\n";
            
            $report .= "- 影响: {$metric['impact']}\n";
            $report .= "- 建议: {$metric['recommendation']}\n\n";
        }
        
        return $report;
    }
    
    public function exportToPrometheus()
    {
        $output = [];
        
        foreach ($this->metrics as $name => $metric) {
            $output[] = "# HELP rabbitmq_alert_threshold_{$name} {$metric['description']}";
            $output[] = "# TYPE rabbitmq_alert_threshold_{$name} gauge";
            
            foreach ($metric['thresholds'] as $level => $value) {
                $output[] = "rabbitmq_alert_threshold_{$name}{level=\"{$level}\"} {$value}";
            }
        }
        
        return implode("\n", $output);
    }
}

指标阈值配置文件

php
<?php

return [
    'memory_usage' => [
        'warning' => 70,
        'critical' => 85,
    ],
    'disk_free' => [
        'warning' => 10,
        'critical' => 5,
    ],
    'queue_messages' => [
        'warning' => 50000,
        'critical' => 100000,
    ],
    'connections_total' => [
        'warning' => 800,
        'critical' => 950,
    ],
    'channels_total' => [
        'warning' => 5000,
        'critical' => 8000,
    ],
    'fd_usage' => [
        'warning' => 80,
        'critical' => 90,
    ],
    'sockets_usage' => [
        'warning' => 80,
        'critical' => 90,
    ],
    'proc_usage' => [
        'warning' => 80,
        'critical' => 90,
    ],
    'message_redeliver_rate' => [
        'warning' => 10,
        'critical' => 30,
    ],
];

实际应用场景

场景一:动态阈值调整

php
<?php

class DynamicThresholdManager
{
    private $metrics;
    private $historyFile;
    
    public function __construct(RabbitMQAlertMetrics $metrics, $historyFile = '/var/lib/rabbitmq/threshold_history.json')
    {
        $this->metrics = $metrics;
        $this->historyFile = $historyFile;
    }
    
    public function adjustThresholdsBasedOnHistory($days = 30)
    {
        $history = $this->loadHistory();
        $adjustments = [];
        
        foreach ($history as $metricName => $values) {
            if (count($values) < 100) {
                continue;
            }
            
            $avg = array_sum($values) / count($values);
            $max = max($values);
            $p95 = $this->percentile($values, 95);
            
            $currentThresholds = $this->metrics->getThresholds($metricName);
            
            $newWarningThreshold = $p95 * 1.1;
            $newCriticalThreshold = $max * 1.2;
            
            if ($newWarningThreshold > ($currentThresholds['warning'] ?? 0) * 1.2) {
                $adjustments[$metricName] = [
                    'old_warning' => $currentThresholds['warning'] ?? null,
                    'new_warning' => round($newWarningThreshold, 2),
                    'old_critical' => $currentThresholds['critical'] ?? null,
                    'new_critical' => round($newCriticalThreshold, 2),
                    'reason' => 'Based on historical data analysis',
                ];
            }
        }
        
        return $adjustments;
    }
    
    private function percentile($values, $percentile)
    {
        sort($values);
        $index = ceil(($percentile / 100) * count($values)) - 1;
        return $values[$index] ?? 0;
    }
    
    private function loadHistory()
    {
        if (file_exists($this->historyFile)) {
            return json_decode(file_get_contents($this->historyFile), true) ?: [];
        }
        return [];
    }
}

场景二:指标聚合计算

php
<?php

class MetricsAggregator
{
    private $apiClient;
    
    public function __construct($apiClient)
    {
        $this->apiClient = $apiClient;
    }
    
    public function aggregateMetrics()
    {
        $overview = $this->apiClient->getOverview();
        $nodes = $this->apiClient->getNodes();
        $queues = $this->apiClient->getQueues();
        
        $metrics = [];
        
        $metrics['memory_usage'] = $this->calculateMemoryUsage($nodes);
        $metrics['disk_free'] = $this->calculateDiskFree($nodes);
        $metrics['disk_alarm'] = $this->checkDiskAlarm($nodes);
        $metrics['memory_alarm'] = $this->checkMemoryAlarm($nodes);
        $metrics['queue_messages'] = $this->calculateTotalMessages($queues);
        $metrics['queue_messages_ready'] = $this->calculateReadyMessages($queues);
        $metrics['queue_messages_unacked'] = $this->calculateUnackedMessages($queues);
        $metrics['connections_total'] = $overview['object_totals']['connections'] ?? 0;
        $metrics['channels_total'] = $overview['object_totals']['channels'] ?? 0;
        $metrics['consumers_total'] = $overview['object_totals']['consumers'] ?? 0;
        $metrics['fd_usage'] = $this->calculateFdUsage($nodes);
        $metrics['sockets_usage'] = $this->calculateSocketsUsage($nodes);
        $metrics['proc_usage'] = $this->calculateProcUsage($nodes);
        $metrics['node_status'] = $this->checkNodeStatus($nodes);
        $metrics['cluster_partition'] = $this->checkPartitions($nodes);
        
        return $metrics;
    }
    
    private function calculateMemoryUsage($nodes)
    {
        if (empty($nodes)) {
            return 0;
        }
        
        $totalUsage = 0;
        foreach ($nodes as $node) {
            $usage = ($node['mem_used'] / $node['mem_limit']) * 100;
            $totalUsage += $usage;
        }
        
        return $totalUsage / count($nodes);
    }
    
    private function calculateDiskFree($nodes)
    {
        if (empty($nodes)) {
            return 0;
        }
        
        $minFree = PHP_FLOAT_MAX;
        foreach ($nodes as $node) {
            $freeGB = $node['disk_free'] / 1024 / 1024 / 1024;
            $minFree = min($minFree, $freeGB);
        }
        
        return $minFree;
    }
    
    private function checkDiskAlarm($nodes)
    {
        foreach ($nodes as $node) {
            if ($node['disk_free_alarm'] ?? false) {
                return true;
            }
        }
        return false;
    }
    
    private function checkMemoryAlarm($nodes)
    {
        foreach ($nodes as $node) {
            if ($node['mem_alarm'] ?? false) {
                return true;
            }
        }
        return false;
    }
    
    private function calculateTotalMessages($queues)
    {
        $total = 0;
        foreach ($queues as $queue) {
            $total += $queue['messages'] ?? 0;
        }
        return $total;
    }
    
    private function calculateReadyMessages($queues)
    {
        $total = 0;
        foreach ($queues as $queue) {
            $total += $queue['messages_ready'] ?? 0;
        }
        return $total;
    }
    
    private function calculateUnackedMessages($queues)
    {
        $total = 0;
        foreach ($queues as $queue) {
            $total += $queue['messages_unacked'] ?? 0;
        }
        return $total;
    }
    
    private function calculateFdUsage($nodes)
    {
        if (empty($nodes)) {
            return 0;
        }
        
        $totalUsage = 0;
        foreach ($nodes as $node) {
            $usage = ($node['fd_used'] / $node['fd_total']) * 100;
            $totalUsage += $usage;
        }
        
        return $totalUsage / count($nodes);
    }
    
    private function calculateSocketsUsage($nodes)
    {
        if (empty($nodes)) {
            return 0;
        }
        
        $totalUsage = 0;
        foreach ($nodes as $node) {
            $usage = ($node['sockets_used'] / $node['sockets_total']) * 100;
            $totalUsage += $usage;
        }
        
        return $totalUsage / count($nodes);
    }
    
    private function calculateProcUsage($nodes)
    {
        if (empty($nodes)) {
            return 0;
        }
        
        $totalUsage = 0;
        foreach ($nodes as $node) {
            $usage = ($node['proc_used'] / $node['proc_total']) * 100;
            $totalUsage += $usage;
        }
        
        return $totalUsage / count($nodes);
    }
    
    private function checkNodeStatus($nodes)
    {
        foreach ($nodes as $node) {
            if (!($node['running'] ?? false)) {
                return false;
            }
        }
        return true;
    }
    
    private function checkPartitions($nodes)
    {
        foreach ($nodes as $node) {
            if (!empty($node['partitions'])) {
                return true;
            }
        }
        return false;
    }
}

常见问题与解决方案

问题一:告警阈值设置不合理

现象:告警频繁或漏报。

解决方案

php
$thresholds = [
    'warning' => $p95 * 1.1,
    'critical' => $max * 1.2,
];

问题二:指标采集不完整

现象:部分指标无法获取。

解决方案

php
$metrics['memory_usage'] = $nodes[0]['mem_used'] ?? 0;
$metrics['disk_free'] = $nodes[0]['disk_free'] ?? PHP_INT_MAX;

问题三:告警风暴

现象:短时间内大量告警。

解决方案

php
class AlertThrottler
{
    private $cooldownPeriod = 300;
    private $alertCache = [];
    
    public function shouldAlert($metricName, $level)
    {
        $key = "{$metricName}:{$level}";
        $now = time();
        
        if (isset($this->alertCache[$key])) {
            if ($now - $this->alertCache[$key] < $this->cooldownPeriod) {
                return false;
            }
        }
        
        $this->alertCache[$key] = $now;
        return true;
    }
}

最佳实践

1. 阈值设置建议

指标类型警告阈值严重阈值说明
资源使用率70-80%85-95%留有处理时间
资源剩余20-30%5-10%保证安全余量
消息堆积业务相关业务相关根据处理能力设置

2. 告警优先级

  1. Critical: 服务不可用、数据丢失风险
  2. Warning: 性能下降、资源紧张
  3. Info: 状态变化、趋势预警

3. 指标监控频率

指标类型采集频率检查频率
资源类15秒30秒
连接类30秒1分钟
队列类30秒1分钟
消息类1分钟5分钟

相关链接