Skip to content

RabbitMQ 管理界面监控

概述

RabbitMQ Management Plugin 提供了一个功能强大的 Web 管理界面,可以实时监控 RabbitMQ 的运行状态、管理队列、交换机、用户权限等。本文将详细介绍如何使用管理界面进行监控和日常运维操作。

核心知识点

管理界面功能概览

功能模块说明
Overview系统概览,显示关键指标
Connections连接管理,查看和管理客户端连接
Channels通道管理,查看和管理 AMQP 通道
Exchanges交换机管理,创建、删除、查看交换机
Queues队列管理,创建、删除、查看队列
Admin系统管理,用户、权限、策略配置

启用管理插件

bash
rabbitmq-plugins enable rabbitmq_management

默认访问地址:http://localhost:15672

默认用户名/密码:guest/guest(仅限本地访问)

界面布局说明

Overview 页面

Overview 页面显示以下关键信息:

  1. 消息统计

    • Ready:等待消费的消息数
    • Unacked:已投递未确认的消息数
    • Total:消息总数
  2. 消息速率

    • Publish:消息发布速率
    • Confirm:消息确认速率
    • Consume:消息消费速率
    • Ack:消费者确认速率
  3. 资源使用

    • Memory:内存使用情况
    • Disk Space:磁盘空间使用情况
    • File Descriptors:文件描述符使用情况
    • Sockets:Socket 使用情况
    • Processes:Erlang 进程使用情况

配置示例

配置管理界面访问

bash
rabbitmq-plugins enable rabbitmq_management

rabbitmqctl add_user admin admin123
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

配置文件设置

bash
management.listener.port = 15672
management.listener.ip = 0.0.0.0
management.listener.ssl = false

management.tcp.idle_timeout = 300000
management.tcp.inactivity_timeout = 300000
management.tcp.request_timeout = 300000

通过 PHP 访问管理界面 API

php
<?php

class RabbitMQManagement
{
    private $baseUrl;
    private $username;
    private $password;
    
    public function __construct($host = 'localhost', $port = 15672, $username = 'guest', $password = 'guest')
    {
        $this->baseUrl = "http://{$host}:{$port}";
        $this->username = $username;
        $this->password = $password;
    }
    
    private function request($method, $endpoint, $data = null)
    {
        $url = "{$this->baseUrl}/api/{$endpoint}";
        
        $ch = curl_init();
        $options = [
            CURLOPT_URL => $url,
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_USERPWD => "{$this->username}:{$this->password}",
            CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
            CURLOPT_TIMEOUT => 30,
            CURLOPT_CUSTOMREQUEST => $method,
        ];
        
        if ($data !== null) {
            $options[CURLOPT_POSTFIELDS] = json_encode($data);
        }
        
        curl_setopt_array($ch, $options);
        
        $response = curl_exec($ch);
        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        $error = curl_error($ch);
        curl_close($ch);
        
        if ($error) {
            throw new Exception("cURL Error: {$error}");
        }
        
        return [
            'code' => $httpCode,
            'data' => json_decode($response, true),
        ];
    }
    
    public function getOverview()
    {
        return $this->request('GET', 'overview');
    }
    
    public function getNodes()
    {
        return $this->request('GET', 'nodes');
    }
    
    public function getConnections()
    {
        return $this->request('GET', 'connections');
    }
    
    public function closeConnection($name)
    {
        return $this->request('DELETE', "connections/{$name}");
    }
    
    public function getChannels()
    {
        return $this->request('GET', 'channels');
    }
    
    public function getExchanges()
    {
        return $this->request('GET', 'exchanges');
    }
    
    public function getQueues()
    {
        return $this->request('GET', 'queues');
    }
    
    public function getQueue($vhost, $name)
    {
        return $this->request('GET', "queues/{$vhost}/{$name}");
    }
    
    public function purgeQueue($vhost, $name)
    {
        return $this->request('DELETE', "queues/{$vhost}/{$name}/contents");
    }
    
    public function deleteQueue($vhost, $name)
    {
        return $this->request('DELETE', "queues/{$vhost}/{$name}");
    }
    
    public function publishMessage($vhost, $exchange, $routingKey, $payload)
    {
        $data = [
            'properties' => [
                'delivery_mode' => 2,
                'content_type' => 'application/json',
            ],
            'routing_key' => $routingKey,
            'payload' => json_encode($payload),
            'payload_encoding' => 'string',
        ];
        
        return $this->request('POST', "exchanges/{$vhost}/{$exchange}/publish", $data);
    }
    
    public function getUsers()
    {
        return $this->request('GET', 'users');
    }
    
    public function createUser($username, $password, $tags = '')
    {
        $data = [
            'password' => $password,
            'tags' => $tags,
        ];
        
        return $this->request('PUT', "users/{$username}", $data);
    }
    
    public function deleteUser($username)
    {
        return $this->request('DELETE', "users/{$username}");
    }
    
    public function getVhosts()
    {
        return $this->request('GET', 'vhosts');
    }
    
    public function getPermissions()
    {
        return $this->request('GET', 'permissions');
    }
    
    public function setPermissions($vhost, $username, $configure, $write, $read)
    {
        $data = [
            'configure' => $configure,
            'write' => $write,
            'read' => $read,
        ];
        
        return $this->request('PUT', "permissions/{$vhost}/{$username}", $data);
    }
    
    public function getPolicies()
    {
        return $this->request('GET', 'policies');
    }
    
    public function setPolicy($vhost, $name, $pattern, $definition, $priority = 0)
    {
        $data = [
            'pattern' => $pattern,
            'definition' => $definition,
            'priority' => $priority,
        ];
        
        return $this->request('PUT', "policies/{$vhost}/{$name}", $data);
    }
    
    public function deletePolicy($vhost, $name)
    {
        return $this->request('DELETE', "policies/{$vhost}/{$name}");
    }
}

队列监控脚本

php
<?php

class QueueMonitor
{
    private $management;
    
    public function __construct(RabbitMQManagement $management)
    {
        $this->management = $management;
    }
    
    public function getQueueStats()
    {
        $response = $this->management->getQueues();
        
        if ($response['code'] !== 200) {
            throw new Exception("Failed to get queues");
        }
        
        $stats = [];
        foreach ($response['data'] as $queue) {
            $stats[] = [
                'name' => $queue['name'],
                'vhost' => $queue['vhost'],
                'messages' => $queue['messages'] ?? 0,
                'messages_ready' => $queue['messages_ready'] ?? 0,
                'messages_unacked' => $queue['messages_unacked'] ?? 0,
                'consumers' => $queue['consumers'] ?? 0,
                'memory' => $queue['memory'] ?? 0,
                'state' => $queue['state'] ?? 'unknown',
                'message_stats' => [
                    'publish' => $queue['message_stats']['publish_details']['rate'] ?? 0,
                    'consume' => $queue['message_stats']['consume_details']['rate'] ?? 0,
                    'ack' => $queue['message_stats']['ack_details']['rate'] ?? 0,
                ],
            ];
        }
        
        return $stats;
    }
    
    public function findProblematicQueues($thresholds = [])
    {
        $defaults = [
            'max_messages' => 10000,
            'min_consumers' => 1,
            'max_memory_mb' => 100,
        ];
        
        $thresholds = array_merge($defaults, $thresholds);
        $queues = $this->getQueueStats();
        $problems = [];
        
        foreach ($queues as $queue) {
            $issues = [];
            
            if ($queue['messages'] > $thresholds['max_messages']) {
                $issues[] = "消息堆积过多: {$queue['messages']} 条";
            }
            
            if ($queue['consumers'] < $thresholds['min_consumers']) {
                $issues[] = "消费者数量不足: {$queue['consumers']}";
            }
            
            $memoryMB = $queue['memory'] / 1024 / 1024;
            if ($memoryMB > $thresholds['max_memory_mb']) {
                $issues[] = "内存占用过高: " . round($memoryMB, 2) . " MB";
            }
            
            if ($queue['state'] !== 'running') {
                $issues[] = "队列状态异常: {$queue['state']}";
            }
            
            if (!empty($issues)) {
                $problems[] = [
                    'queue' => $queue['name'],
                    'vhost' => $queue['vhost'],
                    'issues' => $issues,
                ];
            }
        }
        
        return $problems;
    }
    
    public function getQueueMessageRates()
    {
        $queues = $this->getQueueStats();
        $rates = [];
        
        foreach ($queues as $queue) {
            $publishRate = $queue['message_stats']['publish'];
            $consumeRate = $queue['message_stats']['consume'];
            
            $lag = $publishRate - $consumeRate;
            
            $rates[] = [
                'name' => $queue['name'],
                'publish_rate' => $publishRate,
                'consume_rate' => $consumeRate,
                'lag' => $lag,
                'status' => $lag > 0 ? 'accumulating' : 'stable',
            ];
        }
        
        return $rates;
    }
}

实际应用场景

场景一:自动化运维脚本

php
<?php

class AutomatedOperations
{
    private $management;
    
    public function __construct(RabbitMQManagement $management)
    {
        $this->management = $management;
    }
    
    public function cleanupIdleQueues($maxAge = 3600)
    {
        $response = $this->management->getQueues();
        $cleaned = [];
        
        foreach ($response['data'] as $queue) {
            if ($queue['consumers'] === 0 && ($queue['messages'] ?? 0) === 0) {
                $idleSince = $queue['idle_since'] ?? null;
                
                if ($idleSince) {
                    $idleTime = strtotime($idleSince);
                    if (time() - $idleTime > $maxAge) {
                        $this->management->deleteQueue($queue['vhost'], $queue['name']);
                        $cleaned[] = $queue['name'];
                    }
                }
            }
        }
        
        return $cleaned;
    }
    
    public function balanceConsumers()
    {
        $response = $this->management->getQueues();
        $report = [];
        
        foreach ($response['data'] as $queue) {
            $messages = $queue['messages'] ?? 0;
            $consumers = $queue['consumers'] ?? 0;
            
            if ($messages > 0 && $consumers === 0) {
                $report[] = [
                    'queue' => $queue['name'],
                    'issue' => 'no_consumers',
                    'messages' => $messages,
                ];
            } elseif ($messages > 1000 && $consumers < 2) {
                $report[] = [
                    'queue' => $queue['name'],
                    'issue' => 'insufficient_consumers',
                    'messages' => $messages,
                    'consumers' => $consumers,
                ];
            }
        }
        
        return $report;
    }
    
    public function generateReport()
    {
        $overview = $this->management->getOverview();
        $queues = $this->management->getQueues();
        $connections = $this->management->getConnections();
        
        $report = [
            'timestamp' => date('Y-m-d H:i:s'),
            'cluster_name' => $overview['data']['cluster_name'] ?? 'unknown',
            'rabbitmq_version' => $overview['data']['rabbitmq_version'] ?? 'unknown',
            'statistics' => [
                'total_queues' => count($queues['data']),
                'total_connections' => count($connections['data']),
                'total_messages' => 0,
                'total_consumers' => 0,
            ],
            'queues' => [],
        ];
        
        foreach ($queues['data'] as $queue) {
            $report['statistics']['total_messages'] += $queue['messages'] ?? 0;
            $report['statistics']['total_consumers'] += $queue['consumers'] ?? 0;
            
            if ($queue['messages'] > 0) {
                $report['queues'][] = [
                    'name' => $queue['name'],
                    'messages' => $queue['messages'],
                    'consumers' => $queue['consumers'],
                ];
            }
        }
        
        return $report;
    }
}

场景二:连接监控与管理

php
<?php

class ConnectionMonitor
{
    private $management;
    
    public function __construct(RabbitMQManagement $management)
    {
        $this->management = $management;
    }
    
    public function getConnectionStats()
    {
        $response = $this->management->getConnections();
        
        if ($response['code'] !== 200) {
            throw new Exception("Failed to get connections");
        }
        
        $stats = [
            'total' => count($response['data']),
            'by_client' => [],
            'by_host' => [],
            'idle_connections' => [],
        ];
        
        foreach ($response['data'] as $conn) {
            $client = $conn['client_properties']['product'] ?? 'unknown';
            $host = $conn['peer_host'] ?? 'unknown';
            
            $stats['by_client'][$client] = ($stats['by_client'][$client] ?? 0) + 1;
            $stats['by_host'][$host] = ($stats['by_host'][$host] ?? 0) + 1;
            
            if ($conn['channels'] === 0) {
                $stats['idle_connections'][] = [
                    'name' => $conn['name'],
                    'host' => $host,
                    'client' => $client,
                    'connected_at' => $conn['connected_at'] ?? null,
                ];
            }
        }
        
        return $stats;
    }
    
    public function closeIdleConnections($maxIdleTime = 3600)
    {
        $stats = $this->getConnectionStats();
        $closed = [];
        
        foreach ($stats['idle_connections'] as $conn) {
            if ($conn['connected_at']) {
                $connectedTime = strtotime($conn['connected_at']);
                if (time() - $connectedTime > $maxIdleTime) {
                    $this->management->closeConnection($conn['name']);
                    $closed[] = $conn['name'];
                }
            }
        }
        
        return $closed;
    }
    
    public function findProblematicConnections()
    {
        $response = $this->management->getConnections();
        $problems = [];
        
        foreach ($response['data'] as $conn) {
            $issues = [];
            
            if (($conn['channels'] ?? 0) > 100) {
                $issues[] = "通道数量过多: {$conn['channels']}";
            }
            
            if (($conn['recv_cnt'] ?? 0) > 10000000) {
                $issues[] = "接收消息过多: {$conn['recv_cnt']}";
            }
            
            if (($conn['send_cnt'] ?? 0) > 10000000) {
                $issues[] = "发送消息过多: {$conn['send_cnt']}";
            }
            
            if (!empty($issues)) {
                $problems[] = [
                    'name' => $conn['name'],
                    'peer_host' => $conn['peer_host'] ?? 'unknown',
                    'client' => $conn['client_properties']['product'] ?? 'unknown',
                    'issues' => $issues,
                ];
            }
        }
        
        return $problems;
    }
}

常见问题与解决方案

问题一:无法访问管理界面

现象:浏览器无法打开管理界面。

解决方案

bash
rabbitmq-plugins list | grep management
rabbitmq-plugins enable rabbitmq_management

rabbitmqctl status | grep management

检查防火墙设置:

bash
firewall-cmd --add-port=15672/tcp --permanent
firewall-cmd --reload

问题二:guest 用户远程无法登录

现象:使用 guest/guest 远程登录失败。

原因:默认配置只允许本地登录。

解决方案

bash
rabbitmqctl add_user admin admin123
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

或修改配置允许远程 guest(不推荐生产环境):

bash
loopback_users = none

问题三:管理界面加载缓慢

现象:管理界面响应很慢。

原因:队列或连接数量过多,统计计算耗时。

解决方案

bash
management.rates_mode = basic
management.sample_retention_policies.max_seconds = 3600

最佳实践

1. 安全配置

bash
management.listener.ip = 127.0.0.1
management.listener.ssl = true
management.listener.ssl_opts.cacertfile = /path/to/cacert.pem
management.listener.ssl_opts.certfile = /path/to/cert.pem
management.listener.ssl_opts.keyfile = /path/to/key.pem

2. 性能优化

bash
management.http_log_dir = /var/log/rabbitmq/management
management.rates_mode = basic
management.collect_statistics = coarse

3. 定期清理脚本

php
<?php

function cleanupRabbitMQ()
{
    $management = new RabbitMQManagement('localhost', 15672, 'admin', 'admin123');
    $ops = new AutomatedOperations($management);
    
    $cleanedQueues = $ops->cleanupIdleQueues(7200);
    echo "清理空闲队列: " . implode(', ', $cleanedQueues) . "\n";
    
    $report = $ops->generateReport();
    file_put_contents('/var/log/rabbitmq/daily_report.json', json_encode($report, JSON_PRETTY_PRINT));
    echo "日报已生成\n";
}

cleanupRabbitMQ();

相关链接