Skip to content

RabbitMQ Prometheus + Grafana 监控

概述

Prometheus + Grafana 是目前最流行的开源监控解决方案组合。Prometheus 负责数据采集和存储,Grafana 负责数据可视化展示。本文将详细介绍如何使用 Prometheus 和 Grafana 构建 RabbitMQ 监控系统。

核心知识点

架构说明

RabbitMQ --> rabbitmq_prometheus 插件 --> Prometheus --> Grafana
                    |                          |              |
               暴露指标端点               采集存储数据      可视化展示

Prometheus 基本概念

概念说明
Metric监控指标,包含名称和值
Label标签,用于区分不同维度的指标
Time Series时间序列数据
Target采集目标
Job一组相同类型的目标

Grafana 基本概念

概念说明
Dashboard仪表板,包含多个面板
Panel面板,展示单个图表或数据
Data Source数据源,连接 Prometheus
Alert告警规则

配置示例

启用 RabbitMQ Prometheus 插件

bash
rabbitmq-plugins enable rabbitmq_prometheus

默认指标端点:http://localhost:15692/metrics

Prometheus 配置

yaml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

alerting:
  alertmanagers:
    - static_configs:
        - targets:
          - alertmanager:9093

rule_files:
  - /etc/prometheus/rules/*.yml

scrape_configs:
  - job_name: 'rabbitmq'
    static_configs:
      - targets:
          - 'rabbitmq-node1:15692'
          - 'rabbitmq-node2:15692'
          - 'rabbitmq-node3:15692'
    relabel_configs:
      - source_labels: [__address__]
        regex: '([^:]+):\d+'
        target_label: instance
        replacement: '$1'

Prometheus 告警规则

yaml
groups:
  - name: rabbitmq_alerts
    rules:
      - alert: RabbitMQDown
        expr: rabbitmq_up == 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "RabbitMQ 节点宕机"
          description: "RabbitMQ 节点 {{ $labels.instance }} 已宕机超过 1 分钟"

      - alert: RabbitMQMemoryHigh
        expr: (rabbitmq_memory_used_bytes / rabbitmq_memory_limit_bytes) * 100 > 80
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "RabbitMQ 内存使用率过高"
          description: "节点 {{ $labels.instance }} 内存使用率 {{ $value | printf \"%.2f\" }}%"

      - alert: RabbitMQDiskSpaceLow
        expr: rabbitmq_disk_free_bytes < 1073741824
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "RabbitMQ 磁盘空间不足"
          description: "节点 {{ $labels.instance }} 剩余磁盘空间 {{ $value | humanize1024 }}B"

      - alert: RabbitMQQueueMessagesHigh
        expr: rabbitmq_queue_messages > 100000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "队列消息堆积过多"
          description: "队列 {{ $labels.queue }} 消息数量 {{ $value }}"

      - alert: RabbitMQNoConsumer
        expr: rabbitmq_queue_consumers == 0 and rabbitmq_queue_messages > 0
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "队列无消费者"
          description: "队列 {{ $labels.queue }} 有 {{ $value }} 条消息但无消费者"

      - alert: RabbitMQConnectionHigh
        expr: rabbitmq_connections > 1000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "连接数过多"
          description: "节点 {{ $labels.instance }} 连接数 {{ $value }}"

      - alert: RabbitMQFileDescriptorsHigh
        expr: (rabbitmq_fd_used / rabbitmq_fd_total) * 100 > 80
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "文件描述符使用率过高"
          description: "节点 {{ $labels.instance }} 文件描述符使用率 {{ $value | printf \"%.2f\" }}%"

      - alert: RabbitMQClusterPartition
        expr: rabbitmq_partitions > 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "集群网络分区"
          description: "节点 {{ $labels.instance }} 检测到网络分区"

Grafana Dashboard JSON

json
{
  "dashboard": {
    "title": "RabbitMQ Monitoring",
    "uid": "rabbitmq-overview",
    "panels": [
      {
        "title": "Overview",
        "type": "row"
      },
      {
        "title": "Messages Ready",
        "type": "gauge",
        "gridPos": {"x": 0, "y": 1, "w": 6, "h": 4},
        "targets": [
          {
            "expr": "sum(rabbitmq_queue_messages_ready)",
            "legendFormat": "Messages Ready"
          }
        ],
        "fieldConfig": {
          "defaults": {
            "thresholds": {
              "mode": "absolute",
              "steps": [
                {"color": "green", "value": null},
                {"color": "yellow", "value": 10000},
                {"color": "red", "value": 50000}
              ]
            }
          }
        }
      },
      {
        "title": "Messages Unacked",
        "type": "gauge",
        "gridPos": {"x": 6, "y": 1, "w": 6, "h": 4},
        "targets": [
          {
            "expr": "sum(rabbitmq_queue_messages_unacked)",
            "legendFormat": "Messages Unacked"
          }
        ]
      },
      {
        "title": "Connections",
        "type": "stat",
        "gridPos": {"x": 12, "y": 1, "w": 4, "h": 4},
        "targets": [
          {
            "expr": "sum(rabbitmq_connections)",
            "legendFormat": "Connections"
          }
        ]
      },
      {
        "title": "Queues",
        "type": "stat",
        "gridPos": {"x": 16, "y": 1, "w": 4, "h": 4},
        "targets": [
          {
            "expr": "count(rabbitmq_queue_messages)",
            "legendFormat": "Queues"
          }
        ]
      },
      {
        "title": "Message Rates",
        "type": "timeseries",
        "gridPos": {"x": 0, "y": 5, "w": 12, "h": 6},
        "targets": [
          {
            "expr": "sum(rate(rabbitmq_channel_messages_published_total[1m]))",
            "legendFormat": "Publish Rate"
          },
          {
            "expr": "sum(rate(rabbitmq_channel_messages_delivered_total[1m]))",
            "legendFormat": "Deliver Rate"
          },
          {
            "expr": "sum(rate(rabbitmq_channel_messages_acked_total[1m]))",
            "legendFormat": "Ack Rate"
          }
        ]
      },
      {
        "title": "Memory Usage",
        "type": "timeseries",
        "gridPos": {"x": 12, "y": 5, "w": 12, "h": 6},
        "targets": [
          {
            "expr": "rabbitmq_memory_used_bytes / 1024 / 1024",
            "legendFormat": "{{ instance }} - Memory (MB)"
          },
          {
            "expr": "rabbitmq_memory_limit_bytes / 1024 / 1024",
            "legendFormat": "{{ instance }} - Limit (MB)"
          }
        ]
      },
      {
        "title": "Queue Messages by Name",
        "type": "timeseries",
        "gridPos": {"x": 0, "y": 11, "w": 24, "h": 6},
        "targets": [
          {
            "expr": "rabbitmq_queue_messages",
            "legendFormat": "{{ queue }}"
          }
        ]
      }
    ]
  }
}

PHP 自定义指标导出器

php
<?php

class RabbitMQPrometheusExporter
{
    private $host;
    private $port;
    private $user;
    private $password;
    
    public function __construct($host = 'localhost', $port = 15672, $user = 'guest', $password = 'guest')
    {
        $this->host = $host;
        $this->port = $port;
        $this->user = $user;
        $this->password = $password;
    }
    
    private function request($endpoint)
    {
        $url = "http://{$this->host}:{$this->port}/api/{$endpoint}";
        $ch = curl_init();
        curl_setopt_array($ch, [
            CURLOPT_URL => $url,
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_USERPWD => "{$this->user}:{$this->password}",
            CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
            CURLOPT_TIMEOUT => 10,
        ]);
        
        $response = curl_exec($ch);
        curl_close($ch);
        
        return json_decode($response, true);
    }
    
    public function export()
    {
        $output = [];
        
        $overview = $this->request('overview');
        $nodes = $this->request('nodes');
        $queues = $this->request('queues');
        
        $output[] = $this->formatMetric('gauge', 'rabbitmq_up', 1, ['instance' => $this->host]);
        
        if (isset($overview['object_totals'])) {
            $totals = $overview['object_totals'];
            $output[] = $this->formatMetric('gauge', 'rabbitmq_connections', $totals['connections'] ?? 0);
            $output[] = $this->formatMetric('gauge', 'rabbitmq_channels', $totals['channels'] ?? 0);
            $output[] = $this->formatMetric('gauge', 'rabbitmq_queues', $totals['queues'] ?? 0);
            $output[] = $this->formatMetric('gauge', 'rabbitmq_exchanges', $totals['exchanges'] ?? 0);
            $output[] = $this->formatMetric('gauge', 'rabbitmq_consumers', $totals['consumers'] ?? 0);
        }
        
        if (isset($overview['queue_totals'])) {
            $queueTotals = $overview['queue_totals'];
            $output[] = $this->formatMetric('gauge', 'rabbitmq_queue_messages_ready', $queueTotals['messages_ready'] ?? 0);
            $output[] = $this->formatMetric('gauge', 'rabbitmq_queue_messages_unacked', $queueTotals['messages_unacked'] ?? 0);
            $output[] = $this->formatMetric('gauge', 'rabbitmq_queue_messages', $queueTotals['messages'] ?? 0);
        }
        
        if (isset($overview['message_stats'])) {
            $stats = $overview['message_stats'];
            $output[] = $this->formatMetric('gauge', 'rabbitmq_message_publish_rate', $stats['publish_details']['rate'] ?? 0);
            $output[] = $this->formatMetric('gauge', 'rabbitmq_message_confirm_rate', $stats['confirm_details']['rate'] ?? 0);
            $output[] = $this->formatMetric('gauge', 'rabbitmq_message_consume_rate', $stats['consume_details']['rate'] ?? 0);
            $output[] = $this->formatMetric('gauge', 'rabbitmq_message_ack_rate', $stats['ack_details']['rate'] ?? 0);
        }
        
        foreach ($nodes as $node) {
            $labels = ['node' => $node['name']];
            $output[] = $this->formatMetric('gauge', 'rabbitmq_memory_used_bytes', $node['mem_used'] ?? 0, $labels);
            $output[] = $this->formatMetric('gauge', 'rabbitmq_memory_limit_bytes', $node['mem_limit'] ?? 0, $labels);
            $output[] = $this->formatMetric('gauge', 'rabbitmq_disk_free_bytes', $node['disk_free'] ?? 0, $labels);
            $output[] = $this->formatMetric('gauge', 'rabbitmq_fd_used', $node['fd_used'] ?? 0, $labels);
            $output[] = $this->formatMetric('gauge', 'rabbitmq_fd_total', $node['fd_total'] ?? 0, $labels);
            $output[] = $this->formatMetric('gauge', 'rabbitmq_sockets_used', $node['sockets_used'] ?? 0, $labels);
            $output[] = $this->formatMetric('gauge', 'rabbitmq_sockets_total', $node['sockets_total'] ?? 0, $labels);
            $output[] = $this->formatMetric('gauge', 'rabbitmq_proc_used', $node['proc_used'] ?? 0, $labels);
            $output[] = $this->formatMetric('gauge', 'rabbitmq_proc_total', $node['proc_total'] ?? 0, $labels);
            $output[] = $this->formatMetric('gauge', 'rabbitmq_mem_alarm', ($node['mem_alarm'] ?? false) ? 1 : 0, $labels);
            $output[] = $this->formatMetric('gauge', 'rabbitmq_disk_alarm', ($node['disk_free_alarm'] ?? false) ? 1 : 0, $labels);
        }
        
        foreach ($queues as $queue) {
            $labels = ['queue' => $queue['name'], 'vhost' => $queue['vhost']];
            $output[] = $this->formatMetric('gauge', 'rabbitmq_queue_messages', $queue['messages'] ?? 0, $labels);
            $output[] = $this->formatMetric('gauge', 'rabbitmq_queue_messages_ready', $queue['messages_ready'] ?? 0, $labels);
            $output[] = $this->formatMetric('gauge', 'rabbitmq_queue_messages_unacked', $queue['messages_unacked'] ?? 0, $labels);
            $output[] = $this->formatMetric('gauge', 'rabbitmq_queue_consumers', $queue['consumers'] ?? 0, $labels);
            $output[] = $this->formatMetric('gauge', 'rabbitmq_queue_memory_bytes', $queue['memory'] ?? 0, $labels);
        }
        
        return implode("\n", $output);
    }
    
    private function formatMetric($type, $name, $value, $labels = [])
    {
        $labelStr = '';
        if (!empty($labels)) {
            $labelPairs = [];
            foreach ($labels as $k => $v) {
                $labelPairs[] = "{$k}=\"{$v}\"";
            }
            $labelStr = '{' . implode(',', $labelPairs) . '}';
        }
        
        return "{$name}{$labelStr} {$value}";
    }
}

if (php_sapi_name() === 'cli-server' || isset($_GET['metrics'])) {
    header('Content-Type: text/plain');
    $exporter = new RabbitMQPrometheusExporter('localhost', 15672, 'admin', 'admin123');
    echo $exporter->export();
}

实际应用场景

场景一:完整监控部署

yaml
version: '3.8'

services:
  prometheus:
    image: prom/prometheus:latest
    container_name: prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
      - ./rules:/etc/prometheus/rules
      - prometheus_data:/prometheus
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.path=/prometheus'
      - '--web.enable-lifecycle'

  alertmanager:
    image: prom/alertmanager:latest
    container_name: alertmanager
    ports:
      - "9093:9093"
    volumes:
      - ./alertmanager.yml:/etc/alertmanager/alertmanager.yml
      - alertmanager_data:/alertmanager
    command:
      - '--config.file=/etc/alertmanager/alertmanager.yml'
      - '--storage.path=/alertmanager'

  grafana:
    image: grafana/grafana:latest
    container_name: grafana
    ports:
      - "3000:3000"
    volumes:
      - grafana_data:/var/lib/grafana
      - ./provisioning:/etc/grafana/provisioning
    environment:
      - GF_SECURITY_ADMIN_USER=admin
      - GF_SECURITY_ADMIN_PASSWORD=admin123
      - GF_USERS_ALLOW_SIGN_UP=false

volumes:
  prometheus_data:
  alertmanager_data:
  grafana_data:

场景二:Alertmanager 配置

yaml
global:
  resolve_timeout: 5m
  smtp_smarthost: 'smtp.example.com:587'
  smtp_from: 'alertmanager@example.com'
  smtp_auth_username: 'alertmanager@example.com'
  smtp_auth_password: 'password'

route:
  group_by: ['alertname', 'severity']
  group_wait: 30s
  group_interval: 5m
  repeat_interval: 4h
  receiver: 'default'
  routes:
    - match:
        severity: critical
      receiver: 'critical'
    - match:
        severity: warning
      receiver: 'warning'

receivers:
  - name: 'default'
    email_configs:
      - to: 'ops@example.com'
        send_resolved: true

  - name: 'critical'
    email_configs:
      - to: 'ops-critical@example.com'
        send_resolved: true
    webhook_configs:
      - url: 'http://webhook-server:8080/alert'
        send_resolved: true
    slack_configs:
      - api_url: 'https://hooks.slack.com/services/xxx'
        channel: '#alerts-critical'
        send_resolved: true

  - name: 'warning'
    email_configs:
      - to: 'ops-warning@example.com'
        send_resolved: true

inhibit_rules:
  - source_match:
      severity: 'critical'
    target_match:
      severity: 'warning'
    equal: ['alertname', 'instance']

场景三:PHP 告警处理 Webhook

php
<?php

class AlertWebhookHandler
{
    private $logFile = '/var/log/rabbitmq/alerts.log';
    
    public function handle()
    {
        $payload = file_get_contents('php://input');
        $data = json_decode($payload, true);
        
        $this->log($data);
        
        if ($data['status'] === 'firing') {
            $this->processAlert($data);
        } else {
            $this->processResolved($data);
        }
        
        return ['status' => 'ok'];
    }
    
    private function processAlert($data)
    {
        foreach ($data['alerts'] as $alert) {
            $alertName = $alert['labels']['alertname'];
            $severity = $alert['labels']['severity'] ?? 'warning';
            
            switch ($alertName) {
                case 'RabbitMQDown':
                    $this->handleNodeDown($alert);
                    break;
                case 'RabbitMQMemoryHigh':
                    $this->handleMemoryHigh($alert);
                    break;
                case 'RabbitMQQueueMessagesHigh':
                    $this->handleQueueBacklog($alert);
                    break;
                case 'RabbitMQNoConsumer':
                    $this->handleNoConsumer($alert);
                    break;
            }
        }
    }
    
    private function handleNodeDown($alert)
    {
        $instance = $alert['labels']['instance'];
        
        $this->sendNotification("CRITICAL: RabbitMQ 节点 {$instance} 宕机", $alert);
        
        $this->executeAutoRecovery($instance);
    }
    
    private function handleMemoryHigh($alert)
    {
        $instance = $alert['labels']['instance'];
        $usage = $alert['annotations']['description'];
        
        $this->sendNotification("WARNING: {$instance} 内存使用过高 - {$usage}", $alert);
    }
    
    private function handleQueueBacklog($alert)
    {
        $queue = $alert['labels']['queue'];
        $messages = $alert['annotations']['description'];
        
        $this->sendNotification("WARNING: 队列 {$queue} 消息堆积 - {$messages}", $alert);
        
        $this->scaleConsumers($queue);
    }
    
    private function handleNoConsumer($alert)
    {
        $queue = $alert['labels']['queue'];
        
        $this->sendNotification("WARNING: 队列 {$queue} 无消费者", $alert);
    }
    
    private function executeAutoRecovery($instance)
    {
        $script = "/opt/rabbitmq/scripts/recover_node.sh {$instance}";
        exec($script, $output, $returnCode);
        
        $this->log([
            'action' => 'auto_recovery',
            'instance' => $instance,
            'script' => $script,
            'return_code' => $returnCode,
            'output' => $output,
        ]);
    }
    
    private function scaleConsumers($queue)
    {
        $script = "/opt/rabbitmq/scripts/scale_consumers.sh {$queue}";
        exec($script, $output, $returnCode);
        
        $this->log([
            'action' => 'scale_consumers',
            'queue' => $queue,
            'return_code' => $returnCode,
        ]);
    }
    
    private function sendNotification($message, $alert)
    {
        $webhookUrl = getenv('SLACK_WEBHOOK_URL');
        if ($webhookUrl) {
            $payload = [
                'text' => $message,
                'attachments' => [
                    [
                        'color' => $alert['labels']['severity'] === 'critical' ? 'danger' : 'warning',
                        'fields' => [
                            ['title' => 'Alert', 'value' => $alert['labels']['alertname'], 'short' => true],
                            ['title' => 'Severity', 'value' => $alert['labels']['severity'], 'short' => true],
                            ['title' => 'Description', 'value' => $alert['annotations']['description']],
                        ],
                    ],
                ],
            ];
            
            $ch = curl_init($webhookUrl);
            curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($payload));
            curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
            curl_exec($ch);
            curl_close($ch);
        }
    }
    
    private function log($data)
    {
        $entry = [
            'timestamp' => date('Y-m-d H:i:s'),
            'data' => $data,
        ];
        
        file_put_contents(
            $this->logFile,
            json_encode($entry) . "\n",
            FILE_APPEND
        );
    }
    
    private function processResolved($data)
    {
        foreach ($data['alerts'] as $alert) {
            $alertName = $alert['labels']['alertname'];
            $this->sendNotification("RESOLVED: {$alertName} 已恢复", $alert);
        }
    }
}

header('Content-Type: application/json');
$handler = new AlertWebhookHandler();
echo json_encode($handler->handle());

常见问题与解决方案

问题一:Prometheus 无法采集指标

现象:Prometheus targets 显示 down。

解决方案

bash
rabbitmq-plugins enable rabbitmq_prometheus

curl http://localhost:15692/metrics

检查网络连通性和防火墙设置。

问题二:Grafana 无法连接 Prometheus

现象:Grafana 数据源测试失败。

解决方案

  1. 确保 Prometheus 正在运行
  2. 检查 URL 配置是否正确
  3. 检查网络策略和防火墙

问题三:指标数据过多

现象:Prometheus 存储增长过快。

解决方案

yaml
global:
  scrape_interval: 30s
  
scrape_configs:
  - job_name: 'rabbitmq'
    metrics_path: /metrics
    params:
      format: ['prometheus']
    static_configs:
      - targets: ['rabbitmq:15692']

最佳实践

1. 指标采集优化

yaml
scrape_configs:
  - job_name: 'rabbitmq'
    scrape_interval: 15s
    scrape_timeout: 10s
    metrics_path: /metrics
    static_configs:
      - targets: ['rabbitmq:15692']

2. 数据保留策略

yaml
global:
  retention: 15d
  retention_size: 50GB

3. 告警分级

级别说明响应时间
Critical严重故障,影响业务5 分钟
Warning警告,需要关注30 分钟
Info信息,仅供参考无需响应

相关链接