Skip to content

RabbitMQ 内存使用分析

概述

理解 RabbitMQ 的内存使用模式对于系统调优和故障排查至关重要。本文将深入分析 RabbitMQ 的内存组成、监控方法和优化策略。

核心知识点

内存组成结构

┌─────────────────────────────────────────────────────────────┐
│                   RabbitMQ 内存组成                          │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                    Total Memory                      │   │
│  │  ┌─────────────────────────────────────────────┐    │   │
│  │  │              Processes Memory                │    │   │
│  │  │  (Erlang 进程堆、栈、邮箱)                    │    │   │
│  │  └─────────────────────────────────────────────┘    │   │
│  │  ┌─────────────────────────────────────────────┐    │   │
│  │  │               Binary Memory                  │    │   │
│  │  │  (消息体、大块数据)                           │    │   │
│  │  └─────────────────────────────────────────────┘    │   │
│  │  ┌─────────────────────────────────────────────┐    │   │
│  │  │                ETS Memory                    │    │   │
│  │  │  (队列索引、路由表、Mnesia)                   │    │   │
│  │  └─────────────────────────────────────────────┘    │   │
│  │  ┌─────────────────────────────────────────────┐    │   │
│  │  │                 Code Memory                  │    │   │
│  │  │  (Erlang 代码段)                             │    │   │
│  │  └─────────────────────────────────────────────┘    │   │
│  │  ┌─────────────────────────────────────────────┐    │   │
│  │  │                 Atom Memory                  │    │   │
│  │  │  (原子表)                                    │    │   │
│  │  └─────────────────────────────────────────────┘    │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

内存类型详解

内存类型说明占比优化重点
processesErlang 进程内存20-40%进程数量、堆大小
binary二进制数据30-60%消息大小、数量
etsETS 表内存10-30%队列数量、索引
code代码段5-10%通常固定
atom原子表<1%通常固定

内存分配器

┌─────────────────────────────────────────────────────────────┐
│                    内存分配器架构                            │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   应用请求 ──▶ ┌─────────────────────────────────────────┐  │
│                │           Allocator                      │  │
│                │  ┌─────────────────────────────────┐    │  │
│                │  │         Carrier                 │    │  │
│                │  │  ┌─────┐ ┌─────┐ ┌─────┐      │    │  │
│                │  │  │Block│ │Block│ │Block│ ...  │    │  │
│                │  │  └─────┘ └─────┘ └─────┘      │    │  │
│                │  └─────────────────────────────────┘    │  │
│                │  ┌─────────────────────────────────┐    │  │
│                │  │         Carrier                 │    │  │
│                │  └─────────────────────────────────┘    │  │
│                └─────────────────────────────────────────┘  │
│                                                             │
│   Carrier: 从操作系统分配的大块内存                          │
│   Block: 分配给应用的实际内存块                              │
│                                                             │
└─────────────────────────────────────────────────────────────┘

队列内存模型

┌─────────────────────────────────────────────────────────────┐
│                    队列内存占用                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  普通队列:                                                  │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  消息索引 (ETS)  +  消息体 (Binary/磁盘)             │   │
│  │  ~100 bytes/消息  +  消息大小                        │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│  懒队列:                                                    │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  消息索引 (ETS)  +  消息体 (磁盘)                    │   │
│  │  ~100 bytes/消息  +  极少内存                        │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│  内存估算:                                                  │
│  - 每条消息索引:~100-200 bytes                             │
│  - 每个队列基础:~10-50 KB                                  │
│  - 每个连接:~100 KB                                        │
│  - 每个通道:~50 KB                                         │
│                                                             │
└─────────────────────────────────────────────────────────────┘

配置示例

内存监控配置

ini
# /etc/rabbitmq/rabbitmq.conf

# 内存水位线(相对值)
vm_memory_high_watermark.relative = 0.6

# 内存水位线(绝对值)
# vm_memory_high_watermark.absolute = 4GB

# 分页阈值比例
vm_memory_high_watermark_paging_ratio = 0.75

# 内存计算策略
# total_memory: 使用系统总内存
# legacy: 使用 Erlang VM 报告的内存
memory_calculation_strategy = total_memory

内存分配器配置

bash
# /etc/rabbitmq/rabbitmq-env.conf

# Binary 分配器优化
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+MBas ageffcbf +MMBcs 512"

# Heap 分配器优化
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="$RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS +MHas ageffcbf"

# ETS 分配器优化
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="$RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS +MSs ageffcbf +MMscs 512"

PHP 代码示例

内存分析工具类

php
<?php

namespace App\RabbitMQ\Memory;

use PhpAmqpLib\Connection\AMQPStreamConnection;

class MemoryAnalyzer
{
    private AMQPStreamConnection $connection;
    private string $apiHost;
    private int $apiPort;
    private string $apiUser;
    private string $apiPass;
    
    public function __construct(
        AMQPStreamConnection $connection,
        string $apiHost = 'localhost',
        int $apiPort = 15672,
        string $apiUser = 'guest',
        string $apiPass = 'guest'
    ) {
        $this->connection = $connection;
        $this->apiHost = $apiHost;
        $this->apiPort = $apiPort;
        $this->apiUser = $apiUser;
        $this->apiPass = $apiPass;
    }
    
    public function getMemoryBreakdown(): array
    {
        $nodeInfo = $this->apiRequest('/api/nodes');
        
        if (empty($nodeInfo)) {
            return ['error' => 'Unable to fetch node information'];
        }
        
        $node = $nodeInfo[0];
        
        return [
            'total' => $node['mem_used'] ?? 0,
            'total_human' => $this->formatBytes($node['mem_used'] ?? 0),
            'limit' => $node['mem_limit'] ?? 0,
            'limit_human' => $this->formatBytes($node['mem_limit'] ?? 0),
            'usage_percent' => $this->calculateUsagePercent(
                $node['mem_used'] ?? 0,
                $node['mem_limit'] ?? 1
            ),
            'breakdown' => $this->parseMemoryBreakdown($node['memory'] ?? []),
            'alarm' => $node['mem_alarm'] ?? false,
        ];
    }

    public function getQueueMemoryUsage(): array
    {
        $queues = $this->apiRequest('/api/queues?columns=name,memory,messages');
        
        $result = [];
        $totalMemory = 0;
        
        foreach ($queues as $queue) {
            $result[$queue['name']] = [
                'memory' => $queue['memory'] ?? 0,
                'memory_human' => $this->formatBytes($queue['memory'] ?? 0),
                'messages' => $queue['messages'] ?? 0,
                'bytes_per_message' => $this->calculateBytesPerMessage(
                    $queue['memory'] ?? 0,
                    $queue['messages'] ?? 0
                ),
            ];
            $totalMemory += $queue['memory'] ?? 0;
        }
        
        uasort($result, function ($a, $b) {
            return $b['memory'] <=> $a['memory'];
        });
        
        return [
            'queues' => $result,
            'total_memory' => $totalMemory,
            'total_memory_human' => $this->formatBytes($totalMemory),
            'queue_count' => count($result),
        ];
    }

    public function getConnectionMemoryUsage(): array
    {
        $connections = $this->apiRequest('/api/connections?columns=client_properties,memory,channels');
        
        $result = [];
        $totalMemory = 0;
        
        foreach ($connections as $conn) {
            $clientName = $conn['client_properties']['connection_name'] ?? 'unknown';
            $result[$clientName] = [
                'memory' => $conn['memory'] ?? 0,
                'memory_human' => $this->formatBytes($conn['memory'] ?? 0),
                'channels' => $conn['channels'] ?? 0,
            ];
            $totalMemory += $conn['memory'] ?? 0;
        }
        
        return [
            'connections' => $result,
            'total_memory' => $totalMemory,
            'total_memory_human' => $this->formatBytes($totalMemory),
            'connection_count' => count($result),
        ];
    }

    public function getErlangMemoryInfo(): array
    {
        $memoryTypes = [
            'total' => 'erlang:memory(total).',
            'processes' => 'erlang:memory(processes).',
            'processes_used' => 'erlang:memory(processes_used).',
            'binary' => 'erlang:memory(binary).',
            'ets' => 'erlang:memory(ets).',
            'atom' => 'erlang:memory(atom).',
            'atom_used' => 'erlang:memory(atom_used).',
            'code' => 'erlang:memory(code).',
        ];
        
        $result = [];
        
        foreach ($memoryTypes as $type => $command) {
            $output = shell_exec("rabbitmqctl eval '{$command}' 2>/dev/null");
            $result[$type] = $this->parseErlangValue($output);
        }
        
        return [
            'memory' => $result,
            'percentages' => $this->calculateMemoryPercentages($result),
        ];
    }

    public function analyzeMemoryTrend(int $samples = 10, int $intervalMs = 1000): array
    {
        $trend = [];
        
        for ($i = 0; $i < $samples; $i++) {
            $breakdown = $this->getMemoryBreakdown();
            
            $trend[] = [
                'timestamp' => microtime(true),
                'total' => $breakdown['total'],
                'binary' => $breakdown['breakdown']['binary'] ?? 0,
                'processes' => $breakdown['breakdown']['processes'] ?? 0,
                'ets' => $breakdown['breakdown']['ets'] ?? 0,
            ];
            
            if ($i < $samples - 1) {
                usleep($intervalMs * 1000);
            }
        }
        
        return [
            'samples' => $trend,
            'analysis' => $this->analyzeTrendData($trend),
        ];
    }

    public function getMemoryRecommendations(): array
    {
        $breakdown = $this->getMemoryBreakdown();
        $queueMemory = $this->getQueueMemoryUsage();
        $recommendations = [];
        
        if ($breakdown['usage_percent'] > 80) {
            $recommendations[] = [
                'priority' => 'critical',
                'category' => 'memory',
                'issue' => '内存使用率过高',
                'current' => $breakdown['usage_percent'] . '%',
                'recommendation' => '增加内存或减少消息积压',
            ];
        }
        
        $binaryPercent = ($breakdown['breakdown']['binary'] ?? 0) / ($breakdown['total'] ?: 1) * 100;
        if ($binaryPercent > 50) {
            $recommendations[] = [
                'priority' => 'high',
                'category' => 'binary',
                'issue' => '二进制内存占比过高',
                'current' => round($binaryPercent, 2) . '%',
                'recommendation' => '考虑使用懒队列或减少消息大小',
            ];
        }
        
        $etsPercent = ($breakdown['breakdown']['ets'] ?? 0) / ($breakdown['total'] ?: 1) * 100;
        if ($etsPercent > 30) {
            $recommendations[] = [
                'priority' => 'medium',
                'category' => 'ets',
                'issue' => 'ETS 内存占比过高',
                'current' => round($etsPercent, 2) . '%',
                'recommendation' => '减少队列数量或优化队列索引',
            ];
        }
        
        return $recommendations;
    }

    private function apiRequest(string $endpoint): array
    {
        $url = "http://{$this->apiHost}:{$this->apiPort}{$endpoint}";
        
        $ch = curl_init();
        curl_setopt($ch, CURLOPT_URL, $url);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
        curl_setopt($ch, CURLOPT_USERPWD, "{$this->apiUser}:{$this->apiPass}");
        curl_setopt($ch, CURLOPT_TIMEOUT, 10);
        
        $response = curl_exec($ch);
        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        curl_close($ch);
        
        if ($httpCode !== 200) {
            return [];
        }
        
        return json_decode($response, true) ?: [];
    }

    private function parseMemoryBreakdown(array $memory): array
    {
        $breakdown = [];
        
        $types = [
            'binary' => 'binary',
            'code' => 'code',
            'ets' => 'ets',
            'msg_index' => 'msg_index',
            'other_proc' => 'other_proc',
            'other_system' => 'other_system',
            'plugins' => 'plugins',
            'queue_index' => 'queue_index',
            'queue_slave_procs' => 'queue_slave_procs',
            'total' => 'total',
        ];
        
        foreach ($types as $key => $type) {
            if (isset($memory[$type])) {
                $breakdown[$key] = $memory[$type];
            }
        }
        
        return $breakdown;
    }

    private function parseErlangValue(?string $output): int
    {
        if ($output && preg_match('/(\d+)/', $output, $matches)) {
            return (int) $matches[1];
        }
        return 0;
    }

    private function calculateMemoryPercentages(array $memory): array
    {
        $total = $memory['total'] ?: 1;
        
        $percentages = [];
        foreach ($memory as $type => $value) {
            if ($type !== 'total') {
                $percentages[$type] = round($value / $total * 100, 2);
            }
        }
        
        return $percentages;
    }

    private function calculateUsagePercent(int $used, int $limit): float
    {
        if ($limit === 0) {
            return 0;
        }
        return round($used / $limit * 100, 2);
    }

    private function calculateBytesPerMessage(int $memory, int $messages): float
    {
        if ($messages === 0) {
            return 0;
        }
        return round($memory / $messages, 2);
    }

    private function analyzeTrendData(array $trend): array
    {
        if (count($trend) < 2) {
            return [];
        }
        
        $totals = array_column($trend, 'total');
        $first = $totals[0];
        $last = $totals[count($totals) - 1];
        
        $growth = $last - $first;
        $growthRate = $growth / count($trend);
        
        return [
            'start_memory' => $first,
            'end_memory' => $last,
            'growth' => $growth,
            'growth_human' => $this->formatBytes(abs($growth)),
            'growth_direction' => $growth > 0 ? 'increasing' : ($growth < 0 ? 'decreasing' : 'stable'),
            'growth_rate_per_sample' => $this->formatBytes(abs($growthRate)),
        ];
    }

    private function formatBytes(int $bytes): string
    {
        $units = ['B', 'KB', 'MB', 'GB', 'TB'];
        $i = 0;
        
        while ($bytes >= 1024 && $i < count($units) - 1) {
            $bytes /= 1024;
            $i++;
        }
        
        return round($bytes, 2) . ' ' . $units[$i];
    }
}

内存监控服务

php
<?php

namespace App\RabbitMQ\Memory;

class MemoryMonitorService
{
    private MemoryAnalyzer $analyzer;
    private array $thresholds;
    
    public function __construct(MemoryAnalyzer $analyzer, array $thresholds = [])
    {
        $this->analyzer = $analyzer;
        $this->thresholds = array_merge([
            'warning' => 70,
            'critical' => 85,
            'emergency' => 95,
        ], $thresholds);
    }
    
    public function check(): array
    {
        $breakdown = $this->analyzer->getMemoryBreakdown();
        $usage = $breakdown['usage_percent'];
        
        $status = 'normal';
        $alerts = [];
        
        if ($usage >= $this->thresholds['emergency']) {
            $status = 'emergency';
            $alerts[] = $this->createAlert('emergency', $usage);
        } elseif ($usage >= $this->thresholds['critical']) {
            $status = 'critical';
            $alerts[] = $this->createAlert('critical', $usage);
        } elseif ($usage >= $this->thresholds['warning']) {
            $status = 'warning';
            $alerts[] = $this->createAlert('warning', $usage);
        }
        
        return [
            'status' => $status,
            'usage_percent' => $usage,
            'breakdown' => $breakdown,
            'alerts' => $alerts,
            'timestamp' => date('Y-m-d H:i:s'),
        ];
    }

    public function getDetailedReport(): array
    {
        return [
            'memory' => $this->analyzer->getMemoryBreakdown(),
            'queues' => $this->analyzer->getQueueMemoryUsage(),
            'connections' => $this->analyzer->getConnectionMemoryUsage(),
            'erlang' => $this->analyzer->getErlangMemoryInfo(),
            'recommendations' => $this->analyzer->getMemoryRecommendations(),
        ];
    }

    public function startContinuousMonitoring(int $intervalSeconds = 60): void
    {
        while (true) {
            $check = $this->check();
            
            if ($check['status'] !== 'normal') {
                $this->handleAlert($check);
            }
            
            $this->logMemoryStatus($check);
            
            sleep($intervalSeconds);
        }
    }

    private function createAlert(string $level, float $usage): array
    {
        return [
            'level' => $level,
            'message' => "内存使用率达到 {$usage}%",
            'timestamp' => date('Y-m-d H:i:s'),
            'threshold' => $this->thresholds[$level],
        ];
    }

    private function handleAlert(array $check): void
    {
        // 发送告警通知
        error_log("RabbitMQ Memory Alert: " . json_encode($check));
    }

    private function logMemoryStatus(array $check): void
    {
        // 记录内存状态
        $logEntry = sprintf(
            "[%s] Memory: %.2f%% - Status: %s\n",
            $check['timestamp'],
            $check['usage_percent'],
            $check['status']
        );
        file_put_contents('/var/log/rabbitmq_memory.log', $logEntry, FILE_APPEND);
    }
}

内存使用报告生成器

php
<?php

namespace App\RabbitMQ\Memory;

class MemoryReportGenerator
{
    private MemoryAnalyzer $analyzer;
    
    public function __construct(MemoryAnalyzer $analyzer)
    {
        $this->analyzer = $analyzer;
    }
    
    public function generateReport(string $format = 'text'): string
    {
        $data = $this->gatherData();
        
        switch ($format) {
            case 'json':
                return json_encode($data, JSON_PRETTY_PRINT);
            case 'html':
                return $this->generateHtmlReport($data);
            case 'text':
            default:
                return $this->generateTextReport($data);
        }
    }

    private function gatherData(): array
    {
        return [
            'generated_at' => date('Y-m-d H:i:s'),
            'memory_breakdown' => $this->analyzer->getMemoryBreakdown(),
            'queue_memory' => $this->analyzer->getQueueMemoryUsage(),
            'connection_memory' => $this->analyzer->getConnectionMemoryUsage(),
            'erlang_memory' => $this->analyzer->getErlangMemoryInfo(),
            'recommendations' => $this->analyzer->getMemoryRecommendations(),
        ];
    }

    private function generateTextReport(array $data): string
    {
        $report = [];
        $report[] = "=" * 60;
        $report[] = "RabbitMQ Memory Analysis Report";
        $report[] = "Generated: " . $data['generated_at'];
        $report[] = "=" * 60;
        $report[] = "";
        
        $breakdown = $data['memory_breakdown'];
        $report[] = "## Memory Overview";
        $report[] = "Total Used: " . $breakdown['total_human'];
        $report[] = "Memory Limit: " . $breakdown['limit_human'];
        $report[] = "Usage: " . $breakdown['usage_percent'] . "%";
        $report[] = "Alarm Status: " . ($breakdown['alarm'] ? 'ACTIVE' : 'Normal');
        $report[] = "";
        
        $report[] = "## Memory Breakdown";
        foreach ($breakdown['breakdown'] as $type => $value) {
            $percent = round($value / ($breakdown['total'] ?: 1) * 100, 2);
            $report[] = sprintf("  %-20s: %s (%.2f%%)", $type, $this->formatBytes($value), $percent);
        }
        $report[] = "";
        
        $queueMemory = $data['queue_memory'];
        $report[] = "## Top 10 Queues by Memory";
        $topQueues = array_slice($queueMemory['queues'], 0, 10, true);
        foreach ($topQueues as $name => $info) {
            $report[] = sprintf("  %-40s: %s (%d messages)", 
                $name, $info['memory_human'], $info['messages']);
        }
        $report[] = "";
        
        if (!empty($data['recommendations'])) {
            $report[] = "## Recommendations";
            foreach ($data['recommendations'] as $rec) {
                $report[] = "  [{$rec['priority']}] {$rec['issue']}";
                $report[] = "    Current: {$rec['current']}";
                $report[] = "    Action: {$rec['recommendation']}";
            }
        }
        
        return implode("\n", $report);
    }

    private function generateHtmlReport(array $data): string
    {
        $html = '<!DOCTYPE html><html><head><title>RabbitMQ Memory Report</title>';
        $html .= '<style>body{font-family:Arial,sans-serif;margin:20px;}';
        $html .= 'table{border-collapse:collapse;width:100%;margin:10px 0;}';
        $html .= 'th,td{border:1px solid #ddd;padding:8px;text-align:left;}';
        $html .= 'th{background-color:#4CAF50;color:white;}';
        $html .= '.warning{background-color:#fff3cd;}';
        $html .= '.critical{background-color:#f8d7da;}</style></head><body>';
        
        $html .= '<h1>RabbitMQ Memory Analysis Report</h1>';
        $html .= '<p>Generated: ' . $data['generated_at'] . '</p>';
        
        $breakdown = $data['memory_breakdown'];
        $html .= '<h2>Memory Overview</h2>';
        $html .= '<table><tr><th>Metric</th><th>Value</th></tr>';
        $html .= '<tr><td>Total Used</td><td>' . $breakdown['total_human'] . '</td></tr>';
        $html .= '<tr><td>Memory Limit</td><td>' . $breakdown['limit_human'] . '</td></tr>';
        $html .= '<tr><td>Usage</td><td>' . $breakdown['usage_percent'] . '%</td></tr>';
        $html .= '</table>';
        
        $html .= '<h2>Memory Breakdown</h2>';
        $html .= '<table><tr><th>Type</th><th>Size</th><th>Percentage</th></tr>';
        foreach ($breakdown['breakdown'] as $type => $value) {
            $percent = round($value / ($breakdown['total'] ?: 1) * 100, 2);
            $html .= '<tr><td>' . $type . '</td><td>' . $this->formatBytes($value) . '</td><td>' . $percent . '%</td></tr>';
        }
        $html .= '</table>';
        
        $html .= '</body></html>';
        
        return $html;
    }

    private function formatBytes(int $bytes): string
    {
        $units = ['B', 'KB', 'MB', 'GB', 'TB'];
        $i = 0;
        while ($bytes >= 1024 && $i < count($units) - 1) {
            $bytes /= 1024;
            $i++;
        }
        return round($bytes, 2) . ' ' . $units[$i];
    }
}

实际应用场景

场景一:内存泄漏排查

php
<?php

class MemoryLeakDetector
{
    private MemoryAnalyzer $analyzer;
    private array $baseline = [];
    
    public function setBaseline(): void
    {
        $this->baseline = $this->analyzer->getMemoryBreakdown();
    }

    public function detectLeak(): array
    {
        $current = $this->analyzer->getMemoryBreakdown();
        
        $growth = $current['total'] - $this->baseline['total'];
        
        return [
            'baseline' => $this->baseline['total_human'],
            'current' => $current['total_human'],
            'growth' => $this->analyzer->formatBytes(abs($growth)),
            'potential_leak' => $growth > 100 * 1024 * 1024,
        ];
    }
}

场景二:容量规划

php
<?php

class MemoryCapacityPlanner
{
    public function estimateRequirements(array $params): array
    {
        $connections = $params['connections'] ?? 100;
        $channels = $params['channels'] ?? 500;
        $queues = $params['queues'] ?? 100;
        $messagesPerQueue = $params['messages_per_queue'] ?? 10000;
        $avgMessageSize = $params['avg_message_size'] ?? 1024;
        
        $connectionMemory = $connections * 100 * 1024;
        $channelMemory = $channels * 50 * 1024;
        $queueBaseMemory = $queues * 30 * 1024;
        $messageIndexMemory = $queues * $messagesPerQueue * 150;
        $messageBodyMemory = $queues * $messagesPerQueue * $avgMessageSize * 0.1;
        
        $totalEstimate = $connectionMemory + $channelMemory + $queueBaseMemory + 
                         $messageIndexMemory + $messageBodyMemory;
        
        return [
            'estimated_memory' => $totalEstimate,
            'estimated_memory_human' => $this->formatBytes($totalEstimate),
            'recommended_memory' => $totalEstimate * 2,
            'recommended_memory_human' => $this->formatBytes($totalEstimate * 2),
            'breakdown' => [
                'connections' => $this->formatBytes($connectionMemory),
                'channels' => $this->formatBytes($channelMemory),
                'queue_base' => $this->formatBytes($queueBaseMemory),
                'message_index' => $this->formatBytes($messageIndexMemory),
                'message_body' => $this->formatBytes($messageBodyMemory),
            ],
        ];
    }

    private function formatBytes(int $bytes): string
    {
        $units = ['B', 'KB', 'MB', 'GB'];
        $i = 0;
        while ($bytes >= 1024 && $i < count($units) - 1) {
            $bytes /= 1024;
            $i++;
        }
        return round($bytes, 2) . ' ' . $units[$i];
    }
}

常见问题与解决方案

问题一:内存持续增长

诊断步骤

bash
# 查看内存详情
rabbitmqctl status | grep -A 50 memory

# 查看队列内存
rabbitmqctl list_queues name memory messages

解决方案

  1. 检查是否有消费者断开
  2. 检查队列是否设置了 TTL
  3. 考虑使用懒队列

问题二:Binary 内存过高

解决方案

bash
# 强制 GC
rabbitmqctl eval 'erlang:garbage_collect().'

# 查看大 binary
rabbitmqctl eval 'erlang:memory(binary).'

最佳实践建议

内存监控

指标告警阈值处理建议
总内存使用率> 80%扩容或清理
Binary 占比> 50%检查消息积压
ETS 占比> 30%减少队列数

内存优化

场景优化方法
大量消息使用懒队列
大消息压缩或分片
大量队列合并或清理

相关链接