Skip to content

RabbitMQ 日志分析工具

概述

日志分析是运维工作的重要组成部分,通过对 RabbitMQ 日志的分析可以发现问题、定位故障、优化性能。本文将介绍多种日志分析方法和工具,包括命令行工具、PHP 脚本和可视化方案。

核心知识点

日志分析目标

目标说明关键指标
故障排查定位问题原因错误日志、异常堆栈
性能分析发现性能瓶颈响应时间、吞吐量
安全审计检测安全威胁认证失败、异常访问
容量规划预测资源需求连接数、消息量

常用分析工具

工具类型适用场景
grep/awk/sed命令行快速过滤分析
ELK Stack可视化平台大规模日志分析
Grafana Loki日志聚合云原生环境
PHP 脚本自定义分析特定需求分析

日志格式解析

RabbitMQ 支持两种日志格式:

文本格式

2024-01-15 10:30:00.123 [info] <0.123.0> connection <0.456.0> accepted: 192.168.1.100:5672 -> 192.168.1.1:5672

JSON 格式

json
{"time":"2024-01-15T10:30:00.123Z","level":"info","pid":"<0.123.0>","msg":"connection accepted"}

配置示例

PHP 日志分析类

php
<?php

class RabbitMQLogAnalyzer
{
    private $logFile;
    private $logDir;
    
    public function __construct($logFile = null, $logDir = '/var/log/rabbitmq')
    {
        $this->logFile = $logFile;
        $this->logDir = $logDir;
    }
    
    public function analyzeErrors($hours = 24)
    {
        $errors = [];
        $files = $this->getLogFiles();
        
        foreach ($files as $file) {
            $errors = array_merge($errors, $this->parseErrorsFromFile($file['path'], $hours));
        }
        
        usort($errors, function($a, $b) {
            return strcmp($b['timestamp'], $a['timestamp']);
        });
        
        return $errors;
    }
    
    private function parseErrorsFromFile($filePath, $hours)
    {
        $errors = [];
        $threshold = time() - ($hours * 3600);
        
        if (!file_exists($filePath)) {
            return $errors;
        }
        
        $handle = fopen($filePath, 'r');
        $isGz = preg_match('/\.gz$/', $filePath);
        
        while (($line = $isGz ? gzgets($handle) : fgets($handle)) !== false) {
            $parsed = $this->parseLine($line);
            
            if ($parsed && in_array($parsed['level'], ['error', 'critical'])) {
                $logTime = strtotime($parsed['timestamp']);
                
                if ($logTime >= $threshold) {
                    $errors[] = $parsed;
                }
            }
        }
        
        fclose($handle);
        
        return $errors;
    }
    
    public function parseLine($line)
    {
        if (preg_match('/^\{.*\}$/', trim($line))) {
            return $this->parseJsonLine($line);
        }
        
        return $this->parseTextLine($line);
    }
    
    private function parseJsonLine($line)
    {
        $data = json_decode($line, true);
        
        if (!$data) {
            return null;
        }
        
        return [
            'timestamp' => $data['time'] ?? '',
            'level' => $data['level'] ?? 'info',
            'pid' => $data['pid'] ?? '',
            'message' => $data['msg'] ?? '',
            'raw' => $line,
            'metadata' => $data,
        ];
    }
    
    private function parseTextLine($line)
    {
        $pattern = '/^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}\.\d+)\s+\[(\w+)\]\s+(<[^>]+>)\s+(.*)$/';
        
        if (preg_match($pattern, $line, $matches)) {
            return [
                'timestamp' => $matches[1],
                'level' => $matches[2],
                'pid' => $matches[3],
                'message' => $matches[4],
                'raw' => $line,
            ];
        }
        
        return null;
    }
    
    public function getConnectionStats($hours = 24)
    {
        $stats = [
            'total_connections' => 0,
            'successful' => 0,
            'failed' => 0,
            'by_host' => [],
            'by_user' => [],
        ];
        
        $files = $this->getLogFiles();
        
        foreach ($files as $file) {
            $this->parseConnectionStats($file['path'], $hours, $stats);
        }
        
        return $stats;
    }
    
    private function parseConnectionStats($filePath, $hours, &$stats)
    {
        $threshold = time() - ($hours * 3600);
        
        if (!file_exists($filePath)) {
            return;
        }
        
        $handle = fopen($filePath, 'r');
        $isGz = preg_match('/\.gz$/', $filePath);
        
        while (($line = $isGz ? gzgets($handle) : fgets($handle)) !== false) {
            if (strpos($line, 'connection') !== false) {
                $parsed = $this->parseLine($line);
                
                if ($parsed) {
                    $logTime = strtotime($parsed['timestamp']);
                    
                    if ($logTime >= $threshold) {
                        $stats['total_connections']++;
                        
                        if (strpos($parsed['message'], 'accepted') !== false) {
                            $stats['successful']++;
                            
                            if (preg_match('/(\d+\.\d+\.\d+\.\d+)/', $parsed['message'], $m)) {
                                $host = $m[1];
                                $stats['by_host'][$host] = ($stats['by_host'][$host] ?? 0) + 1;
                            }
                        } elseif (strpos($parsed['message'], 'failed') !== false ||
                                  strpos($parsed['message'], 'error') !== false) {
                            $stats['failed']++;
                        }
                        
                        if (preg_match('/user[:\s]+(\w+)/i', $parsed['message'], $m)) {
                            $user = $m[1];
                            $stats['by_user'][$user] = ($stats['by_user'][$user] ?? 0) + 1;
                        }
                    }
                }
            }
        }
        
        fclose($handle);
    }
    
    public function getQueueStats($hours = 24)
    {
        $stats = [
            'created' => 0,
            'deleted' => 0,
            'by_name' => [],
            'errors' => [],
        ];
        
        $files = $this->getLogFiles();
        
        foreach ($files as $file) {
            $this->parseQueueStats($file['path'], $hours, $stats);
        }
        
        return $stats;
    }
    
    private function parseQueueStats($filePath, $hours, &$stats)
    {
        $threshold = time() - ($hours * 3600);
        
        if (!file_exists($filePath)) {
            return;
        }
        
        $handle = fopen($filePath, 'r');
        $isGz = preg_match('/\.gz$/', $filePath);
        
        while (($line = $isGz ? gzgets($handle) : fgets($handle)) !== false) {
            if (strpos($line, 'queue') !== false) {
                $parsed = $this->parseLine($line);
                
                if ($parsed) {
                    $logTime = strtotime($parsed['timestamp']);
                    
                    if ($logTime >= $threshold) {
                        if (preg_match('/declaring queue[:\s]+([^\s,]+)/i', $parsed['message'], $m)) {
                            $queueName = $m[1];
                            $stats['created']++;
                            $stats['by_name'][$queueName] = ($stats['by_name'][$queueName] ?? 0) + 1;
                        }
                        
                        if (preg_match('/deleting queue[:\s]+([^\s,]+)/i', $parsed['message'], $m)) {
                            $queueName = $m[1];
                            $stats['deleted']++;
                        }
                        
                        if (in_array($parsed['level'], ['error', 'warning'])) {
                            if (preg_match('/queue[:\s]+([^\s,]+)/i', $parsed['message'], $m)) {
                                $stats['errors'][] = [
                                    'queue' => $m[1],
                                    'message' => $parsed['message'],
                                    'timestamp' => $parsed['timestamp'],
                                ];
                            }
                        }
                    }
                }
            }
        }
        
        fclose($handle);
    }
    
    public function searchLogs($pattern, $hours = 24, $level = null)
    {
        $results = [];
        $threshold = time() - ($hours * 3600);
        $files = $this->getLogFiles();
        
        foreach ($files as $file) {
            $this->searchInFile($file['path'], $pattern, $threshold, $level, $results);
        }
        
        return $results;
    }
    
    private function searchInFile($filePath, $pattern, $threshold, $level, &$results)
    {
        if (!file_exists($filePath)) {
            return;
        }
        
        $handle = fopen($filePath, 'r');
        $isGz = preg_match('/\.gz$/', $filePath);
        
        while (($line = $isGz ? gzgets($handle) : fgets($handle)) !== false) {
            if (stripos($line, $pattern) !== false) {
                $parsed = $this->parseLine($line);
                
                if ($parsed) {
                    $logTime = strtotime($parsed['timestamp']);
                    
                    if ($logTime >= $threshold) {
                        if ($level === null || $parsed['level'] === $level) {
                            $results[] = $parsed;
                        }
                    }
                }
            }
        }
        
        fclose($handle);
    }
    
    public function getLogFiles()
    {
        $files = [];
        
        if (!is_dir($this->logDir)) {
            return $files;
        }
        
        $iterator = new DirectoryIterator($this->logDir);
        
        foreach ($iterator as $file) {
            if ($file->isFile() && preg_match('/\.log(\.\d+)?(\.gz)?$/', $file->getFilename())) {
                $files[] = [
                    'name' => $file->getFilename(),
                    'path' => $file->getPathname(),
                    'size' => $file->getSize(),
                    'modified' => date('Y-m-d H:i:s', $file->getMTime()),
                ];
            }
        }
        
        usort($files, function($a, $b) {
            return strcmp($b['modified'], $a['modified']);
        });
        
        return $files;
    }
    
    public function generateSummaryReport($hours = 24)
    {
        $errors = $this->analyzeErrors($hours);
        $connections = $this->getConnectionStats($hours);
        $queues = $this->getQueueStats($hours);
        
        $report = [
            'period' => "Last {$hours} hours",
            'generated_at' => date('Y-m-d H:i:s'),
            'errors' => [
                'total' => count($errors),
                'critical' => count(array_filter($errors, fn($e) => $e['level'] === 'critical')),
                'by_type' => $this->categorizeErrors($errors),
            ],
            'connections' => $connections,
            'queues' => $queues,
            'recommendations' => $this->generateRecommendations($errors, $connections, $queues),
        ];
        
        return $report;
    }
    
    private function categorizeErrors($errors)
    {
        $categories = [];
        
        foreach ($errors as $error) {
            $category = $this->categorizeError($error['message']);
            $categories[$category] = ($categories[$category] ?? 0) + 1;
        }
        
        arsort($categories);
        
        return $categories;
    }
    
    private function categorizeError($message)
    {
        if (stripos($message, 'connection') !== false) {
            return 'connection';
        }
        if (stripos($message, 'queue') !== false) {
            return 'queue';
        }
        if (stripos($message, 'memory') !== false) {
            return 'memory';
        }
        if (stripos($message, 'disk') !== false) {
            return 'disk';
        }
        if (stripos($message, 'channel') !== false) {
            return 'channel';
        }
        if (stripos($message, 'authentication') !== false || stripos($message, 'auth') !== false) {
            return 'authentication';
        }
        
        return 'other';
    }
    
    private function generateRecommendations($errors, $connections, $queues)
    {
        $recommendations = [];
        
        if (count($errors) > 10) {
            $recommendations[] = '检测到较多错误日志,建议检查系统状态';
        }
        
        if ($connections['failed'] > $connections['successful'] * 0.1) {
            $recommendations[] = '连接失败率较高,建议检查网络和认证配置';
        }
        
        if (!empty($queues['errors'])) {
            $recommendations[] = '检测到队列相关错误,建议检查队列配置';
        }
        
        return $recommendations;
    }
}

Shell 日志分析脚本

bash
#!/bin/bash
/opt/rabbitmq/scripts/analyze_logs.sh

LOG_DIR="/var/log/rabbitmq"
HOURS=${1:-24}

echo "=== RabbitMQ 日志分析报告 ==="
echo "分析时间范围: 最近 ${HOURS} 小时"
echo ""

echo "1. 错误统计:"
echo "-------------------"
grep -h "\[error\]\|\[critical\]" ${LOG_DIR}/*.log 2>/dev/null | \
    grep "$(date -d "${HOURS} hours ago" '+%Y-%m-%d')" | \
    wc -l | xargs echo "错误总数:"

echo ""
echo "2. 连接统计:"
echo "-------------------"
grep -h "connection.*accepted" ${LOG_DIR}/*.log 2>/dev/null | \
    grep "$(date -d "${HOURS} hours ago" '+%Y-%m-%d')" | \
    wc -l | xargs echo "成功连接:"

grep -h "connection.*failed\|connection.*error" ${LOG_DIR}/*.log 2>/dev/null | \
    grep "$(date -d "${HOURS} hours ago" '+%Y-%m-%d')" | \
    wc -l | xargs echo "失败连接:"

echo ""
echo "3. 认证失败:"
echo "-------------------"
grep -h "authentication.*failed\|access_refused" ${LOG_DIR}/*.log 2>/dev/null | \
    grep "$(date -d "${HOURS} hours ago" '+%Y-%m-%d')" | \
    wc -l | xargs echo "认证失败次数:"

echo ""
echo "4. 最近 10 条错误:"
echo "-------------------"
grep -h "\[error\]\|\[critical\]" ${LOG_DIR}/*.log 2>/dev/null | \
    grep "$(date -d "${HOURS} hours ago" '+%Y-%m-%d')" | \
    tail -10

echo ""
echo "5. Top 10 活跃 IP:"
echo "-------------------"
grep -h "connection.*accepted" ${LOG_DIR}/*.log 2>/dev/null | \
    grep "$(date -d "${HOURS} hours ago" '+%Y-%m-%d')" | \
    grep -oE '([0-9]{1,3}\.){3}[0-9]{1,3}' | \
    sort | uniq -c | sort -rn | head -10

ELK Stack 配置

yaml
version: '3.8'

services:
  elasticsearch:
    image: elasticsearch:8.0.0
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
    ports:
      - "9200:9200"
    volumes:
      - es_data:/usr/share/elasticsearch/data

  logstash:
    image: logstash:8.0.0
    volumes:
      - ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
      - /var/log/rabbitmq:/var/log/rabbitmq:ro
    depends_on:
      - elasticsearch

  kibana:
    image: kibana:8.0.0
    ports:
      - "5601:5601"
    depends_on:
      - elasticsearch

volumes:
  es_data:

Logstash 配置文件:

ruby
input {
  file {
    path => "/var/log/rabbitmq/*.log"
    start_position => "beginning"
    sincedb_path => "/dev/null"
    codec => json {
      charset => "UTF-8"
    }
  }
}

filter {
  if [level] {
    mutate {
      add_field => { "log_level" => "%{level}" }
    }
  }
  
  date {
    match => [ "time", "ISO8601", "YYYY-MM-dd HH:mm:ss.SSS" ]
    target => "@timestamp"
  }
  
  grok {
    match => { "msg" => "connection %{WORD:connection_action}: %{IP:client_ip}:%{NUMBER:client_port}" }
  }
}

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "rabbitmq-%{+YYYY.MM.dd}"
  }
}

实际应用场景

场景一:实时错误监控

php
<?php

class RealtimeErrorMonitor
{
    private $analyzer;
    private $lastPosition = 0;
    private $alertThreshold = 5;
    private $alertWindow = 300;
    
    public function __construct(RabbitMQLogAnalyzer $analyzer)
    {
        $this->analyzer = $analyzer;
    }
    
    public function monitor($logFile)
    {
        $errors = [];
        $currentPosition = filesize($logFile);
        
        if ($currentPosition < $this->lastPosition) {
            $this->lastPosition = 0;
        }
        
        $handle = fopen($logFile, 'r');
        fseek($handle, $this->lastPosition);
        
        while (($line = fgets($handle)) !== false) {
            $parsed = $this->analyzer->parseLine($line);
            
            if ($parsed && in_array($parsed['level'], ['error', 'critical'])) {
                $errors[] = $parsed;
            }
        }
        
        $this->lastPosition = ftell($handle);
        fclose($handle);
        
        if (count($errors) >= $this->alertThreshold) {
            $this->sendAlert($errors);
        }
        
        return $errors;
    }
    
    private function sendAlert($errors)
    {
        $message = sprintf(
            "检测到 %d 条错误日志,请及时处理\n最近错误: %s",
            count($errors),
            $errors[0]['message'] ?? 'Unknown'
        );
        
        error_log("[RabbitMQ Alert] " . $message);
    }
}

场景二:日志导出与归档

php
<?php

class LogExporter
{
    private $analyzer;
    
    public function __construct(RabbitMQLogAnalyzer $analyzer)
    {
        $this->analyzer = $analyzer;
    }
    
    public function exportToJson($outputFile, $hours = 24)
    {
        $report = $this->analyzer->generateSummaryReport($hours);
        
        file_put_contents($outputFile, json_encode($report, JSON_PRETTY_PRINT));
        
        return $outputFile;
    }
    
    public function exportToCsv($outputFile, $hours = 24)
    {
        $errors = $this->analyzer->analyzeErrors($hours);
        
        $fp = fopen($outputFile, 'w');
        fputcsv($fp, ['Timestamp', 'Level', 'PID', 'Message']);
        
        foreach ($errors as $error) {
            fputcsv($fp, [
                $error['timestamp'],
                $error['level'],
                $error['pid'],
                $error['message'],
            ]);
        }
        
        fclose($fp);
        
        return $outputFile;
    }
    
    public function archiveLogs($archiveDir, $daysOld = 30)
    {
        $files = $this->analyzer->getLogFiles();
        $threshold = time() - ($daysOld * 86400);
        $archived = [];
        
        if (!is_dir($archiveDir)) {
            mkdir($archiveDir, 0755, true);
        }
        
        foreach ($files as $file) {
            $mtime = filemtime($file['path']);
            
            if ($mtime < $threshold) {
                $archivePath = $archiveDir . '/' . basename($file['path']);
                
                if (rename($file['path'], $archivePath)) {
                    $archived[] = $file['name'];
                }
            }
        }
        
        return $archived;
    }
}

常见问题与解决方案

问题一:日志量过大分析缓慢

现象:分析大量日志时性能下降。

解决方案

php
$analyzer->searchLogs('error', 1);

或使用时间范围过滤:

bash
grep "$(date '+%Y-%m-%d')" /var/log/rabbitmq/rabbit.log | grep error

问题二:JSON 解析失败

现象:JSON 格式日志解析出错。

解决方案

bash
log.file.formatter = json
log.file.json.field_map = time level msg pid mfa line

问题三:无法追踪问题

现象:日志信息不足以定位问题。

解决方案

bash
log.file.level = debug
log.connection.level = debug
log.channel.level = debug

最佳实践

1. 日志分析频率

分析类型频率工具
实时监控持续自定义脚本
日常检查每日Shell 脚本
深度分析每周ELK Stack
归档报告每月PHP 脚本

2. 关键指标监控

php
$report = $analyzer->generateSummaryReport(24);

if ($report['errors']['total'] > 100) {
    trigger_alert('错误日志过多');
}

3. 日志保留策略

  • 原始日志:30 天
  • 分析报告:90 天
  • 归档数据:1 年

相关链接