Appearance
RabbitMQ 管理界面监控
概述
RabbitMQ Management Plugin 提供了一个功能强大的 Web 管理界面,可以实时监控 RabbitMQ 的运行状态、管理队列、交换机、用户权限等。本文将详细介绍如何使用管理界面进行监控和日常运维操作。
核心知识点
管理界面功能概览
| 功能模块 | 说明 |
|---|---|
| Overview | 系统概览,显示关键指标 |
| Connections | 连接管理,查看和管理客户端连接 |
| Channels | 通道管理,查看和管理 AMQP 通道 |
| Exchanges | 交换机管理,创建、删除、查看交换机 |
| Queues | 队列管理,创建、删除、查看队列 |
| Admin | 系统管理,用户、权限、策略配置 |
启用管理插件
bash
rabbitmq-plugins enable rabbitmq_management默认访问地址:http://localhost:15672
默认用户名/密码:guest/guest(仅限本地访问)
界面布局说明
Overview 页面
Overview 页面显示以下关键信息:
消息统计
- Ready:等待消费的消息数
- Unacked:已投递未确认的消息数
- Total:消息总数
消息速率
- Publish:消息发布速率
- Confirm:消息确认速率
- Consume:消息消费速率
- Ack:消费者确认速率
资源使用
- Memory:内存使用情况
- Disk Space:磁盘空间使用情况
- File Descriptors:文件描述符使用情况
- Sockets:Socket 使用情况
- Processes:Erlang 进程使用情况
配置示例
配置管理界面访问
bash
rabbitmq-plugins enable rabbitmq_management
rabbitmqctl add_user admin admin123
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"配置文件设置
bash
management.listener.port = 15672
management.listener.ip = 0.0.0.0
management.listener.ssl = false
management.tcp.idle_timeout = 300000
management.tcp.inactivity_timeout = 300000
management.tcp.request_timeout = 300000通过 PHP 访问管理界面 API
php
<?php
class RabbitMQManagement
{
private $baseUrl;
private $username;
private $password;
public function __construct($host = 'localhost', $port = 15672, $username = 'guest', $password = 'guest')
{
$this->baseUrl = "http://{$host}:{$port}";
$this->username = $username;
$this->password = $password;
}
private function request($method, $endpoint, $data = null)
{
$url = "{$this->baseUrl}/api/{$endpoint}";
$ch = curl_init();
$options = [
CURLOPT_URL => $url,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_USERPWD => "{$this->username}:{$this->password}",
CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
CURLOPT_TIMEOUT => 30,
CURLOPT_CUSTOMREQUEST => $method,
];
if ($data !== null) {
$options[CURLOPT_POSTFIELDS] = json_encode($data);
}
curl_setopt_array($ch, $options);
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
$error = curl_error($ch);
curl_close($ch);
if ($error) {
throw new Exception("cURL Error: {$error}");
}
return [
'code' => $httpCode,
'data' => json_decode($response, true),
];
}
public function getOverview()
{
return $this->request('GET', 'overview');
}
public function getNodes()
{
return $this->request('GET', 'nodes');
}
public function getConnections()
{
return $this->request('GET', 'connections');
}
public function closeConnection($name)
{
return $this->request('DELETE', "connections/{$name}");
}
public function getChannels()
{
return $this->request('GET', 'channels');
}
public function getExchanges()
{
return $this->request('GET', 'exchanges');
}
public function getQueues()
{
return $this->request('GET', 'queues');
}
public function getQueue($vhost, $name)
{
return $this->request('GET', "queues/{$vhost}/{$name}");
}
public function purgeQueue($vhost, $name)
{
return $this->request('DELETE', "queues/{$vhost}/{$name}/contents");
}
public function deleteQueue($vhost, $name)
{
return $this->request('DELETE', "queues/{$vhost}/{$name}");
}
public function publishMessage($vhost, $exchange, $routingKey, $payload)
{
$data = [
'properties' => [
'delivery_mode' => 2,
'content_type' => 'application/json',
],
'routing_key' => $routingKey,
'payload' => json_encode($payload),
'payload_encoding' => 'string',
];
return $this->request('POST', "exchanges/{$vhost}/{$exchange}/publish", $data);
}
public function getUsers()
{
return $this->request('GET', 'users');
}
public function createUser($username, $password, $tags = '')
{
$data = [
'password' => $password,
'tags' => $tags,
];
return $this->request('PUT', "users/{$username}", $data);
}
public function deleteUser($username)
{
return $this->request('DELETE', "users/{$username}");
}
public function getVhosts()
{
return $this->request('GET', 'vhosts');
}
public function getPermissions()
{
return $this->request('GET', 'permissions');
}
public function setPermissions($vhost, $username, $configure, $write, $read)
{
$data = [
'configure' => $configure,
'write' => $write,
'read' => $read,
];
return $this->request('PUT', "permissions/{$vhost}/{$username}", $data);
}
public function getPolicies()
{
return $this->request('GET', 'policies');
}
public function setPolicy($vhost, $name, $pattern, $definition, $priority = 0)
{
$data = [
'pattern' => $pattern,
'definition' => $definition,
'priority' => $priority,
];
return $this->request('PUT', "policies/{$vhost}/{$name}", $data);
}
public function deletePolicy($vhost, $name)
{
return $this->request('DELETE', "policies/{$vhost}/{$name}");
}
}队列监控脚本
php
<?php
class QueueMonitor
{
private $management;
public function __construct(RabbitMQManagement $management)
{
$this->management = $management;
}
public function getQueueStats()
{
$response = $this->management->getQueues();
if ($response['code'] !== 200) {
throw new Exception("Failed to get queues");
}
$stats = [];
foreach ($response['data'] as $queue) {
$stats[] = [
'name' => $queue['name'],
'vhost' => $queue['vhost'],
'messages' => $queue['messages'] ?? 0,
'messages_ready' => $queue['messages_ready'] ?? 0,
'messages_unacked' => $queue['messages_unacked'] ?? 0,
'consumers' => $queue['consumers'] ?? 0,
'memory' => $queue['memory'] ?? 0,
'state' => $queue['state'] ?? 'unknown',
'message_stats' => [
'publish' => $queue['message_stats']['publish_details']['rate'] ?? 0,
'consume' => $queue['message_stats']['consume_details']['rate'] ?? 0,
'ack' => $queue['message_stats']['ack_details']['rate'] ?? 0,
],
];
}
return $stats;
}
public function findProblematicQueues($thresholds = [])
{
$defaults = [
'max_messages' => 10000,
'min_consumers' => 1,
'max_memory_mb' => 100,
];
$thresholds = array_merge($defaults, $thresholds);
$queues = $this->getQueueStats();
$problems = [];
foreach ($queues as $queue) {
$issues = [];
if ($queue['messages'] > $thresholds['max_messages']) {
$issues[] = "消息堆积过多: {$queue['messages']} 条";
}
if ($queue['consumers'] < $thresholds['min_consumers']) {
$issues[] = "消费者数量不足: {$queue['consumers']}";
}
$memoryMB = $queue['memory'] / 1024 / 1024;
if ($memoryMB > $thresholds['max_memory_mb']) {
$issues[] = "内存占用过高: " . round($memoryMB, 2) . " MB";
}
if ($queue['state'] !== 'running') {
$issues[] = "队列状态异常: {$queue['state']}";
}
if (!empty($issues)) {
$problems[] = [
'queue' => $queue['name'],
'vhost' => $queue['vhost'],
'issues' => $issues,
];
}
}
return $problems;
}
public function getQueueMessageRates()
{
$queues = $this->getQueueStats();
$rates = [];
foreach ($queues as $queue) {
$publishRate = $queue['message_stats']['publish'];
$consumeRate = $queue['message_stats']['consume'];
$lag = $publishRate - $consumeRate;
$rates[] = [
'name' => $queue['name'],
'publish_rate' => $publishRate,
'consume_rate' => $consumeRate,
'lag' => $lag,
'status' => $lag > 0 ? 'accumulating' : 'stable',
];
}
return $rates;
}
}实际应用场景
场景一:自动化运维脚本
php
<?php
class AutomatedOperations
{
private $management;
public function __construct(RabbitMQManagement $management)
{
$this->management = $management;
}
public function cleanupIdleQueues($maxAge = 3600)
{
$response = $this->management->getQueues();
$cleaned = [];
foreach ($response['data'] as $queue) {
if ($queue['consumers'] === 0 && ($queue['messages'] ?? 0) === 0) {
$idleSince = $queue['idle_since'] ?? null;
if ($idleSince) {
$idleTime = strtotime($idleSince);
if (time() - $idleTime > $maxAge) {
$this->management->deleteQueue($queue['vhost'], $queue['name']);
$cleaned[] = $queue['name'];
}
}
}
}
return $cleaned;
}
public function balanceConsumers()
{
$response = $this->management->getQueues();
$report = [];
foreach ($response['data'] as $queue) {
$messages = $queue['messages'] ?? 0;
$consumers = $queue['consumers'] ?? 0;
if ($messages > 0 && $consumers === 0) {
$report[] = [
'queue' => $queue['name'],
'issue' => 'no_consumers',
'messages' => $messages,
];
} elseif ($messages > 1000 && $consumers < 2) {
$report[] = [
'queue' => $queue['name'],
'issue' => 'insufficient_consumers',
'messages' => $messages,
'consumers' => $consumers,
];
}
}
return $report;
}
public function generateReport()
{
$overview = $this->management->getOverview();
$queues = $this->management->getQueues();
$connections = $this->management->getConnections();
$report = [
'timestamp' => date('Y-m-d H:i:s'),
'cluster_name' => $overview['data']['cluster_name'] ?? 'unknown',
'rabbitmq_version' => $overview['data']['rabbitmq_version'] ?? 'unknown',
'statistics' => [
'total_queues' => count($queues['data']),
'total_connections' => count($connections['data']),
'total_messages' => 0,
'total_consumers' => 0,
],
'queues' => [],
];
foreach ($queues['data'] as $queue) {
$report['statistics']['total_messages'] += $queue['messages'] ?? 0;
$report['statistics']['total_consumers'] += $queue['consumers'] ?? 0;
if ($queue['messages'] > 0) {
$report['queues'][] = [
'name' => $queue['name'],
'messages' => $queue['messages'],
'consumers' => $queue['consumers'],
];
}
}
return $report;
}
}场景二:连接监控与管理
php
<?php
class ConnectionMonitor
{
private $management;
public function __construct(RabbitMQManagement $management)
{
$this->management = $management;
}
public function getConnectionStats()
{
$response = $this->management->getConnections();
if ($response['code'] !== 200) {
throw new Exception("Failed to get connections");
}
$stats = [
'total' => count($response['data']),
'by_client' => [],
'by_host' => [],
'idle_connections' => [],
];
foreach ($response['data'] as $conn) {
$client = $conn['client_properties']['product'] ?? 'unknown';
$host = $conn['peer_host'] ?? 'unknown';
$stats['by_client'][$client] = ($stats['by_client'][$client] ?? 0) + 1;
$stats['by_host'][$host] = ($stats['by_host'][$host] ?? 0) + 1;
if ($conn['channels'] === 0) {
$stats['idle_connections'][] = [
'name' => $conn['name'],
'host' => $host,
'client' => $client,
'connected_at' => $conn['connected_at'] ?? null,
];
}
}
return $stats;
}
public function closeIdleConnections($maxIdleTime = 3600)
{
$stats = $this->getConnectionStats();
$closed = [];
foreach ($stats['idle_connections'] as $conn) {
if ($conn['connected_at']) {
$connectedTime = strtotime($conn['connected_at']);
if (time() - $connectedTime > $maxIdleTime) {
$this->management->closeConnection($conn['name']);
$closed[] = $conn['name'];
}
}
}
return $closed;
}
public function findProblematicConnections()
{
$response = $this->management->getConnections();
$problems = [];
foreach ($response['data'] as $conn) {
$issues = [];
if (($conn['channels'] ?? 0) > 100) {
$issues[] = "通道数量过多: {$conn['channels']}";
}
if (($conn['recv_cnt'] ?? 0) > 10000000) {
$issues[] = "接收消息过多: {$conn['recv_cnt']}";
}
if (($conn['send_cnt'] ?? 0) > 10000000) {
$issues[] = "发送消息过多: {$conn['send_cnt']}";
}
if (!empty($issues)) {
$problems[] = [
'name' => $conn['name'],
'peer_host' => $conn['peer_host'] ?? 'unknown',
'client' => $conn['client_properties']['product'] ?? 'unknown',
'issues' => $issues,
];
}
}
return $problems;
}
}常见问题与解决方案
问题一:无法访问管理界面
现象:浏览器无法打开管理界面。
解决方案:
bash
rabbitmq-plugins list | grep management
rabbitmq-plugins enable rabbitmq_management
rabbitmqctl status | grep management检查防火墙设置:
bash
firewall-cmd --add-port=15672/tcp --permanent
firewall-cmd --reload问题二:guest 用户远程无法登录
现象:使用 guest/guest 远程登录失败。
原因:默认配置只允许本地登录。
解决方案:
bash
rabbitmqctl add_user admin admin123
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"或修改配置允许远程 guest(不推荐生产环境):
bash
loopback_users = none问题三:管理界面加载缓慢
现象:管理界面响应很慢。
原因:队列或连接数量过多,统计计算耗时。
解决方案:
bash
management.rates_mode = basic
management.sample_retention_policies.max_seconds = 3600最佳实践
1. 安全配置
bash
management.listener.ip = 127.0.0.1
management.listener.ssl = true
management.listener.ssl_opts.cacertfile = /path/to/cacert.pem
management.listener.ssl_opts.certfile = /path/to/cert.pem
management.listener.ssl_opts.keyfile = /path/to/key.pem2. 性能优化
bash
management.http_log_dir = /var/log/rabbitmq/management
management.rates_mode = basic
management.collect_statistics = coarse3. 定期清理脚本
php
<?php
function cleanupRabbitMQ()
{
$management = new RabbitMQManagement('localhost', 15672, 'admin', 'admin123');
$ops = new AutomatedOperations($management);
$cleanedQueues = $ops->cleanupIdleQueues(7200);
echo "清理空闲队列: " . implode(', ', $cleanedQueues) . "\n";
$report = $ops->generateReport();
file_put_contents('/var/log/rabbitmq/daily_report.json', json_encode($report, JSON_PRETTY_PRINT));
echo "日报已生成\n";
}
cleanupRabbitMQ();