Appearance
RabbitMQ Prometheus + Grafana 监控
概述
Prometheus + Grafana 是目前最流行的开源监控解决方案组合。Prometheus 负责数据采集和存储,Grafana 负责数据可视化展示。本文将详细介绍如何使用 Prometheus 和 Grafana 构建 RabbitMQ 监控系统。
核心知识点
架构说明
RabbitMQ --> rabbitmq_prometheus 插件 --> Prometheus --> Grafana
| | |
暴露指标端点 采集存储数据 可视化展示Prometheus 基本概念
| 概念 | 说明 |
|---|---|
| Metric | 监控指标,包含名称和值 |
| Label | 标签,用于区分不同维度的指标 |
| Time Series | 时间序列数据 |
| Target | 采集目标 |
| Job | 一组相同类型的目标 |
Grafana 基本概念
| 概念 | 说明 |
|---|---|
| Dashboard | 仪表板,包含多个面板 |
| Panel | 面板,展示单个图表或数据 |
| Data Source | 数据源,连接 Prometheus |
| Alert | 告警规则 |
配置示例
启用 RabbitMQ Prometheus 插件
bash
rabbitmq-plugins enable rabbitmq_prometheus默认指标端点:http://localhost:15692/metrics
Prometheus 配置
yaml
global:
scrape_interval: 15s
evaluation_interval: 15s
alerting:
alertmanagers:
- static_configs:
- targets:
- alertmanager:9093
rule_files:
- /etc/prometheus/rules/*.yml
scrape_configs:
- job_name: 'rabbitmq'
static_configs:
- targets:
- 'rabbitmq-node1:15692'
- 'rabbitmq-node2:15692'
- 'rabbitmq-node3:15692'
relabel_configs:
- source_labels: [__address__]
regex: '([^:]+):\d+'
target_label: instance
replacement: '$1'Prometheus 告警规则
yaml
groups:
- name: rabbitmq_alerts
rules:
- alert: RabbitMQDown
expr: rabbitmq_up == 0
for: 1m
labels:
severity: critical
annotations:
summary: "RabbitMQ 节点宕机"
description: "RabbitMQ 节点 {{ $labels.instance }} 已宕机超过 1 分钟"
- alert: RabbitMQMemoryHigh
expr: (rabbitmq_memory_used_bytes / rabbitmq_memory_limit_bytes) * 100 > 80
for: 5m
labels:
severity: warning
annotations:
summary: "RabbitMQ 内存使用率过高"
description: "节点 {{ $labels.instance }} 内存使用率 {{ $value | printf \"%.2f\" }}%"
- alert: RabbitMQDiskSpaceLow
expr: rabbitmq_disk_free_bytes < 1073741824
for: 1m
labels:
severity: critical
annotations:
summary: "RabbitMQ 磁盘空间不足"
description: "节点 {{ $labels.instance }} 剩余磁盘空间 {{ $value | humanize1024 }}B"
- alert: RabbitMQQueueMessagesHigh
expr: rabbitmq_queue_messages > 100000
for: 5m
labels:
severity: warning
annotations:
summary: "队列消息堆积过多"
description: "队列 {{ $labels.queue }} 消息数量 {{ $value }}"
- alert: RabbitMQNoConsumer
expr: rabbitmq_queue_consumers == 0 and rabbitmq_queue_messages > 0
for: 5m
labels:
severity: warning
annotations:
summary: "队列无消费者"
description: "队列 {{ $labels.queue }} 有 {{ $value }} 条消息但无消费者"
- alert: RabbitMQConnectionHigh
expr: rabbitmq_connections > 1000
for: 5m
labels:
severity: warning
annotations:
summary: "连接数过多"
description: "节点 {{ $labels.instance }} 连接数 {{ $value }}"
- alert: RabbitMQFileDescriptorsHigh
expr: (rabbitmq_fd_used / rabbitmq_fd_total) * 100 > 80
for: 5m
labels:
severity: warning
annotations:
summary: "文件描述符使用率过高"
description: "节点 {{ $labels.instance }} 文件描述符使用率 {{ $value | printf \"%.2f\" }}%"
- alert: RabbitMQClusterPartition
expr: rabbitmq_partitions > 0
for: 1m
labels:
severity: critical
annotations:
summary: "集群网络分区"
description: "节点 {{ $labels.instance }} 检测到网络分区"Grafana Dashboard JSON
json
{
"dashboard": {
"title": "RabbitMQ Monitoring",
"uid": "rabbitmq-overview",
"panels": [
{
"title": "Overview",
"type": "row"
},
{
"title": "Messages Ready",
"type": "gauge",
"gridPos": {"x": 0, "y": 1, "w": 6, "h": 4},
"targets": [
{
"expr": "sum(rabbitmq_queue_messages_ready)",
"legendFormat": "Messages Ready"
}
],
"fieldConfig": {
"defaults": {
"thresholds": {
"mode": "absolute",
"steps": [
{"color": "green", "value": null},
{"color": "yellow", "value": 10000},
{"color": "red", "value": 50000}
]
}
}
}
},
{
"title": "Messages Unacked",
"type": "gauge",
"gridPos": {"x": 6, "y": 1, "w": 6, "h": 4},
"targets": [
{
"expr": "sum(rabbitmq_queue_messages_unacked)",
"legendFormat": "Messages Unacked"
}
]
},
{
"title": "Connections",
"type": "stat",
"gridPos": {"x": 12, "y": 1, "w": 4, "h": 4},
"targets": [
{
"expr": "sum(rabbitmq_connections)",
"legendFormat": "Connections"
}
]
},
{
"title": "Queues",
"type": "stat",
"gridPos": {"x": 16, "y": 1, "w": 4, "h": 4},
"targets": [
{
"expr": "count(rabbitmq_queue_messages)",
"legendFormat": "Queues"
}
]
},
{
"title": "Message Rates",
"type": "timeseries",
"gridPos": {"x": 0, "y": 5, "w": 12, "h": 6},
"targets": [
{
"expr": "sum(rate(rabbitmq_channel_messages_published_total[1m]))",
"legendFormat": "Publish Rate"
},
{
"expr": "sum(rate(rabbitmq_channel_messages_delivered_total[1m]))",
"legendFormat": "Deliver Rate"
},
{
"expr": "sum(rate(rabbitmq_channel_messages_acked_total[1m]))",
"legendFormat": "Ack Rate"
}
]
},
{
"title": "Memory Usage",
"type": "timeseries",
"gridPos": {"x": 12, "y": 5, "w": 12, "h": 6},
"targets": [
{
"expr": "rabbitmq_memory_used_bytes / 1024 / 1024",
"legendFormat": "{{ instance }} - Memory (MB)"
},
{
"expr": "rabbitmq_memory_limit_bytes / 1024 / 1024",
"legendFormat": "{{ instance }} - Limit (MB)"
}
]
},
{
"title": "Queue Messages by Name",
"type": "timeseries",
"gridPos": {"x": 0, "y": 11, "w": 24, "h": 6},
"targets": [
{
"expr": "rabbitmq_queue_messages",
"legendFormat": "{{ queue }}"
}
]
}
]
}
}PHP 自定义指标导出器
php
<?php
class RabbitMQPrometheusExporter
{
private $host;
private $port;
private $user;
private $password;
public function __construct($host = 'localhost', $port = 15672, $user = 'guest', $password = 'guest')
{
$this->host = $host;
$this->port = $port;
$this->user = $user;
$this->password = $password;
}
private function request($endpoint)
{
$url = "http://{$this->host}:{$this->port}/api/{$endpoint}";
$ch = curl_init();
curl_setopt_array($ch, [
CURLOPT_URL => $url,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_USERPWD => "{$this->user}:{$this->password}",
CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
CURLOPT_TIMEOUT => 10,
]);
$response = curl_exec($ch);
curl_close($ch);
return json_decode($response, true);
}
public function export()
{
$output = [];
$overview = $this->request('overview');
$nodes = $this->request('nodes');
$queues = $this->request('queues');
$output[] = $this->formatMetric('gauge', 'rabbitmq_up', 1, ['instance' => $this->host]);
if (isset($overview['object_totals'])) {
$totals = $overview['object_totals'];
$output[] = $this->formatMetric('gauge', 'rabbitmq_connections', $totals['connections'] ?? 0);
$output[] = $this->formatMetric('gauge', 'rabbitmq_channels', $totals['channels'] ?? 0);
$output[] = $this->formatMetric('gauge', 'rabbitmq_queues', $totals['queues'] ?? 0);
$output[] = $this->formatMetric('gauge', 'rabbitmq_exchanges', $totals['exchanges'] ?? 0);
$output[] = $this->formatMetric('gauge', 'rabbitmq_consumers', $totals['consumers'] ?? 0);
}
if (isset($overview['queue_totals'])) {
$queueTotals = $overview['queue_totals'];
$output[] = $this->formatMetric('gauge', 'rabbitmq_queue_messages_ready', $queueTotals['messages_ready'] ?? 0);
$output[] = $this->formatMetric('gauge', 'rabbitmq_queue_messages_unacked', $queueTotals['messages_unacked'] ?? 0);
$output[] = $this->formatMetric('gauge', 'rabbitmq_queue_messages', $queueTotals['messages'] ?? 0);
}
if (isset($overview['message_stats'])) {
$stats = $overview['message_stats'];
$output[] = $this->formatMetric('gauge', 'rabbitmq_message_publish_rate', $stats['publish_details']['rate'] ?? 0);
$output[] = $this->formatMetric('gauge', 'rabbitmq_message_confirm_rate', $stats['confirm_details']['rate'] ?? 0);
$output[] = $this->formatMetric('gauge', 'rabbitmq_message_consume_rate', $stats['consume_details']['rate'] ?? 0);
$output[] = $this->formatMetric('gauge', 'rabbitmq_message_ack_rate', $stats['ack_details']['rate'] ?? 0);
}
foreach ($nodes as $node) {
$labels = ['node' => $node['name']];
$output[] = $this->formatMetric('gauge', 'rabbitmq_memory_used_bytes', $node['mem_used'] ?? 0, $labels);
$output[] = $this->formatMetric('gauge', 'rabbitmq_memory_limit_bytes', $node['mem_limit'] ?? 0, $labels);
$output[] = $this->formatMetric('gauge', 'rabbitmq_disk_free_bytes', $node['disk_free'] ?? 0, $labels);
$output[] = $this->formatMetric('gauge', 'rabbitmq_fd_used', $node['fd_used'] ?? 0, $labels);
$output[] = $this->formatMetric('gauge', 'rabbitmq_fd_total', $node['fd_total'] ?? 0, $labels);
$output[] = $this->formatMetric('gauge', 'rabbitmq_sockets_used', $node['sockets_used'] ?? 0, $labels);
$output[] = $this->formatMetric('gauge', 'rabbitmq_sockets_total', $node['sockets_total'] ?? 0, $labels);
$output[] = $this->formatMetric('gauge', 'rabbitmq_proc_used', $node['proc_used'] ?? 0, $labels);
$output[] = $this->formatMetric('gauge', 'rabbitmq_proc_total', $node['proc_total'] ?? 0, $labels);
$output[] = $this->formatMetric('gauge', 'rabbitmq_mem_alarm', ($node['mem_alarm'] ?? false) ? 1 : 0, $labels);
$output[] = $this->formatMetric('gauge', 'rabbitmq_disk_alarm', ($node['disk_free_alarm'] ?? false) ? 1 : 0, $labels);
}
foreach ($queues as $queue) {
$labels = ['queue' => $queue['name'], 'vhost' => $queue['vhost']];
$output[] = $this->formatMetric('gauge', 'rabbitmq_queue_messages', $queue['messages'] ?? 0, $labels);
$output[] = $this->formatMetric('gauge', 'rabbitmq_queue_messages_ready', $queue['messages_ready'] ?? 0, $labels);
$output[] = $this->formatMetric('gauge', 'rabbitmq_queue_messages_unacked', $queue['messages_unacked'] ?? 0, $labels);
$output[] = $this->formatMetric('gauge', 'rabbitmq_queue_consumers', $queue['consumers'] ?? 0, $labels);
$output[] = $this->formatMetric('gauge', 'rabbitmq_queue_memory_bytes', $queue['memory'] ?? 0, $labels);
}
return implode("\n", $output);
}
private function formatMetric($type, $name, $value, $labels = [])
{
$labelStr = '';
if (!empty($labels)) {
$labelPairs = [];
foreach ($labels as $k => $v) {
$labelPairs[] = "{$k}=\"{$v}\"";
}
$labelStr = '{' . implode(',', $labelPairs) . '}';
}
return "{$name}{$labelStr} {$value}";
}
}
if (php_sapi_name() === 'cli-server' || isset($_GET['metrics'])) {
header('Content-Type: text/plain');
$exporter = new RabbitMQPrometheusExporter('localhost', 15672, 'admin', 'admin123');
echo $exporter->export();
}实际应用场景
场景一:完整监控部署
yaml
version: '3.8'
services:
prometheus:
image: prom/prometheus:latest
container_name: prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- ./rules:/etc/prometheus/rules
- prometheus_data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--web.enable-lifecycle'
alertmanager:
image: prom/alertmanager:latest
container_name: alertmanager
ports:
- "9093:9093"
volumes:
- ./alertmanager.yml:/etc/alertmanager/alertmanager.yml
- alertmanager_data:/alertmanager
command:
- '--config.file=/etc/alertmanager/alertmanager.yml'
- '--storage.path=/alertmanager'
grafana:
image: grafana/grafana:latest
container_name: grafana
ports:
- "3000:3000"
volumes:
- grafana_data:/var/lib/grafana
- ./provisioning:/etc/grafana/provisioning
environment:
- GF_SECURITY_ADMIN_USER=admin
- GF_SECURITY_ADMIN_PASSWORD=admin123
- GF_USERS_ALLOW_SIGN_UP=false
volumes:
prometheus_data:
alertmanager_data:
grafana_data:场景二:Alertmanager 配置
yaml
global:
resolve_timeout: 5m
smtp_smarthost: 'smtp.example.com:587'
smtp_from: 'alertmanager@example.com'
smtp_auth_username: 'alertmanager@example.com'
smtp_auth_password: 'password'
route:
group_by: ['alertname', 'severity']
group_wait: 30s
group_interval: 5m
repeat_interval: 4h
receiver: 'default'
routes:
- match:
severity: critical
receiver: 'critical'
- match:
severity: warning
receiver: 'warning'
receivers:
- name: 'default'
email_configs:
- to: 'ops@example.com'
send_resolved: true
- name: 'critical'
email_configs:
- to: 'ops-critical@example.com'
send_resolved: true
webhook_configs:
- url: 'http://webhook-server:8080/alert'
send_resolved: true
slack_configs:
- api_url: 'https://hooks.slack.com/services/xxx'
channel: '#alerts-critical'
send_resolved: true
- name: 'warning'
email_configs:
- to: 'ops-warning@example.com'
send_resolved: true
inhibit_rules:
- source_match:
severity: 'critical'
target_match:
severity: 'warning'
equal: ['alertname', 'instance']场景三:PHP 告警处理 Webhook
php
<?php
class AlertWebhookHandler
{
private $logFile = '/var/log/rabbitmq/alerts.log';
public function handle()
{
$payload = file_get_contents('php://input');
$data = json_decode($payload, true);
$this->log($data);
if ($data['status'] === 'firing') {
$this->processAlert($data);
} else {
$this->processResolved($data);
}
return ['status' => 'ok'];
}
private function processAlert($data)
{
foreach ($data['alerts'] as $alert) {
$alertName = $alert['labels']['alertname'];
$severity = $alert['labels']['severity'] ?? 'warning';
switch ($alertName) {
case 'RabbitMQDown':
$this->handleNodeDown($alert);
break;
case 'RabbitMQMemoryHigh':
$this->handleMemoryHigh($alert);
break;
case 'RabbitMQQueueMessagesHigh':
$this->handleQueueBacklog($alert);
break;
case 'RabbitMQNoConsumer':
$this->handleNoConsumer($alert);
break;
}
}
}
private function handleNodeDown($alert)
{
$instance = $alert['labels']['instance'];
$this->sendNotification("CRITICAL: RabbitMQ 节点 {$instance} 宕机", $alert);
$this->executeAutoRecovery($instance);
}
private function handleMemoryHigh($alert)
{
$instance = $alert['labels']['instance'];
$usage = $alert['annotations']['description'];
$this->sendNotification("WARNING: {$instance} 内存使用过高 - {$usage}", $alert);
}
private function handleQueueBacklog($alert)
{
$queue = $alert['labels']['queue'];
$messages = $alert['annotations']['description'];
$this->sendNotification("WARNING: 队列 {$queue} 消息堆积 - {$messages}", $alert);
$this->scaleConsumers($queue);
}
private function handleNoConsumer($alert)
{
$queue = $alert['labels']['queue'];
$this->sendNotification("WARNING: 队列 {$queue} 无消费者", $alert);
}
private function executeAutoRecovery($instance)
{
$script = "/opt/rabbitmq/scripts/recover_node.sh {$instance}";
exec($script, $output, $returnCode);
$this->log([
'action' => 'auto_recovery',
'instance' => $instance,
'script' => $script,
'return_code' => $returnCode,
'output' => $output,
]);
}
private function scaleConsumers($queue)
{
$script = "/opt/rabbitmq/scripts/scale_consumers.sh {$queue}";
exec($script, $output, $returnCode);
$this->log([
'action' => 'scale_consumers',
'queue' => $queue,
'return_code' => $returnCode,
]);
}
private function sendNotification($message, $alert)
{
$webhookUrl = getenv('SLACK_WEBHOOK_URL');
if ($webhookUrl) {
$payload = [
'text' => $message,
'attachments' => [
[
'color' => $alert['labels']['severity'] === 'critical' ? 'danger' : 'warning',
'fields' => [
['title' => 'Alert', 'value' => $alert['labels']['alertname'], 'short' => true],
['title' => 'Severity', 'value' => $alert['labels']['severity'], 'short' => true],
['title' => 'Description', 'value' => $alert['annotations']['description']],
],
],
],
];
$ch = curl_init($webhookUrl);
curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($payload));
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_exec($ch);
curl_close($ch);
}
}
private function log($data)
{
$entry = [
'timestamp' => date('Y-m-d H:i:s'),
'data' => $data,
];
file_put_contents(
$this->logFile,
json_encode($entry) . "\n",
FILE_APPEND
);
}
private function processResolved($data)
{
foreach ($data['alerts'] as $alert) {
$alertName = $alert['labels']['alertname'];
$this->sendNotification("RESOLVED: {$alertName} 已恢复", $alert);
}
}
}
header('Content-Type: application/json');
$handler = new AlertWebhookHandler();
echo json_encode($handler->handle());常见问题与解决方案
问题一:Prometheus 无法采集指标
现象:Prometheus targets 显示 down。
解决方案:
bash
rabbitmq-plugins enable rabbitmq_prometheus
curl http://localhost:15692/metrics检查网络连通性和防火墙设置。
问题二:Grafana 无法连接 Prometheus
现象:Grafana 数据源测试失败。
解决方案:
- 确保 Prometheus 正在运行
- 检查 URL 配置是否正确
- 检查网络策略和防火墙
问题三:指标数据过多
现象:Prometheus 存储增长过快。
解决方案:
yaml
global:
scrape_interval: 30s
scrape_configs:
- job_name: 'rabbitmq'
metrics_path: /metrics
params:
format: ['prometheus']
static_configs:
- targets: ['rabbitmq:15692']最佳实践
1. 指标采集优化
yaml
scrape_configs:
- job_name: 'rabbitmq'
scrape_interval: 15s
scrape_timeout: 10s
metrics_path: /metrics
static_configs:
- targets: ['rabbitmq:15692']2. 数据保留策略
yaml
global:
retention: 15d
retention_size: 50GB3. 告警分级
| 级别 | 说明 | 响应时间 |
|---|---|---|
| Critical | 严重故障,影响业务 | 5 分钟 |
| Warning | 警告,需要关注 | 30 分钟 |
| Info | 信息,仅供参考 | 无需响应 |
