Skip to content

日志分析技巧

概述

RabbitMQ 日志是排查问题的重要依据,通过分析日志可以快速定位问题原因。本文档将介绍 RabbitMQ 日志的结构、分析方法及常见日志模式。

日志文件位置

┌─────────────────────────────────────────────────────────────┐
│                    RabbitMQ 日志文件                         │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  /var/log/rabbitmq/                                         │
│  ├── rabbit@hostname.log      # 主日志文件                  │
│  ├── rabbit@hostname-sasl.log # Erlang VM日志               │
│  └── log/                    # 日志目录(可配置)            │
│                                                             │
└─────────────────────────────────────────────────────────────┘

日志级别说明

┌─────────────────────────────────────────────────────────────┐
│                    日志级别                                  │
├─────────────────────────────────────────────────────────────┤
│  级别      │  说明           │  使用场景                    │
├─────────────────────────────────────────────────────────────┤
│  debug    │  调试信息       │  开发调试                    │
│  info     │  一般信息       │  正常操作记录                │
│  warning  │  警告信息       │  潜在问题                    │
│  error    │  错误信息       │  错误但可恢复                │
│  critical │  严重错误       │  系统级故障                  │
└─────────────────────────────────────────────────────────────┘

日志格式分析

1. 标准日志格式

=INFO REPORT==== 15-Jan-2024::10:30:45 ===
    accepting AMQP connection <0.1234.0> (192.168.1.100:54321 -> 192.168.1.1:5672)

格式解析

  • =INFO REPORT==== - 日志级别
  • 15-Jan-2024::10:30:45 - 时间戳
  • accepting AMQP connection - 事件描述
  • <0.1234.0> - Erlang进程ID
  • IP地址和端口信息

2. 常见日志模式

连接相关日志

bash
# 连接建立
accepting AMQP connection <0.1234.0> (192.168.1.100:54321 -> 192.168.1.1:5672)

# 连接关闭
closing AMQP connection <0.1234.0> (192.168.1.100:54321 -> 192.168.1.1:5672)

# 连接超时
closing AMQP connection <0.1234.0>: client unexpectedly closed TCP connection

# 心跳超时
closing AMQP connection <0.1234.0>: missed heartbeats from client

通道相关日志

bash
# 通道创建
opening channel #1 on connection <0.1234.0>

# 通道关闭
closing channel #1 on connection <0.1234.0>

# 通道错误
channel error on connection <0.1234.0>, channel 1:
{amqp_error, precondition_failed, "inequivalent arg 'durable' for queue 'test' in vhost '/'"

队列相关日志

bash
# 队列声明
declaring queue 'orders.queue' on vhost '/'

# 队列删除
deleting queue 'orders.queue' on vhost '/'

# 队列镜像
Synchronising: queue 'orders.queue' on node 'rabbit@node2'

资源告警日志

bash
# 内存告警
memory alarm on node 'rabbit@node1', setting resource alarm

# 磁盘告警
disk resource limit alarm set on node 'rabbit@node1'

# 流控触发
flow control: blocking publisher on connection <0.1234.0>

集群相关日志

bash
# 节点加入
node 'rabbit@node2' started

# 网络分区
partition detected: [rabbit@node1, rabbit@node2]

# 节点断开
node 'rabbit@node2' down: connection_closed

日志分析命令

1. 基本查看命令

bash
# 查看最新日志
tail -f /var/log/rabbitmq/rabbit@*.log

# 查看最近100行
tail -100 /var/log/rabbitmq/rabbit@*.log

# 实时监控错误
tail -f /var/log/rabbitmq/rabbit@*.log | grep -i error

# 查看特定时间段的日志
sed -n '/15-Jan-2024::10:00/,/15-Jan-2024::11:00/p' /var/log/rabbitmq/rabbit@*.log

2. 错误日志分析

bash
# 统计错误类型
grep -i "error\|exception\|failed" /var/log/rabbitmq/rabbit@*.log | \
  awk '{print $1, $2, $3}' | sort | uniq -c | sort -rn

# 查找特定错误
grep "precondition_failed" /var/log/rabbitmq/rabbit@*.log

# 查找连接错误
grep -i "connection.*error\|connection.*closed\|connection.*failed" /var/log/rabbitmq/rabbit@*.log

# 查找通道错误
grep -i "channel.*error\|channel.*closed" /var/log/rabbitmq/rabbit@*.log

3. 连接分析

bash
# 统计连接来源IP
grep "accepting AMQP connection" /var/log/rabbitmq/rabbit@*.log | \
  grep -oP '\(\K[0-9.]+' | sort | uniq -c | sort -rn

# 统计连接关闭原因
grep "closing AMQP connection" /var/log/rabbitmq/rabbit@*.log | \
  sed 's/.*closing AMQP connection [^:]*: //' | sort | uniq -c | sort -rn

# 统计心跳超时
grep "missed heartbeats" /var/log/rabbitmq/rabbit@*.log | wc -l

4. 性能分析

bash
# 查找慢操作
grep -E "took [0-9]+ms|slow operation" /var/log/rabbitmq/rabbit@*.log

# 查找流控日志
grep "flow control" /var/log/rabbitmq/rabbit@*.log

# 查找资源告警
grep -i "alarm\|resource limit" /var/log/rabbitmq/rabbit@*.log

5. 集群分析

bash
# 查找网络分区
grep -i "partition" /var/log/rabbitmq/rabbit@*.log

# 查找节点状态变化
grep -i "node.*started\|node.*down\|node.*joined" /var/log/rabbitmq/rabbit@*.log

# 查找同步问题
grep -i "synchronising\|sync" /var/log/rabbitmq/rabbit@*.log

PHP 日志分析工具

php
<?php

class RabbitMQLogAnalyzer
{
    private $logFile;
    private $patterns = [
        'connection_open' => '/accepting AMQP connection <([0-9.]+)> \(([^)]+)\)/',
        'connection_close' => '/closing AMQP connection <([0-9.]+)>/',
        'channel_error' => '/channel error on connection <([0-9.]+)>, channel (\d+)/',
        'queue_declare' => '/declaring queue \'([^\']+)\'/',
        'memory_alarm' => '/memory alarm on node/',
        'disk_alarm' => '/disk resource limit alarm/',
        'partition' => '/partition detected/',
        'heartbeat_missed' => '/missed heartbeats from client/',
        'flow_control' => '/flow control/',
    ];

    public function __construct(string $logFile)
    {
        $this->logFile = $logFile;
    }

    public function analyze(int $lines = 10000): array
    {
        $result = [
            'total_lines' => 0,
            'errors' => [],
            'warnings' => [],
            'connections' => [
                'opened' => 0,
                'closed' => 0,
                'by_ip' => [],
            ],
            'channels' => [
                'errors' => 0,
                'details' => [],
            ],
            'resources' => [
                'memory_alarms' => 0,
                'disk_alarms' => 0,
                'flow_controls' => 0,
            ],
            'cluster' => [
                'partitions' => 0,
            ],
            'timeline' => [],
        ];

        $handle = fopen($this->logFile, 'r');
        if (!$handle) {
            throw new \RuntimeException("无法打开日志文件: {$this->logFile}");
        }

        $lineCount = 0;
        while (($line = fgets($handle)) !== false && $lineCount < $lines) {
            $lineCount++;
            $result['total_lines']++;

            $this->analyzeLine($line, $result);
        }

        fclose($handle);

        return $result;
    }

    private function analyzeLine(string $line, array &$result): void
    {
        if (preg_match('/=ERROR REPORT====/', $line)) {
            $result['errors'][] = trim($line);
        }

        if (preg_match('/=WARNING REPORT====/', $line)) {
            $result['warnings'][] = trim($line);
        }

        if (preg_match($this->patterns['connection_open'], $line, $matches)) {
            $result['connections']['opened']++;
            $ip = explode(':', $matches[2])[0];
            $result['connections']['by_ip'][$ip] = ($result['connections']['by_ip'][$ip] ?? 0) + 1;
        }

        if (preg_match($this->patterns['connection_close'], $line)) {
            $result['connections']['closed']++;
        }

        if (preg_match($this->patterns['channel_error'], $line, $matches)) {
            $result['channels']['errors']++;
            $result['channels']['details'][] = [
                'connection' => $matches[1],
                'channel' => $matches[2],
                'line' => trim($line),
            ];
        }

        if (preg_match($this->patterns['memory_alarm'], $line)) {
            $result['resources']['memory_alarms']++;
        }

        if (preg_match($this->patterns['disk_alarm'], $line)) {
            $result['resources']['disk_alarms']++;
        }

        if (preg_match($this->patterns['flow_control'], $line)) {
            $result['resources']['flow_controls']++;
        }

        if (preg_match($this->patterns['partition'], $line)) {
            $result['cluster']['partitions']++;
        }

        if (preg_match('/(\d{2}-\w{3}-\d{4}::\d{2}:\d{2}:\d{2})/', $line, $matches)) {
            $timestamp = $matches[1];
            if (!isset($result['timeline'][$timestamp])) {
                $result['timeline'][$timestamp] = [
                    'errors' => 0,
                    'warnings' => 0,
                    'connections' => 0,
                ];
            }

            if (strpos($line, 'ERROR') !== false) {
                $result['timeline'][$timestamp]['errors']++;
            }
            if (strpos($line, 'WARNING') !== false) {
                $result['timeline'][$timestamp]['warnings']++;
            }
            if (strpos($line, 'AMQP connection') !== false) {
                $result['timeline'][$timestamp]['connections']++;
            }
        }
    }

    public function findErrors(string $pattern = null, int $context = 2): array
    {
        $errors = [];
        $handle = fopen($this->logFile, 'r');
        
        $lines = [];
        while (($line = fgets($handle)) !== false) {
            $lines[] = $line;
        }
        fclose($handle);

        foreach ($lines as $index => $line) {
            if (strpos($line, 'ERROR') !== false) {
                if ($pattern && strpos($line, $pattern) === false) {
                    continue;
                }

                $error = [
                    'line_number' => $index + 1,
                    'line' => trim($line),
                    'context' => [],
                ];

                for ($i = max(0, $index - $context); $i <= min(count($lines) - 1, $index + $context); $i++) {
                    $error['context'][] = trim($lines[$i]);
                }

                $errors[] = $error;
            }
        }

        return $errors;
    }

    public function getStatistics(int $hours = 24): array
    {
        $stats = [
            'connections_opened' => 0,
            'connections_closed' => 0,
            'errors' => 0,
            'warnings' => 0,
            'top_ips' => [],
            'error_types' => [],
        ];

        $startTime = time() - ($hours * 3600);
        $ipCounts = [];
        $errorTypes = [];

        $handle = fopen($this->logFile, 'r');
        while (($line = fgets($handle)) !== false) {
            if (preg_match('/(\d{2}-\w{3}-\d{4}::\d{2}:\d{2}:\d{2})/', $line, $matches)) {
                $logTime = strtotime(str_replace('-', ' ', $matches[1]));
                
                if ($logTime < $startTime) {
                    continue;
                }

                if (strpos($line, 'accepting AMQP connection') !== false) {
                    $stats['connections_opened']++;
                    if (preg_match('/\(([0-9.]+):/', $line, $ipMatch)) {
                        $ip = $ipMatch[1];
                        $ipCounts[$ip] = ($ipCounts[$ip] ?? 0) + 1;
                    }
                }

                if (strpos($line, 'closing AMQP connection') !== false) {
                    $stats['connections_closed']++;
                }

                if (strpos($line, 'ERROR') !== false) {
                    $stats['errors']++;
                    if (preg_match('/\{([^}]+)\}/', $line, $errorMatch)) {
                        $errorType = $errorMatch[1];
                        $errorTypes[$errorType] = ($errorTypes[$errorTypes] ?? 0) + 1;
                    }
                }

                if (strpos($line, 'WARNING') !== false) {
                    $stats['warnings']++;
                }
            }
        }
        fclose($handle);

        arsort($ipCounts);
        $stats['top_ips'] = array_slice($ipCounts, 0, 10, true);

        arsort($errorTypes);
        $stats['error_types'] = array_slice($errorTypes, 0, 10, true);

        return $stats;
    }

    public function generateReport(): string
    {
        $analysis = $this->analyze();
        $stats = $this->getStatistics();

        $report = "=== RabbitMQ 日志分析报告 ===\n";
        $report .= "生成时间: " . date('Y-m-d H:i:s') . "\n\n";

        $report .= "【基本信息】\n";
        $report .= "分析行数: {$analysis['total_lines']}\n";
        $report .= "错误数量: " . count($analysis['errors']) . "\n";
        $report .= "警告数量: " . count($analysis['warnings']) . "\n\n";

        $report .= "【连接统计】\n";
        $report .= "连接打开: {$analysis['connections']['opened']}\n";
        $report .= "连接关闭: {$analysis['connections']['closed']}\n";
        $report .= "TOP10 来源IP:\n";
        foreach ($stats['top_ips'] as $ip => $count) {
            $report .= "  {$ip}: {$count}\n";
        }
        $report .= "\n";

        $report .= "【通道统计】\n";
        $report .= "通道错误: {$analysis['channels']['errors']}\n\n";

        $report .= "【资源告警】\n";
        $report .= "内存告警: {$analysis['resources']['memory_alarms']}\n";
        $report .= "磁盘告警: {$analysis['resources']['disk_alarms']}\n";
        $report .= "流控触发: {$analysis['resources']['flow_controls']}\n\n";

        $report .= "【集群状态】\n";
        $report .= "网络分区: {$analysis['cluster']['partitions']}\n\n";

        if (!empty($analysis['errors'])) {
            $report .= "【最近错误】\n";
            foreach (array_slice($analysis['errors'], 0, 5) as $error) {
                $report .= "  {$error}\n";
            }
        }

        return $report;
    }
}

// 使用示例
$analyzer = new RabbitMQLogAnalyzer('/var/log/rabbitmq/rabbit@localhost.log');

echo $analyzer->generateReport();

$errors = $analyzer->findErrors('connection', 3);
echo "\n连接相关错误:\n";
foreach ($errors as $error) {
    echo "行 {$error['line_number']}: {$error['line']}\n";
}

日志配置优化

1. 日志级别配置

bash
# rabbitmq.conf

# 设置日志级别
log.console.level = info
log.file.level = info

# 日志格式
log.file.formatter = text
log.file.rotation.date = $D0
log.file.rotation.size = 104857600

# 日志位置
log.file = /var/log/rabbitmq/rabbit.log

2. 日志轮转配置

bash
# /etc/logrotate.d/rabbitmq

/var/log/rabbitmq/*.log {
    daily
    missingok
    rotate 30
    compress
    delaycompress
    notifempty
    create 640 rabbitmq rabbitmq
    sharedscripts
    postrotate
        /sbin/service rabbitmq-server rotate-logs > /dev/null 2>&1 || true
    endscript
}

常见日志问题排查

1. 连接频繁断开

bash
# 排查步骤
# 1. 搜索连接关闭日志
grep "closing AMQP connection" /var/log/rabbitmq/rabbit@*.log | head -20

# 2. 分析关闭原因
grep "closing AMQP connection" /var/log/rabbitmq/rabbit@*.log | \
  sed 's/.*: //' | sort | uniq -c | sort -rn

# 3. 检查心跳超时
grep "missed heartbeats" /var/log/rabbitmq/rabbit@*.log

2. 通道错误

bash
# 排查步骤
# 1. 搜索通道错误
grep "channel error" /var/log/rabbitmq/rabbit@*.log

# 2. 分析错误类型
grep "channel error" /var/log/rabbitmq/rabbit@*.log | \
  grep -oP '\{amqp_error,[^,]+' | sort | uniq -c

# 3. 检查参数冲突
grep "precondition_failed" /var/log/rabbitmq/rabbit@*.log

3. 内存告警

bash
# 排查步骤
# 1. 搜索内存告警
grep "memory alarm" /var/log/rabbitmq/rabbit@*.log

# 2. 分析触发时间
grep "memory alarm" /var/log/rabbitmq/rabbit@*.log | \
  grep -oP '\d{2}-\w{3}-\d{4}::\d{2}:\d{2}:\d{2}'

# 3. 检查流控
grep "flow control" /var/log/rabbitmq/rabbit@*.log

注意事项

  1. 日志文件可能很大:使用 tail、grep 等命令过滤
  2. 敏感信息保护:日志中可能包含敏感信息,注意权限
  3. 日志轮转配置:避免磁盘空间被日志占满
  4. 实时监控:重要日志应配置实时告警
  5. 日志归档:定期归档历史日志便于追溯

相关链接