Skip to content

内存问题诊断

概述

RabbitMQ 内存问题是生产环境中常见的故障原因,可能导致服务变慢、触发流控甚至崩溃。本文档将详细介绍内存问题的诊断方法和解决方案。

内存使用分析

1. RabbitMQ 内存组成

┌─────────────────────────────────────────────────────────────┐
│                  RabbitMQ 内存组成                           │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                    Total Memory                      │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐            │   │
│  │  │  Queue   │ │ Connection│ │  Binary  │            │   │
│  │  │  Index   │ │  Buffers  │ │  Data    │            │   │
│  │  └──────────┘ └──────────┘ └──────────┘            │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐            │   │
│  │  │ Message  │ │  Plugin  │ │  Erlang  │            │   │
│  │  │  Store   │ │  Memory  │ │   VM     │            │   │
│  │  └──────────┘ └──────────┘ └──────────┘            │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

2. 内存使用分布

┌─────────────────────────────────────────────────────────────┐
│                 典型内存使用分布                             │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  组件              │  占比      │  说明                     │
│  ─────────────────────────────────────────────────────────  │
│  消息存储          │  30-50%   │  队列中的消息              │
│  连接缓冲区        │  10-20%   │  连接和通道缓冲            │
│  二进制数据        │  10-20%   │  消息体和元数据            │
│  队列索引          │  5-10%    │  队列索引数据              │
│  插件              │  5-10%    │  管理插件等                │
│  Erlang VM         │  5-10%    │  运行时开销                │
│  其他              │  5-10%    │  其他组件                  │
│                                                             │
└─────────────────────────────────────────────────────────────┘

诊断步骤

步骤1:查看整体内存状态

bash
# 查看内存使用概览
rabbitmqctl status | grep -A 30 "Memory"

# 使用 rabbitmq-diagnostics
rabbitmq-diagnostics memory_breakdown

# 查看内存水位
rabbitmqctl status | grep -A 5 "memory_high_watermark"

步骤2:分析内存分布

bash
# 查看详细内存分布
rabbitmqctl eval 'rabbit_memory:memory_breakdown().'

# 查看各组件内存使用
rabbitmqctl eval '
  [{total, rabbit_memory:total_memory()},
   {connections, rabbit_memory:connection_memory()},
   {queues, rabbit_memory:queue_memory()},
   {binary, rabbit_memory:binary_memory()}].'

# 查看队列内存占用
rabbitmqctl list_queues name memory | sort -k2 -n -r | head -20

步骤3:检查内存告警

bash
# 查看告警状态
rabbitmqctl list_node_names | while read node; do
    echo "节点: $node"
    rabbitmqctl -n $node eval 'rabbit_alarm:get_alarms().'
done

# 查看内存限制
rabbitmqctl eval 'rabbit_vm:memory_high_watermark().'

# 查看内存使用率
rabbitmqctl status | grep "Memory use"

步骤4:分析进程内存

bash
# 查看Erlang进程内存
rabbitmqctl eval '
  processes() |> 
  lists:map(fun(P) -> {P, process_info(P, memory)} end) |>
  lists:sort(fun({_, {memory, A}}, {_, {memory, B}}) -> A > B end) |>
  lists:sublist(10).'

# 查看ETS表内存
rabbitmqctl eval '
  ets:all() |>
  lists:map(fun(T) -> {T, ets:info(T, memory)} end) |>
  lists:sort(fun({_, A}, {_, B}) -> A > B end) |>
  lists:sublist(10).'

PHP 内存诊断工具

php
<?php

class RabbitMQMemoryDiagnostics
{
    private $apiUrl;
    private $user;
    private $password;

    public function __construct(
        string $host = 'localhost',
        int $port = 15672,
        string $user = 'guest',
        string $password = 'guest'
    ) {
        $this->apiUrl = "http://{$host}:{$port}/api";
        $this->user = $user;
        $this->password = $password;
    }

    private function request(string $endpoint): array
    {
        $ch = curl_init();
        curl_setopt_array($ch, [
            CURLOPT_URL => $this->apiUrl . $endpoint,
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_USERPWD => $this->user . ':' . $this->password,
            CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
            CURLOPT_TIMEOUT => 10,
        ]);
        
        $response = curl_exec($ch);
        curl_close($ch);
        
        return json_decode($response, true);
    }

    public function getMemoryOverview(): array
    {
        $overview = $this->request('/overview');
        $nodes = $this->request('/nodes');
        
        $result = [
            'timestamp' => date('Y-m-d H:i:s'),
            'cluster_name' => $overview['cluster_name'] ?? 'unknown',
            'nodes' => [],
            'total_memory' => 0,
            'alerts' => [],
        ];
        
        foreach ($nodes as $node) {
            $memUsed = $node['mem_used'] ?? 0;
            $memLimit = $node['mem_limit'] ?? 1;
            $memPercent = round($memUsed / $memLimit * 100, 2);
            
            $result['nodes'][] = [
                'name' => $node['name'],
                'mem_used' => $this->formatBytes($memUsed),
                'mem_limit' => $this->formatBytes($memLimit),
                'mem_percent' => $memPercent,
                'mem_alarm' => $node['mem_alarm'] ?? false,
                'partitions' => count($node['partitions'] ?? []),
            ];
            
            $result['total_memory'] += $memUsed;
            
            if ($memPercent > 80) {
                $result['alerts'][] = [
                    'level' => $memPercent > 90 ? 'critical' : 'warning',
                    'node' => $node['name'],
                    'message' => "节点 {$node['name']} 内存使用率 {$memPercent}%",
                    'recommendation' => $this->getRecommendation($memPercent),
                ];
            }
            
            if ($node['mem_alarm'] ?? false) {
                $result['alerts'][] = [
                    'level' => 'critical',
                    'node' => $node['name'],
                    'message' => "节点 {$node['name']} 触发内存告警",
                    'recommendation' => '立即处理,考虑扩容或清理消息',
                ];
            }
        }
        
        $result['total_memory_formatted'] = $this->formatBytes($result['total_memory']);
        
        return $result;
    }

    public function getQueueMemoryAnalysis(): array
    {
        $queues = $this->request('/queues');
        
        $result = [
            'total_queues' => count($queues),
            'total_memory' => 0,
            'top_memory_queues' => [],
            'queues_without_consumers' => [],
            'queues_with_backlog' => [],
        ];
        
        $queueMemory = [];
        
        foreach ($queues as $queue) {
            $memory = $queue['memory'] ?? 0;
            $messages = $queue['messages'] ?? 0;
            $consumers = $queue['consumers'] ?? 0;
            
            $result['total_memory'] += $memory;
            
            $queueMemory[] = [
                'name' => $queue['name'],
                'vhost' => $queue['vhost'],
                'memory' => $memory,
                'memory_formatted' => $this->formatBytes($memory),
                'messages' => $messages,
                'consumers' => $consumers,
            ];
            
            if ($consumers === 0 && $messages > 0) {
                $result['queues_without_consumers'][] = [
                    'name' => $queue['name'],
                    'messages' => $messages,
                    'memory' => $this->formatBytes($memory),
                ];
            }
            
            if ($messages > 10000) {
                $result['queues_with_backlog'][] = [
                    'name' => $queue['name'],
                    'messages' => $messages,
                    'memory' => $this->formatBytes($memory),
                ];
            }
        }
        
        usort($queueMemory, function ($a, $b) {
            return $b['memory'] - $a['memory'];
        });
        
        $result['top_memory_queues'] = array_slice($queueMemory, 0, 20);
        $result['total_memory_formatted'] = $this->formatBytes($result['total_memory']);
        
        return $result;
    }

    public function getConnectionMemoryAnalysis(): array
    {
        $connections = $this->request('/connections');
        
        $result = [
            'total_connections' => count($connections),
            'total_memory' => 0,
            'by_user' => [],
            'by_peer_host' => [],
            'top_memory_connections' => [],
        ];
        
        $connMemory = [];
        
        foreach ($connections as $conn) {
            $recvOct = $conn['recv_oct'] ?? 0;
            $sendOct = $conn['send_oct'] ?? 0;
            $channels = $conn['channels'] ?? 0;
            
            $estimatedMemory = $recvOct + $sendOct + ($channels * 10240);
            
            $result['total_memory'] += $estimatedMemory;
            
            $user = $conn['user'] ?? 'unknown';
            $peerHost = $conn['peer_host'] ?? 'unknown';
            
            $result['by_user'][$user] = ($result['by_user'][$user] ?? 0) + 1;
            $result['by_peer_host'][$peerHost] = ($result['by_peer_host'][$peerHost] ?? 0) + 1;
            
            $connMemory[] = [
                'name' => $conn['name'],
                'user' => $user,
                'peer_host' => $peerHost,
                'channels' => $channels,
                'recv_oct' => $this->formatBytes($recvOct),
                'send_oct' => $this->formatBytes($sendOct),
                'estimated_memory' => $this->formatBytes($estimatedMemory),
            ];
        }
        
        usort($connMemory, function ($a, $b) {
            return strcmp($b['estimated_memory'], $a['estimated_memory']);
        });
        
        $result['top_memory_connections'] = array_slice($connMemory, 0, 10);
        $result['total_memory_formatted'] = $this->formatBytes($result['total_memory']);
        
        return $result;
    }

    public function generateMemoryReport(): string
    {
        $overview = $this->getMemoryOverview();
        $queues = $this->getQueueMemoryAnalysis();
        $connections = $this->getConnectionMemoryAnalysis();
        
        $report = "=== RabbitMQ 内存诊断报告 ===\n";
        $report .= "生成时间: {$overview['timestamp']}\n\n";
        
        $report .= "【节点内存状态】\n";
        foreach ($overview['nodes'] as $node) {
            $report .= "节点: {$node['name']}\n";
            $report .= "  已用: {$node['mem_used']} / {$node['mem_limit']} ({$node['mem_percent']}%)\n";
            $report .= "  告警: " . ($node['mem_alarm'] ? '是' : '否') . "\n";
        }
        $report .= "\n";
        
        $report .= "【队列内存分析】\n";
        $report .= "总队列数: {$queues['total_queues']}\n";
        $report .= "队列总内存: {$queues['total_memory_formatted']}\n";
        $report .= "TOP5 内存占用队列:\n";
        foreach (array_slice($queues['top_memory_queues'], 0, 5) as $queue) {
            $report .= "  {$queue['name']}: {$queue['memory_formatted']} ({$queue['messages']} 消息)\n";
        }
        $report .= "\n";
        
        $report .= "【连接内存分析】\n";
        $report .= "总连接数: {$connections['total_connections']}\n";
        $report .= "连接总内存: {$connections['total_memory_formatted']}\n";
        $report .= "按用户统计:\n";
        foreach ($connections['by_user'] as $user => $count) {
            $report .= "  {$user}: {$count} 连接\n";
        }
        $report .= "\n";
        
        if (!empty($overview['alerts'])) {
            $report .= "【告警信息】\n";
            foreach ($overview['alerts'] as $alert) {
                $report .= "[{$alert['level']}] {$alert['message']}\n";
                $report .= "  建议: {$alert['recommendation']}\n";
            }
        }
        
        if (!empty($queues['queues_without_consumers'])) {
            $report .= "\n【无消费者队列】\n";
            foreach (array_slice($queues['queues_without_consumers'], 0, 5) as $queue) {
                $report .= "  {$queue['name']}: {$queue['messages']} 消息, {$queue['memory']}\n";
            }
        }
        
        return $report;
    }

    private function formatBytes(int $bytes): string
    {
        $units = ['B', 'KB', 'MB', 'GB', 'TB'];
        $i = 0;
        while ($bytes >= 1024 && $i < count($units) - 1) {
            $bytes /= 1024;
            $i++;
        }
        return round($bytes, 2) . ' ' . $units[$i];
    }

    private function getRecommendation(float $memPercent): string
    {
        if ($memPercent > 90) {
            return '紧急:立即清理消息或扩容内存';
        } elseif ($memPercent > 80) {
            return '警告:考虑增加消费者或清理积压消息';
        } else {
            return '关注:持续监控内存使用趋势';
        }
    }
}

// 使用示例
$diagnostics = new RabbitMQMemoryDiagnostics();
echo $diagnostics->generateMemoryReport();

常见内存问题及解决方案

1. 消息积压导致内存过高

php
<?php

class MemoryOptimization
{
    private $connection;
    private $channel;

    public function __construct()
    {
        $this->connection = new \PhpAmqpLib\Connection\AMQPStreamConnection(
            'localhost', 5672, 'guest', 'guest'
        );
        $this->channel = $this->connection->channel();
    }

    public function convertToLazyQueue(string $queueName): void
    {
        $policy = [
            'name' => 'lazy-' . $queueName,
            'pattern' => '^' . preg_quote($queueName) . '$',
            'definition' => [
                'queue-type' => 'lazy',
            ],
            'priority' => 1,
            'apply-to' => 'queues',
        ];
        
        echo "建议使用以下命令设置策略:\n";
        echo "rabbitmqctl set_policy {$policy['name']} \"{$policy['pattern']}\" '" . 
             json_encode($policy['definition']) . "' --apply-to queues\n";
    }

    public function setQueueMaxLength(string $queueName, int $maxLength): void
    {
        $args = new \PhpAmqpLib\Wire\AMQPTable([
            'x-max-length' => $maxLength,
            'x-overflow' => 'reject-publish-dlx',
        ]);
        
        echo "创建队列时使用参数:\n";
        echo "x-max-length: {$maxLength}\n";
        echo "x-overflow: reject-publish-dlx\n";
    }

    public function setQueueTTL(string $queueName, int $ttlMs): void
    {
        echo "建议设置队列TTL: {$ttlMs}ms\n";
        echo "创建队列时使用参数 x-message-ttl: {$ttlMs}\n";
    }

    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

2. 连接数过多导致内存过高

bash
# 查看连接数
rabbitmqctl list_connections | wc -l

# 查看每个用户的连接数
rabbitmqctl list_connections user | sort | uniq -c | sort -rn

# 限制最大连接数
# 在 rabbitmq.conf 中配置
# connection_max = 10000

3. 内存告警处理

bash
# 临时提高内存限制
rabbitmqctl eval 'application:set_env(rabbit, vm_memory_high_watermark, 0.6).'

# 触发GC
rabbitmqctl eval 'erlang:garbage_collect().'

# 查看阻塞的发布者
rabbitmqctl list_connections name state | grep blocked

内存配置优化

1. 内存水位配置

bash
# rabbitmq.conf

# 内存高水位(默认0.4,即40%)
vm_memory_high_watermark.relative = 0.6

# 或使用绝对值
# vm_memory_high_watermark.absolute = 4GB

# 流控触发阈值(默认0.05,即在高水位的95%触发)
vm_memory_high_watermark_paging_ratio = 0.75

2. 队列内存优化

bash
# 使用惰性队列减少内存占用
# 设置策略
rabbitmqctl set_policy lazy-queues "^lazy\." '{"queue-type":"lazy"}' --apply-to queues

# 设置队列最大长度
rabbitmqctl set_policy max-length "^limited\." '{"max-length":100000}' --apply-to queues

# 设置消息TTL
rabbitmqctl set_policy message-ttl "^ttl\." '{"message-ttl":86400000}' --apply-to queues

3. Erlang VM 内存优化

bash
# rabbitmq.conf

# Erlang VM 内存分配策略
scheduler_busy_wait_threshold = 1000

# GC 配置
# 在 advanced.config 中配置

内存监控脚本

bash
#!/bin/bash
# memory_monitor.sh

THRESHOLD=80
LOG_FILE="/var/log/rabbitmq/memory_monitor.log"

log_message() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" >> $LOG_FILE
}

get_memory_percent() {
    rabbitmqctl status 2>/dev/null | grep -oP 'Memory use:.*?\K[0-9.]+(?=%)'
}

check_memory() {
    local mem_percent=$(get_memory_percent)
    
    if (( $(echo "$mem_percent > $THRESHOLD" | bc -l) )); then
        log_message "WARNING: 内存使用率 ${mem_percent}% 超过阈值 ${THRESHOLD}%"
        
        # 记录内存详情
        log_message "队列内存TOP5:"
        rabbitmqctl list_queues name memory 2>/dev/null | \
            sort -k2 -rn | head -5 >> $LOG_FILE
        
        # 发送告警
        # send_alert "RabbitMQ内存告警: ${mem_percent}%"
    fi
}

while true; do
    check_memory
    sleep 60
done

注意事项

  1. 内存告警是保护机制:不要盲目提高内存限制
  2. 惰性队列有代价:会增加磁盘IO
  3. 监控要持续:内存问题往往是渐进的
  4. 预防优于治疗:提前设置队列限制
  5. 测试内存恢复:确保内存下降后服务能恢复

相关链接