Skip to content

RabbitMQ 监控指标体系

概述

监控是保障 RabbitMQ 稳定运行的关键环节。建立完善的监控指标体系,能够帮助运维人员及时发现和解决问题,确保消息队列服务的高可用性。本文将详细介绍 RabbitMQ 的核心监控指标、采集方法和最佳实践。

核心知识点

监控指标分类

RabbitMQ 监控指标可分为以下几个主要类别:

类别说明关键指标
连接指标客户端连接状态连接数、通道数、网络流量
队列指标队列运行状态消息数量、消费速率、队列长度
消息指标消息流转情况发布速率、确认速率、投递速率
资源指标系统资源使用内存、磁盘、CPU、文件描述符
节点指标节点健康状态运行状态、集群状态、分区状态

核心监控指标详解

1. 连接与通道指标

connections_count          # 当前连接数
channels_count             # 当前通道数
consumers_count            # 当前消费者数量
connection_opened_total    # 累计打开连接数
connection_closed_total    # 累计关闭连接数
channel_opened_total       # 累计打开通道数
channel_closed_total       # 累计关闭通道数

2. 队列指标

queue_messages_ready       # 等待消费的消息数
queue_messages_unacked     # 已投递未确认的消息数
queue_messages_total       # 队列消息总数
queue_message_bytes        # 消息占用字节数
queue_consumer_count       # 队列消费者数量
queue_message_stats        # 消息统计信息

3. 消息流转指标

message_publish_rate       # 消息发布速率 (条/秒)
message_confirm_rate       # 消息确认速率 (条/秒)
message_consume_rate       # 消息消费速率 (条/秒)
message_redeliver_rate     # 消息重投递速率 (条/秒)
message_ack_rate           # 消息 ACK 速率 (条/秒)
message_nack_rate          # 消息 NACK 速率 (条/秒)

4. 资源使用指标

mem_used                   # 内存使用量 (字节)
mem_limit                  # 内存限制 (字节)
mem_alarm                  # 内存告警状态
disk_free                  # 磁盘可用空间 (字节)
disk_free_limit            # 磁盘空间限制 (字节)
disk_alarm                 # 磁盘告警状态
fd_used                    # 已使用文件描述符数
fd_total                   # 文件描述符总数
sockets_used               # 已使用 socket 数
sockets_total              # socket 总数
proc_used                  # 已使用 Erlang 进程数
proc_total                 # Erlang 进程总数

5. 节点健康指标

node_running               # 节点运行状态
node_type                  # 节点类型 (disc/ram)
node_uptime                # 节点运行时间
cluster_partitions         # 集群分区数
cluster_nodes              # 集群节点列表

配置示例

通过 HTTP API 获取监控指标

php
<?php

class RabbitMQMetrics
{
    private $host;
    private $port;
    private $user;
    private $password;
    
    public function __construct($host = 'localhost', $port = 15672, $user = 'guest', $password = 'guest')
    {
        $this->host = $host;
        $this->port = $port;
        $this->user = $user;
        $this->password = $password;
    }
    
    private function request($endpoint)
    {
        $url = "http://{$this->host}:{$this->port}/api/{$endpoint}";
        $ch = curl_init();
        curl_setopt_array($ch, [
            CURLOPT_URL => $url,
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_USERPWD => "{$this->user}:{$this->password}",
            CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
            CURLOPT_TIMEOUT => 10,
        ]);
        
        $response = curl_exec($ch);
        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        curl_close($ch);
        
        if ($httpCode !== 200) {
            throw new Exception("API request failed: HTTP {$httpCode}");
        }
        
        return json_decode($response, true);
    }
    
    public function getOverview()
    {
        return $this->request('overview');
    }
    
    public function getNodes()
    {
        return $this->request('nodes');
    }
    
    public function getQueues()
    {
        return $this->request('queues');
    }
    
    public function getConnections()
    {
        return $this->request('connections');
    }
    
    public function getChannels()
    {
        return $this->request('channels');
    }
    
    public function getMetrics()
    {
        $overview = $this->getOverview();
        $nodes = $this->getNodes();
        $queues = $this->getQueues();
        
        return [
            'connections' => [
                'total' => $overview['object_totals']['connections'] ?? 0,
                'channels' => $overview['object_totals']['channels'] ?? 0,
                'consumers' => $overview['object_totals']['consumers'] ?? 0,
            ],
            'queues' => [
                'total' => $overview['object_totals']['queues'] ?? 0,
                'messages_ready' => $overview['queue_totals']['messages_ready'] ?? 0,
                'messages_unacked' => $overview['queue_totals']['messages_unacked'] ?? 0,
                'messages_total' => $overview['queue_totals']['messages'] ?? 0,
            ],
            'message_rates' => [
                'publish' => $overview['message_stats']['publish_details']['rate'] ?? 0,
                'confirm' => $overview['message_stats']['confirm_details']['rate'] ?? 0,
                'consume' => $overview['message_stats']['consume_details']['rate'] ?? 0,
                'ack' => $overview['message_stats']['ack_details']['rate'] ?? 0,
            ],
            'memory' => [
                'used' => $overview['node']['mem_used'] ?? 0,
                'limit' => $overview['node']['mem_limit'] ?? 0,
                'alarm' => $overview['node']['mem_alarm'] ?? false,
            ],
            'disk' => [
                'free' => $overview['node']['disk_free'] ?? 0,
                'limit' => $overview['node']['disk_free_limit'] ?? 0,
                'alarm' => $overview['node']['disk_free_alarm'] ?? false,
            ],
        ];
    }
}

$metrics = new RabbitMQMetrics('localhost', 15672, 'admin', 'admin123');
print_r($metrics->getMetrics());

使用 rabbitmqctl 获取指标

bash
#!/bin/bash

echo "=== RabbitMQ 监控指标采集 ==="
echo ""

echo "1. 队列状态:"
rabbitmqctl list_queues name messages messages_ready messages_unacked consumers
echo ""

echo "2. 连接状态:"
rabbitmqctl list_connections peer_host peer_port state channels
echo ""

echo "3. 通道状态:"
rabbitmqctl list_channels connection state messages_unacked messages_uncommitted
echo ""

echo "4. 节点状态:"
rabbitmqctl status | grep -A 5 "Memory"
echo ""

echo "5. 集群状态:"
rabbitmqctl cluster_status

Prometheus 格式指标导出

php
<?php

class PrometheusExporter
{
    private $metrics;
    
    public function __construct(RabbitMQMetrics $metrics)
    {
        $this->metrics = $metrics;
    }
    
    public function export()
    {
        $data = $this->metrics->getMetrics();
        $output = [];
        
        $output[] = "# HELP rabbitmq_connections_total Total number of connections";
        $output[] = "# TYPE rabbitmq_connections_total gauge";
        $output[] = "rabbitmq_connections_total {$data['connections']['total']}";
        
        $output[] = "# HELP rabbitmq_channels_total Total number of channels";
        $output[] = "# TYPE rabbitmq_channels_total gauge";
        $output[] = "rabbitmq_channels_total {$data['connections']['channels']}";
        
        $output[] = "# HELP rabbitmq_queues_total Total number of queues";
        $output[] = "# TYPE rabbitmq_queues_total gauge";
        $output[] = "rabbitmq_queues_total {$data['queues']['total']}";
        
        $output[] = "# HELP rabbitmq_messages_ready Messages ready for consumption";
        $output[] = "# TYPE rabbitmq_messages_ready gauge";
        $output[] = "rabbitmq_messages_ready {$data['queues']['messages_ready']}";
        
        $output[] = "# HELP rabbitmq_messages_unacked Messages unacknowledged";
        $output[] = "# TYPE rabbitmq_messages_unacked gauge";
        $output[] = "rabbitmq_messages_unacked {$data['queues']['messages_unacked']}";
        
        $output[] = "# HELP rabbitmq_messages_total Total messages in queues";
        $output[] = "# TYPE rabbitmq_messages_total gauge";
        $output[] = "rabbitmq_messages_total {$data['queues']['messages_total']}";
        
        $output[] = "# HELP rabbitmq_memory_used_bytes Memory used in bytes";
        $output[] = "# TYPE rabbitmq_memory_used_bytes gauge";
        $output[] = "rabbitmq_memory_used_bytes {$data['memory']['used']}";
        
        $output[] = "# HELP rabbitmq_memory_limit_bytes Memory limit in bytes";
        $output[] = "# TYPE rabbitmq_memory_limit_bytes gauge";
        $output[] = "rabbitmq_memory_limit_bytes {$data['memory']['limit']}";
        
        $output[] = "# HELP rabbitmq_disk_free_bytes Free disk space in bytes";
        $output[] = "# TYPE rabbitmq_disk_free_bytes gauge";
        $output[] = "rabbitmq_disk_free_bytes {$data['disk']['free']}";
        
        $output[] = "# HELP rabbitmq_publish_rate Message publish rate per second";
        $output[] = "# TYPE rabbitmq_publish_rate gauge";
        $output[] = "rabbitmq_publish_rate {$data['message_rates']['publish']}";
        
        return implode("\n", $output);
    }
}

header('Content-Type: text/plain');
$metrics = new RabbitMQMetrics('localhost', 15672, 'admin', 'admin123');
$exporter = new PrometheusExporter($metrics);
echo $exporter->export();

实际应用场景

场景一:实时监控仪表板

php
<?php

class MonitoringDashboard
{
    private $metrics;
    private $thresholds = [
        'memory_usage_percent' => 80,
        'disk_usage_percent' => 90,
        'queue_messages_max' => 100000,
        'connection_max' => 1000,
    ];
    
    public function __construct(RabbitMQMetrics $metrics)
    {
        $this->metrics = $metrics;
    }
    
    public function getHealthStatus()
    {
        $data = $this->metrics->getMetrics();
        $status = 'healthy';
        $alerts = [];
        
        $memoryUsagePercent = ($data['memory']['used'] / $data['memory']['limit']) * 100;
        if ($memoryUsagePercent > $this->thresholds['memory_usage_percent']) {
            $status = 'warning';
            $alerts[] = [
                'level' => 'warning',
                'message' => "内存使用率过高: " . round($memoryUsagePercent, 2) . "%",
            ];
        }
        
        if ($data['memory']['alarm']) {
            $status = 'critical';
            $alerts[] = [
                'level' => 'critical',
                'message' => '内存告警已触发',
            ];
        }
        
        if ($data['disk']['alarm']) {
            $status = 'critical';
            $alerts[] = [
                'level' => 'critical',
                'message' => '磁盘空间告警已触发',
            ];
        }
        
        if ($data['queues']['messages_total'] > $this->thresholds['queue_messages_max']) {
            $status = 'warning';
            $alerts[] = [
                'level' => 'warning',
                'message' => "消息堆积过多: {$data['queues']['messages_total']} 条",
            ];
        }
        
        if ($data['connections']['total'] > $this->thresholds['connection_max']) {
            $status = 'warning';
            $alerts[] = [
                'level' => 'warning',
                'message' => "连接数过多: {$data['connections']['total']}",
            ];
        }
        
        return [
            'status' => $status,
            'alerts' => $alerts,
            'metrics' => $data,
            'timestamp' => date('Y-m-d H:i:s'),
        ];
    }
    
    public function renderDashboard()
    {
        $health = $this->getHealthStatus();
        $metrics = $health['metrics'];
        
        $statusColor = match($health['status']) {
            'healthy' => '#28a745',
            'warning' => '#ffc107',
            'critical' => '#dc3545',
            default => '#6c757d',
        };
        
        echo "<div style='font-family: Arial, sans-serif;'>";
        echo "<h1>RabbitMQ 监控仪表板</h1>";
        echo "<p>状态: <span style='color: {$statusColor}; font-weight: bold;'>{$health['status']}</span></p>";
        echo "<p>更新时间: {$health['timestamp']}</p>";
        
        if (!empty($health['alerts'])) {
            echo "<h2>告警信息</h2>";
            foreach ($health['alerts'] as $alert) {
                $color = $alert['level'] === 'critical' ? 'red' : 'orange';
                echo "<p style='color: {$color};'>⚠ {$alert['message']}</p>";
            }
        }
        
        echo "<h2>连接统计</h2>";
        echo "<ul>";
        echo "<li>连接数: {$metrics['connections']['total']}</li>";
        echo "<li>通道数: {$metrics['connections']['channels']}</li>";
        echo "<li>消费者数: {$metrics['connections']['consumers']}</li>";
        echo "</ul>";
        
        echo "<h2>队列统计</h2>";
        echo "<ul>";
        echo "<li>队列数: {$metrics['queues']['total']}</li>";
        echo "<li>待消费消息: {$metrics['queues']['messages_ready']}</li>";
        echo "<li>未确认消息: {$metrics['queues']['messages_unacked']}</li>";
        echo "<li>消息总数: {$metrics['queues']['messages_total']}</li>";
        echo "</ul>";
        
        echo "<h2>资源使用</h2>";
        $memUsedMB = round($metrics['memory']['used'] / 1024 / 1024, 2);
        $memLimitMB = round($metrics['memory']['limit'] / 1024 / 1024, 2);
        $diskFreeGB = round($metrics['disk']['free'] / 1024 / 1024 / 1024, 2);
        echo "<ul>";
        echo "<li>内存使用: {$memUsedMB} MB / {$memLimitMB} MB</li>";
        echo "<li>磁盘可用: {$diskFreeGB} GB</li>";
        echo "</ul>";
        
        echo "</div>";
    }
}

场景二:指标历史记录

php
<?php

class MetricsHistory
{
    private $db;
    
    public function __construct(PDO $db)
    {
        $this->db = $db;
    }
    
    public function record(array $metrics)
    {
        $sql = "INSERT INTO rabbitmq_metrics (
            recorded_at, connections, channels, consumers,
            queues, messages_ready, messages_unacked, messages_total,
            memory_used, memory_limit, disk_free,
            publish_rate, consume_rate, ack_rate
        ) VALUES (
            NOW(), :connections, :channels, :consumers,
            :queues, :messages_ready, :messages_unacked, :messages_total,
            :memory_used, :memory_limit, :disk_free,
            :publish_rate, :consume_rate, :ack_rate
        )";
        
        $stmt = $this->db->prepare($sql);
        $stmt->execute([
            'connections' => $metrics['connections']['total'],
            'channels' => $metrics['connections']['channels'],
            'consumers' => $metrics['connections']['consumers'],
            'queues' => $metrics['queues']['total'],
            'messages_ready' => $metrics['queues']['messages_ready'],
            'messages_unacked' => $metrics['queues']['messages_unacked'],
            'messages_total' => $metrics['queues']['messages_total'],
            'memory_used' => $metrics['memory']['used'],
            'memory_limit' => $metrics['memory']['limit'],
            'disk_free' => $metrics['disk']['free'],
            'publish_rate' => $metrics['message_rates']['publish'],
            'consume_rate' => $metrics['message_rates']['consume'],
            'ack_rate' => $metrics['message_rates']['ack'],
        ]);
    }
    
    public function getHistory($hours = 24)
    {
        $sql = "SELECT * FROM rabbitmq_metrics 
                WHERE recorded_at >= DATE_SUB(NOW(), INTERVAL :hours HOUR)
                ORDER BY recorded_at ASC";
        
        $stmt = $this->db->prepare($sql);
        $stmt->execute(['hours' => $hours]);
        
        return $stmt->fetchAll(PDO::FETCH_ASSOC);
    }
    
    public function getAggregatedStats($days = 7)
    {
        $sql = "SELECT 
                    DATE(recorded_at) as date,
                    AVG(messages_total) as avg_messages,
                    MAX(messages_total) as max_messages,
                    AVG(connections) as avg_connections,
                    MAX(connections) as max_connections,
                    AVG(memory_used) as avg_memory,
                    MAX(memory_used) as max_memory
                FROM rabbitmq_metrics
                WHERE recorded_at >= DATE_SUB(NOW(), INTERVAL :days DAY)
                GROUP BY DATE(recorded_at)
                ORDER BY date ASC";
        
        $stmt = $this->db->prepare($sql);
        $stmt->execute(['days' => $days]);
        
        return $stmt->fetchAll(PDO::FETCH_ASSOC);
    }
}

常见问题与解决方案

问题一:指标采集超时

现象:通过 HTTP API 获取指标时经常超时。

原因:队列数量过多或消息量过大,API 响应变慢。

解决方案

php
<?php

class CachedMetrics
{
    private $metrics;
    private $cache;
    private $cacheKey = 'rabbitmq_metrics';
    private $cacheTTL = 60;
    
    public function __construct(RabbitMQMetrics $metrics, $cache)
    {
        $this->metrics = $metrics;
        $this->cache = $cache;
    }
    
    public function getMetrics()
    {
        $cached = $this->cache->get($this->cacheKey);
        if ($cached !== null) {
            return $cached;
        }
        
        $data = $this->metrics->getMetrics();
        $this->cache->set($this->cacheKey, $data, $this->cacheTTL);
        
        return $data;
    }
}

问题二:指标数据不准确

现象:监控显示的数据与实际不符。

原因:采集时间点不同,或存在缓存。

解决方案

  1. 使用原子性 API 调用获取数据
  2. 增加采集频率
  3. 对比多个数据源验证

问题三:监控指标过多导致性能问题

现象:监控系统本身影响了 RabbitMQ 性能。

原因:频繁调用管理 API 增加了服务器负担。

解决方案

bash
rabbitmqctl eval 'application:set_env(rabbit, management, [{rates_mode, none}]).'

最佳实践

1. 监控指标选择原则

  • 关注关键指标,避免监控过多无用指标
  • 根据业务特点调整监控重点
  • 设置合理的阈值和告警级别

2. 采集频率建议

指标类型建议采集频率
核心指标(连接、队列)15-30 秒
资源指标(内存、磁盘)30-60 秒
统计指标(消息速率)60 秒
历史数据聚合5-10 分钟

3. 告警阈值设置

php
<?php

return [
    'alerts' => [
        'memory_usage' => [
            'warning' => 70,
            'critical' => 85,
        ],
        'disk_usage' => [
            'warning' => 80,
            'critical' => 90,
        ],
        'queue_messages' => [
            'warning' => 50000,
            'critical' => 100000,
        ],
        'connections' => [
            'warning' => 800,
            'critical' => 950,
        ],
        'consumer_lag' => [
            'warning' => 1000,
            'critical' => 5000,
        ],
    ],
];

4. 监控数据保留策略

  • 原始数据保留 7 天
  • 小时聚合数据保留 30 天
  • 天聚合数据保留 1 年

相关链接