Appearance
RabbitMQ 监控指标体系
概述
监控是保障 RabbitMQ 稳定运行的关键环节。建立完善的监控指标体系,能够帮助运维人员及时发现和解决问题,确保消息队列服务的高可用性。本文将详细介绍 RabbitMQ 的核心监控指标、采集方法和最佳实践。
核心知识点
监控指标分类
RabbitMQ 监控指标可分为以下几个主要类别:
| 类别 | 说明 | 关键指标 |
|---|---|---|
| 连接指标 | 客户端连接状态 | 连接数、通道数、网络流量 |
| 队列指标 | 队列运行状态 | 消息数量、消费速率、队列长度 |
| 消息指标 | 消息流转情况 | 发布速率、确认速率、投递速率 |
| 资源指标 | 系统资源使用 | 内存、磁盘、CPU、文件描述符 |
| 节点指标 | 节点健康状态 | 运行状态、集群状态、分区状态 |
核心监控指标详解
1. 连接与通道指标
connections_count # 当前连接数
channels_count # 当前通道数
consumers_count # 当前消费者数量
connection_opened_total # 累计打开连接数
connection_closed_total # 累计关闭连接数
channel_opened_total # 累计打开通道数
channel_closed_total # 累计关闭通道数2. 队列指标
queue_messages_ready # 等待消费的消息数
queue_messages_unacked # 已投递未确认的消息数
queue_messages_total # 队列消息总数
queue_message_bytes # 消息占用字节数
queue_consumer_count # 队列消费者数量
queue_message_stats # 消息统计信息3. 消息流转指标
message_publish_rate # 消息发布速率 (条/秒)
message_confirm_rate # 消息确认速率 (条/秒)
message_consume_rate # 消息消费速率 (条/秒)
message_redeliver_rate # 消息重投递速率 (条/秒)
message_ack_rate # 消息 ACK 速率 (条/秒)
message_nack_rate # 消息 NACK 速率 (条/秒)4. 资源使用指标
mem_used # 内存使用量 (字节)
mem_limit # 内存限制 (字节)
mem_alarm # 内存告警状态
disk_free # 磁盘可用空间 (字节)
disk_free_limit # 磁盘空间限制 (字节)
disk_alarm # 磁盘告警状态
fd_used # 已使用文件描述符数
fd_total # 文件描述符总数
sockets_used # 已使用 socket 数
sockets_total # socket 总数
proc_used # 已使用 Erlang 进程数
proc_total # Erlang 进程总数5. 节点健康指标
node_running # 节点运行状态
node_type # 节点类型 (disc/ram)
node_uptime # 节点运行时间
cluster_partitions # 集群分区数
cluster_nodes # 集群节点列表配置示例
通过 HTTP API 获取监控指标
php
<?php
class RabbitMQMetrics
{
private $host;
private $port;
private $user;
private $password;
public function __construct($host = 'localhost', $port = 15672, $user = 'guest', $password = 'guest')
{
$this->host = $host;
$this->port = $port;
$this->user = $user;
$this->password = $password;
}
private function request($endpoint)
{
$url = "http://{$this->host}:{$this->port}/api/{$endpoint}";
$ch = curl_init();
curl_setopt_array($ch, [
CURLOPT_URL => $url,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_USERPWD => "{$this->user}:{$this->password}",
CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
CURLOPT_TIMEOUT => 10,
]);
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
if ($httpCode !== 200) {
throw new Exception("API request failed: HTTP {$httpCode}");
}
return json_decode($response, true);
}
public function getOverview()
{
return $this->request('overview');
}
public function getNodes()
{
return $this->request('nodes');
}
public function getQueues()
{
return $this->request('queues');
}
public function getConnections()
{
return $this->request('connections');
}
public function getChannels()
{
return $this->request('channels');
}
public function getMetrics()
{
$overview = $this->getOverview();
$nodes = $this->getNodes();
$queues = $this->getQueues();
return [
'connections' => [
'total' => $overview['object_totals']['connections'] ?? 0,
'channels' => $overview['object_totals']['channels'] ?? 0,
'consumers' => $overview['object_totals']['consumers'] ?? 0,
],
'queues' => [
'total' => $overview['object_totals']['queues'] ?? 0,
'messages_ready' => $overview['queue_totals']['messages_ready'] ?? 0,
'messages_unacked' => $overview['queue_totals']['messages_unacked'] ?? 0,
'messages_total' => $overview['queue_totals']['messages'] ?? 0,
],
'message_rates' => [
'publish' => $overview['message_stats']['publish_details']['rate'] ?? 0,
'confirm' => $overview['message_stats']['confirm_details']['rate'] ?? 0,
'consume' => $overview['message_stats']['consume_details']['rate'] ?? 0,
'ack' => $overview['message_stats']['ack_details']['rate'] ?? 0,
],
'memory' => [
'used' => $overview['node']['mem_used'] ?? 0,
'limit' => $overview['node']['mem_limit'] ?? 0,
'alarm' => $overview['node']['mem_alarm'] ?? false,
],
'disk' => [
'free' => $overview['node']['disk_free'] ?? 0,
'limit' => $overview['node']['disk_free_limit'] ?? 0,
'alarm' => $overview['node']['disk_free_alarm'] ?? false,
],
];
}
}
$metrics = new RabbitMQMetrics('localhost', 15672, 'admin', 'admin123');
print_r($metrics->getMetrics());使用 rabbitmqctl 获取指标
bash
#!/bin/bash
echo "=== RabbitMQ 监控指标采集 ==="
echo ""
echo "1. 队列状态:"
rabbitmqctl list_queues name messages messages_ready messages_unacked consumers
echo ""
echo "2. 连接状态:"
rabbitmqctl list_connections peer_host peer_port state channels
echo ""
echo "3. 通道状态:"
rabbitmqctl list_channels connection state messages_unacked messages_uncommitted
echo ""
echo "4. 节点状态:"
rabbitmqctl status | grep -A 5 "Memory"
echo ""
echo "5. 集群状态:"
rabbitmqctl cluster_statusPrometheus 格式指标导出
php
<?php
class PrometheusExporter
{
private $metrics;
public function __construct(RabbitMQMetrics $metrics)
{
$this->metrics = $metrics;
}
public function export()
{
$data = $this->metrics->getMetrics();
$output = [];
$output[] = "# HELP rabbitmq_connections_total Total number of connections";
$output[] = "# TYPE rabbitmq_connections_total gauge";
$output[] = "rabbitmq_connections_total {$data['connections']['total']}";
$output[] = "# HELP rabbitmq_channels_total Total number of channels";
$output[] = "# TYPE rabbitmq_channels_total gauge";
$output[] = "rabbitmq_channels_total {$data['connections']['channels']}";
$output[] = "# HELP rabbitmq_queues_total Total number of queues";
$output[] = "# TYPE rabbitmq_queues_total gauge";
$output[] = "rabbitmq_queues_total {$data['queues']['total']}";
$output[] = "# HELP rabbitmq_messages_ready Messages ready for consumption";
$output[] = "# TYPE rabbitmq_messages_ready gauge";
$output[] = "rabbitmq_messages_ready {$data['queues']['messages_ready']}";
$output[] = "# HELP rabbitmq_messages_unacked Messages unacknowledged";
$output[] = "# TYPE rabbitmq_messages_unacked gauge";
$output[] = "rabbitmq_messages_unacked {$data['queues']['messages_unacked']}";
$output[] = "# HELP rabbitmq_messages_total Total messages in queues";
$output[] = "# TYPE rabbitmq_messages_total gauge";
$output[] = "rabbitmq_messages_total {$data['queues']['messages_total']}";
$output[] = "# HELP rabbitmq_memory_used_bytes Memory used in bytes";
$output[] = "# TYPE rabbitmq_memory_used_bytes gauge";
$output[] = "rabbitmq_memory_used_bytes {$data['memory']['used']}";
$output[] = "# HELP rabbitmq_memory_limit_bytes Memory limit in bytes";
$output[] = "# TYPE rabbitmq_memory_limit_bytes gauge";
$output[] = "rabbitmq_memory_limit_bytes {$data['memory']['limit']}";
$output[] = "# HELP rabbitmq_disk_free_bytes Free disk space in bytes";
$output[] = "# TYPE rabbitmq_disk_free_bytes gauge";
$output[] = "rabbitmq_disk_free_bytes {$data['disk']['free']}";
$output[] = "# HELP rabbitmq_publish_rate Message publish rate per second";
$output[] = "# TYPE rabbitmq_publish_rate gauge";
$output[] = "rabbitmq_publish_rate {$data['message_rates']['publish']}";
return implode("\n", $output);
}
}
header('Content-Type: text/plain');
$metrics = new RabbitMQMetrics('localhost', 15672, 'admin', 'admin123');
$exporter = new PrometheusExporter($metrics);
echo $exporter->export();实际应用场景
场景一:实时监控仪表板
php
<?php
class MonitoringDashboard
{
private $metrics;
private $thresholds = [
'memory_usage_percent' => 80,
'disk_usage_percent' => 90,
'queue_messages_max' => 100000,
'connection_max' => 1000,
];
public function __construct(RabbitMQMetrics $metrics)
{
$this->metrics = $metrics;
}
public function getHealthStatus()
{
$data = $this->metrics->getMetrics();
$status = 'healthy';
$alerts = [];
$memoryUsagePercent = ($data['memory']['used'] / $data['memory']['limit']) * 100;
if ($memoryUsagePercent > $this->thresholds['memory_usage_percent']) {
$status = 'warning';
$alerts[] = [
'level' => 'warning',
'message' => "内存使用率过高: " . round($memoryUsagePercent, 2) . "%",
];
}
if ($data['memory']['alarm']) {
$status = 'critical';
$alerts[] = [
'level' => 'critical',
'message' => '内存告警已触发',
];
}
if ($data['disk']['alarm']) {
$status = 'critical';
$alerts[] = [
'level' => 'critical',
'message' => '磁盘空间告警已触发',
];
}
if ($data['queues']['messages_total'] > $this->thresholds['queue_messages_max']) {
$status = 'warning';
$alerts[] = [
'level' => 'warning',
'message' => "消息堆积过多: {$data['queues']['messages_total']} 条",
];
}
if ($data['connections']['total'] > $this->thresholds['connection_max']) {
$status = 'warning';
$alerts[] = [
'level' => 'warning',
'message' => "连接数过多: {$data['connections']['total']}",
];
}
return [
'status' => $status,
'alerts' => $alerts,
'metrics' => $data,
'timestamp' => date('Y-m-d H:i:s'),
];
}
public function renderDashboard()
{
$health = $this->getHealthStatus();
$metrics = $health['metrics'];
$statusColor = match($health['status']) {
'healthy' => '#28a745',
'warning' => '#ffc107',
'critical' => '#dc3545',
default => '#6c757d',
};
echo "<div style='font-family: Arial, sans-serif;'>";
echo "<h1>RabbitMQ 监控仪表板</h1>";
echo "<p>状态: <span style='color: {$statusColor}; font-weight: bold;'>{$health['status']}</span></p>";
echo "<p>更新时间: {$health['timestamp']}</p>";
if (!empty($health['alerts'])) {
echo "<h2>告警信息</h2>";
foreach ($health['alerts'] as $alert) {
$color = $alert['level'] === 'critical' ? 'red' : 'orange';
echo "<p style='color: {$color};'>⚠ {$alert['message']}</p>";
}
}
echo "<h2>连接统计</h2>";
echo "<ul>";
echo "<li>连接数: {$metrics['connections']['total']}</li>";
echo "<li>通道数: {$metrics['connections']['channels']}</li>";
echo "<li>消费者数: {$metrics['connections']['consumers']}</li>";
echo "</ul>";
echo "<h2>队列统计</h2>";
echo "<ul>";
echo "<li>队列数: {$metrics['queues']['total']}</li>";
echo "<li>待消费消息: {$metrics['queues']['messages_ready']}</li>";
echo "<li>未确认消息: {$metrics['queues']['messages_unacked']}</li>";
echo "<li>消息总数: {$metrics['queues']['messages_total']}</li>";
echo "</ul>";
echo "<h2>资源使用</h2>";
$memUsedMB = round($metrics['memory']['used'] / 1024 / 1024, 2);
$memLimitMB = round($metrics['memory']['limit'] / 1024 / 1024, 2);
$diskFreeGB = round($metrics['disk']['free'] / 1024 / 1024 / 1024, 2);
echo "<ul>";
echo "<li>内存使用: {$memUsedMB} MB / {$memLimitMB} MB</li>";
echo "<li>磁盘可用: {$diskFreeGB} GB</li>";
echo "</ul>";
echo "</div>";
}
}场景二:指标历史记录
php
<?php
class MetricsHistory
{
private $db;
public function __construct(PDO $db)
{
$this->db = $db;
}
public function record(array $metrics)
{
$sql = "INSERT INTO rabbitmq_metrics (
recorded_at, connections, channels, consumers,
queues, messages_ready, messages_unacked, messages_total,
memory_used, memory_limit, disk_free,
publish_rate, consume_rate, ack_rate
) VALUES (
NOW(), :connections, :channels, :consumers,
:queues, :messages_ready, :messages_unacked, :messages_total,
:memory_used, :memory_limit, :disk_free,
:publish_rate, :consume_rate, :ack_rate
)";
$stmt = $this->db->prepare($sql);
$stmt->execute([
'connections' => $metrics['connections']['total'],
'channels' => $metrics['connections']['channels'],
'consumers' => $metrics['connections']['consumers'],
'queues' => $metrics['queues']['total'],
'messages_ready' => $metrics['queues']['messages_ready'],
'messages_unacked' => $metrics['queues']['messages_unacked'],
'messages_total' => $metrics['queues']['messages_total'],
'memory_used' => $metrics['memory']['used'],
'memory_limit' => $metrics['memory']['limit'],
'disk_free' => $metrics['disk']['free'],
'publish_rate' => $metrics['message_rates']['publish'],
'consume_rate' => $metrics['message_rates']['consume'],
'ack_rate' => $metrics['message_rates']['ack'],
]);
}
public function getHistory($hours = 24)
{
$sql = "SELECT * FROM rabbitmq_metrics
WHERE recorded_at >= DATE_SUB(NOW(), INTERVAL :hours HOUR)
ORDER BY recorded_at ASC";
$stmt = $this->db->prepare($sql);
$stmt->execute(['hours' => $hours]);
return $stmt->fetchAll(PDO::FETCH_ASSOC);
}
public function getAggregatedStats($days = 7)
{
$sql = "SELECT
DATE(recorded_at) as date,
AVG(messages_total) as avg_messages,
MAX(messages_total) as max_messages,
AVG(connections) as avg_connections,
MAX(connections) as max_connections,
AVG(memory_used) as avg_memory,
MAX(memory_used) as max_memory
FROM rabbitmq_metrics
WHERE recorded_at >= DATE_SUB(NOW(), INTERVAL :days DAY)
GROUP BY DATE(recorded_at)
ORDER BY date ASC";
$stmt = $this->db->prepare($sql);
$stmt->execute(['days' => $days]);
return $stmt->fetchAll(PDO::FETCH_ASSOC);
}
}常见问题与解决方案
问题一:指标采集超时
现象:通过 HTTP API 获取指标时经常超时。
原因:队列数量过多或消息量过大,API 响应变慢。
解决方案:
php
<?php
class CachedMetrics
{
private $metrics;
private $cache;
private $cacheKey = 'rabbitmq_metrics';
private $cacheTTL = 60;
public function __construct(RabbitMQMetrics $metrics, $cache)
{
$this->metrics = $metrics;
$this->cache = $cache;
}
public function getMetrics()
{
$cached = $this->cache->get($this->cacheKey);
if ($cached !== null) {
return $cached;
}
$data = $this->metrics->getMetrics();
$this->cache->set($this->cacheKey, $data, $this->cacheTTL);
return $data;
}
}问题二:指标数据不准确
现象:监控显示的数据与实际不符。
原因:采集时间点不同,或存在缓存。
解决方案:
- 使用原子性 API 调用获取数据
- 增加采集频率
- 对比多个数据源验证
问题三:监控指标过多导致性能问题
现象:监控系统本身影响了 RabbitMQ 性能。
原因:频繁调用管理 API 增加了服务器负担。
解决方案:
bash
rabbitmqctl eval 'application:set_env(rabbit, management, [{rates_mode, none}]).'最佳实践
1. 监控指标选择原则
- 关注关键指标,避免监控过多无用指标
- 根据业务特点调整监控重点
- 设置合理的阈值和告警级别
2. 采集频率建议
| 指标类型 | 建议采集频率 |
|---|---|
| 核心指标(连接、队列) | 15-30 秒 |
| 资源指标(内存、磁盘) | 30-60 秒 |
| 统计指标(消息速率) | 60 秒 |
| 历史数据聚合 | 5-10 分钟 |
3. 告警阈值设置
php
<?php
return [
'alerts' => [
'memory_usage' => [
'warning' => 70,
'critical' => 85,
],
'disk_usage' => [
'warning' => 80,
'critical' => 90,
],
'queue_messages' => [
'warning' => 50000,
'critical' => 100000,
],
'connections' => [
'warning' => 800,
'critical' => 950,
],
'consumer_lag' => [
'warning' => 1000,
'critical' => 5000,
],
],
];4. 监控数据保留策略
- 原始数据保留 7 天
- 小时聚合数据保留 30 天
- 天聚合数据保留 1 年
