Skip to content

RabbitMQ 资源限制配置

概述

RabbitMQ 资源限制是保障消息队列稳定运行的重要机制。通过合理配置内存、磁盘、连接数、队列长度等限制,可以防止单个用户或应用占用过多资源,确保整个系统的可用性和稳定性。本文档详细介绍各类资源限制的配置方法和最佳实践。

核心知识点

资源限制类型

RabbitMQ 资源限制体系
├── 全局限制
│   ├── 内存水位线(vm_memory_high_watermark)
│   ├── 磁盘空间限制(disk_free_limit)
│   └── 最大连接数(connection_max)
├── 虚拟主机限制
│   ├── 最大队列数(max-queues)
│   └── 最大连接数(max-connections)
├── 队列限制
│   ├── 最大长度(x-max-length)
│   ├── 最大字节数(x-max-length-bytes)
│   └── 消息 TTL(x-message-ttl)
└── 用户限制
    └── 最大通道数(channel_max)

内存管理机制

RabbitMQ 内存管理流程:

内存使用情况


┌─────────────────────────────────────────┐
│ vm_memory_high_watermark_paging_ratio   │
│ (开始将消息写入磁盘)                    │
│ 默认:0.75 × watermark                   │
└──────────────────┬──────────────────────┘


┌─────────────────────────────────────────┐
│ vm_memory_high_watermark                │
│ (阻塞发布者)                           │
│ 默认:0.4(40% 物理内存)                 │
└─────────────────────────────────────────┘

限制优先级

资源限制检查顺序:
1. 连接级别限制(channel_max)
2. 虚拟主机限制(max-queues, max-connections)
3. 队列级别限制(max-length, TTL)
4. 全局内存限制
5. 全局磁盘限制

配置示例

rabbitmq.conf 全局资源限制

ini
# ========================================
# 内存限制配置
# ========================================

# 内存水位线(相对值)
# 默认:0.4(40% 物理内存)
# 当内存使用超过此阈值时,阻塞发布者
vm_memory_high_watermark.relative = 0.6

# 内存水位线(绝对值)
# 单位:字节,或使用 KB、MB、GB 后缀
# vm_memory_high_watermark.absolute = 4GB

# 内存分页比例
# 当内存达到 watermark × paging_ratio 时,开始将消息写入磁盘
# 默认:0.75
vm_memory_high_watermark_paging_ratio = 0.75

# ========================================
# 磁盘空间限制配置
# ========================================

# 磁盘空间限制(绝对值)
# 当磁盘空间低于此值时,阻塞发布者
# 默认:50MB
disk_free_limit.absolute = 2GB

# 磁盘空间限制(相对内存)
# disk_free_limit.relative = 2.0
# 表示磁盘空间至少是内存的 2 倍

# ========================================
# 连接与通道限制
# ========================================

# 最大连接数
# 默认:无限制
connection_max = 1000

# 最大通道数(每个连接)
# 默认:2047
channel_max = 2048

# 心跳超时(秒)
# 默认:60
heartbeat = 30

# ========================================
# 队列默认限制
# ========================================

# 默认消息 TTL(毫秒)
# 应用于队列中的消息
# queue_default_ttl = 86400000

# 默认队列最大长度
# queue_max_length = 100000

# ========================================
# IO 与性能限制
# ========================================

# 最大文件描述符数
# 默认:自动检测
# file_handle_limit = 65536

# 消息存储刷新间隔
# msg_store_flush_interval = 100

# ========================================
# 统计收集限制
# ========================================

# 统计数据收集间隔(毫秒)
# collect_statistics_interval = 5000

# 统计收集级别
# none:不收集
# coarse:粗粒度(默认)
# fine:细粒度
collect_statistics = coarse

advanced.config 高级限制配置

erlang
[
  {rabbit, [
    %% 内存管理
    {vm_memory_high_watermark, 0.6},
    {vm_memory_high_watermark_paging_ratio, 0.75},
    
    %% 磁盘限制
    {disk_free_limit, {absolute, 2147483648}},  %% 2GB
    
    %% 连接限制
    {connection_max, 1000},
    {channel_max, 2048},
    
    %% 消息存储配置
    {msg_store_file_size_limit, 16777216},  %% 16MB
    
    %% 队列索引配置
    {queue_index_embed_msgs_below, 4096},  %% 4KB
    
    %% 队列懒惰模式阈值
    {lazy_queue_explicit_permission_run_mode, lazy},
    
    %% 统计配置
    {collect_statistics, coarse},
    {collect_statistics_interval, 5000}
  ]},
  
  {rabbitmq_management, [
    %% 管理界面统计限制
    {rates_mode, basic},
    
    %% 采样保留策略
    {sample_retention_policies, [
      {global, [{605, 5}, {3660, 60}, {29400, 600}, {86400, 1800}]},
      {basic, [{605, 5}, {3600, 60}]},
      {detailed, [{10, 5}]}
    ]}
  ]}
].

队列级别限制配置

php
<?php

// 声明队列时设置限制
$queueArgs = [
    // 消息 TTL(毫秒)
    'x-message-ttl' => 86400000,  // 24小时
    
    // 队列最大长度
    'x-max-length' => 100000,
    
    // 队列最大字节数
    'x-max-length-bytes' => 1073741824,  // 1GB
    
    // 溢出行为
    // 'drop-head':删除头部消息(默认)
    // 'reject-publish':拒绝新消息
    // 'reject-publish-dlx':拒绝并转发到死信交换器
    'x-overflow' => 'reject-publish',
    
    // 队列模式
    // 'default':默认模式
    // 'lazy':惰性模式,消息存储在磁盘
    'x-queue-mode' => 'lazy',
    
    // 队列版本(RabbitMQ 3.10+)
    'x-queue-version' => 2,
    
    // 单消费者模式
    'x-single-active-consumer' => true,
    
    // 死信交换器
    'x-dead-letter-exchange' => 'dlx.exchange',
    'x-dead-letter-routing-key' => 'dead.letter',
];

// 使用 php-amqplib 声明队列
$channel->queue_declare(
    'my_queue',
    false,  // passive
    true,   // durable
    false,  // exclusive
    false,  // auto_delete
    false,  // nowait
    new PhpAmqpLib\Wire\AMQPTable($queueArgs)
);

虚拟主机限制配置

bash
# ========================================
# 设置虚拟主机限制
# ========================================

# 设置最大队列数
rabbitmqctl set_vhost_limits -p /production '{"max-queues": 500}'

# 设置最大连接数
rabbitmqctl set_vhost_limits -p /production '{"max-connections": 200}'

# 同时设置多个限制
rabbitmqctl set_vhost_limits -p /production '{"max-queues": 500, "max-connections": 200}'

# ========================================
# 查询虚拟主机限制
# ========================================

# 查看特定虚拟主机的限制
rabbitmqctl list_vhost_limits /production

# 查看所有虚拟主机的限制
rabbitmqctl list_vhost_limits

# ========================================
# 清除虚拟主机限制
# ========================================

rabbitmqctl clear_vhost_limits -p /production

PHP 代码示例

资源限制管理类

php
<?php

class RabbitMQResourceLimitManager
{
    private string $host;
    private int $apiPort;
    private string $username;
    private string $password;

    public function __construct(
        string $host = 'localhost',
        int $apiPort = 15672,
        string $username = 'guest',
        string $password = 'guest'
    ) {
        $this->host = $host;
        $this->apiPort = $apiPort;
        $this->username = $username;
        $this->password = $password;
    }

    private function request(string $endpoint, string $method = 'GET', $data = null): array
    {
        $url = "http://{$this->host}:{$this->apiPort}/api/{$endpoint}";
        
        $ch = curl_init();
        curl_setopt_array($ch, [
            CURLOPT_URL => $url,
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_USERPWD => "{$this->username}:{$this->password}",
            CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
            CURLOPT_CUSTOMREQUEST => $method,
        ]);
        
        if ($data !== null) {
            curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data));
        }
        
        $response = curl_exec($ch);
        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        curl_close($ch);
        
        return [
            'status' => $httpCode,
            'data' => json_decode($response, true),
        ];
    }

    public function getNodeStatus(): array
    {
        $response = $this->request('nodes');
        
        if ($response['status'] !== 200 || empty($response['data'])) {
            return [];
        }
        
        $node = $response['data'][0];
        
        return [
            'name' => $node['name'],
            'memory_limit' => $node['mem_limit'] ?? 0,
            'memory_used' => $node['mem_used'] ?? 0,
            'memory_alarm' => $node['mem_alarm'] ?? false,
            'disk_free' => $node['disk_free'] ?? 0,
            'disk_free_limit' => $node['disk_free_limit'] ?? 0,
            'disk_alarm' => $node['disk_free_alarm'] ?? false,
            'fd_total' => $node['fd_total'] ?? 0,
            'fd_used' => $node['fd_used'] ?? 0,
            'sockets_total' => $node['sockets_total'] ?? 0,
            'sockets_used' => $node['sockets_used'] ?? 0,
            'proc_total' => $node['proc_total'] ?? 0,
            'proc_used' => $node['proc_used'] ?? 0,
        ];
    }

    public function getOverview(): array
    {
        $response = $this->request('overview');
        
        if ($response['status'] !== 200) {
            return [];
        }
        
        $data = $response['data'];
        
        return [
            'connections' => $data['object_totals']['connections'] ?? 0,
            'channels' => $data['object_totals']['channels'] ?? 0,
            'queues' => $data['object_totals']['queues'] ?? 0,
            'consumers' => $data['object_totals']['consumers'] ?? 0,
            'messages' => $data['queue_totals']['messages'] ?? 0,
            'message_rates' => $data['message_stats'] ?? [],
        ];
    }

    public function setVHostLimits(string $vhost, array $limits): bool
    {
        $vhostEncoded = urlencode($vhost);
        $response = $this->request(
            "vhost-limits/{$vhostEncoded}",
            'PUT',
            $limits
        );
        
        return in_array($response['status'], [201, 204]);
    }

    public function getVHostLimits(string $vhost): array
    {
        $vhostEncoded = urlencode($vhost);
        $response = $this->request("vhost-limits/{$vhostEncoded}");
        
        return $response['data'] ?? [];
    }

    public function getResourceUsage(): array
    {
        $node = $this->getNodeStatus();
        $overview = $this->getOverview();
        
        $memoryUsage = 0;
        if ($node['memory_limit'] > 0) {
            $memoryUsage = ($node['memory_used'] / $node['memory_limit']) * 100;
        }
        
        $diskUsage = 0;
        if ($node['disk_free_limit'] > 0) {
            $diskUsage = (($node['disk_free_limit'] - $node['disk_free']) / $node['disk_free_limit']) * 100;
        }
        
        $fdUsage = 0;
        if ($node['fd_total'] > 0) {
            $fdUsage = ($node['fd_used'] / $node['fd_total']) * 100;
        }
        
        return [
            'memory' => [
                'used' => $node['memory_used'],
                'limit' => $node['memory_limit'],
                'usage_percent' => $memoryUsage,
                'alarm' => $node['memory_alarm'],
            ],
            'disk' => [
                'free' => $node['disk_free'],
                'limit' => $node['disk_free_limit'],
                'usage_percent' => $diskUsage,
                'alarm' => $node['disk_alarm'],
            ],
            'file_descriptors' => [
                'used' => $node['fd_used'],
                'total' => $node['fd_total'],
                'usage_percent' => $fdUsage,
            ],
            'connections' => $overview['connections'],
            'channels' => $overview['channels'],
            'queues' => $overview['queues'],
            'messages' => $overview['messages'],
        ];
    }

    public function printResourceReport(): void
    {
        $usage = $this->getResourceUsage();
        
        echo "RabbitMQ 资源使用报告\n";
        echo "生成时间: " . date('Y-m-d H:i:s') . "\n";
        echo str_repeat("=", 60) . "\n\n";
        
        echo "内存使用:\n";
        echo "  已使用: " . $this->formatBytes($usage['memory']['used']) . "\n";
        echo "  限制: " . $this->formatBytes($usage['memory']['limit']) . "\n";
        echo "  使用率: " . round($usage['memory']['usage_percent'], 2) . "%\n";
        echo "  告警: " . ($usage['memory']['alarm'] ? '⚠️ 是' : '✅ 否') . "\n\n";
        
        echo "磁盘空间:\n";
        echo "  可用: " . $this->formatBytes($usage['disk']['free']) . "\n";
        echo "  限制: " . $this->formatBytes($usage['disk']['limit']) . "\n";
        echo "  告警: " . ($usage['disk']['alarm'] ? '⚠️ 是' : '✅ 否') . "\n\n";
        
        echo "文件描述符:\n";
        echo "  已使用: {$usage['file_descriptors']['used']}\n";
        echo "  总数: {$usage['file_descriptors']['total']}\n";
        echo "  使用率: " . round($usage['file_descriptors']['usage_percent'], 2) . "%\n\n";
        
        echo "连接统计:\n";
        echo "  连接数: {$usage['connections']}\n";
        echo "  通道数: {$usage['channels']}\n";
        echo "  队列数: {$usage['queues']}\n";
        echo "  消息数: {$usage['messages']}\n";
    }

    private function formatBytes(int $bytes): string
    {
        $units = ['B', 'KB', 'MB', 'GB', 'TB'];
        $i = 0;
        while ($bytes >= 1024 && $i < count($units) - 1) {
            $bytes /= 1024;
            $i++;
        }
        return round($bytes, 2) . ' ' . $units[$i];
    }
}

// 使用示例
$limitManager = new RabbitMQResourceLimitManager('localhost', 15672, 'admin', 'password');
$limitManager->printResourceReport();

队列限制管理类

php
<?php

class QueueLimitManager
{
    private PhpAmqpLib\Connection\AMQPStreamConnection $connection;
    private PhpAmqpLib\Channel\AMQPChannel $channel;

    public function __construct(PhpAmqpLib\Connection\AMQPStreamConnection $connection)
    {
        $this->connection = $connection;
        $this->channel = $connection->channel();
    }

    public function declareQueueWithLimits(
        string $queueName,
        array $limits = []
    ): array {
        $defaults = [
            'durable' => true,
            'message_ttl' => null,
            'max_length' => null,
            'max_bytes' => null,
            'overflow' => 'drop-head',
            'queue_mode' => 'default',
            'dead_letter_exchange' => null,
            'dead_letter_routing_key' => null,
        ];
        
        $config = array_merge($defaults, $limits);
        
        $arguments = [];
        
        if ($config['message_ttl'] !== null) {
            $arguments['x-message-ttl'] = $config['message_ttl'];
        }
        
        if ($config['max_length'] !== null) {
            $arguments['x-max-length'] = $config['max_length'];
        }
        
        if ($config['max_bytes'] !== null) {
            $arguments['x-max-length-bytes'] = $config['max_bytes'];
        }
        
        $arguments['x-overflow'] = $config['overflow'];
        $arguments['x-queue-mode'] = $config['queue_mode'];
        
        if ($config['dead_letter_exchange'] !== null) {
            $arguments['x-dead-letter-exchange'] = $config['dead_letter_exchange'];
        }
        
        if ($config['dead_letter_routing_key'] !== null) {
            $arguments['x-dead-letter-routing-key'] = $config['dead_letter_routing_key'];
        }
        
        [$queue, $messageCount, $consumerCount] = $this->channel->queue_declare(
            $queueName,
            false,
            $config['durable'],
            false,
            false,
            false,
            new PhpAmqpLib\Wire\AMQPTable($arguments)
        );
        
        return [
            'queue' => $queue,
            'message_count' => $messageCount,
            'consumer_count' => $consumerCount,
            'config' => $config,
        ];
    }

    public function createLimitedQueue(string $name, string $type = 'standard'): array
    {
        $presets = [
            'standard' => [
                'max_length' => 10000,
                'message_ttl' => 86400000,
            ],
            'high_volume' => [
                'max_length' => 100000,
                'max_bytes' => 1073741824,
                'queue_mode' => 'lazy',
            ],
            'transient' => [
                'message_ttl' => 3600000,
                'max_length' => 1000,
                'overflow' => 'drop-head',
            ],
            'critical' => [
                'max_length' => 5000,
                'overflow' => 'reject-publish',
                'dead_letter_exchange' => 'dlx.critical',
            ],
        ];
        
        $config = $presets[$type] ?? $presets['standard'];
        
        return $this->declareQueueWithLimits($name, $config);
    }

    public function getQueueInfo(string $queueName, string $vhost = '/'): ?array
    {
        $ch = curl_init();
        curl_setopt_array($ch, [
            CURLOPT_URL => "http://localhost:15672/api/queues/" . urlencode($vhost) . "/" . urlencode($queueName),
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_USERPWD => "guest:guest",
        ]);
        
        $response = curl_exec($ch);
        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        curl_close($ch);
        
        if ($httpCode !== 200) {
            return null;
        }
        
        return json_decode($response, true);
    }
}

// 使用示例
$connection = new PhpAmqpLib\Connection\AMQPStreamConnection(
    'localhost', 5672, 'guest', 'guest'
);
$queueManager = new QueueLimitManager($connection);

// 创建标准队列
$queueManager->createLimitedQueue('orders.standard', 'standard');

// 创建高容量队列
$queueManager->createLimitedQueue('logs.high_volume', 'high_volume');

// 创建临时队列
$queueManager->createLimitedQueue('cache.transient', 'transient');

// 创建关键队列
$queueManager->createLimitedQueue('payments.critical', 'critical');

资源监控告警类

php
<?php

class ResourceMonitor
{
    private RabbitMQResourceLimitManager $limitManager;
    private array $thresholds;

    public function __construct(
        RabbitMQResourceLimitManager $limitManager,
        array $thresholds = []
    ) {
        $this->limitManager = $limitManager;
        $this->thresholds = array_merge([
            'memory_warning' => 70,
            'memory_critical' => 85,
            'disk_warning' => 80,
            'disk_critical' => 90,
            'fd_warning' => 70,
            'fd_critical' => 85,
            'queue_message_warning' => 10000,
            'queue_message_critical' => 100000,
        ], $thresholds);
    }

    public function checkHealth(): array
    {
        $usage = $this->limitManager->getResourceUsage();
        $alerts = [];
        
        $memoryUsage = $usage['memory']['usage_percent'];
        if ($memoryUsage >= $this->thresholds['memory_critical']) {
            $alerts[] = [
                'level' => 'critical',
                'resource' => 'memory',
                'message' => "内存使用率过高: " . round($memoryUsage, 2) . "%",
                'value' => $memoryUsage,
            ];
        } elseif ($memoryUsage >= $this->thresholds['memory_warning']) {
            $alerts[] = [
                'level' => 'warning',
                'resource' => 'memory',
                'message' => "内存使用率较高: " . round($memoryUsage, 2) . "%",
                'value' => $memoryUsage,
            ];
        }
        
        if ($usage['memory']['alarm']) {
            $alerts[] = [
                'level' => 'critical',
                'resource' => 'memory',
                'message' => "内存告警已触发!",
                'value' => true,
            ];
        }
        
        if ($usage['disk']['alarm']) {
            $alerts[] = [
                'level' => 'critical',
                'resource' => 'disk',
                'message' => "磁盘空间告警已触发!",
                'value' => true,
            ];
        }
        
        $fdUsage = $usage['file_descriptors']['usage_percent'];
        if ($fdUsage >= $this->thresholds['fd_critical']) {
            $alerts[] = [
                'level' => 'critical',
                'resource' => 'file_descriptors',
                'message' => "文件描述符使用率过高: " . round($fdUsage, 2) . "%",
                'value' => $fdUsage,
            ];
        } elseif ($fdUsage >= $this->thresholds['fd_warning']) {
            $alerts[] = [
                'level' => 'warning',
                'resource' => 'file_descriptors',
                'message' => "文件描述符使用率较高: " . round($fdUsage, 2) . "%",
                'value' => $fdUsage,
            ];
        }
        
        $messageCount = $usage['messages'];
        if ($messageCount >= $this->thresholds['queue_message_critical']) {
            $alerts[] = [
                'level' => 'critical',
                'resource' => 'messages',
                'message' => "消息堆积严重: {$messageCount} 条",
                'value' => $messageCount,
            ];
        } elseif ($messageCount >= $this->thresholds['queue_message_warning']) {
            $alerts[] = [
                'level' => 'warning',
                'resource' => 'messages',
                'message' => "消息堆积较多: {$messageCount} 条",
                'value' => $messageCount,
            ];
        }
        
        return [
            'healthy' => empty(array_filter($alerts, fn($a) => $a['level'] === 'critical')),
            'alerts' => $alerts,
            'usage' => $usage,
        ];
    }

    public function printHealthReport(): void
    {
        $health = $this->checkHealth();
        
        echo "RabbitMQ 健康检查报告\n";
        echo "生成时间: " . date('Y-m-d H:i:s') . "\n";
        echo str_repeat("=", 60) . "\n\n";
        
        $status = $health['healthy'] ? '✅ 健康' : '❌ 异常';
        echo "整体状态: {$status}\n\n";
        
        if (!empty($health['alerts'])) {
            echo "告警信息:\n";
            echo str_repeat("-", 40) . "\n";
            
            foreach ($health['alerts'] as $alert) {
                $icon = $alert['level'] === 'critical' ? '🔴' : '🟡';
                echo "{$icon} [{$alert['level']}] {$alert['message']}\n";
            }
        } else {
            echo "✅ 无告警\n";
        }
    }
}

// 使用示例
$monitor = new ResourceMonitor($limitManager, [
    'memory_warning' => 60,
    'memory_critical' => 80,
]);
$monitor->printHealthReport();

实际应用场景

场景一:生产环境资源限制配置

ini
# production-rabbitmq.conf

# 内存配置(8GB 内存服务器)
vm_memory_high_watermark.relative = 0.6
vm_memory_high_watermark_paging_ratio = 0.75

# 磁盘配置
disk_free_limit.absolute = 4GB

# 连接配置
connection_max = 2000
channel_max = 2048
heartbeat = 30

# 统计配置
collect_statistics = coarse
collect_statistics_interval = 10000
bash
# 虚拟主机限制
rabbitmqctl set_vhost_limits -p /production '{"max-queues": 500, "max-connections": 500}'
rabbitmqctl set_vhost_limits -p /production '{"max-queues": 200, "max-connections": 200}'

场景二:高吞吐量队列配置

php
<?php

class HighThroughputQueueSetup
{
    private QueueLimitManager $queueManager;

    public function setupOrderQueue(): void
    {
        $this->queueManager->declareQueueWithLimits('orders.processing', [
            'max_length' => 100000,
            'max_bytes' => 5368709120,  // 5GB
            'queue_mode' => 'lazy',
            'overflow' => 'reject-publish',
            'dead_letter_exchange' => 'dlx.orders',
        ]);
    }

    public function setupLogQueue(): void
    {
        $this->queueManager->declareQueueWithLimits('logs.events', [
            'max_length' => 500000,
            'message_ttl' => 604800000,  // 7天
            'queue_mode' => 'lazy',
            'overflow' => 'drop-head',
        ]);
    }
}

场景三:租户资源配额

php
<?php

class TenantResourceQuota
{
    private RabbitMQResourceLimitManager $limitManager;

    private array $quotas = [
        'free' => [
            'max_queues' => 10,
            'max_connections' => 5,
            'queue_max_length' => 1000,
        ],
        'basic' => [
            'max_queues' => 50,
            'max_connections' => 20,
            'queue_max_length' => 10000,
        ],
        'pro' => [
            'max_queues' => 200,
            'max_connections' => 100,
            'queue_max_length' => 100000,
        ],
        'enterprise' => [
            'max_queues' => 1000,
            'max_connections' => 500,
            'queue_max_length' => 1000000,
        ],
    ];

    public function applyQuota(string $tenantId, string $plan): void
    {
        $quota = $this->quotas[$plan] ?? $this->quotas['free'];
        $vhost = "/tenant_{$tenantId}";
        
        $this->limitManager->setVHostLimits($vhost, [
            'max-queues' => $quota['max_queues'],
            'max-connections' => $quota['max_connections'],
        ]);
    }
}

常见问题与解决方案

问题一:内存告警频繁触发

原因分析

  • 内存水位线设置过低
  • 消息堆积过多
  • 消费者处理速度慢

解决方案

ini
# 调整内存水位线
vm_memory_high_watermark.relative = 0.6

# 调整分页比例
vm_memory_high_watermark_paging_ratio = 0.5

# 使用惰性队列
# 在队列声明时设置 x-queue-mode: lazy
bash
# 查看内存使用详情
rabbitmqctl status | grep -A 20 memory

# 查看队列消息数
rabbitmqctl list_queues name messages memory

问题二:磁盘空间告警

原因分析

  • 磁盘空间限制设置过高
  • 持久化消息过多
  • 日志文件过大

解决方案

ini
# 调整磁盘限制
disk_free_limit.absolute = 2GB

# 或使用相对值
disk_free_limit.relative = 1.5
bash
# 清理旧日志
find /var/log/rabbitmq -name "*.log" -mtime +30 -delete

# 查看磁盘使用
df -h /var/lib/rabbitmq

# 清理消息(谨慎操作)
rabbitmqctl purge_queue queue_name

问题三:连接数耗尽

原因分析

  • 连接泄漏(未正确关闭)
  • 最大连接数设置过低
  • 客户端连接池配置不当

解决方案

ini
# 增加最大连接数
connection_max = 5000

# 设置心跳
heartbeat = 30
php
<?php
// 正确管理连接
class ConnectionManager
{
    private static ?PhpAmqpLib\Connection\AMQPStreamConnection $connection = null;

    public static function getConnection(): PhpAmqpLib\Connection\AMQPStreamConnection
    {
        if (self::$connection === null || !self::$connection->isConnected()) {
            self::$connection = new PhpAmqpLib\Connection\AMQPStreamConnection(
                'localhost', 5672, 'guest', 'guest',
                '/', false, 'AMQPLAIN', null, 'en_US', 30.0
            );
        }
        return self::$connection;
    }

    public static function close(): void
    {
        if (self::$connection !== null) {
            self::$connection->close();
            self::$connection = null;
        }
    }
}

register_shutdown_function([ConnectionManager::class, 'close']);

最佳实践建议

1. 内存配置建议

服务器内存推荐水位线分页比例说明
4GB0.50.75小型服务器
8GB0.60.75中型服务器
16GB+0.70.75大型服务器

2. 队列限制建议

php
<?php

class QueueLimitPresets
{
    public const STANDARD = [
        'max_length' => 10000,
        'message_ttl' => 86400000,  // 1天
    ];

    public const HIGH_VOLUME = [
        'max_length' => 100000,
        'queue_mode' => 'lazy',
    ];

    public const CRITICAL = [
        'max_length' => 5000,
        'overflow' => 'reject-publish',
        'dead_letter_exchange' => 'dlx.critical',
    ];

    public const TRANSIENT = [
        'message_ttl' => 3600000,  // 1小时
        'overflow' => 'drop-head',
    ];
}

3. 监控脚本

bash
#!/bin/bash
# monitor-resources.sh

API_URL="http://localhost:15672/api"
AUTH="admin:password"

# 获取内存使用
memory_alarm=$(curl -s -u $AUTH $API_URL/nodes | jq '.[0].mem_alarm')

if [ "$memory_alarm" = "true" ]; then
    echo "CRITICAL: Memory alarm triggered!"
    # 发送告警
fi

# 获取磁盘使用
disk_alarm=$(curl -s -u $AUTH $API_URL/nodes | jq '.[0].disk_free_alarm')

if [ "$disk_alarm" = "true" ]; then
    echo "CRITICAL: Disk alarm triggered!"
    # 发送告警
fi

# 获取队列消息数
total_messages=$(curl -s -u $AUTH $API_URL/overview | jq '.queue_totals.messages')

if [ "$total_messages" -gt 100000 ]; then
    echo "WARNING: High message count: $total_messages"
fi

4. 资源限制检查清单

□ 内存水位线配置合理
□ 磁盘空间限制配置
□ 最大连接数设置
□ 虚拟主机限制配置
□ 队列长度限制设置
□ 消息 TTL 配置
□ 死信队列配置
□ 监控告警配置
□ 定期检查资源使用
□ 制定应急预案

相关链接