Skip to content

磁盘问题诊断

按述

RabbitMQ 磁盘问题可能导致消息持久化失败、服务阻塞甚至数据丢失。本文档将详细介绍磁盘问题的诊断方法和解决方案。

磁盘使用分析

1. RabbitMQ 磁盘数据组成

┌─────────────────────────────────────────────────────────────┐
│                  RabbitMQ 磁盘数据组成                       │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  /var/lib/rabbitmq/mnesia/                                  │
│  ├── rabbit@hostname/                                       │
│  │   ├── msg_store_persistent/  # 持久化消息存储            │
│  │   ├── msg_store_transient/   # 临时消息存储              │
│  │   ├── queues/                # 队列数据                  │
│  │   ├── recovery.dets         # 恢复数据                   │
│  │   └── ...                                               │
│  ├── rabbit@hostname-sasl.log  # SASL日志                   │
│  └── cluster_nodes.config       # 集群节点配置              │
│                                                             │
└─────────────────────────────────────────────────────────────┘

2. 磁盘空间占用因素

┌─────────────────────────────────────────────────────────────┐
│                  磁盘空间占用因素                             │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  因素              │  说明           │  影响程度            │
│  ─────────────────────────────────────────────────────────  │
│  持久化消息        │  消息体数据     │  高                  │
│  队列索引          │  消息索引       │  中                  │
│  消息元数据        │  消息属性       │  低                  │
│  日志文件          │  运行日志       │  中                  │
│  配置数据          │  Mnesia数据     │  低                  │
│                                                             │
└─────────────────────────────────────────────────────────────┘

诊断步骤

步骤1:检查磁盘空间

bash
# 查看磁盘空间
df -h /var/lib/rabbitmq

# 查看RabbitMQ数据目录大小
du -sh /var/lib/rabbitmq/*

# 查看详细目录大小
du -sh /var/lib/rabbitmq/mnesia/*/*

步骤2:检查磁盘告警

bash
# 查看磁盘告警状态
rabbitmqctl status | grep -A 5 "disk_free_limit"

# 查看磁盘空间限制
rabbitmqctl eval 'rabbit_disk_monitor:get_disk_free_limit().'

# 查看当前磁盘空间
rabbitmqctl eval 'rabbit_disk_monitor:get_disk_free().'

# 查看告警
rabbitmqctl eval 'rabbit_alarm:get_alarms().'

步骤3:分析磁盘IO

bash
# 查看磁盘IO统计
iostat -x 1 10

# 查看进程IO
iotop -p $(pgrep -d',' -f rabbitmq)

# 查看磁盘读写
vmstat -d

步骤4:分析消息存储

bash
# 查看消息存储目录
ls -la /var/lib/rabbitmq/mnesia/rabbit@*/msg_store_persistent/

# 查看队列数据大小
du -sh /var/lib/rabbitmq/mnesia/rabbit@*/queues/*

# 查看消息数量
rabbitmqctl list_queues name messages messages_persistent

PHP 磁盘诊断工具

php
<?php

class RabbitMQDiskDiagnostics
{
    private $apiUrl;
    private $user;
    private $password;
    private $dataDir = '/var/lib/rabbitmq';

    public function __construct(
        string $host = 'localhost',
        int $port = 15672,
        string $user = 'guest',
        string $password = 'guest'
    ) {
        $this->apiUrl = "http://{$host}:{$port}/api";
        $this->user = $user;
        $this->password = $password;
    }

    private function request(string $endpoint): array
    {
        $ch = curl_init();
        curl_setopt_array($ch, [
            CURLOPT_URL => $this->apiUrl . $endpoint,
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_USERPWD => $this->user . ':' . $this->password,
            CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
            CURLOPT_TIMEOUT => 10,
        ]);
        
        $response = curl_exec($ch);
        curl_close($ch);
        
        return json_decode($response, true);
    }

    public function getDiskOverview(): array
    {
        $nodes = $this->request('/nodes');
        
        $result = [
            'timestamp' => date('Y-m-d H:i:s'),
            'nodes' => [],
            'alerts' => [],
        ];
        
        foreach ($nodes as $node) {
            $diskFree = $node['disk_free'] ?? 0;
            $diskLimit = $node['disk_free_limit'] ?? 0;
            $diskAlarm = $node['disk_free_alarm'] ?? false;
            
            $result['nodes'][] = [
                'name' => $node['name'],
                'disk_free' => $this->formatBytes($diskFree),
                'disk_free_bytes' => $diskFree,
                'disk_limit' => $this->formatBytes($diskLimit),
                'disk_limit_bytes' => $diskLimit,
                'disk_alarm' => $diskAlarm,
                'disk_percent' => $diskLimit > 0 ? 
                    round(($diskFree / $diskLimit) * 100, 2) : 0,
            ];
            
            if ($diskAlarm) {
                $result['alerts'][] = [
                    'level' => 'critical',
                    'node' => $node['name'],
                    'message' => "节点 {$node['name']} 触发磁盘告警",
                    'disk_free' => $this->formatBytes($diskFree),
                    'disk_limit' => $this->formatBytes($diskLimit),
                ];
            }
            
            if ($diskFree < $diskLimit * 2) {
                $result['alerts'][] = [
                    'level' => 'warning',
                    'node' => $node['name'],
                    'message' => "节点 {$node['name']} 磁盘空间即将不足",
                    'disk_free' => $this->formatBytes($diskFree),
                ];
            }
        }
        
        return $result;
    }

    public function getDataDirectoryAnalysis(): array
    {
        $result = [
            'timestamp' => date('Y-m-d H:i:s'),
            'data_dir' => $this->dataDir,
            'total_size' => 0,
            'directories' => [],
        ];
        
        if (!is_dir($this->dataDir)) {
            $result['error'] = "数据目录不存在: {$this->dataDir}";
            return $result;
        }
        
        $iterator = new RecursiveIteratorIterator(
            new RecursiveDirectoryIterator($this->dataDir, RecursiveDirectoryIterator::SKIP_DOTS),
            RecursiveIteratorIterator::SELF_FIRST
        );
        
        $dirSizes = [];
        
        foreach ($iterator as $file) {
            if ($file->isDir()) {
                $path = $file->getPathname();
                $dirSizes[$path] = 0;
            }
        }
        
        foreach ($iterator as $file) {
            if ($file->isFile()) {
                $size = $file->getSize();
                $result['total_size'] += $size;
                
                $path = $file->getPath();
                foreach ($dirSizes as $dir => $s) {
                    if (strpos($path, $dir) === 0) {
                        $dirSizes[$dir] += $size;
                    }
                }
            }
        }
        
        arsort($dirSizes);
        
        foreach (array_slice($dirSizes, 0, 20, true) as $dir => $size) {
            $result['directories'][] = [
                'path' => $dir,
                'size' => $this->formatBytes($size),
                'size_bytes' => $size,
            ];
        }
        
        $result['total_size_formatted'] = $this->formatBytes($result['total_size']);
        
        return $result;
    }

    public function getMessageStoreAnalysis(): array
    {
        $queues = $this->request('/queues');
        
        $result = [
            'timestamp' => date('Y-m-d H:i:s'),
            'total_messages' => 0,
            'total_persistent' => 0,
            'queues' => [],
        ];
        
        foreach ($queues as $queue) {
            $messages = $queue['messages'] ?? 0;
            $persistent = $queue['message_stats']['persistent'] ?? 0;
            
            $result['total_messages'] += $messages;
            $result['total_persistent'] += $persistent;
            
            if ($messages > 0) {
                $result['queues'][] = [
                    'name' => $queue['name'],
                    'vhost' => $queue['vhost'],
                    'messages' => $messages,
                    'messages_ready' => $queue['messages_ready'] ?? 0,
                    'messages_unacked' => $queue['messages_unacked'] ?? 0,
                    'durable' => $queue['durable'] ?? false,
                ];
            }
        }
        
        usort($result['queues'], function ($a, $b) {
            return $b['messages'] - $a['messages'];
        });
        
        return $result;
    }

    public function getIOStatistics(): array
    {
        $nodes = $this->request('/nodes');
        
        $result = [
            'timestamp' => date('Y-m-d H:i:s'),
            'nodes' => [],
        ];
        
        foreach ($nodes as $node) {
            $ioStats = $node['io_read_stats'] ?? [];
            $ioWriteStats = $node['io_write_stats'] ?? [];
            $ioSyncStats = $node['io_sync_stats'] ?? [];
            
            $result['nodes'][] = [
                'name' => $node['name'],
                'io_read' => [
                    'count' => $ioStats['read_count'] ?? 0,
                    'bytes' => $this->formatBytes($ioStats['read_bytes'] ?? 0),
                    'avg_time' => ($ioStats['read_time'] ?? 0) . ' μs',
                ],
                'io_write' => [
                    'count' => $ioWriteStats['write_count'] ?? 0,
                    'bytes' => $this->formatBytes($ioWriteStats['write_bytes'] ?? 0),
                    'avg_time' => ($ioWriteStats['write_time'] ?? 0) . ' μs',
                ],
                'io_sync' => [
                    'count' => $ioSyncStats['sync_count'] ?? 0,
                    'avg_time' => ($ioSyncStats['sync_time'] ?? 0) . ' μs',
                ],
            ];
        }
        
        return $result;
    }

    public function generateDiskReport(): string
    {
        $overview = $this->getDiskOverview();
        $dataDir = $this->getDataDirectoryAnalysis();
        $messages = $this->getMessageStoreAnalysis();
        $io = $this->getIOStatistics();
        
        $report = "=== RabbitMQ 磁盘诊断报告 ===\n";
        $report .= "生成时间: {$overview['timestamp']}\n\n";
        
        $report .= "【磁盘空间状态】\n";
        foreach ($overview['nodes'] as $node) {
            $report .= "节点: {$node['name']}\n";
            $report .= "  可用空间: {$node['disk_free']}\n";
            $report .= "  限制阈值: {$node['disk_limit']}\n";
            $report .= "  告警状态: " . ($node['disk_alarm'] ? '是' : '否') . "\n";
        }
        $report .= "\n";
        
        $report .= "【数据目录分析】\n";
        $report .= "数据目录: {$dataDir['data_dir']}\n";
        $report .= "总大小: {$dataDir['total_size_formatted']}\n";
        $report .= "TOP10 大目录:\n";
        foreach (array_slice($dataDir['directories'], 0, 10) as $dir) {
            $report .= "  {$dir['path']}: {$dir['size']}\n";
        }
        $report .= "\n";
        
        $report .= "【消息存储分析】\n";
        $report .= "总消息数: {$messages['total_messages']}\n";
        $report .= "TOP10 消息队列:\n";
        foreach (array_slice($messages['queues'], 0, 10) as $queue) {
            $report .= "  {$queue['name']}: {$queue['messages']} 消息\n";
        }
        $report .= "\n";
        
        $report .= "【IO统计】\n";
        foreach ($io['nodes'] as $node) {
            $report .= "节点: {$node['name']}\n";
            $report .= "  读: {$node['io_read']['count']} 次, {$node['io_read']['bytes']}\n";
            $report .= "  写: {$node['io_write']['count']} 次, {$node['io_write']['bytes']}\n";
            $report .= "  同步: {$node['io_sync']['count']} 次\n";
        }
        $report .= "\n";
        
        if (!empty($overview['alerts'])) {
            $report .= "【告警信息】\n";
            foreach ($overview['alerts'] as $alert) {
                $report .= "[{$alert['level']}] {$alert['message']}\n";
            }
        }
        
        return $report;
    }

    private function formatBytes(int $bytes): string
    {
        $units = ['B', 'KB', 'MB', 'GB', 'TB'];
        $i = 0;
        while ($bytes >= 1024 && $i < count($units) - 1) {
            $bytes /= 1024;
            $i++;
        }
        return round($bytes, 2) . ' ' . $units[$i];
    }
}

// 使用示例
$diagnostics = new RabbitMQDiskDiagnostics();
echo $diagnostics->generateDiskReport();

常见磁盘问题及解决方案

1. 磁盘空间不足

bash
# 查看磁盘使用
df -h /var/lib/rabbitmq

# 查看大文件
find /var/lib/rabbitmq -type f -size +100M -exec ls -lh {} \;

# 清理日志
find /var/log/rabbitmq -name "*.log" -mtime +7 -delete

# 清理旧消息(谨慎操作)
# rabbitmqctl purge_queue queue_name

2. 磁盘IO瓶颈

bash
# 查看IO等待
iostat -x 1

# 查看进程IO
iotop -p $(pgrep -d',' -f rabbitmq)

# 优化磁盘配置
# 使用SSD
# 调整IO调度器
echo noop > /sys/block/sda/queue/scheduler

3. 消息存储膨胀

php
<?php

class DiskOptimization
{
    private $connection;
    private $channel;

    public function __construct()
    {
        $this->connection = new \PhpAmqpLib\Connection\AMQPStreamConnection(
            'localhost', 5672, 'guest', 'guest'
        );
        $this->channel = $this->connection->channel();
    }

    public function createLazyQueue(string $queueName): void
    {
        $args = new \PhpAmqpLib\Wire\AMQPTable([
            'x-queue-type' => 'lazy',
        ]);
        
        $this->channel->queue_declare(
            $queueName,
            false,
            true,
            false,
            false,
            false,
            $args
        );
        
        echo "创建惰性队列: {$queueName}\n";
    }

    public function setQueueLimits(
        string $queueName,
        int $maxLength = 100000,
        int $maxBytes = 1073741824
    ): void {
        $args = new \PhpAmqpLib\Wire\AMQPTable([
            'x-max-length' => $maxLength,
            'x-max-length-bytes' => $maxBytes,
            'x-overflow' => 'reject-publish-dlx',
        ]);
        
        echo "设置队列限制: {$queueName}\n";
        echo "  最大消息数: {$maxLength}\n";
        echo "  最大字节数: {$maxBytes}\n";
    }

    public function enableMessageTTL(string $queueName, int $ttlMs): void
    {
        $args = new \PhpAmqpLib\Wire\AMQPTable([
            'x-message-ttl' => $ttlMs,
        ]);
        
        echo "设置消息TTL: {$queueName} = {$ttlMs}ms\n";
    }

    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

磁盘配置优化

1. 磁盘告警配置

bash
# rabbitmq.conf

# 磁盘空间限制(绝对值)
disk_free_limit.absolute = 10GB

# 或使用相对值
# disk_free_limit.relative = 2.0

# 磁盘检查间隔
disk_monitor_check_interval = 60000

2. 消息存储优化

bash
# 使用惰性队列策略
rabbitmqctl set_policy lazy-queues "^lazy\." '{"queue-type":"lazy"}' --apply-to queues

# 设置队列最大长度策略
rabbitmqctl set_policy max-length "^limited\." '{"max-length":100000}' --apply-to queues

# 设置消息TTL策略
rabbitmqctl set_policy message-ttl "^ttl\." '{"message-ttl":86400000}' --apply-to queues

3. IO优化配置

bash
# rabbitmq.conf

# 批量写入配置
msg_store_file_size_limit = 16777216

# IO线程池
msg_store_io_thread_pool_size = 4

磁盘监控脚本

bash
#!/bin/bash
# disk_monitor.sh

THRESHOLD_GB=10
LOG_FILE="/var/log/rabbitmq/disk_monitor.log"

log_message() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" >> $LOG_FILE
}

get_disk_free_gb() {
    df -BG /var/lib/rabbitmq | tail -1 | awk '{print $4}' | tr -d 'G'
}

check_disk() {
    local disk_free=$(get_disk_free_gb)
    
    if [ "$disk_free" -lt "$THRESHOLD_GB" ]; then
        log_message "WARNING: 磁盘空间不足 ${disk_free}GB (阈值 ${THRESHOLD_GB}GB)"
        
        # 记录磁盘使用详情
        log_message "磁盘使用详情:"
        df -h /var/lib/rabbitmq >> $LOG_FILE
        
        # 记录大目录
        log_message "大目录TOP10:"
        du -sh /var/lib/rabbitmq/* 2>/dev/null | sort -rh | head -10 >> $LOG_FILE
        
        # 发送告警
        # send_alert "RabbitMQ磁盘空间告警: ${disk_free}GB"
    fi
}

while true; do
    check_disk
    sleep 300
done

注意事项

  1. 磁盘告警会阻塞发布:确保有足够磁盘空间
  2. 惰性队列增加IO:权衡内存和磁盘使用
  3. 定期清理旧数据:避免无限增长
  4. 监控IO性能:IO瓶颈影响整体性能
  5. 备份重要数据:定期备份消息数据

相关链接