Skip to content

内存泄漏陷阱

概述

内存泄漏是 RabbitMQ 应用程序中常见但难以发现的问题。本文档分析内存泄漏的常见原因、检测方法和解决方案。

内存泄漏来源

┌─────────────────────────────────────────────────────────────────────────┐
│                        内存泄漏来源                                      │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐         │
│  │   应用层泄漏     │  │   连接层泄漏     │  │   消费层泄漏     │         │
│  │                 │  │                 │  │                 │         │
│  │ • 对象未释放    │  │ • 连接未关闭    │  │ • 消息未确认    │         │
│  │ • 缓存无限增长  │  │ • 通道泄漏      │  │ • 预取过多      │         │
│  │ • 循环引用      │  │ • 心跳积压      │  │ • 处理阻塞      │         │
│  └─────────────────┘  └─────────────────┘  └─────────────────┘         │
│                                                                         │
│  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐         │
│  │   队列层泄漏     │  │   服务端泄漏     │  │   系统层泄漏     │         │
│  │                 │  │                 │  │                 │         │
│  │ • 队列积压      │  │ • Erlang VM     │  │ • 文件描述符    │         │
│  │ • 死信堆积      │  │ • Mnesia 表     │  │ • Socket 泄漏   │         │
│  │ • 未消费消息    │  │ • 二进制堆      │  │ • 进程泄漏      │         │
│  └─────────────────┘  └─────────────────┘  └─────────────────┘         │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

常见陷阱场景

陷阱1:连接和通道泄漏

php
<?php

class ConnectionLeak
{
    public function publish(array $data): void
    {
        // 陷阱:每次调用创建新连接但不关闭
        $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $channel = $connection->channel();
        
        $channel->basic_publish(
            new AMQPMessage(json_encode($data)),
            '',
            'queue'
        );
        
        // 陷阱:异常时连接未关闭
        // 如果 basic_publish 抛出异常,连接永远不会关闭
    }
}

class ChannelLeak
{
    private $connection;
    private array $channels = [];
    
    public function getChannel()
    {
        // 陷阱:不断创建新通道,从不复用
        $channel = $this->connection->channel();
        $this->channels[] = $channel;
        
        // 陷阱:通道数组无限增长
        return $channel;
    }
}

陷阱2:消息未确认导致内存积压

php
<?php

class UnackedMessageLeak
{
    public function consume(string $queue): void
    {
        // 陷阱:预取数量过大
        $this->channel->basic_qos(null, 1000, null);
        
        $callback = function ($message) {
            $data = json_decode($message->body, true);
            
            // 陷阱:处理失败但不确认
            if ($this->shouldSkip($data)) {
                return; // 消息永远不会被确认
            }
            
            $this->process($data);
            $message->ack();
        };
        
        $this->channel->basic_consume($queue, '', false, false, false, false, $callback);
    }
    
    private function shouldSkip($data): bool
    {
        return true; // 所有消息都被跳过
    }
}

陷阱3:缓存无限增长

php
<?php

class CacheLeak
{
    private array $cache = [];
    
    public function process($message): void
    {
        $messageId = $message->get('message_id');
        
        // 陷阱:缓存无限增长
        $this->cache[$messageId] = $message->body;
        
        // 陷阱:从不清理缓存
        $this->doProcess($message);
    }
}

class ProcessingStateLeak
{
    private array $processingStates = [];
    
    public function startProcessing(string $messageId): void
    {
        $this->processingStates[$messageId] = [
            'start_time' => time(),
            'status' => 'processing',
        ];
    }
    
    public function finishProcessing(string $messageId): void
    {
        // 陷阱:如果处理失败,状态永远不会清理
        unset($this->processingStates[$messageId]);
    }
}

陷阱4:消费者阻塞

php
<?php

class BlockingConsumer
{
    public function consume(string $queue): void
    {
        $callback = function ($message) {
            // 陷阱:同步阻塞调用
            $result = $this->callExternalService($message);
            
            // 陷阱:无限等待
            $response = $this->waitForResponse();
            
            $message->ack();
        };
        
        $this->channel->basic_consume($queue, '', false, false, false, false, $callback);
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
    
    private function callExternalService($message)
    {
        // 可能长时间阻塞
        $ch = curl_init();
        curl_setopt($ch, CURLOPT_TIMEOUT, 0); // 无超时
        return curl_exec($ch);
    }
}

陷阱5:队列积压导致内存溢出

php
<?php

class QueueBacklogLeak
{
    public function publishWithoutConsumer(): void
    {
        // 陷阱:无消费者时持续发送消息
        for ($i = 0; $i < 1000000; $i++) {
            $message = new AMQPMessage(json_encode(['id' => $i]));
            $this->channel->basic_publish($message, '', 'queue_without_consumer');
        }
        
        // 队列积压,内存持续增长
    }
    
    public function declareQueueWithoutLimits(): void
    {
        // 陷阱:队列无长度限制
        $this->channel->queue_declare('unlimited_queue', false, true, false, false);
        
        // 应该设置最大长度
        // $this->channel->queue_declare(
        //     'limited_queue',
        //     false, true, false, false, false,
        //     new AMQPTable(['x-max-length' => 10000])
        // );
    }
}

正确做法示例

资源管理

php
<?php

namespace App\Messaging\Resource;

use PhpAmqpLib\Connection\AMQPStreamConnection;

class ResourceManager
{
    private static ?ResourceManager $instance = null;
    private ?AMQPStreamConnection $connection = null;
    private array $channels = [];
    private int $maxChannels = 16;
    private array $config;
    
    private function __construct(array $config)
    {
        $this->config = $config;
    }
    
    public static function getInstance(array $config = []): self
    {
        if (self::$instance === null) {
            self::$instance = new self($config);
        }
        return self::$instance;
    }
    
    public function getConnection(): AMQPStreamConnection
    {
        if ($this->connection === null || !$this->connection->isConnected()) {
            $this->connection = $this->createConnection();
        }
        
        return $this->connection;
    }
    
    public function getChannel()
    {
        $connection = $this->getConnection();
        
        foreach ($this->channels as $index => $channel) {
            if ($channel->is_open()) {
                return $channel;
            }
            unset($this->channels[$index]);
        }
        
        if (count($this->channels) < $this->maxChannels) {
            $channel = $connection->channel();
            $this->channels[] = $channel;
            return $channel;
        }
        
        return $connection->channel();
    }
    
    public function releaseChannel($channel): void
    {
        if ($channel && $channel->is_open()) {
            try {
                $channel->close();
            } catch (\Exception $e) {
                // Ignore
            }
        }
        
        $index = array_search($channel, $this->channels, true);
        if ($index !== false) {
            unset($this->channels[$index]);
        }
    }
    
    public function close(): void
    {
        foreach ($this->channels as $channel) {
            try {
                if ($channel->is_open()) {
                    $channel->close();
                }
            } catch (\Exception $e) {
                // Ignore
            }
        }
        $this->channels = [];
        
        if ($this->connection) {
            try {
                if ($this->connection->isConnected()) {
                    $this->connection->close();
                }
            } catch (\Exception $e) {
                // Ignore
            }
            $this->connection = null;
        }
    }
    
    private function createConnection(): AMQPStreamConnection
    {
        return new AMQPStreamConnection(
            $this->config['host'],
            $this->config['port'],
            $this->config['user'],
            $this->config['password'],
            $this->config['vhost'] ?? '/',
            false,
            'AMQPLAIN',
            null,
            'en_US',
            $this->config['connection_timeout'] ?? 3.0,
            $this->config['read_write_timeout'] ?? 10.0,
            null,
            true,
            $this->config['heartbeat'] ?? 60
        );
    }
    
    public function __destruct()
    {
        $this->close();
    }
}

内存安全的消费者

php
<?php

namespace App\Messaging\Consumer;

use PhpAmqpLib\Message\AMQPMessage;

class MemorySafeConsumer
{
    private $channel;
    private int $prefetchCount;
    private int $processedCount = 0;
    private int $memoryLimit;
    private int $restartThreshold;
    
    public function __construct(
        $channel,
        int $prefetchCount = 10,
        int $memoryLimitMB = 512,
        int $restartThreshold = 10000
    ) {
        $this->channel = $channel;
        $this->prefetchCount = $prefetchCount;
        $this->memoryLimit = $memoryLimitMB * 1024 * 1024;
        $this->restartThreshold = $restartThreshold;
    }
    
    public function consume(string $queue, callable $handler): void
    {
        $this->channel->basic_qos(null, $this->prefetchCount, null);
        
        $callback = function (AMQPMessage $message) use ($handler, $queue) {
            $this->processMessage($message, $handler, $queue);
        };
        
        $this->channel->basic_consume(
            $queue,
            $this->getConsumerTag(),
            false,
            false,
            false,
            false,
            $callback
        );
        
        while ($this->channel->is_consuming()) {
            $this->checkMemoryUsage();
            
            try {
                $this->channel->wait(null, false, 30);
            } catch (\Exception $e) {
                // Handle error
            }
        }
    }
    
    private function processMessage(
        AMQPMessage $message,
        callable $handler,
        string $queue
    ): void {
        try {
            $data = json_decode($message->body, true);
            
            $result = $handler($data, $message);
            
            if ($result !== false) {
                $message->ack();
            } else {
                $message->nack(false, true);
            }
            
            $this->processedCount++;
            
            $this->checkRestartThreshold();
            
        } catch (\Exception $e) {
            $message->nack(false, true);
        }
    }
    
    private function checkMemoryUsage(): void
    {
        $memoryUsage = memory_get_usage(true);
        
        if ($memoryUsage > $this->memoryLimit) {
            $this->triggerGracefulRestart();
        }
    }
    
    private function checkRestartThreshold(): void
    {
        if ($this->processedCount >= $this->restartThreshold) {
            $this->triggerGracefulRestart();
        }
    }
    
    private function triggerGracefulRestart(): void
    {
        // 触发优雅重启
        // 可以通过信号或状态标记实现
        $this->channel->basic_cancel($this->getConsumerTag());
    }
    
    private function getConsumerTag(): string
    {
        return gethostname() . '-' . getmypid();
    }
}

有界缓存实现

php
<?php

namespace App\Messaging\Cache;

class BoundedCache
{
    private array $cache = [];
    private int $maxSize;
    private int $ttl;
    
    public function __construct(int $maxSize = 10000, int $ttl = 3600)
    {
        $this->maxSize = $maxSize;
        $this->ttl = $ttl;
    }
    
    public function set(string $key, $value): void
    {
        $this->evictIfNeeded();
        
        $this->cache[$key] = [
            'value' => $value,
            'expires_at' => time() + $this->ttl,
        ];
    }
    
    public function get(string $key)
    {
        if (!isset($this->cache[$key])) {
            return null;
        }
        
        $item = $this->cache[$key];
        
        if ($item['expires_at'] < time()) {
            unset($this->cache[$key]);
            return null;
        }
        
        return $item['value'];
    }
    
    public function has(string $key): bool
    {
        return $this->get($key) !== null;
    }
    
    public function delete(string $key): void
    {
        unset($this->cache[$key]);
    }
    
    public function clear(): void
    {
        $this->cache = [];
    }
    
    public function size(): int
    {
        return count($this->cache);
    }
    
    private function evictIfNeeded(): void
    {
        if (count($this->cache) >= $this->maxSize) {
            $this->evictOldest();
        }
    }
    
    private function evictOldest(): void
    {
        $now = time();
        $evictCount = (int) ($this->maxSize * 0.1);
        
        uasort($this->cache, function ($a, $b) {
            return $a['expires_at'] <=> $b['expires_at'];
        });
        
        $keys = array_keys($this->cache);
        
        for ($i = 0; $i < $evictCount && $i < count($keys); $i++) {
            unset($this->cache[$keys[$i]]);
        }
        
        foreach ($this->cache as $key => $item) {
            if ($item['expires_at'] < $now) {
                unset($this->cache[$key]);
            }
        }
    }
}

内存监控

php
<?php

namespace App\Messaging\Monitor;

class MemoryMonitor
{
    private int $warningThreshold;
    private int $criticalThreshold;
    private $logger;
    
    public function __construct(
        int $warningThresholdMB = 256,
        int $criticalThresholdMB = 512,
        $logger = null
    ) {
        $this->warningThreshold = $warningThresholdMB * 1024 * 1024;
        $this->criticalThreshold = $criticalThresholdMB * 1024 * 1024;
        $this->logger = $logger;
    }
    
    public function check(): array
    {
        $usage = memory_get_usage(true);
        $peak = memory_get_peak_usage(true);
        $limit = $this->getMemoryLimit();
        
        $status = 'normal';
        
        if ($usage > $this->criticalThreshold) {
            $status = 'critical';
        } elseif ($usage > $this->warningThreshold) {
            $status = 'warning';
        }
        
        $result = [
            'status' => $status,
            'usage_bytes' => $usage,
            'usage_mb' => round($usage / 1024 / 1024, 2),
            'peak_bytes' => $peak,
            'peak_mb' => round($peak / 1024 / 1024, 2),
            'limit_bytes' => $limit,
            'limit_mb' => round($limit / 1024 / 1024, 2),
            'usage_percent' => $limit > 0 ? round(($usage / $limit) * 100, 2) : 0,
        ];
        
        if ($status !== 'normal' && $this->logger) {
            $this->logger->warning('Memory usage alert', $result);
        }
        
        return $result;
    }
    
    public function isOverThreshold(): bool
    {
        return memory_get_usage(true) > $this->warningThreshold;
    }
    
    public function isCritical(): bool
    {
        return memory_get_usage(true) > $this->criticalThreshold;
    }
    
    public function getUsage(): int
    {
        return memory_get_usage(true);
    }
    
    public function getPeakUsage(): int
    {
        return memory_get_peak_usage(true);
    }
    
    private function getMemoryLimit(): int
    {
        $limit = ini_get('memory_limit');
        
        if ($limit === '-1') {
            return PHP_INT_MAX;
        }
        
        $unit = strtoupper(substr($limit, -1));
        $value = (int) substr($limit, 0, -1);
        
        switch ($unit) {
            case 'G':
                return $value * 1024 * 1024 * 1024;
            case 'M':
                return $value * 1024 * 1024;
            case 'K':
                return $value * 1024;
            default:
                return (int) $limit;
        }
    }
}

最佳实践建议清单

资源管理

  • [ ] 使用连接池复用连接
  • [ ] 及时关闭不再使用的资源
  • [ ] 使用 try-finally 确保资源释放
  • [ ] 实现资源清理的析构函数

内存管理

  • [ ] 使用有界数据结构
  • [ ] 定期清理缓存
  • [ ] 监控内存使用
  • [ ] 设置内存限制

消费者设计

  • [ ] 设置合理的预取数量
  • [ ] 确保消息正确确认
  • [ ] 避免阻塞操作
  • [ ] 实现优雅重启

监控告警

  • [ ] 监控内存使用趋势
  • [ ] 配置内存告警
  • [ ] 监控未确认消息数
  • [ ] 监控连接和通道数

生产环境注意事项

  1. 内存限制

    • 设置 PHP 内存限制
    • 监控内存使用趋势
    • 配置内存告警阈值
  2. 定期重启

    • 设置处理消息数阈值
    • 定期重启消费者进程
    • 使用进程管理器
  3. 资源清理

    • 实现资源清理钩子
    • 处理异常情况下的清理
    • 使用 finally 块
  4. 监控分析

    • 使用内存分析工具
    • 定期检查内存泄漏
    • 分析内存快照

相关链接