Skip to content

RabbitMQ 自定义监控脚本

概述

除了使用现成的监控工具,自定义监控脚本可以更灵活地满足特定业务需求。本文将介绍如何使用 PHP 和 Shell 脚本编写 RabbitMQ 监控脚本,包括健康检查、指标采集、告警通知等功能。

核心知识点

监控脚本类型

类型说明适用场景
健康检查脚本检查服务是否正常定时任务、负载均衡
指标采集脚本收集监控数据监控系统集成
告警脚本发送告警通知异常处理
运维脚本执行运维操作自动化运维

脚本设计原则

  1. 幂等性:多次执行结果一致
  2. 容错性:异常情况能够优雅处理
  3. 可配置:参数可配置,便于不同环境使用
  4. 日志记录:记录执行日志便于排查问题

配置示例

基础监控类

php
<?php

class RabbitMQMonitor
{
    private $config;
    private $logger;
    
    public function __construct(array $config = [])
    {
        $this->config = array_merge([
            'host' => 'localhost',
            'port' => 15672,
            'user' => 'guest',
            'password' => 'guest',
            'timeout' => 10,
        ], $config);
        
        $this->logger = new Logger('/var/log/rabbitmq/monitor.log');
    }
    
    private function request($endpoint, $method = 'GET', $data = null)
    {
        $url = "http://{$this->config['host']}:{$this->config['port']}/api/{$endpoint}";
        
        $ch = curl_init();
        $options = [
            CURLOPT_URL => $url,
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_USERPWD => "{$this->config['user']}:{$this->config['password']}",
            CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
            CURLOPT_TIMEOUT => $this->config['timeout'],
            CURLOPT_CUSTOMREQUEST => $method,
        ];
        
        if ($data !== null) {
            $options[CURLOPT_POSTFIELDS] = json_encode($data);
        }
        
        curl_setopt_array($ch, $options);
        
        $response = curl_exec($ch);
        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        $error = curl_error($ch);
        curl_close($ch);
        
        if ($error) {
            throw new Exception("cURL Error: {$error}");
        }
        
        return [
            'code' => $httpCode,
            'data' => json_decode($response, true),
        ];
    }
    
    public function healthCheck()
    {
        try {
            $response = $this->request('overview');
            
            if ($response['code'] !== 200) {
                return [
                    'status' => 'unhealthy',
                    'message' => "API returned HTTP {$response['code']}",
                ];
            }
            
            $data = $response['data'];
            
            $checks = [
                'server_running' => true,
                'memory_alarm' => !($data['node']['mem_alarm'] ?? false),
                'disk_alarm' => !($data['node']['disk_free_alarm'] ?? false),
            ];
            
            $allHealthy = !in_array(false, $checks, true);
            
            return [
                'status' => $allHealthy ? 'healthy' : 'degraded',
                'checks' => $checks,
                'version' => $data['rabbitmq_version'] ?? 'unknown',
                'cluster_name' => $data['cluster_name'] ?? 'unknown',
            ];
        } catch (Exception $e) {
            return [
                'status' => 'unhealthy',
                'message' => $e->getMessage(),
            ];
        }
    }
    
    public function getQueueMetrics($queueName = null, $vhost = '/')
    {
        if ($queueName) {
            $vhostEncoded = urlencode($vhost);
            $response = $this->request("queues/{$vhostEncoded}/{$queueName}");
            
            if ($response['code'] !== 200) {
                return null;
            }
            
            return $this->parseQueueData($response['data']);
        }
        
        $response = $this->request('queues');
        
        if ($response['code'] !== 200) {
            return [];
        }
        
        $metrics = [];
        foreach ($response['data'] as $queue) {
            $metrics[] = $this->parseQueueData($queue);
        }
        
        return $metrics;
    }
    
    private function parseQueueData($queue)
    {
        return [
            'name' => $queue['name'],
            'vhost' => $queue['vhost'],
            'messages' => $queue['messages'] ?? 0,
            'messages_ready' => $queue['messages_ready'] ?? 0,
            'messages_unacked' => $queue['messages_unacked'] ?? 0,
            'consumers' => $queue['consumers'] ?? 0,
            'memory' => $queue['memory'] ?? 0,
            'state' => $queue['state'] ?? 'unknown',
            'publish_rate' => $queue['message_stats']['publish_details']['rate'] ?? 0,
            'consume_rate' => $queue['message_stats']['consume_details']['rate'] ?? 0,
            'ack_rate' => $queue['message_stats']['ack_details']['rate'] ?? 0,
        ];
    }
    
    public function getNodeMetrics()
    {
        $response = $this->request('nodes');
        
        if ($response['code'] !== 200) {
            return [];
        }
        
        $metrics = [];
        foreach ($response['data'] as $node) {
            $metrics[] = [
                'name' => $node['name'],
                'type' => $node['type'],
                'running' => $node['running'] ?? false,
                'uptime' => $node['uptime'] ?? 0,
                'mem_used' => $node['mem_used'] ?? 0,
                'mem_limit' => $node['mem_limit'] ?? 0,
                'mem_alarm' => $node['mem_alarm'] ?? false,
                'disk_free' => $node['disk_free'] ?? 0,
                'disk_alarm' => $node['disk_free_alarm'] ?? false,
                'fd_used' => $node['fd_used'] ?? 0,
                'fd_total' => $node['fd_total'] ?? 0,
                'sockets_used' => $node['sockets_used'] ?? 0,
                'sockets_total' => $node['sockets_total'] ?? 0,
                'proc_used' => $node['proc_used'] ?? 0,
                'proc_total' => $node['proc_total'] ?? 0,
            ];
        }
        
        return $metrics;
    }
    
    public function getConnectionMetrics()
    {
        $response = $this->request('connections');
        
        if ($response['code'] !== 200) {
            return [];
        }
        
        $metrics = [
            'total' => count($response['data']),
            'by_client' => [],
            'by_host' => [],
            'idle' => 0,
        ];
        
        foreach ($response['data'] as $conn) {
            $client = $conn['client_properties']['product'] ?? 'unknown';
            $host = $conn['peer_host'] ?? 'unknown';
            
            $metrics['by_client'][$client] = ($metrics['by_client'][$client] ?? 0) + 1;
            $metrics['by_host'][$host] = ($metrics['by_host'][$host] ?? 0) + 1;
            
            if (($conn['channels'] ?? 0) === 0) {
                $metrics['idle']++;
            }
        }
        
        return $metrics;
    }
}

class Logger
{
    private $file;
    
    public function __construct($file)
    {
        $this->file = $file;
        $dir = dirname($file);
        if (!is_dir($dir)) {
            mkdir($dir, 0755, true);
        }
    }
    
    public function log($level, $message, array $context = [])
    {
        $entry = [
            'timestamp' => date('Y-m-d H:i:s'),
            'level' => $level,
            'message' => $message,
            'context' => $context,
        ];
        
        file_put_contents($this->file, json_encode($entry) . "\n", FILE_APPEND);
    }
    
    public function info($message, array $context = [])
    {
        $this->log('INFO', $message, $context);
    }
    
    public function warning($message, array $context = [])
    {
        $this->log('WARNING', $message, $context);
    }
    
    public function error($message, array $context = [])
    {
        $this->log('ERROR', $message, $context);
    }
}

告警检查脚本

php
<?php

class AlertChecker
{
    private $monitor;
    private $logger;
    private $notifier;
    private $thresholds;
    
    public function __construct(RabbitMQMonitor $monitor, Logger $logger, $notifier = null)
    {
        $this->monitor = $monitor;
        $this->logger = $logger;
        $this->notifier = $notifier;
        
        $this->thresholds = [
            'memory_usage_percent' => ['warning' => 70, 'critical' => 85],
            'disk_free_gb' => ['warning' => 10, 'critical' => 5],
            'queue_messages' => ['warning' => 50000, 'critical' => 100000],
            'connections' => ['warning' => 800, 'critical' => 950],
            'no_consumer_messages' => ['warning' => 1000, 'critical' => 5000],
        ];
    }
    
    public function checkAll()
    {
        $alerts = [];
        
        $alerts = array_merge($alerts, $this->checkHealth());
        $alerts = array_merge($alerts, $this->checkNodes());
        $alerts = array_merge($alerts, $this->checkQueues());
        $alerts = array_merge($alerts, $this->checkConnections());
        
        foreach ($alerts as $alert) {
            $this->logger->warning("Alert: {$alert['message']}", $alert);
            
            if ($this->notifier) {
                $this->notifier->send($alert);
            }
        }
        
        return $alerts;
    }
    
    private function checkHealth()
    {
        $alerts = [];
        $health = $this->monitor->healthCheck();
        
        if ($health['status'] === 'unhealthy') {
            $alerts[] = [
                'level' => 'critical',
                'type' => 'health',
                'message' => "RabbitMQ 服务异常: " . ($health['message'] ?? 'unknown'),
                'timestamp' => date('Y-m-d H:i:s'),
            ];
        } elseif ($health['status'] === 'degraded') {
            $alerts[] = [
                'level' => 'warning',
                'type' => 'health',
                'message' => 'RabbitMQ 服务降级运行',
                'details' => $health['checks'],
                'timestamp' => date('Y-m-d H:i:s'),
            ];
        }
        
        return $alerts;
    }
    
    private function checkNodes()
    {
        $alerts = [];
        $nodes = $this->monitor->getNodeMetrics();
        
        foreach ($nodes as $node) {
            if (!$node['running']) {
                $alerts[] = [
                    'level' => 'critical',
                    'type' => 'node',
                    'node' => $node['name'],
                    'message' => "节点 {$node['name']} 未运行",
                    'timestamp' => date('Y-m-d H:i:s'),
                ];
                continue;
            }
            
            $memoryUsagePercent = ($node['mem_used'] / $node['mem_limit']) * 100;
            if ($memoryUsagePercent > $this->thresholds['memory_usage_percent']['critical']) {
                $alerts[] = [
                    'level' => 'critical',
                    'type' => 'memory',
                    'node' => $node['name'],
                    'message' => "节点 {$node['name']} 内存使用率过高: " . round($memoryUsagePercent, 2) . "%",
                    'value' => $memoryUsagePercent,
                    'timestamp' => date('Y-m-d H:i:s'),
                ];
            } elseif ($memoryUsagePercent > $this->thresholds['memory_usage_percent']['warning']) {
                $alerts[] = [
                    'level' => 'warning',
                    'type' => 'memory',
                    'node' => $node['name'],
                    'message' => "节点 {$node['name']} 内存使用率较高: " . round($memoryUsagePercent, 2) . "%",
                    'value' => $memoryUsagePercent,
                    'timestamp' => date('Y-m-d H:i:s'),
                ];
            }
            
            if ($node['mem_alarm']) {
                $alerts[] = [
                    'level' => 'critical',
                    'type' => 'memory_alarm',
                    'node' => $node['name'],
                    'message' => "节点 {$node['name']} 内存告警已触发",
                    'timestamp' => date('Y-m-d H:i:s'),
                ];
            }
            
            $diskFreeGB = $node['disk_free'] / 1024 / 1024 / 1024;
            if ($diskFreeGB < $this->thresholds['disk_free_gb']['critical']) {
                $alerts[] = [
                    'level' => 'critical',
                    'type' => 'disk',
                    'node' => $node['name'],
                    'message' => "节点 {$node['name']} 磁盘空间不足: " . round($diskFreeGB, 2) . " GB",
                    'value' => $diskFreeGB,
                    'timestamp' => date('Y-m-d H:i:s'),
                ];
            } elseif ($diskFreeGB < $this->thresholds['disk_free_gb']['warning']) {
                $alerts[] = [
                    'level' => 'warning',
                    'type' => 'disk',
                    'node' => $node['name'],
                    'message' => "节点 {$node['name']} 磁盘空间较低: " . round($diskFreeGB, 2) . " GB",
                    'value' => $diskFreeGB,
                    'timestamp' => date('Y-m-d H:i:s'),
                ];
            }
            
            if ($node['disk_alarm']) {
                $alerts[] = [
                    'level' => 'critical',
                    'type' => 'disk_alarm',
                    'node' => $node['name'],
                    'message' => "节点 {$node['name']} 磁盘告警已触发",
                    'timestamp' => date('Y-m-d H:i:s'),
                ];
            }
        }
        
        return $alerts;
    }
    
    private function checkQueues()
    {
        $alerts = [];
        $queues = $this->monitor->getQueueMetrics();
        
        foreach ($queues as $queue) {
            if ($queue['messages'] > $this->thresholds['queue_messages']['critical']) {
                $alerts[] = [
                    'level' => 'critical',
                    'type' => 'queue_backlog',
                    'queue' => $queue['name'],
                    'vhost' => $queue['vhost'],
                    'message' => "队列 {$queue['name']} 消息堆积严重: {$queue['messages']} 条",
                    'value' => $queue['messages'],
                    'timestamp' => date('Y-m-d H:i:s'),
                ];
            } elseif ($queue['messages'] > $this->thresholds['queue_messages']['warning']) {
                $alerts[] = [
                    'level' => 'warning',
                    'type' => 'queue_backlog',
                    'queue' => $queue['name'],
                    'vhost' => $queue['vhost'],
                    'message' => "队列 {$queue['name']} 消息堆积: {$queue['messages']} 条",
                    'value' => $queue['messages'],
                    'timestamp' => date('Y-m-d H:i:s'),
                ];
            }
            
            if ($queue['consumers'] === 0 && $queue['messages'] > $this->thresholds['no_consumer_messages']['warning']) {
                $alerts[] = [
                    'level' => 'warning',
                    'type' => 'no_consumer',
                    'queue' => $queue['name'],
                    'vhost' => $queue['vhost'],
                    'message' => "队列 {$queue['name']} 无消费者但有 {$queue['messages']} 条消息",
                    'value' => $queue['messages'],
                    'timestamp' => date('Y-m-d H:i:s'),
                ];
            }
            
            if ($queue['state'] !== 'running') {
                $alerts[] = [
                    'level' => 'warning',
                    'type' => 'queue_state',
                    'queue' => $queue['name'],
                    'vhost' => $queue['vhost'],
                    'message' => "队列 {$queue['name']} 状态异常: {$queue['state']}",
                    'timestamp' => date('Y-m-d H:i:s'),
                ];
            }
        }
        
        return $alerts;
    }
    
    private function checkConnections()
    {
        $alerts = [];
        $connections = $this->monitor->getConnectionMetrics();
        
        if ($connections['total'] > $this->thresholds['connections']['critical']) {
            $alerts[] = [
                'level' => 'critical',
                'type' => 'connections',
                'message' => "连接数过多: {$connections['total']}",
                'value' => $connections['total'],
                'timestamp' => date('Y-m-d H:i:s'),
            ];
        } elseif ($connections['total'] > $this->thresholds['connections']['warning']) {
            $alerts[] = [
                'level' => 'warning',
                'type' => 'connections',
                'message' => "连接数较高: {$connections['total']}",
                'value' => $connections['total'],
                'timestamp' => date('Y-m-d H:i:s'),
            ];
        }
        
        if ($connections['idle'] > 100) {
            $alerts[] = [
                'level' => 'warning',
                'type' => 'idle_connections',
                'message' => "空闲连接过多: {$connections['idle']}",
                'value' => $connections['idle'],
                'timestamp' => date('Y-m-d H:i:s'),
            ];
        }
        
        return $alerts;
    }
}

通知发送类

php
<?php

class Notifier
{
    private $config;
    private $logger;
    
    public function __construct(array $config, Logger $logger)
    {
        $this->config = $config;
        $this->logger = $logger;
    }
    
    public function send(array $alert)
    {
        $level = $alert['level'];
        
        if (isset($this->config['email'][$level])) {
            $this->sendEmail($alert, $this->config['email'][$level]);
        }
        
        if (isset($this->config['slack'][$level])) {
            $this->sendSlack($alert, $this->config['slack'][$level]);
        }
        
        if (isset($this->config['webhook'][$level])) {
            $this->sendWebhook($alert, $this->config['webhook'][$level]);
        }
    }
    
    private function sendEmail(array $alert, array $recipients)
    {
        $subject = "[{$alert['level']}] RabbitMQ Alert: {$alert['type']}";
        $body = $this->formatAlertEmail($alert);
        
        $headers = [
            'From: ' . ($this->config['from_email'] ?? 'alerts@example.com'),
            'Content-Type: text/plain; charset=UTF-8',
        ];
        
        foreach ($recipients as $to) {
            mail($to, $subject, $body, implode("\r\n", $headers));
        }
        
        $this->logger->info("Email sent", ['recipients' => $recipients, 'alert' => $alert]);
    }
    
    private function sendSlack(array $alert, $webhookUrl)
    {
        $color = match($alert['level']) {
            'critical' => 'danger',
            'warning' => 'warning',
            default => 'good',
        };
        
        $payload = [
            'attachments' => [
                [
                    'color' => $color,
                    'title' => "RabbitMQ Alert: {$alert['type']}",
                    'text' => $alert['message'],
                    'fields' => [
                        ['title' => 'Level', 'value' => $alert['level'], 'short' => true],
                        ['title' => 'Time', 'value' => $alert['timestamp'], 'short' => true],
                    ],
                ],
            ],
        ];
        
        $ch = curl_init($webhookUrl);
        curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($payload));
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
        curl_exec($ch);
        curl_close($ch);
        
        $this->logger->info("Slack notification sent", ['alert' => $alert]);
    }
    
    private function sendWebhook(array $alert, $webhookUrl)
    {
        $ch = curl_init($webhookUrl);
        curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($alert));
        curl_setopt($ch, CURLOPT_HTTPHEADER, ['Content-Type: application/json']);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
        curl_exec($ch);
        curl_close($ch);
        
        $this->logger->info("Webhook notification sent", ['alert' => $alert]);
    }
    
    private function formatAlertEmail(array $alert)
    {
        $body = "RabbitMQ Alert Notification\n";
        $body .= str_repeat("=", 50) . "\n\n";
        $body .= "Level: {$alert['level']}\n";
        $body .= "Type: {$alert['type']}\n";
        $body .= "Message: {$alert['message']}\n";
        $body .= "Time: {$alert['timestamp']}\n";
        
        if (isset($alert['node'])) {
            $body .= "Node: {$alert['node']}\n";
        }
        if (isset($alert['queue'])) {
            $body .= "Queue: {$alert['queue']}\n";
        }
        if (isset($alert['value'])) {
            $body .= "Value: {$alert['value']}\n";
        }
        
        return $body;
    }
}

实际应用场景

场景一:定时监控脚本

php
#!/usr/bin/env php
<?php

require_once __DIR__ . '/RabbitMQMonitor.php';
require_once __DIR__ . '/AlertChecker.php';
require_once __DIR__ . '/Notifier.php';
require_once __DIR__ . '/Logger.php';

$config = [
    'rabbitmq' => [
        'host' => getenv('RABBITMQ_HOST') ?: 'localhost',
        'port' => getenv('RABBITMQ_PORT') ?: 15672,
        'user' => getenv('RABBITMQ_USER') ?: 'admin',
        'password' => getenv('RABBITMQ_PASSWORD') ?: 'admin123',
    ],
    'notifications' => [
        'from_email' => 'alerts@example.com',
        'email' => [
            'critical' => ['ops-critical@example.com'],
            'warning' => ['ops@example.com'],
        ],
        'slack' => [
            'critical' => getenv('SLACK_WEBHOOK_CRITICAL'),
            'warning' => getenv('SLACK_WEBHOOK_WARNING'),
        ],
    ],
];

$logger = new Logger('/var/log/rabbitmq/monitor.log');
$monitor = new RabbitMQMonitor($config['rabbitmq']);
$notifier = new Notifier($config['notifications'], $logger);
$checker = new AlertChecker($monitor, $logger, $notifier);

$alerts = $checker->checkAll();

echo "检查完成,发现 " . count($alerts) . " 个告警\n";

foreach ($alerts as $alert) {
    echo "[{$alert['level']}] {$alert['message']}\n";
}

exit(count($alerts) > 0 ? 1 : 0);

场景二:Shell 监控脚本

bash
#!/bin/bash
/opt/rabbitmq/scripts/monitor.sh

RABBITMQ_HOST="${RABBITMQ_HOST:-localhost}"
RABBITMQ_PORT="${RABBITMQ_PORT:-15672}"
RABBITMQ_USER="${RABBITMQ_USER:-admin}"
RABBITMQ_PASS="${RABBITMQ_PASS:-admin123}"

API_URL="http://${RABBITMQ_HOST}:${RABBITMQ_PORT}/api"

LOG_FILE="/var/log/rabbitmq/monitor.log"
ALERT_FILE="/var/log/rabbitmq/alerts.log"

log() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" >> "$LOG_FILE"
}

alert() {
    local level="$1"
    local message="$2"
    echo "[${level}] $(date '+%Y-%m-%d %H:%M:%S') - ${message}" >> "$ALERT_FILE"
    log "ALERT [${level}]: ${message}"
}

check_rabbitmq_status() {
    local response
    response=$(curl -s -o /dev/null -w "%{http_code}" -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" "${API_URL}/overview")
    
    if [ "$response" != "200" ]; then
        alert "CRITICAL" "RabbitMQ API 返回 HTTP ${response}"
        return 1
    fi
    
    log "RabbitMQ status OK"
    return 0
}

check_memory() {
    local mem_used mem_limit usage
    mem_used=$(curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" "${API_URL}/nodes" | python3 -c "import sys,json; print(json.load(sys.stdin)[0]['mem_used'])")
    mem_limit=$(curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" "${API_URL}/nodes" | python3 -c "import sys,json; print(json.load(sys.stdin)[0]['mem_limit'])")
    
    usage=$(echo "scale=2; ${mem_used} * 100 / ${mem_limit}" | bc)
    
    if (( $(echo "${usage} > 85" | bc -l) )); then
        alert "CRITICAL" "内存使用率过高: ${usage}%"
    elif (( $(echo "${usage} > 70" | bc -l) )); then
        alert "WARNING" "内存使用率较高: ${usage}%"
    else
        log "内存使用率正常: ${usage}%"
    fi
}

check_disk() {
    local disk_free
    disk_free=$(curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" "${API_URL}/nodes" | python3 -c "import sys,json; print(json.load(sys.stdin)[0]['disk_free'])")
    
    local disk_gb=$((disk_free / 1024 / 1024 / 1024))
    
    if [ "$disk_gb" -lt 5 ]; then
        alert "CRITICAL" "磁盘空间不足: ${disk_gb}GB"
    elif [ "$disk_gb" -lt 10 ]; then
        alert "WARNING" "磁盘空间较低: ${disk_gb}GB"
    else
        log "磁盘空间正常: ${disk_gb}GB"
    fi
}

check_queues() {
    local queues_json
    queues_json=$(curl -s -u "${RABBITMQ_USER}:${RABBITMQ_PASS}" "${API_URL}/queues")
    
    echo "$queues_json" | python3 -c "
import sys, json
queues = json.load(sys.stdin)
for q in queues:
    messages = q.get('messages', 0)
    consumers = q.get('consumers', 0)
    name = q.get('name', 'unknown')
    
    if messages > 100000:
        print(f'CRITICAL|{name}|消息堆积严重: {messages}条')
    elif messages > 50000:
        print(f'WARNING|{name}|消息堆积: {messages}条')
    
    if consumers == 0 and messages > 1000:
        print(f'WARNING|{name}|无消费者但有 {messages} 条消息')
" | while IFS='|' read -r level queue_name message; do
        alert "$level" "队列 ${queue_name}: ${message}"
    done
}

main() {
    log "开始监控检查"
    
    check_rabbitmq_status
    check_memory
    check_disk
    check_queues
    
    log "监控检查完成"
}

main "$@"

场景三:健康检查 API

php
<?php

require_once __DIR__ . '/RabbitMQMonitor.php';

header('Content-Type: application/json');

$config = [
    'host' => getenv('RABBITMQ_HOST') ?: 'localhost',
    'port' => getenv('RABBITMQ_PORT') ?: 15672,
    'user' => getenv('RABBITMQ_USER') ?: 'admin',
    'password' => getenv('RABBITMQ_PASSWORD') ?: 'admin123',
];

$monitor = new RabbitMQMonitor($config);
$health = $monitor->healthCheck();

http_response_code($health['status'] === 'healthy' ? 200 : 503);

echo json_encode($health, JSON_PRETTY_PRINT);

常见问题与解决方案

问题一:脚本执行超时

现象:脚本执行时间过长。

解决方案

php
set_time_limit(30);
$monitor = new RabbitMQMonitor(['timeout' => 10]);

问题二:日志文件过大

现象:日志文件占用过多磁盘空间。

解决方案

bash
logrotate /etc/logrotate.d/rabbitmq-monitor
/var/log/rabbitmq/*.log {
    daily
    rotate 7
    compress
    missingok
    notifempty
}

问题三:告警风暴

现象:同一告警重复发送多次。

解决方案

php
class AlertDeduplicator
{
    private $cacheFile = '/tmp/rabbitmq_alerts_cache.json';
    private $cooldownPeriod = 3600;
    
    public function shouldSend(array $alert)
    {
        $cache = $this->loadCache();
        $key = md5($alert['type'] . ($alert['queue'] ?? '') . ($alert['node'] ?? ''));
        
        if (isset($cache[$key])) {
            if (time() - $cache[$key] < $this->cooldownPeriod) {
                return false;
            }
        }
        
        $cache[$key] = time();
        $this->saveCache($cache);
        
        return true;
    }
    
    private function loadCache()
    {
        if (file_exists($this->cacheFile)) {
            return json_decode(file_get_contents($this->cacheFile), true) ?: [];
        }
        return [];
    }
    
    private function saveCache(array $cache)
    {
        file_put_contents($this->cacheFile, json_encode($cache));
    }
}

最佳实践

1. 脚本部署结构

/opt/rabbitmq/
├── scripts/
│   ├── monitor.php
│   ├── health_check.php
│   └── alert_check.sh
├── lib/
│   ├── RabbitMQMonitor.php
│   ├── AlertChecker.php
│   └── Notifier.php
├── config/
│   └── config.php
└── logs/
    ├── monitor.log
    └── alerts.log

2. 定时任务配置

cron
*/1 * * * * /opt/rabbitmq/scripts/monitor.php >> /var/log/rabbitmq/cron.log 2>&1
*/5 * * * * /opt/rabbitmq/scripts/alert_check.sh >> /var/log/rabbitmq/cron.log 2>&1

3. 监控指标输出格式

json
{
    "timestamp": "2024-01-15T10:30:00Z",
    "status": "healthy",
    "metrics": {
        "connections": 150,
        "queues": 25,
        "messages": 1234,
        "memory_usage_percent": 45.5
    },
    "alerts": []
}

相关链接