Skip to content

监控告警实践

概述

完善的监控告警体系是保障 RabbitMQ 系统稳定运行的关键。本文档介绍 RabbitMQ 监控指标、告警配置和可视化方案的最佳实践。

监控架构

┌─────────────────────────────────────────────────────────────────────────┐
│                        监控告警架构                                      │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                        数据采集层                                │   │
│  │                                                                  │   │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐             │   │
│  │  │ RabbitMQ    │  │  RabbitMQ   │  │  RabbitMQ   │             │   │
│  │  │   Node 1    │  │   Node 2    │  │   Node 3    │             │   │
│  │  │             │  │             │  │             │             │   │
│  │  │ ┌─────────┐ │  │ ┌─────────┐ │  │ ┌─────────┐ │             │   │
│  │  │ │Prometheus│ │  │ │Prometheus│ │  │ │Prometheus│ │             │   │
│  │  │ │ Exporter │ │  │ │ Exporter │ │  │ │ Exporter │ │             │   │
│  │  │ └────┬────┘ │  │ └────┬────┘ │  │ └────┬────┘ │             │   │
│  │  └──────┼──────┘  └──────┼──────┘  └──────┼──────┘             │   │
│  │         │                │                │                     │   │
│  └─────────┼────────────────┼────────────────┼─────────────────────┘   │
│            │                │                │                         │
│            ▼                ▼                ▼                         │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                        数据存储层                                │   │
│  │                                                                  │   │
│  │  ┌─────────────────────────────────────────────────────────┐   │   │
│  │  │                    Prometheus Server                      │   │   │
│  │  │                                                           │   │   │
│  │  │   • 时序数据存储    • 数据聚合    • 查询接口              │   │   │
│  │  │                                                           │   │   │
│  │  └─────────────────────────────────────────────────────────┘   │   │
│  │                                                                  │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│            │                                                            │
│            ▼                                                            │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                        可视化告警层                              │   │
│  │                                                                  │   │
│  │  ┌─────────────────┐  ┌─────────────────┐                      │   │
│  │  │     Grafana     │  │  AlertManager   │                      │   │
│  │  │                 │  │                 │                      │   │
│  │  │ • 仪表盘展示    │  │ • 告警路由      │                      │   │
│  │  │ • 图表配置      │  │ • 告警分组      │                      │   │
│  │  │ • 告警规则      │  │ • 通知发送      │                      │   │
│  │  └─────────────────┘  └─────────────────┘                      │   │
│  │                                                                  │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

核心监控指标

1. 节点指标

指标名称说明告警阈值
rabbitmq_up节点状态0 表示宕机
rabbitmq_node_memory_used内存使用量> 80% 水位线
rabbitmq_node_disk_free磁盘剩余空间< 1GB
rabbitmq_node_fd_used文件描述符使用> 80%
rabbitmq_node_sockets_usedSocket 使用数> 80%
rabbitmq_node_proc_usedErlang 进程数> 80%

2. 队列指标

指标名称说明告警阈值
rabbitmq_queue_messages队列消息数根据业务设定
rabbitmq_queue_messages_ready待消费消息数持续增长告警
rabbitmq_queue_messages_unacked未确认消息数过高告警
rabbitmq_queue_consumers消费者数量0 告警
rabbitmq_queue_message_bytes消息字节数根据容量设定

3. 连接指标

指标名称说明告警阈值
rabbitmq_connections连接数接近上限告警
rabbitmq_channels通道数接近上限告警
rabbitmq_connection_closed_total连接关闭数异常增长告警

4. 吞吐量指标

指标名称说明告警阈值
rabbitmq_channel_messages_published_total发布消息数下降告警
rabbitmq_channel_messages_delivered_total投递消息数下降告警
rabbitmq_channel_messages_acked_total确认消息数下降告警

PHP 代码示例

正确做法:自定义监控指标采集

php
<?php

namespace App\Messaging\Monitoring;

use PhpAmqpLib\Connection\AMQPStreamConnection;

class MetricsCollector
{
    private array $nodes;
    private string $user;
    private string $password;
    
    public function __construct(array $nodes, string $user, string $password)
    {
        $this->nodes = $nodes;
        $this->user = $user;
        $this->password = $password;
    }
    
    public function collectAllMetrics(): array
    {
        $metrics = [
            'timestamp' => time(),
            'nodes' => [],
            'queues' => [],
            'overview' => [],
        ];
        
        foreach ($this->nodes as $node) {
            $metrics['nodes'][$node['host']] = $this->collectNodeMetrics($node);
        }
        
        $metrics['overview'] = $this->collectOverviewMetrics();
        $metrics['queues'] = $this->collectQueueMetrics();
        
        return $metrics;
    }
    
    private function collectNodeMetrics(array $node): array
    {
        $url = sprintf('http://%s:15672/api/nodes/%s', $node['host'], $node['name']);
        
        $data = $this->httpGet($url);
        
        return [
            'name' => $data['name'] ?? '',
            'running' => $data['running'] ?? false,
            'memory_used' => $data['mem_used'] ?? 0,
            'memory_limit' => $data['mem_limit'] ?? 0,
            'memory_usage_percent' => $this->calculatePercentage(
                $data['mem_used'] ?? 0,
                $data['mem_limit'] ?? 1
            ),
            'disk_free' => $data['disk_free'] ?? 0,
            'disk_free_limit' => $data['disk_free_limit'] ?? 0,
            'fd_used' => $data['fd_used'] ?? 0,
            'fd_total' => $data['fd_total'] ?? 0,
            'sockets_used' => $data['sockets_used'] ?? 0,
            'sockets_total' => $data['sockets_total'] ?? 0,
            'proc_used' => $data['proc_used'] ?? 0,
            'proc_total' => $data['proc_total'] ?? 0,
            'uptime' => $data['uptime'] ?? 0,
        ];
    }
    
    private function collectOverviewMetrics(): array
    {
        $url = sprintf('http://%s:15672/api/overview', $this->nodes[0]['host']);
        
        $data = $this->httpGet($url);
        
        return [
            'queue_totals' => $data['queue_totals'] ?? [],
            'message_stats' => $data['message_stats'] ?? [],
            'object_totals' => [
                'connections' => $data['object_totals']['connections'] ?? 0,
                'channels' => $data['object_totals']['channels'] ?? 0,
                'queues' => $data['object_totals']['queues'] ?? 0,
                'consumers' => $data['object_totals']['consumers'] ?? 0,
            ],
            'listeners' => $data['listeners'] ?? [],
            'cluster_name' => $data['cluster_name'] ?? '',
        ];
    }
    
    private function collectQueueMetrics(): array
    {
        $url = sprintf('http://%s:15672/api/queues', $this->nodes[0]['host']);
        
        $queues = $this->httpGet($url);
        
        $metrics = [];
        foreach ($queues as $queue) {
            $metrics[$queue['name']] = [
                'name' => $queue['name'],
                'vhost' => $queue['vhost'],
                'messages' => $queue['messages'] ?? 0,
                'messages_ready' => $queue['messages_ready'] ?? 0,
                'messages_unacked' => $queue['messages_unacked'] ?? 0,
                'consumers' => $queue['consumers'] ?? 0,
                'message_bytes' => $queue['message_bytes'] ?? 0,
                'state' => $queue['state'] ?? 'unknown',
                'type' => $queue['type'] ?? 'classic',
            ];
        }
        
        return $metrics;
    }
    
    private function calculatePercentage(int $used, int $total): float
    {
        if ($total === 0) {
            return 0;
        }
        return round(($used / $total) * 100, 2);
    }
    
    private function httpGet(string $url): array
    {
        $ch = curl_init();
        curl_setopt_array($ch, [
            CURLOPT_URL => $url,
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_USERPWD => $this->user . ':' . $this->password,
            CURLOPT_TIMEOUT => 10,
        ]);
        
        $response = curl_exec($ch);
        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        curl_close($ch);
        
        if ($httpCode !== 200) {
            return [];
        }
        
        return json_decode($response, true) ?? [];
    }
}

告警规则配置

php
<?php

namespace App\Messaging\Monitoring;

class AlertRuleEngine
{
    private MetricsCollector $collector;
    private array $rules;
    private array $alertChannels;
    
    public function __construct(
        MetricsCollector $collector,
        array $rules,
        array $alertChannels
    ) {
        $this->collector = $collector;
        $this->rules = $rules;
        $this->alertChannels = $alertChannels;
    }
    
    public function evaluate(): array
    {
        $metrics = $this->collector->collectAllMetrics();
        $alerts = [];
        
        foreach ($this->rules as $rule) {
            $result = $this->evaluateRule($rule, $metrics);
            
            if ($result['triggered']) {
                $alerts[] = $result;
                $this->sendAlert($result);
            }
        }
        
        return $alerts;
    }
    
    private function evaluateRule(array $rule, array $metrics): array
    {
        $value = $this->extractValue($rule['metric'], $metrics);
        $triggered = false;
        
        switch ($rule['operator']) {
            case '>':
                $triggered = $value > $rule['threshold'];
                break;
            case '<':
                $triggered = $value < $rule['threshold'];
                break;
            case '==':
                $triggered = $value == $rule['threshold'];
                break;
            case '!=':
                $triggered = $value != $rule['threshold'];
                break;
        }
        
        return [
            'rule_name' => $rule['name'],
            'metric' => $rule['metric'],
            'value' => $value,
            'threshold' => $rule['threshold'],
            'operator' => $rule['operator'],
            'severity' => $rule['severity'],
            'triggered' => $triggered,
            'message' => $this->buildAlertMessage($rule, $value),
            'timestamp' => time(),
        ];
    }
    
    private function extractValue(string $metric, array $metrics)
    {
        $keys = explode('.', $metric);
        $value = $metrics;
        
        foreach ($keys as $key) {
            if (!isset($value[$key])) {
                return null;
            }
            $value = $value[$key];
        }
        
        return $value;
    }
    
    private function buildAlertMessage(array $rule, $value): string
    {
        return sprintf(
            'Alert: %s - Current value: %s, Threshold: %s %s',
            $rule['description'] ?? $rule['name'],
            $value,
            $rule['operator'],
            $rule['threshold']
        );
    }
    
    private function sendAlert(array $alert): void
    {
        foreach ($this->alertChannels as $channel) {
            $channel->send($alert);
        }
    }
}

class AlertChannel
{
    private string $type;
    private array $config;
    
    public function __construct(string $type, array $config)
    {
        $this->type = $type;
        $this->config = $config;
    }
    
    public function send(array $alert): void
    {
        switch ($this->type) {
            case 'email':
                $this->sendEmail($alert);
                break;
            case 'slack':
                $this->sendSlack($alert);
                break;
            case 'webhook':
                $this->sendWebhook($alert);
                break;
        }
    }
    
    private function sendEmail(array $alert): void
    {
        $to = $this->config['to'];
        $subject = sprintf('[%s] RabbitMQ Alert: %s', $alert['severity'], $alert['rule_name']);
        $body = $alert['message'];
        
        mail($to, $subject, $body);
    }
    
    private function sendSlack(array $alert): void
    {
        $webhook = $this->config['webhook_url'];
        
        $color = match ($alert['severity']) {
            'critical' => 'danger',
            'warning' => 'warning',
            default => 'good',
        };
        
        $payload = [
            'attachments' => [
                [
                    'color' => $color,
                    'title' => 'RabbitMQ Alert',
                    'fields' => [
                        ['title' => 'Rule', 'value' => $alert['rule_name'], 'short' => true],
                        ['title' => 'Severity', 'value' => $alert['severity'], 'short' => true],
                        ['title' => 'Current Value', 'value' => $alert['value'], 'short' => true],
                        ['title' => 'Threshold', 'value' => $alert['threshold'], 'short' => true],
                        ['title' => 'Message', 'value' => $alert['message'], 'short' => false],
                    ],
                    'timestamp' => $alert['timestamp'],
                ],
            ],
        ];
        
        $ch = curl_init($webhook);
        curl_setopt_array($ch, [
            CURLOPT_POST => true,
            CURLOPT_POSTFIELDS => json_encode($payload),
            CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
            CURLOPT_RETURNTRANSFER => true,
        ]);
        curl_exec($ch);
        curl_close($ch);
    }
    
    private function sendWebhook(array $alert): void
    {
        $ch = curl_init($this->config['url']);
        curl_setopt_array($ch, [
            CURLOPT_POST => true,
            CURLOPT_POSTFIELDS => json_encode($alert),
            CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
            CURLOPT_RETURNTRANSFER => true,
        ]);
        curl_exec($ch);
        curl_close($ch);
    }
}

默认告警规则

php
<?php

return [
    'rules' => [
        [
            'name' => 'node_down',
            'metric' => 'nodes.*.running',
            'operator' => '==',
            'threshold' => false,
            'severity' => 'critical',
            'description' => 'RabbitMQ node is down',
        ],
        [
            'name' => 'memory_high',
            'metric' => 'nodes.*.memory_usage_percent',
            'operator' => '>',
            'threshold' => 80,
            'severity' => 'warning',
            'description' => 'Memory usage is high',
        ],
        [
            'name' => 'memory_critical',
            'metric' => 'nodes.*.memory_usage_percent',
            'operator' => '>',
            'threshold' => 90,
            'severity' => 'critical',
            'description' => 'Memory usage is critical',
        ],
        [
            'name' => 'disk_low',
            'metric' => 'nodes.*.disk_free',
            'operator' => '<',
            'threshold' => 1073741824,
            'severity' => 'critical',
            'description' => 'Disk space is low (< 1GB)',
        ],
        [
            'name' => 'queue_no_consumer',
            'metric' => 'queues.*.consumers',
            'operator' => '==',
            'threshold' => 0,
            'severity' => 'warning',
            'description' => 'Queue has no consumers',
        ],
        [
            'name' => 'queue_messages_high',
            'metric' => 'queues.*.messages',
            'operator' => '>',
            'threshold' => 10000,
            'severity' => 'warning',
            'description' => 'Queue has too many messages',
        ],
        [
            'name' => 'fd_high',
            'metric' => 'nodes.*.fd_used',
            'operator' => '>',
            'threshold' => 8000,
            'severity' => 'warning',
            'description' => 'File descriptor usage is high',
        ],
    ],
];

Prometheus 配置

yaml
# prometheus.yml

global:
  scrape_interval: 15s
  evaluation_interval: 15s

alerting:
  alertmanagers:
    - static_configs:
        - targets:
          - alertmanager:9093

rule_files:
  - /etc/prometheus/rules/*.yml

scrape_configs:
  - job_name: 'rabbitmq'
    static_configs:
      - targets:
        - rabbitmq-node1:15692
        - rabbitmq-node2:15692
        - rabbitmq-node3:15692
    metrics_path: /metrics

Prometheus 告警规则

yaml
# rabbitmq_alerts.yml

groups:
  - name: rabbitmq
    rules:
      - alert: RabbitMQNodeDown
        expr: rabbitmq_up == 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "RabbitMQ node is down"
          description: "RabbitMQ node {{ $labels.instance }} has been down for more than 1 minute."

      - alert: RabbitMQMemoryHigh
        expr: (rabbitmq_node_memory_used / rabbitmq_node_memory_limit) * 100 > 80
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "RabbitMQ memory usage is high"
          description: "Memory usage on {{ $labels.instance }} is {{ $value }}%."

      - alert: RabbitMQMemoryCritical
        expr: (rabbitmq_node_memory_used / rabbitmq_node_memory_limit) * 100 > 90
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "RabbitMQ memory usage is critical"
          description: "Memory usage on {{ $labels.instance }} is {{ $value }}%."

      - alert: RabbitMQDiskSpaceLow
        expr: rabbitmq_node_disk_free < 1073741824
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "RabbitMQ disk space is low"
          description: "Disk space on {{ $labels.instance }} is {{ $value }} bytes."

      - alert: RabbitMQQueueNoConsumer
        expr: rabbitmq_queue_consumers == 0 and rabbitmq_queue_messages > 0
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Queue has no consumers"
          description: "Queue {{ $labels.queue }} in vhost {{ $labels.vhost }} has {{ $value }} messages but no consumers."

      - alert: RabbitMQQueueMessagesHigh
        expr: rabbitmq_queue_messages > 10000
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Queue has too many messages"
          description: "Queue {{ $labels.queue }} has {{ $value }} messages."

      - alert: RabbitMQConnectionsHigh
        expr: rabbitmq_connections > 8000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High number of connections"
          description: "RabbitMQ has {{ $value }} connections."

      - alert: RabbitMQUnackedMessagesHigh
        expr: sum(rabbitmq_queue_messages_unacked) > 1000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High number of unacked messages"
          description: "Total unacked messages is {{ $value }}."

Grafana Dashboard JSON

json
{
  "dashboard": {
    "title": "RabbitMQ Monitoring",
    "panels": [
      {
        "title": "Node Status",
        "type": "stat",
        "targets": [
          {
            "expr": "rabbitmq_up",
            "legendFormat": "{{ instance }}"
          }
        ]
      },
      {
        "title": "Memory Usage",
        "type": "gauge",
        "targets": [
          {
            "expr": "(rabbitmq_node_memory_used / rabbitmq_node_memory_limit) * 100",
            "legendFormat": "{{ instance }}"
          }
        ],
        "fieldConfig": {
          "defaults": {
            "max": 100,
            "min": 0,
            "unit": "percent",
            "thresholds": {
              "mode": "absolute",
              "steps": [
                {"color": "green", "value": null},
                {"color": "yellow", "value": 70},
                {"color": "red", "value": 85}
              ]
            }
          }
        }
      },
      {
        "title": "Queue Messages",
        "type": "graph",
        "targets": [
          {
            "expr": "sum(rabbitmq_queue_messages) by (queue)",
            "legendFormat": "{{ queue }}"
          }
        ]
      },
      {
        "title": "Message Rate",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(rabbitmq_channel_messages_published_total[1m])",
            "legendFormat": "Published"
          },
          {
            "expr": "rate(rabbitmq_channel_messages_delivered_total[1m])",
            "legendFormat": "Delivered"
          },
          {
            "expr": "rate(rabbitmq_channel_messages_acked_total[1m])",
            "legendFormat": "Acked"
          }
        ]
      }
    ]
  }
}

最佳实践建议清单

监控配置

  • [ ] 部署 Prometheus + Grafana
  • [ ] 配置节点指标采集
  • [ ] 配置队列指标采集
  • [ ] 配置连接指标采集
  • [ ] 配置吞吐量指标采集

告警配置

  • [ ] 配置节点状态告警
  • [ ] 配置资源使用告警
  • [ ] 配置队列状态告警
  • [ ] 配置消息积压告警
  • [ ] 配置多渠道通知

可视化配置

  • [ ] 创建节点概览仪表盘
  • [ ] 创建队列详情仪表盘
  • [ ] 创建消息流量仪表盘
  • [ ] 创建告警历史仪表盘

运维配置

  • [ ] 定期检查告警规则
  • [ ] 定期优化仪表盘
  • [ ] 定期清理历史数据
  • [ ] 定期演练告警响应

生产环境注意事项

  1. 监控数据保留

    • 设置合理的数据保留周期
    • 配置数据降采样策略
    • 定期清理过期数据
  2. 告警策略

    • 避免告警风暴
    • 配置告警分组
    • 设置告警静默期
  3. 高可用监控

    • 监控系统自身高可用
    • 配置监控系统告警
    • 准备备用监控方案
  4. 性能影响

    • 控制采集频率
    • 优化查询性能
    • 合理配置资源

相关链接