Appearance
RabbitMQ 告警指标
概述
告警指标是监控系统的核心,合理选择和配置告警指标可以及时发现和预警系统问题。本文将详细介绍 RabbitMQ 的关键告警指标、指标阈值设置和告警级别划分。
核心知识点
告警指标分类
| 分类 | 指标 | 说明 |
|---|---|---|
| 资源类 | 内存、磁盘、CPU、文件描述符 | 系统资源使用情况 |
| 连接类 | 连接数、通道数、消费者数 | 客户端连接状态 |
| 队列类 | 消息数、消费速率、队列长度 | 队列运行状态 |
| 消息类 | 发布速率、确认速率、重投递率 | 消息流转情况 |
| 集群类 | 节点状态、分区状态、同步状态 | 集群健康状态 |
告警级别定义
| 级别 | 颜色 | 说明 | 响应时间 |
|---|---|---|---|
| Critical | 红色 | 严重故障,影响业务 | 5 分钟内 |
| Warning | 橙色 | 警告,需要关注 | 30 分钟内 |
| Info | 蓝色 | 信息,仅供参考 | 无需响应 |
指标阈值原则
- 基于历史数据:参考历史运行数据设置
- 留有余量:在临界值之前告警
- 分级设置:不同级别设置不同阈值
- 动态调整:根据业务变化调整
配置示例
关键告警指标详解
php
<?php
class RabbitMQAlertMetrics
{
private $metrics;
public function __construct()
{
$this->metrics = $this->defineMetrics();
}
private function defineMetrics()
{
return [
'memory_usage' => [
'name' => '内存使用率',
'description' => 'RabbitMQ 节点内存使用占总内存的百分比',
'unit' => '%',
'thresholds' => [
'warning' => 70,
'critical' => 85,
],
'calculation' => '(mem_used / mem_limit) * 100',
'impact' => '内存不足会触发流控,影响消息吞吐',
'recommendation' => '增加内存或优化消息处理',
],
'disk_free' => [
'name' => '磁盘可用空间',
'description' => 'RabbitMQ 数据目录所在磁盘的可用空间',
'unit' => 'GB',
'thresholds' => [
'warning' => 10,
'critical' => 5,
],
'calculation' => 'disk_free / 1024 / 1024 / 1024',
'impact' => '磁盘空间不足会阻止消息写入',
'recommendation' => '清理日志或扩展磁盘容量',
],
'disk_alarm' => [
'name' => '磁盘告警状态',
'description' => '磁盘空间是否触发告警阈值',
'unit' => 'boolean',
'thresholds' => [
'critical' => true,
],
'calculation' => 'disk_free_alarm == true',
'impact' => '触发磁盘告警会阻塞所有生产者',
'recommendation' => '立即释放磁盘空间',
],
'memory_alarm' => [
'name' => '内存告警状态',
'description' => '内存是否触发告警阈值',
'unit' => 'boolean',
'thresholds' => [
'critical' => true,
],
'calculation' => 'mem_alarm == true',
'impact' => '触发内存告警会阻塞所有生产者',
'recommendation' => '立即释放内存或增加内存限制',
],
'queue_messages' => [
'name' => '队列消息数',
'description' => '队列中等待消费的消息总数',
'unit' => '条',
'thresholds' => [
'warning' => 50000,
'critical' => 100000,
],
'calculation' => 'queue_messages',
'impact' => '消息堆积会消耗内存和磁盘',
'recommendation' => '增加消费者或优化消费逻辑',
],
'queue_messages_ready' => [
'name' => '待消费消息数',
'description' => '队列中等待被消费的消息数',
'unit' => '条',
'thresholds' => [
'warning' => 30000,
'critical' => 80000,
],
'calculation' => 'messages_ready',
'impact' => '消息堆积影响业务处理时效',
'recommendation' => '检查消费者状态和消费速率',
],
'queue_messages_unacked' => [
'name' => '未确认消息数',
'description' => '已投递但未被确认的消息数',
'unit' => '条',
'thresholds' => [
'warning' => 10000,
'critical' => 50000,
],
'calculation' => 'messages_unacked',
'impact' => '未确认消息过多可能导致消息重复',
'recommendation' => '检查消费者确认逻辑',
],
'connections_total' => [
'name' => '总连接数',
'description' => '当前活跃的 TCP 连接总数',
'unit' => '个',
'thresholds' => [
'warning' => 800,
'critical' => 950,
],
'calculation' => 'object_totals.connections',
'impact' => '连接数过多消耗系统资源',
'recommendation' => '优化连接池配置',
],
'channels_total' => [
'name' => '总通道数',
'description' => '当前活跃的 AMQP 通道总数',
'unit' => '个',
'thresholds' => [
'warning' => 5000,
'critical' => 8000,
],
'calculation' => 'object_totals.channels',
'impact' => '通道数过多影响性能',
'recommendation' => '优化通道使用策略',
],
'consumers_total' => [
'name' => '总消费者数',
'description' => '当前活跃的消费者总数',
'unit' => '个',
'thresholds' => [
'warning' => 1000,
'critical' => 2000,
],
'calculation' => 'object_totals.consumers',
'impact' => '消费者过多增加调度开销',
'recommendation' => '合并消费者或优化消费逻辑',
],
'fd_usage' => [
'name' => '文件描述符使用率',
'description' => '已使用文件描述符占总数的百分比',
'unit' => '%',
'thresholds' => [
'warning' => 80,
'critical' => 90,
],
'calculation' => '(fd_used / fd_total) * 100',
'impact' => '文件描述符耗尽会导致无法接受新连接',
'recommendation' => '增加系统文件描述符限制',
],
'sockets_usage' => [
'name' => 'Socket 使用率',
'description' => '已使用 Socket 占总数的百分比',
'unit' => '%',
'thresholds' => [
'warning' => 80,
'critical' => 90,
],
'calculation' => '(sockets_used / sockets_total) * 100',
'impact' => 'Socket 耗尽会导致连接失败',
'recommendation' => '增加 Socket 限制或优化连接复用',
],
'proc_usage' => [
'name' => 'Erlang 进程使用率',
'description' => '已使用 Erlang 进程占总数的百分比',
'unit' => '%',
'thresholds' => [
'warning' => 80,
'critical' => 90,
],
'calculation' => '(proc_used / proc_total) * 100',
'impact' => '进程数耗尽会导致服务不可用',
'recommendation' => '增加 Erlang 进程限制',
],
'node_status' => [
'name' => '节点状态',
'description' => 'RabbitMQ 节点是否正常运行',
'unit' => 'boolean',
'thresholds' => [
'critical' => false,
],
'calculation' => 'running == true',
'impact' => '节点宕机会影响服务可用性',
'recommendation' => '立即检查节点状态并恢复',
],
'cluster_partition' => [
'name' => '集群分区',
'description' => '集群是否存在网络分区',
'unit' => 'boolean',
'thresholds' => [
'critical' => true,
],
'calculation' => 'partitions.length > 0',
'impact' => '网络分区会导致数据不一致',
'recommendation' => '检查网络并处理分区',
],
'queue_consumer_count' => [
'name' => '队列消费者数',
'description' => '队列当前的消费者数量',
'unit' => '个',
'thresholds' => [
'warning' => 0,
],
'condition' => 'queue_messages > 1000',
'calculation' => 'consumers',
'impact' => '无消费者会导致消息堆积',
'recommendation' => '启动消费者或检查消费者状态',
],
'message_publish_rate' => [
'name' => '消息发布速率',
'description' => '每秒发布的消息数量',
'unit' => '条/秒',
'thresholds' => [
'warning' => 50000,
'critical' => 100000,
],
'calculation' => 'message_stats.publish_details.rate',
'impact' => '发布速率过高可能导致处理延迟',
'recommendation' => '优化生产者或扩展集群',
],
'message_redeliver_rate' => [
'name' => '消息重投递率',
'description' => '消息重投递占总投递的百分比',
'unit' => '%',
'thresholds' => [
'warning' => 10,
'critical' => 30,
],
'calculation' => '(redeliver / deliver) * 100',
'impact' => '重投递过多影响性能和业务',
'recommendation' => '检查消费者处理逻辑',
],
];
}
public function getMetrics()
{
return $this->metrics;
}
public function getMetric($name)
{
return $this->metrics[$name] ?? null;
}
public function getThresholds($name)
{
$metric = $this->getMetric($name);
return $metric['thresholds'] ?? [];
}
public function checkThreshold($name, $value)
{
$metric = $this->getMetric($name);
if (!$metric) {
return ['level' => 'unknown', 'message' => 'Unknown metric'];
}
$thresholds = $metric['thresholds'];
if (isset($thresholds['critical'])) {
if ($this->exceedsThreshold($value, $thresholds['critical'], $name)) {
return [
'level' => 'critical',
'value' => $value,
'threshold' => $thresholds['critical'],
'message' => "{$metric['name']} 达到严重阈值",
'impact' => $metric['impact'],
'recommendation' => $metric['recommendation'],
];
}
}
if (isset($thresholds['warning'])) {
if ($this->exceedsThreshold($value, $thresholds['warning'], $name)) {
return [
'level' => 'warning',
'value' => $value,
'threshold' => $thresholds['warning'],
'message' => "{$metric['name']} 达到警告阈值",
'impact' => $metric['impact'],
'recommendation' => $metric['recommendation'],
];
}
}
return [
'level' => 'ok',
'value' => $value,
'message' => "{$metric['name']} 正常",
];
}
private function exceedsThreshold($value, $threshold, $metricName)
{
$lowerIsBetter = in_array($metricName, ['disk_free', 'queue_consumer_count', 'node_status']);
if ($lowerIsBetter) {
if ($metricName === 'node_status') {
return !$value;
}
if ($metricName === 'queue_consumer_count') {
return $value === 0;
}
return $value < $threshold;
}
return $value > $threshold;
}
public function checkAllMetrics(array $currentValues)
{
$results = [];
foreach ($currentValues as $name => $value) {
if (isset($this->metrics[$name])) {
$results[$name] = $this->checkThreshold($name, $value);
}
}
return $results;
}
public function getAlerts(array $currentValues)
{
$results = $this->checkAllMetrics($currentValues);
$alerts = [];
foreach ($results as $name => $result) {
if (in_array($result['level'], ['warning', 'critical'])) {
$alerts[] = array_merge(['metric' => $name], $result);
}
}
usort($alerts, function($a, $b) {
$order = ['critical' => 0, 'warning' => 1];
return $order[$a['level']] <=> $order[$b['level']];
});
return $alerts;
}
public function generateMetricsReport()
{
$report = "RabbitMQ 告警指标配置报告\n";
$report .= str_repeat("=", 60) . "\n\n";
foreach ($this->metrics as $name => $metric) {
$report .= "【{$metric['name']}】\n";
$report .= "- 指标名称: {$name}\n";
$report .= "- 说明: {$metric['description']}\n";
$report .= "- 单位: {$metric['unit']}\n";
$thresholds = [];
foreach ($metric['thresholds'] as $level => $value) {
$thresholds[] = "{$level}: {$value}";
}
$report .= "- 阈值: " . implode(', ', $thresholds) . "\n";
$report .= "- 影响: {$metric['impact']}\n";
$report .= "- 建议: {$metric['recommendation']}\n\n";
}
return $report;
}
public function exportToPrometheus()
{
$output = [];
foreach ($this->metrics as $name => $metric) {
$output[] = "# HELP rabbitmq_alert_threshold_{$name} {$metric['description']}";
$output[] = "# TYPE rabbitmq_alert_threshold_{$name} gauge";
foreach ($metric['thresholds'] as $level => $value) {
$output[] = "rabbitmq_alert_threshold_{$name}{level=\"{$level}\"} {$value}";
}
}
return implode("\n", $output);
}
}指标阈值配置文件
php
<?php
return [
'memory_usage' => [
'warning' => 70,
'critical' => 85,
],
'disk_free' => [
'warning' => 10,
'critical' => 5,
],
'queue_messages' => [
'warning' => 50000,
'critical' => 100000,
],
'connections_total' => [
'warning' => 800,
'critical' => 950,
],
'channels_total' => [
'warning' => 5000,
'critical' => 8000,
],
'fd_usage' => [
'warning' => 80,
'critical' => 90,
],
'sockets_usage' => [
'warning' => 80,
'critical' => 90,
],
'proc_usage' => [
'warning' => 80,
'critical' => 90,
],
'message_redeliver_rate' => [
'warning' => 10,
'critical' => 30,
],
];实际应用场景
场景一:动态阈值调整
php
<?php
class DynamicThresholdManager
{
private $metrics;
private $historyFile;
public function __construct(RabbitMQAlertMetrics $metrics, $historyFile = '/var/lib/rabbitmq/threshold_history.json')
{
$this->metrics = $metrics;
$this->historyFile = $historyFile;
}
public function adjustThresholdsBasedOnHistory($days = 30)
{
$history = $this->loadHistory();
$adjustments = [];
foreach ($history as $metricName => $values) {
if (count($values) < 100) {
continue;
}
$avg = array_sum($values) / count($values);
$max = max($values);
$p95 = $this->percentile($values, 95);
$currentThresholds = $this->metrics->getThresholds($metricName);
$newWarningThreshold = $p95 * 1.1;
$newCriticalThreshold = $max * 1.2;
if ($newWarningThreshold > ($currentThresholds['warning'] ?? 0) * 1.2) {
$adjustments[$metricName] = [
'old_warning' => $currentThresholds['warning'] ?? null,
'new_warning' => round($newWarningThreshold, 2),
'old_critical' => $currentThresholds['critical'] ?? null,
'new_critical' => round($newCriticalThreshold, 2),
'reason' => 'Based on historical data analysis',
];
}
}
return $adjustments;
}
private function percentile($values, $percentile)
{
sort($values);
$index = ceil(($percentile / 100) * count($values)) - 1;
return $values[$index] ?? 0;
}
private function loadHistory()
{
if (file_exists($this->historyFile)) {
return json_decode(file_get_contents($this->historyFile), true) ?: [];
}
return [];
}
}场景二:指标聚合计算
php
<?php
class MetricsAggregator
{
private $apiClient;
public function __construct($apiClient)
{
$this->apiClient = $apiClient;
}
public function aggregateMetrics()
{
$overview = $this->apiClient->getOverview();
$nodes = $this->apiClient->getNodes();
$queues = $this->apiClient->getQueues();
$metrics = [];
$metrics['memory_usage'] = $this->calculateMemoryUsage($nodes);
$metrics['disk_free'] = $this->calculateDiskFree($nodes);
$metrics['disk_alarm'] = $this->checkDiskAlarm($nodes);
$metrics['memory_alarm'] = $this->checkMemoryAlarm($nodes);
$metrics['queue_messages'] = $this->calculateTotalMessages($queues);
$metrics['queue_messages_ready'] = $this->calculateReadyMessages($queues);
$metrics['queue_messages_unacked'] = $this->calculateUnackedMessages($queues);
$metrics['connections_total'] = $overview['object_totals']['connections'] ?? 0;
$metrics['channels_total'] = $overview['object_totals']['channels'] ?? 0;
$metrics['consumers_total'] = $overview['object_totals']['consumers'] ?? 0;
$metrics['fd_usage'] = $this->calculateFdUsage($nodes);
$metrics['sockets_usage'] = $this->calculateSocketsUsage($nodes);
$metrics['proc_usage'] = $this->calculateProcUsage($nodes);
$metrics['node_status'] = $this->checkNodeStatus($nodes);
$metrics['cluster_partition'] = $this->checkPartitions($nodes);
return $metrics;
}
private function calculateMemoryUsage($nodes)
{
if (empty($nodes)) {
return 0;
}
$totalUsage = 0;
foreach ($nodes as $node) {
$usage = ($node['mem_used'] / $node['mem_limit']) * 100;
$totalUsage += $usage;
}
return $totalUsage / count($nodes);
}
private function calculateDiskFree($nodes)
{
if (empty($nodes)) {
return 0;
}
$minFree = PHP_FLOAT_MAX;
foreach ($nodes as $node) {
$freeGB = $node['disk_free'] / 1024 / 1024 / 1024;
$minFree = min($minFree, $freeGB);
}
return $minFree;
}
private function checkDiskAlarm($nodes)
{
foreach ($nodes as $node) {
if ($node['disk_free_alarm'] ?? false) {
return true;
}
}
return false;
}
private function checkMemoryAlarm($nodes)
{
foreach ($nodes as $node) {
if ($node['mem_alarm'] ?? false) {
return true;
}
}
return false;
}
private function calculateTotalMessages($queues)
{
$total = 0;
foreach ($queues as $queue) {
$total += $queue['messages'] ?? 0;
}
return $total;
}
private function calculateReadyMessages($queues)
{
$total = 0;
foreach ($queues as $queue) {
$total += $queue['messages_ready'] ?? 0;
}
return $total;
}
private function calculateUnackedMessages($queues)
{
$total = 0;
foreach ($queues as $queue) {
$total += $queue['messages_unacked'] ?? 0;
}
return $total;
}
private function calculateFdUsage($nodes)
{
if (empty($nodes)) {
return 0;
}
$totalUsage = 0;
foreach ($nodes as $node) {
$usage = ($node['fd_used'] / $node['fd_total']) * 100;
$totalUsage += $usage;
}
return $totalUsage / count($nodes);
}
private function calculateSocketsUsage($nodes)
{
if (empty($nodes)) {
return 0;
}
$totalUsage = 0;
foreach ($nodes as $node) {
$usage = ($node['sockets_used'] / $node['sockets_total']) * 100;
$totalUsage += $usage;
}
return $totalUsage / count($nodes);
}
private function calculateProcUsage($nodes)
{
if (empty($nodes)) {
return 0;
}
$totalUsage = 0;
foreach ($nodes as $node) {
$usage = ($node['proc_used'] / $node['proc_total']) * 100;
$totalUsage += $usage;
}
return $totalUsage / count($nodes);
}
private function checkNodeStatus($nodes)
{
foreach ($nodes as $node) {
if (!($node['running'] ?? false)) {
return false;
}
}
return true;
}
private function checkPartitions($nodes)
{
foreach ($nodes as $node) {
if (!empty($node['partitions'])) {
return true;
}
}
return false;
}
}常见问题与解决方案
问题一:告警阈值设置不合理
现象:告警频繁或漏报。
解决方案:
php
$thresholds = [
'warning' => $p95 * 1.1,
'critical' => $max * 1.2,
];问题二:指标采集不完整
现象:部分指标无法获取。
解决方案:
php
$metrics['memory_usage'] = $nodes[0]['mem_used'] ?? 0;
$metrics['disk_free'] = $nodes[0]['disk_free'] ?? PHP_INT_MAX;问题三:告警风暴
现象:短时间内大量告警。
解决方案:
php
class AlertThrottler
{
private $cooldownPeriod = 300;
private $alertCache = [];
public function shouldAlert($metricName, $level)
{
$key = "{$metricName}:{$level}";
$now = time();
if (isset($this->alertCache[$key])) {
if ($now - $this->alertCache[$key] < $this->cooldownPeriod) {
return false;
}
}
$this->alertCache[$key] = $now;
return true;
}
}最佳实践
1. 阈值设置建议
| 指标类型 | 警告阈值 | 严重阈值 | 说明 |
|---|---|---|---|
| 资源使用率 | 70-80% | 85-95% | 留有处理时间 |
| 资源剩余 | 20-30% | 5-10% | 保证安全余量 |
| 消息堆积 | 业务相关 | 业务相关 | 根据处理能力设置 |
2. 告警优先级
- Critical: 服务不可用、数据丢失风险
- Warning: 性能下降、资源紧张
- Info: 状态变化、趋势预警
3. 指标监控频率
| 指标类型 | 采集频率 | 检查频率 |
|---|---|---|
| 资源类 | 15秒 | 30秒 |
| 连接类 | 30秒 | 1分钟 |
| 队列类 | 30秒 | 1分钟 |
| 消息类 | 1分钟 | 5分钟 |
