Skip to content

RabbitMQ 容量规划指南

概述

容量规划是确保 RabbitMQ 集群稳定运行的关键环节。合理的容量规划可以避免资源瓶颈、保证服务质量,并为业务增长预留空间。本文档详细介绍 RabbitMQ 容量规划的方法、指标和最佳实践。

核心知识点

容量规划维度

维度关键指标规划要点
消息吞吐量消息/秒生产速率、消费速率、峰值倍数
消息存储消息大小、数量持久化比例、保留时间
连接数连接数、通道数客户端数量、连接池大小
内存内存使用量消息缓存、队列数量
磁盘磁盘空间、IOPS持久化消息、日志存储
网络带宽、延迟消息大小、集群通信

性能基准参考

场景吞吐量内存需求磁盘需求
轻量级< 1万消息/秒2-4 GB20 GB SSD
中等规模1-5万消息/秒8-16 GB100 GB SSD
大规模5-10万消息/秒32-64 GB500 GB NVMe
超大规模> 10万消息/秒128+ GB1 TB+ NVMe

容量规划公式

内存需求估算:
总内存 = 基础内存 + 消息内存 + 连接内存 + 队列内存

基础内存 = 512 MB (Erlang VM + RabbitMQ 核心)
消息内存 = 平均消息大小 × 队列深度 × 消息数
连接内存 = 连接数 × 100 KB + 通道数 × 50 KB
队列内存 = 队列数 × 1 MB

磁盘需求估算:
总磁盘 = 消息存储 + 日志存储 + 预留空间

消息存储 = 平均消息大小 × 日消息量 × 保留天数 × 持久化比例
日志存储 = 10 GB (估算)
预留空间 = 总磁盘 × 30%

配置示例

Prometheus 容量监控规则

yaml
groups:
- name: rabbitmq_capacity
  rules:
  - alert: RabbitMQHighMemoryUsage
    expr: rabbitmq_process_resident_memory_bytes / rabbitmq_resident_memory_limit_bytes > 0.8
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "RabbitMQ 内存使用率过高"
      description: "节点 {{ $labels.instance }} 内存使用率 {{ $value | humanizePercentage }}"

  - alert: RabbitMQHighDiskUsage
    expr: rabbitmq_disk_free_bytes / rabbitmq_disk_free_limit_bytes < 1.5
    for: 5m
    labels:
      severity: critical
    annotations:
      summary: "RabbitMQ 磁盘空间不足"
      description: "节点 {{ $labels.instance }} 磁盘剩余 {{ $value | humanizeBytes }}"

  - alert: RabbitMQQueueDepthHigh
    expr: rabbitmq_queue_messages > 100000
    for: 10m
    labels:
      severity: warning
    annotations:
      summary: "队列消息堆积"
      description: "队列 {{ $labels.queue }} 消息数量 {{ $value }}"

  - alert: RabbitMQConnectionLimit
    expr: rabbitmq_connections_total / rabbitmq_connection_max > 0.8
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "连接数接近上限"
      description: "连接数使用率 {{ $value | humanizePercentage }}"

Grafana 容量规划仪表板

json
{
  "dashboard": {
    "title": "RabbitMQ Capacity Planning",
    "panels": [
      {
        "title": "消息吞吐量趋势",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(rabbitmq_messages_published_total[5m])",
            "legendFormat": "生产速率"
          },
          {
            "expr": "rate(rabbitmq_messages_delivered_total[5m])",
            "legendFormat": "消费速率"
          }
        ]
      },
      {
        "title": "内存使用趋势",
        "type": "graph",
        "targets": [
          {
            "expr": "rabbitmq_process_resident_memory_bytes",
            "legendFormat": "已使用"
          },
          {
            "expr": "rabbitmq_resident_memory_limit_bytes",
            "legendFormat": "限制"
          }
        ]
      },
      {
        "title": "队列深度分布",
        "type": "heatmap",
        "targets": [
          {
            "expr": "rabbitmq_queue_messages",
            "legendFormat": "{{ queue }}"
          }
        ]
      },
      {
        "title": "连接数趋势",
        "type": "graph",
        "targets": [
          {
            "expr": "rabbitmq_connections_total",
            "legendFormat": "连接数"
          },
          {
            "expr": "rabbitmq_channels_total",
            "legendFormat": "通道数"
          }
        ]
      }
    ]
  }
}

Kubernetes 资源配置

yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: rabbitmq
  namespace: messaging
spec:
  serviceName: rabbitmq-headless
  replicas: 3
  selector:
    matchLabels:
      app: rabbitmq
  template:
    metadata:
      labels:
        app: rabbitmq
    spec:
      containers:
      - name: rabbitmq
        image: rabbitmq:3.12-management
        resources:
          requests:
            cpu: "2"
            memory: "8Gi"
          limits:
            cpu: "4"
            memory: "16Gi"
        env:
        - name: RABBITMQ_VM_MEMORY_HIGH_WATERMARK
          value: "0.6"
        - name: RABBITMQ_VM_MEMORY_HIGH_WATERMARK_PAGING_RATIO
          value: "0.75"
        volumeMounts:
        - name: rabbitmq-data
          mountPath: /var/lib/rabbitmq
      volumes:
      - name: rabbitmq-data
        persistentVolumeClaim:
          claimName: rabbitmq-pvc
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: rabbitmq-pvc
  namespace: messaging
spec:
  accessModes:
  - ReadWriteOnce
  resources:
    requests:
      storage: 200Gi
  storageClassName: fast-ssd

PHP 代码示例

RabbitMQ 容量规划分析器

php
<?php

namespace App\Services\RabbitMQ;

use PhpAmqpLib\Connection\AMQPStreamConnection;

class RabbitMQCapacityAnalyzer
{
    private $config;
    private $connection;
    private $channel;

    public function __construct(array $config)
    {
        $this->config = $config;
        $this->connect();
    }

    private function connect(): void
    {
        $this->connection = new AMQPStreamConnection(
            $this->config['host'],
            $this->config['port'],
            $this->config['user'],
            $this->config['password'],
            $this->config['vhost'] ?? '/'
        );
        $this->channel = $this->connection->channel();
    }

    public function analyze(): array
    {
        return [
            'current_usage' => $this->getCurrentUsage(),
            'resource_limits' => $this->getResourceLimits(),
            'capacity_metrics' => $this->getCapacityMetrics(),
            'recommendations' => $this->getRecommendations(),
            'growth_projection' => $this->projectGrowth(),
        ];
    }

    public function getCurrentUsage(): array
    {
        $overview = $this->apiRequest('GET', '/overview');
        $nodes = $this->apiRequest('GET', '/nodes');
        $queues = $this->apiRequest('GET', '/queues');

        $usage = [
            'messages' => [
                'total' => $overview['queue_totals']['messages'] ?? 0,
                'ready' => $overview['queue_totals']['messages_ready'] ?? 0,
                'unacked' => $overview['queue_totals']['messages_unacknowledged'] ?? 0,
            ],
            'message_rates' => [
                'publish' => $overview['message_stats']['publish_details']['rate'] ?? 0,
                'deliver' => $overview['message_stats']['deliver_get_details']['rate'] ?? 0,
                'ack' => $overview['message_stats']['ack_details']['rate'] ?? 0,
            ],
            'connections' => $overview['object_totals']['connections'] ?? 0,
            'channels' => $overview['object_totals']['channels'] ?? 0,
            'queues' => $overview['object_totals']['queues'] ?? 0,
            'exchanges' => $overview['object_totals']['exchanges'] ?? 0,
            'consumers' => $overview['object_totals']['consumers'] ?? 0,
        ];

        foreach ($nodes as $node) {
            $usage['nodes'][$node['name']] = [
                'memory_used' => $node['mem_used'] ?? 0,
                'memory_limit' => $node['mem_limit'] ?? 0,
                'disk_free' => $node['disk_free'] ?? 0,
                'disk_limit' => $node['disk_free_limit'] ?? 0,
                'cpu_usage' => $this->calculateCpuUsage($node),
                'uptime' => $node['uptime'] ?? 0,
            ];
        }

        return $usage;
    }

    public function getResourceLimits(): array
    {
        $nodes = $this->apiRequest('GET', '/nodes');
        $limits = [];

        foreach ($nodes as $node) {
            $limits[$node['name']] = [
                'memory' => [
                    'limit' => $node['mem_limit'] ?? 0,
                    'watermark' => $node['mem_limit'] ?? 0,
                    'paging_ratio' => $this->getPagingRatio(),
                ],
                'disk' => [
                    'free' => $node['disk_free'] ?? 0,
                    'limit' => $node['disk_free_limit'] ?? 0,
                ],
                'file_descriptors' => [
                    'used' => $node['proc_used'] ?? 0,
                    'limit' => $node['proc_total'] ?? 0,
                ],
                'sockets' => [
                    'used' => $node['sockets_used'] ?? 0,
                    'limit' => $node['sockets_total'] ?? 0,
                ],
            ];
        }

        return $limits;
    }

    public function getCapacityMetrics(): array
    {
        $usage = $this->getCurrentUsage();
        $limits = $this->getResourceLimits();

        $metrics = [
            'memory_utilization' => [],
            'disk_utilization' => [],
            'connection_utilization' => [],
            'throughput_analysis' => [],
        ];

        foreach ($usage['nodes'] ?? [] as $nodeName => $nodeUsage) {
            $nodeLimits = $limits[$nodeName] ?? [];

            $memoryUsed = $nodeUsage['memory_used'];
            $memoryLimit = $nodeLimits['memory']['limit'] ?? 1;
            $metrics['memory_utilization'][$nodeName] = [
                'percentage' => round(($memoryUsed / $memoryLimit) * 100, 2),
                'used_gb' => round($memoryUsed / 1024 / 1024 / 1024, 2),
                'limit_gb' => round($memoryLimit / 1024 / 1024 / 1024, 2),
            ];

            $diskFree = $nodeUsage['disk_free'];
            $diskLimit = $nodeLimits['disk']['limit'] ?? 0;
            $metrics['disk_utilization'][$nodeName] = [
                'free_gb' => round($diskFree / 1024 / 1024 / 1024, 2),
                'limit_gb' => round($diskLimit / 1024 / 1024 / 1024, 2),
                'available_ratio' => $diskLimit > 0 ? round($diskFree / $diskLimit, 2) : 0,
            ];
        }

        $metrics['throughput_analysis'] = [
            'publish_rate' => $usage['message_rates']['publish'],
            'consume_rate' => $usage['message_rates']['deliver'],
            'ack_rate' => $usage['message_rates']['ack'],
            'backlog_rate' => $usage['message_rates']['publish'] - $usage['message_rates']['deliver'],
        ];

        return $metrics;
    }

    public function getRecommendations(): array
    {
        $usage = $this->getCurrentUsage();
        $metrics = $this->getCapacityMetrics();
        $recommendations = [];

        foreach ($metrics['memory_utilization'] as $nodeName => $memory) {
            if ($memory['percentage'] > 80) {
                $recommendations[] = [
                    'type' => 'memory',
                    'severity' => 'high',
                    'node' => $nodeName,
                    'message' => "内存使用率过高 ({$memory['percentage']}%),建议增加内存或优化队列",
                    'action' => '增加内存限制或减少消息堆积',
                ];
            } elseif ($memory['percentage'] > 60) {
                $recommendations[] = [
                    'type' => 'memory',
                    'severity' => 'medium',
                    'node' => $nodeName,
                    'message' => "内存使用率较高 ({$memory['percentage']}%),建议监控",
                    'action' => '监控内存趋势,规划扩容',
                ];
            }
        }

        foreach ($metrics['disk_utilization'] as $nodeName => $disk) {
            if ($disk['available_ratio'] < 1.5) {
                $recommendations[] = [
                    'type' => 'disk',
                    'severity' => 'critical',
                    'node' => $nodeName,
                    'message' => "磁盘空间不足,剩余 {$disk['free_gb']} GB",
                    'action' => '立即清理磁盘或扩容',
                ];
            }
        }

        $backlogRate = $metrics['throughput_analysis']['backlog_rate'];
        if ($backlogRate > 1000) {
            $recommendations[] = [
                'type' => 'throughput',
                'severity' => 'high',
                'node' => 'cluster',
                'message' => "消息生产速率超过消费速率,堆积速度 {$backlogRate}/秒",
                'action' => '增加消费者或优化消费逻辑',
            ];
        }

        $messageCount = $usage['messages']['total'];
        if ($messageCount > 1000000) {
            $recommendations[] = [
                'type' => 'queue',
                'severity' => 'high',
                'node' => 'cluster',
                'message' => "队列消息堆积严重,当前 {$messageCount} 条消息",
                'action' => '检查消费者状态,增加消费能力',
            ];
        }

        return $recommendations;
    }

    public function projectGrowth(int $days = 30): array
    {
        $history = $this->getHistoricalData($days);
        $projection = [];

        if (!empty($history['message_rates'])) {
            $growthRate = $this->calculateGrowthRate($history['message_rates']);
            $currentRate = end($history['message_rates']);

            $projection['message_rate'] = [
                'current' => $currentRate,
                'growth_rate' => $growthRate,
                'projected_30d' => $currentRate * (1 + $growthRate * 30),
                'projected_90d' => $currentRate * (1 + $growthRate * 90),
            ];
        }

        if (!empty($history['connection_counts'])) {
            $growthRate = $this->calculateGrowthRate($history['connection_counts']);
            $currentCount = end($history['connection_counts']);

            $projection['connections'] = [
                'current' => $currentCount,
                'growth_rate' => $growthRate,
                'projected_30d' => $currentCount * (1 + $growthRate * 30),
                'projected_90d' => $currentCount * (1 + $growthRate * 90),
            ];
        }

        if (!empty($history['memory_usage'])) {
            $growthRate = $this->calculateGrowthRate($history['memory_usage']);
            $currentMemory = end($history['memory_usage']);

            $projection['memory'] = [
                'current_gb' => round($currentMemory / 1024 / 1024 / 1024, 2),
                'growth_rate' => $growthRate,
                'projected_30d_gb' => round($currentMemory * (1 + $growthRate * 30) / 1024 / 1024 / 1024, 2),
                'projected_90d_gb' => round($currentMemory * (1 + $growthRate * 90) / 1024 / 1024 / 1024, 2),
            ];
        }

        $projection['capacity_alert'] = $this->calculateCapacityAlert($projection);

        return $projection;
    }

    private function getHistoricalData(int $days): array
    {
        $history = [
            'message_rates' => [],
            'connection_counts' => [],
            'memory_usage' => [],
        ];

        return $history;
    }

    private function calculateGrowthRate(array $data): float
    {
        if (count($data) < 2) {
            return 0;
        }

        $first = reset($data);
        $last = end($data);
        $count = count($data);

        if ($first == 0) {
            return 0;
        }

        return ($last - $first) / $first / $count;
    }

    private function calculateCapacityAlert(array $projection): array
    {
        $alerts = [];

        if (isset($projection['memory']['projected_30d_gb'])) {
            $projectedMemory = $projection['memory']['projected_30d_gb'];
            $currentLimit = $this->config['memory_limit_gb'] ?? 16;

            if ($projectedMemory > $currentLimit * 0.9) {
                $alerts[] = [
                    'type' => 'memory',
                    'timeframe' => '30 days',
                    'message' => "预计 30 天内内存将达到上限",
                    'projected_usage' => $projectedMemory,
                    'current_limit' => $currentLimit,
                ];
            }
        }

        return $alerts;
    }

    public function calculateRequirements(array $params): array
    {
        $messagesPerSecond = $params['messages_per_second'];
        $avgMessageSize = $params['avg_message_size'];
        $peakMultiplier = $params['peak_multiplier'] ?? 3;
        $retentionDays = $params['retention_days'] ?? 7;
        $persistenceRatio = $params['persistence_ratio'] ?? 0.5;
        $connections = $params['connections'] ?? 100;
        $queues = $params['queues'] ?? 50;

        $peakMessagesPerSecond = $messagesPerSecond * $peakMultiplier;

        $messageMemory = $avgMessageSize * 10000;
        $connectionMemory = $connections * 100 * 1024 + $connections * 10 * 50 * 1024;
        $queueMemory = $queues * 1024 * 1024;
        $baseMemory = 512 * 1024 * 1024;

        $totalMemory = $messageMemory + $connectionMemory + $queueMemory + $baseMemory;
        $recommendedMemory = $totalMemory * 2;

        $dailyMessages = $messagesPerSecond * 86400;
        $dailyStorage = $dailyMessages * $avgMessageSize * $persistenceRatio;
        $totalStorage = $dailyStorage * $retentionDays;
        $recommendedStorage = $totalStorage * 1.5;

        $recommendedCpu = max(2, ceil($peakMessagesPerSecond / 10000));

        return [
            'estimated' => [
                'memory_gb' => round($totalMemory / 1024 / 1024 / 1024, 2),
                'storage_gb' => round($totalStorage / 1024 / 1024 / 1024, 2),
                'cpu_cores' => $recommendedCpu,
            ],
            'recommended' => [
                'memory_gb' => round($recommendedMemory / 1024 / 1024 / 1024, 2),
                'storage_gb' => round($recommendedStorage / 1024 / 1024 / 1024, 2),
                'cpu_cores' => $recommendedCpu * 2,
            ],
            'peak_capacity' => [
                'messages_per_second' => $peakMessagesPerSecond,
                'daily_messages' => $dailyMessages,
            ],
            'cluster_recommendation' => $this->recommendClusterSize($peakMessagesPerSecond),
        ];
    }

    private function recommendClusterSize(int $peakMessagesPerSecond): array
    {
        if ($peakMessagesPerSecond < 10000) {
            return [
                'nodes' => 1,
                'type' => 'single',
                'note' => '单节点足够,建议配置高可用',
            ];
        } elseif ($peakMessagesPerSecond < 50000) {
            return [
                'nodes' => 3,
                'type' => 'cluster',
                'note' => '3 节点集群,配置镜像队列',
            ];
        } elseif ($peakMessagesPerSecond < 100000) {
            return [
                'nodes' => 5,
                'type' => 'cluster',
                'note' => '5 节点集群,配置仲裁队列',
            ];
        } else {
            return [
                'nodes' => 7,
                'type' => 'cluster',
                'note' => '7 节点集群,考虑分区部署',
            ];
        }
    }

    public function generateReport(): string
    {
        $analysis = $this->analyze();
        $report = "# RabbitMQ 容量规划报告\n\n";
        $report .= "生成时间: " . date('Y-m-d H:i:s') . "\n\n";

        $report .= "## 当前使用情况\n\n";
        $report .= "### 消息统计\n";
        $report .= "- 总消息数: {$analysis['current_usage']['messages']['total']}\n";
        $report .= "- 就绪消息: {$analysis['current_usage']['messages']['ready']}\n";
        $report .= "- 未确认消息: {$analysis['current_usage']['messages']['unacked']}\n\n";

        $report .= "### 吞吐量\n";
        $report .= "- 生产速率: " . round($analysis['current_usage']['message_rates']['publish'], 2) . " 消息/秒\n";
        $report .= "- 消费速率: " . round($analysis['current_usage']['message_rates']['deliver'], 2) . " 消息/秒\n\n";

        $report .= "### 资源使用\n";
        $report .= "- 连接数: {$analysis['current_usage']['connections']}\n";
        $report .= "- 通道数: {$analysis['current_usage']['channels']}\n";
        $report .= "- 队列数: {$analysis['current_usage']['queues']}\n\n";

        $report .= "## 容量指标\n\n";
        foreach ($analysis['capacity_metrics']['memory_utilization'] as $node => $memory) {
            $report .= "### 节点 {$node}\n";
            $report .= "- 内存使用率: {$memory['percentage']}%\n";
            $report .= "- 已使用: {$memory['used_gb']} GB\n";
            $report .= "- 限制: {$memory['limit_gb']} GB\n\n";
        }

        if (!empty($analysis['recommendations'])) {
            $report .= "## 建议\n\n";
            foreach ($analysis['recommendations'] as $rec) {
                $report .= "- [{$rec['severity']}] {$rec['message']}\n";
                $report .= "  - 操作: {$rec['action']}\n\n";
            }
        }

        return $report;
    }

    private function calculateCpuUsage(array $node): float
    {
        return 0.0;
    }

    private function getPagingRatio(): float
    {
        return 0.75;
    }

    private function apiRequest(string $method, string $endpoint, array $data = null): array
    {
        $url = "http://{$this->config['host']}:15672/api{$endpoint}";
        
        $ch = curl_init();
        curl_setopt($ch, CURLOPT_URL, $url);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
        curl_setopt($ch, CURLOPT_USERPWD, $this->config['user'] . ':' . $this->config['password']);
        curl_setopt($ch, CURLOPT_CUSTOMREQUEST, $method);
        
        if ($data !== null) {
            curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data));
            curl_setopt($ch, CURLOPT_HTTPHEADER, ['Content-Type: application/json']);
        }
        
        $response = curl_exec($ch);
        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        curl_close($ch);

        if ($httpCode >= 400) {
            throw new \RuntimeException("API request failed: {$endpoint}, HTTP {$httpCode}");
        }

        return json_decode($response, true) ?: [];
    }

    public function close(): void
    {
        if ($this->channel) {
            $this->channel->close();
        }
        if ($this->connection) {
            $this->connection->close();
        }
    }

    public function __destruct()
    {
        $this->close();
    }
}

容量规划命令

php
<?php

namespace App\Console\Commands;

use App\Services\RabbitMQ\RabbitMQCapacityAnalyzer;
use Illuminate\Console\Command;

class RabbitMQCapacity extends Command
{
    protected $signature = 'rabbitmq:capacity 
                            {action : analyze|report|calculate}
                            {--messages= : 每秒消息数}
                            {--size= : 平均消息大小(字节)}
                            {--connections= : 连接数}
                            {--days=30 : 预测天数}';

    protected $description = 'RabbitMQ 容量规划命令';

    private $analyzer;

    public function handle()
    {
        $config = [
            'host' => config('rabbitmq.host', 'localhost'),
            'port' => config('rabbitmq.port', 5672),
            'user' => config('rabbitmq.user', 'guest'),
            'password' => config('rabbitmq.password', 'guest'),
            'memory_limit_gb' => config('rabbitmq.memory_limit_gb', 16),
        ];

        $this->analyzer = new RabbitMQCapacityAnalyzer($config);

        $action = $this->argument('action');

        switch ($action) {
            case 'analyze':
                $this->analyze();
                break;
            case 'report':
                $this->report();
                break;
            case 'calculate':
                $this->calculate();
                break;
            default:
                $this->error("未知操作: {$action}");
        }
    }

    private function analyze(): void
    {
        $this->info('分析当前容量使用情况...');

        $analysis = $this->analyzer->analyze();

        $this->info("\n=== 当前使用情况 ===");
        $this->table(
            ['指标', '值'],
            [
                ['总消息数', $analysis['current_usage']['messages']['total']],
                ['就绪消息', $analysis['current_usage']['messages']['ready']],
                ['未确认消息', $analysis['current_usage']['messages']['unacked']],
                ['连接数', $analysis['current_usage']['connections']],
                ['通道数', $analysis['current_usage']['channels']],
                ['队列数', $analysis['current_usage']['queues']],
            ]
        );

        $this->info("\n=== 吞吐量分析 ===");
        $this->table(
            ['指标', '速率(消息/秒)'],
            [
                ['生产速率', round($analysis['current_usage']['message_rates']['publish'], 2)],
                ['消费速率', round($analysis['current_usage']['message_rates']['deliver'], 2)],
                ['确认速率', round($analysis['current_usage']['message_rates']['ack'], 2)],
                ['堆积速率', round($analysis['capacity_metrics']['throughput_analysis']['backlog_rate'], 2)],
            ]
        );

        if (!empty($analysis['recommendations'])) {
            $this->warn("\n=== 优化建议 ===");
            foreach ($analysis['recommendations'] as $rec) {
                $severity = strtoupper($rec['severity']);
                $this->warn("[{$severity}] {$rec['message']}");
                $this->line("  → {$rec['action']}");
            }
        }
    }

    private function report(): void
    {
        $report = $this->analyzer->generateReport();
        
        $file = storage_path('rabbitmq_capacity_report_' . date('YmdHis') . '.md');
        file_put_contents($file, $report);

        $this->info("报告已生成: {$file}");
        $this->info("\n" . $report);
    }

    private function calculate(): void
    {
        $messagesPerSecond = (int) $this->option('messages');
        $avgMessageSize = (int) $this->option('size');
        $connections = (int) $this->option('connections') ?: 100;

        if (!$messagesPerSecond || !$avgMessageSize) {
            $this->error('请指定 --messages 和 --size 参数');
            return;
        }

        $this->info('计算容量需求...');

        $requirements = $this->analyzer->calculateRequirements([
            'messages_per_second' => $messagesPerSecond,
            'avg_message_size' => $avgMessageSize,
            'peak_multiplier' => 3,
            'retention_days' => 7,
            'persistence_ratio' => 0.5,
            'connections' => $connections,
            'queues' => 50,
        ]);

        $this->info("\n=== 容量需求估算 ===");
        $this->table(
            ['资源', '估算值', '推荐值'],
            [
                ['内存', $requirements['estimated']['memory_gb'] . ' GB', $requirements['recommended']['memory_gb'] . ' GB'],
                ['存储', $requirements['estimated']['storage_gb'] . ' GB', $requirements['recommended']['storage_gb'] . ' GB'],
                ['CPU', $requirements['estimated']['cpu_cores'] . ' 核', $requirements['recommended']['cpu_cores'] . ' 核'],
            ]
        );

        $this->info("\n=== 集群建议 ===");
        $cluster = $requirements['cluster_recommendation'];
        $this->info("节点数: {$cluster['nodes']}");
        $this->info("类型: {$cluster['type']}");
        $this->info("说明: {$cluster['note']}");

        $this->info("\n=== 峰值容量 ===");
        $this->info("峰值消息速率: {$requirements['peak_capacity']['messages_per_second']} 消息/秒");
        $this->info("日均消息量: {$requirements['peak_capacity']['daily_messages']}");
    }
}

实际应用场景

场景一:新系统容量规划

php
<?php

$analyzer = new RabbitMQCapacityAnalyzer($config);

$requirements = $analyzer->calculateRequirements([
    'messages_per_second' => 5000,
    'avg_message_size' => 2048,
    'peak_multiplier' => 3,
    'retention_days' => 7,
    'persistence_ratio' => 0.3,
    'connections' => 200,
    'queues' => 100,
]);

echo "内存需求: {$requirements['recommended']['memory_gb']} GB\n";
echo "存储需求: {$requirements['recommended']['storage_gb']} GB\n";
echo "CPU 需求: {$requirements['recommended']['cpu_cores']} 核\n";
echo "建议节点数: {$requirements['cluster_recommendation']['nodes']}\n";

场景二:容量预警监控

php
<?php

$analyzer = new RabbitMQCapacityAnalyzer($config);

$metrics = $analyzer->getCapacityMetrics();

foreach ($metrics['memory_utilization'] as $node => $memory) {
    if ($memory['percentage'] > 80) {
        $alert = [
            'level' => 'critical',
            'node' => $node,
            'message' => "内存使用率过高: {$memory['percentage']}%",
            'timestamp' => time(),
        ];
        
        NotificationService::send($alert);
    }
}

$backlogRate = $metrics['throughput_analysis']['backlog_rate'];
if ($backlogRate > 1000) {
    $alert = [
        'level' => 'warning',
        'message' => "消息堆积速率过高: {$backlogRate} 消息/秒",
        'timestamp' => time(),
    ];
    
    NotificationService::send($alert);
}

场景三:自动扩容决策

php
<?php

class AutoScalingService
{
    private $analyzer;
    private $thresholds;

    public function __construct(RabbitMQCapacityAnalyzer $analyzer)
    {
        $this->analyzer = $analyzer;
        $this->thresholds = [
            'memory_scale_up' => 0.8,
            'memory_scale_down' => 0.3,
            'queue_depth_scale_up' => 100000,
            'throughput_scale_up' => 0.9,
        ];
    }

    public function evaluate(): array
    {
        $analysis = $this->analyzer->analyze();
        $decision = ['action' => 'none', 'reason' => ''];

        $memoryUsage = $this->getAverageMemoryUsage($analysis);
        $queueDepth = $analysis['current_usage']['messages']['total'];
        $throughput = $analysis['capacity_metrics']['throughput_analysis'];

        if ($memoryUsage > $this->thresholds['memory_scale_up']) {
            $decision = [
                'action' => 'scale_up',
                'resource' => 'memory',
                'reason' => "内存使用率 {$memoryUsage}% 超过阈值",
                'recommendation' => '增加节点内存或添加新节点',
            ];
        } elseif ($queueDepth > $this->thresholds['queue_depth_scale_up']) {
            $decision = [
                'action' => 'scale_up',
                'resource' => 'consumers',
                'reason' => "队列深度 {$queueDepth} 超过阈值",
                'recommendation' => '增加消费者实例',
            ];
        } elseif ($throughput['backlog_rate'] > 1000) {
            $decision = [
                'action' => 'scale_up',
                'resource' => 'throughput',
                'reason' => "消费速率不足,堆积速度 {$throughput['backlog_rate']}/秒",
                'recommendation' => '优化消费逻辑或增加消费者',
            ];
        }

        return $decision;
    }

    private function getAverageMemoryUsage(array $analysis): float
    {
        $usages = [];
        foreach ($analysis['capacity_metrics']['memory_utilization'] as $node => $memory) {
            $usages[] = $memory['percentage'];
        }
        return count($usages) > 0 ? array_sum($usages) / count($usages) : 0;
    }
}

常见问题与解决方案

问题 1:容量规划不准确

症状:实际使用与规划差异较大

原因:未考虑峰值、消息大小变化、业务增长

解决方案

php
// 使用历史数据进行预测
$projection = $analyzer->projectGrowth(90);

// 考虑多个场景
$scenarios = [
    'normal' => $analyzer->calculateRequirements(['messages_per_second' => 5000, ...]),
    'peak' => $analyzer->calculateRequirements(['messages_per_second' => 15000, ...]),
    'growth' => $analyzer->calculateRequirements(['messages_per_second' => 8000, ...]),
];

// 选择最大需求作为规划基准
$maxMemory = max($scenarios['normal']['recommended']['memory_gb'],
                 $scenarios['peak']['recommended']['memory_gb'],
                 $scenarios['growth']['recommended']['memory_gb']);

问题 2:突发流量导致资源不足

症状:突发流量时系统响应变慢或崩溃

原因:未预留足够的峰值容量

解决方案

yaml
# 配置弹性资源限制
resources:
  requests:
    cpu: "2"
    memory: "8Gi"
  limits:
    cpu: "8"
    memory: "32Gi"

# 配置自动扩缩容
autoscaling:
  enabled: true
  minReplicas: 3
  maxReplicas: 10
  targetCPUUtilizationPercentage: 70
  targetMemoryUtilizationPercentage: 75

问题 3:磁盘空间不足

症状:磁盘告警,服务阻塞

原因:持久化消息过多,未及时清理

解决方案

php
// 配置消息 TTL
$tllConfig = [
    'x-message-ttl' => 86400000, // 1 天
    'x-expires' => 172800000,    // 队列 2 天不使用自动删除
];

// 定期清理策略
$cleanupPolicy = [
    'name' => 'cleanup-policy',
    'pattern' => '^temp\.',
    'definition' => [
        'message-ttl' => 3600000, // 1 小时
        'max-length' => 100000,
        'overflow' => 'drop-head',
    ],
];

最佳实践建议

规划阶段

  1. 收集历史数据

    • 分析过去 3-6 个月的数据
    • 识别峰值和低谷
    • 计算增长率
  2. 定义业务场景

    • 正常负载场景
    • 峰值负载场景
    • 业务增长场景
  3. 预留冗余

    • 内存预留 30-50% 冗余
    • 磁盘预留 50% 冗余
    • 网络带宽预留 40% 冗余

实施阶段

  1. 分阶段部署

    • 先部署最小可用配置
    • 逐步扩展到目标配置
    • 持续监控调整
  2. 配置弹性伸缩

    • 设置合理的扩缩容阈值
    • 配置冷却时间
    • 测试自动扩缩容
  3. 建立监控体系

    • 实时监控关键指标
    • 设置容量告警
    • 定期生成报告

维护阶段

  1. 定期评估

    • 每月评估容量使用
    • 每季度更新规划
    • 每年进行容量审计
  2. 趋势分析

    • 分析使用趋势
    • 预测未来需求
    • 提前规划扩容
  3. 成本优化

    • 识别闲置资源
    • 优化资源配置
    • 平衡性能与成本

相关链接