Appearance
监控告警实践
概述
完善的监控告警体系是保障 RabbitMQ 系统稳定运行的关键。本文档介绍 RabbitMQ 监控指标、告警配置和可视化方案的最佳实践。
监控架构
┌─────────────────────────────────────────────────────────────────────────┐
│ 监控告警架构 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ 数据采集层 │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ RabbitMQ │ │ RabbitMQ │ │ RabbitMQ │ │ │
│ │ │ Node 1 │ │ Node 2 │ │ Node 3 │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ │ ┌─────────┐ │ │ ┌─────────┐ │ │ ┌─────────┐ │ │ │
│ │ │ │Prometheus│ │ │ │Prometheus│ │ │ │Prometheus│ │ │ │
│ │ │ │ Exporter │ │ │ │ Exporter │ │ │ │ Exporter │ │ │ │
│ │ │ └────┬────┘ │ │ └────┬────┘ │ │ └────┬────┘ │ │ │
│ │ └──────┼──────┘ └──────┼──────┘ └──────┼──────┘ │ │
│ │ │ │ │ │ │
│ └─────────┼────────────────┼────────────────┼─────────────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ 数据存储层 │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────────┐ │ │
│ │ │ Prometheus Server │ │ │
│ │ │ │ │ │
│ │ │ • 时序数据存储 • 数据聚合 • 查询接口 │ │ │
│ │ │ │ │ │
│ │ └─────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ 可视化告警层 │ │
│ │ │ │
│ │ ┌─────────────────┐ ┌─────────────────┐ │ │
│ │ │ Grafana │ │ AlertManager │ │ │
│ │ │ │ │ │ │ │
│ │ │ • 仪表盘展示 │ │ • 告警路由 │ │ │
│ │ │ • 图表配置 │ │ • 告警分组 │ │ │
│ │ │ • 告警规则 │ │ • 通知发送 │ │ │
│ │ └─────────────────┘ └─────────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘核心监控指标
1. 节点指标
| 指标名称 | 说明 | 告警阈值 |
|---|---|---|
| rabbitmq_up | 节点状态 | 0 表示宕机 |
| rabbitmq_node_memory_used | 内存使用量 | > 80% 水位线 |
| rabbitmq_node_disk_free | 磁盘剩余空间 | < 1GB |
| rabbitmq_node_fd_used | 文件描述符使用 | > 80% |
| rabbitmq_node_sockets_used | Socket 使用数 | > 80% |
| rabbitmq_node_proc_used | Erlang 进程数 | > 80% |
2. 队列指标
| 指标名称 | 说明 | 告警阈值 |
|---|---|---|
| rabbitmq_queue_messages | 队列消息数 | 根据业务设定 |
| rabbitmq_queue_messages_ready | 待消费消息数 | 持续增长告警 |
| rabbitmq_queue_messages_unacked | 未确认消息数 | 过高告警 |
| rabbitmq_queue_consumers | 消费者数量 | 0 告警 |
| rabbitmq_queue_message_bytes | 消息字节数 | 根据容量设定 |
3. 连接指标
| 指标名称 | 说明 | 告警阈值 |
|---|---|---|
| rabbitmq_connections | 连接数 | 接近上限告警 |
| rabbitmq_channels | 通道数 | 接近上限告警 |
| rabbitmq_connection_closed_total | 连接关闭数 | 异常增长告警 |
4. 吞吐量指标
| 指标名称 | 说明 | 告警阈值 |
|---|---|---|
| rabbitmq_channel_messages_published_total | 发布消息数 | 下降告警 |
| rabbitmq_channel_messages_delivered_total | 投递消息数 | 下降告警 |
| rabbitmq_channel_messages_acked_total | 确认消息数 | 下降告警 |
PHP 代码示例
正确做法:自定义监控指标采集
php
<?php
namespace App\Messaging\Monitoring;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class MetricsCollector
{
private array $nodes;
private string $user;
private string $password;
public function __construct(array $nodes, string $user, string $password)
{
$this->nodes = $nodes;
$this->user = $user;
$this->password = $password;
}
public function collectAllMetrics(): array
{
$metrics = [
'timestamp' => time(),
'nodes' => [],
'queues' => [],
'overview' => [],
];
foreach ($this->nodes as $node) {
$metrics['nodes'][$node['host']] = $this->collectNodeMetrics($node);
}
$metrics['overview'] = $this->collectOverviewMetrics();
$metrics['queues'] = $this->collectQueueMetrics();
return $metrics;
}
private function collectNodeMetrics(array $node): array
{
$url = sprintf('http://%s:15672/api/nodes/%s', $node['host'], $node['name']);
$data = $this->httpGet($url);
return [
'name' => $data['name'] ?? '',
'running' => $data['running'] ?? false,
'memory_used' => $data['mem_used'] ?? 0,
'memory_limit' => $data['mem_limit'] ?? 0,
'memory_usage_percent' => $this->calculatePercentage(
$data['mem_used'] ?? 0,
$data['mem_limit'] ?? 1
),
'disk_free' => $data['disk_free'] ?? 0,
'disk_free_limit' => $data['disk_free_limit'] ?? 0,
'fd_used' => $data['fd_used'] ?? 0,
'fd_total' => $data['fd_total'] ?? 0,
'sockets_used' => $data['sockets_used'] ?? 0,
'sockets_total' => $data['sockets_total'] ?? 0,
'proc_used' => $data['proc_used'] ?? 0,
'proc_total' => $data['proc_total'] ?? 0,
'uptime' => $data['uptime'] ?? 0,
];
}
private function collectOverviewMetrics(): array
{
$url = sprintf('http://%s:15672/api/overview', $this->nodes[0]['host']);
$data = $this->httpGet($url);
return [
'queue_totals' => $data['queue_totals'] ?? [],
'message_stats' => $data['message_stats'] ?? [],
'object_totals' => [
'connections' => $data['object_totals']['connections'] ?? 0,
'channels' => $data['object_totals']['channels'] ?? 0,
'queues' => $data['object_totals']['queues'] ?? 0,
'consumers' => $data['object_totals']['consumers'] ?? 0,
],
'listeners' => $data['listeners'] ?? [],
'cluster_name' => $data['cluster_name'] ?? '',
];
}
private function collectQueueMetrics(): array
{
$url = sprintf('http://%s:15672/api/queues', $this->nodes[0]['host']);
$queues = $this->httpGet($url);
$metrics = [];
foreach ($queues as $queue) {
$metrics[$queue['name']] = [
'name' => $queue['name'],
'vhost' => $queue['vhost'],
'messages' => $queue['messages'] ?? 0,
'messages_ready' => $queue['messages_ready'] ?? 0,
'messages_unacked' => $queue['messages_unacked'] ?? 0,
'consumers' => $queue['consumers'] ?? 0,
'message_bytes' => $queue['message_bytes'] ?? 0,
'state' => $queue['state'] ?? 'unknown',
'type' => $queue['type'] ?? 'classic',
];
}
return $metrics;
}
private function calculatePercentage(int $used, int $total): float
{
if ($total === 0) {
return 0;
}
return round(($used / $total) * 100, 2);
}
private function httpGet(string $url): array
{
$ch = curl_init();
curl_setopt_array($ch, [
CURLOPT_URL => $url,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_USERPWD => $this->user . ':' . $this->password,
CURLOPT_TIMEOUT => 10,
]);
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
if ($httpCode !== 200) {
return [];
}
return json_decode($response, true) ?? [];
}
}告警规则配置
php
<?php
namespace App\Messaging\Monitoring;
class AlertRuleEngine
{
private MetricsCollector $collector;
private array $rules;
private array $alertChannels;
public function __construct(
MetricsCollector $collector,
array $rules,
array $alertChannels
) {
$this->collector = $collector;
$this->rules = $rules;
$this->alertChannels = $alertChannels;
}
public function evaluate(): array
{
$metrics = $this->collector->collectAllMetrics();
$alerts = [];
foreach ($this->rules as $rule) {
$result = $this->evaluateRule($rule, $metrics);
if ($result['triggered']) {
$alerts[] = $result;
$this->sendAlert($result);
}
}
return $alerts;
}
private function evaluateRule(array $rule, array $metrics): array
{
$value = $this->extractValue($rule['metric'], $metrics);
$triggered = false;
switch ($rule['operator']) {
case '>':
$triggered = $value > $rule['threshold'];
break;
case '<':
$triggered = $value < $rule['threshold'];
break;
case '==':
$triggered = $value == $rule['threshold'];
break;
case '!=':
$triggered = $value != $rule['threshold'];
break;
}
return [
'rule_name' => $rule['name'],
'metric' => $rule['metric'],
'value' => $value,
'threshold' => $rule['threshold'],
'operator' => $rule['operator'],
'severity' => $rule['severity'],
'triggered' => $triggered,
'message' => $this->buildAlertMessage($rule, $value),
'timestamp' => time(),
];
}
private function extractValue(string $metric, array $metrics)
{
$keys = explode('.', $metric);
$value = $metrics;
foreach ($keys as $key) {
if (!isset($value[$key])) {
return null;
}
$value = $value[$key];
}
return $value;
}
private function buildAlertMessage(array $rule, $value): string
{
return sprintf(
'Alert: %s - Current value: %s, Threshold: %s %s',
$rule['description'] ?? $rule['name'],
$value,
$rule['operator'],
$rule['threshold']
);
}
private function sendAlert(array $alert): void
{
foreach ($this->alertChannels as $channel) {
$channel->send($alert);
}
}
}
class AlertChannel
{
private string $type;
private array $config;
public function __construct(string $type, array $config)
{
$this->type = $type;
$this->config = $config;
}
public function send(array $alert): void
{
switch ($this->type) {
case 'email':
$this->sendEmail($alert);
break;
case 'slack':
$this->sendSlack($alert);
break;
case 'webhook':
$this->sendWebhook($alert);
break;
}
}
private function sendEmail(array $alert): void
{
$to = $this->config['to'];
$subject = sprintf('[%s] RabbitMQ Alert: %s', $alert['severity'], $alert['rule_name']);
$body = $alert['message'];
mail($to, $subject, $body);
}
private function sendSlack(array $alert): void
{
$webhook = $this->config['webhook_url'];
$color = match ($alert['severity']) {
'critical' => 'danger',
'warning' => 'warning',
default => 'good',
};
$payload = [
'attachments' => [
[
'color' => $color,
'title' => 'RabbitMQ Alert',
'fields' => [
['title' => 'Rule', 'value' => $alert['rule_name'], 'short' => true],
['title' => 'Severity', 'value' => $alert['severity'], 'short' => true],
['title' => 'Current Value', 'value' => $alert['value'], 'short' => true],
['title' => 'Threshold', 'value' => $alert['threshold'], 'short' => true],
['title' => 'Message', 'value' => $alert['message'], 'short' => false],
],
'timestamp' => $alert['timestamp'],
],
],
];
$ch = curl_init($webhook);
curl_setopt_array($ch, [
CURLOPT_POST => true,
CURLOPT_POSTFIELDS => json_encode($payload),
CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
CURLOPT_RETURNTRANSFER => true,
]);
curl_exec($ch);
curl_close($ch);
}
private function sendWebhook(array $alert): void
{
$ch = curl_init($this->config['url']);
curl_setopt_array($ch, [
CURLOPT_POST => true,
CURLOPT_POSTFIELDS => json_encode($alert),
CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
CURLOPT_RETURNTRANSFER => true,
]);
curl_exec($ch);
curl_close($ch);
}
}默认告警规则
php
<?php
return [
'rules' => [
[
'name' => 'node_down',
'metric' => 'nodes.*.running',
'operator' => '==',
'threshold' => false,
'severity' => 'critical',
'description' => 'RabbitMQ node is down',
],
[
'name' => 'memory_high',
'metric' => 'nodes.*.memory_usage_percent',
'operator' => '>',
'threshold' => 80,
'severity' => 'warning',
'description' => 'Memory usage is high',
],
[
'name' => 'memory_critical',
'metric' => 'nodes.*.memory_usage_percent',
'operator' => '>',
'threshold' => 90,
'severity' => 'critical',
'description' => 'Memory usage is critical',
],
[
'name' => 'disk_low',
'metric' => 'nodes.*.disk_free',
'operator' => '<',
'threshold' => 1073741824,
'severity' => 'critical',
'description' => 'Disk space is low (< 1GB)',
],
[
'name' => 'queue_no_consumer',
'metric' => 'queues.*.consumers',
'operator' => '==',
'threshold' => 0,
'severity' => 'warning',
'description' => 'Queue has no consumers',
],
[
'name' => 'queue_messages_high',
'metric' => 'queues.*.messages',
'operator' => '>',
'threshold' => 10000,
'severity' => 'warning',
'description' => 'Queue has too many messages',
],
[
'name' => 'fd_high',
'metric' => 'nodes.*.fd_used',
'operator' => '>',
'threshold' => 8000,
'severity' => 'warning',
'description' => 'File descriptor usage is high',
],
],
];Prometheus 配置
yaml
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
alerting:
alertmanagers:
- static_configs:
- targets:
- alertmanager:9093
rule_files:
- /etc/prometheus/rules/*.yml
scrape_configs:
- job_name: 'rabbitmq'
static_configs:
- targets:
- rabbitmq-node1:15692
- rabbitmq-node2:15692
- rabbitmq-node3:15692
metrics_path: /metricsPrometheus 告警规则
yaml
# rabbitmq_alerts.yml
groups:
- name: rabbitmq
rules:
- alert: RabbitMQNodeDown
expr: rabbitmq_up == 0
for: 1m
labels:
severity: critical
annotations:
summary: "RabbitMQ node is down"
description: "RabbitMQ node {{ $labels.instance }} has been down for more than 1 minute."
- alert: RabbitMQMemoryHigh
expr: (rabbitmq_node_memory_used / rabbitmq_node_memory_limit) * 100 > 80
for: 5m
labels:
severity: warning
annotations:
summary: "RabbitMQ memory usage is high"
description: "Memory usage on {{ $labels.instance }} is {{ $value }}%."
- alert: RabbitMQMemoryCritical
expr: (rabbitmq_node_memory_used / rabbitmq_node_memory_limit) * 100 > 90
for: 1m
labels:
severity: critical
annotations:
summary: "RabbitMQ memory usage is critical"
description: "Memory usage on {{ $labels.instance }} is {{ $value }}%."
- alert: RabbitMQDiskSpaceLow
expr: rabbitmq_node_disk_free < 1073741824
for: 1m
labels:
severity: critical
annotations:
summary: "RabbitMQ disk space is low"
description: "Disk space on {{ $labels.instance }} is {{ $value }} bytes."
- alert: RabbitMQQueueNoConsumer
expr: rabbitmq_queue_consumers == 0 and rabbitmq_queue_messages > 0
for: 5m
labels:
severity: warning
annotations:
summary: "Queue has no consumers"
description: "Queue {{ $labels.queue }} in vhost {{ $labels.vhost }} has {{ $value }} messages but no consumers."
- alert: RabbitMQQueueMessagesHigh
expr: rabbitmq_queue_messages > 10000
for: 10m
labels:
severity: warning
annotations:
summary: "Queue has too many messages"
description: "Queue {{ $labels.queue }} has {{ $value }} messages."
- alert: RabbitMQConnectionsHigh
expr: rabbitmq_connections > 8000
for: 5m
labels:
severity: warning
annotations:
summary: "High number of connections"
description: "RabbitMQ has {{ $value }} connections."
- alert: RabbitMQUnackedMessagesHigh
expr: sum(rabbitmq_queue_messages_unacked) > 1000
for: 5m
labels:
severity: warning
annotations:
summary: "High number of unacked messages"
description: "Total unacked messages is {{ $value }}."Grafana Dashboard JSON
json
{
"dashboard": {
"title": "RabbitMQ Monitoring",
"panels": [
{
"title": "Node Status",
"type": "stat",
"targets": [
{
"expr": "rabbitmq_up",
"legendFormat": "{{ instance }}"
}
]
},
{
"title": "Memory Usage",
"type": "gauge",
"targets": [
{
"expr": "(rabbitmq_node_memory_used / rabbitmq_node_memory_limit) * 100",
"legendFormat": "{{ instance }}"
}
],
"fieldConfig": {
"defaults": {
"max": 100,
"min": 0,
"unit": "percent",
"thresholds": {
"mode": "absolute",
"steps": [
{"color": "green", "value": null},
{"color": "yellow", "value": 70},
{"color": "red", "value": 85}
]
}
}
}
},
{
"title": "Queue Messages",
"type": "graph",
"targets": [
{
"expr": "sum(rabbitmq_queue_messages) by (queue)",
"legendFormat": "{{ queue }}"
}
]
},
{
"title": "Message Rate",
"type": "graph",
"targets": [
{
"expr": "rate(rabbitmq_channel_messages_published_total[1m])",
"legendFormat": "Published"
},
{
"expr": "rate(rabbitmq_channel_messages_delivered_total[1m])",
"legendFormat": "Delivered"
},
{
"expr": "rate(rabbitmq_channel_messages_acked_total[1m])",
"legendFormat": "Acked"
}
]
}
]
}
}最佳实践建议清单
监控配置
- [ ] 部署 Prometheus + Grafana
- [ ] 配置节点指标采集
- [ ] 配置队列指标采集
- [ ] 配置连接指标采集
- [ ] 配置吞吐量指标采集
告警配置
- [ ] 配置节点状态告警
- [ ] 配置资源使用告警
- [ ] 配置队列状态告警
- [ ] 配置消息积压告警
- [ ] 配置多渠道通知
可视化配置
- [ ] 创建节点概览仪表盘
- [ ] 创建队列详情仪表盘
- [ ] 创建消息流量仪表盘
- [ ] 创建告警历史仪表盘
运维配置
- [ ] 定期检查告警规则
- [ ] 定期优化仪表盘
- [ ] 定期清理历史数据
- [ ] 定期演练告警响应
生产环境注意事项
监控数据保留
- 设置合理的数据保留周期
- 配置数据降采样策略
- 定期清理过期数据
告警策略
- 避免告警风暴
- 配置告警分组
- 设置告警静默期
高可用监控
- 监控系统自身高可用
- 配置监控系统告警
- 准备备用监控方案
性能影响
- 控制采集频率
- 优化查询性能
- 合理配置资源
