Appearance
RabbitMQ 日志分析工具
概述
日志分析是运维工作的重要组成部分,通过对 RabbitMQ 日志的分析可以发现问题、定位故障、优化性能。本文将介绍多种日志分析方法和工具,包括命令行工具、PHP 脚本和可视化方案。
核心知识点
日志分析目标
| 目标 | 说明 | 关键指标 |
|---|---|---|
| 故障排查 | 定位问题原因 | 错误日志、异常堆栈 |
| 性能分析 | 发现性能瓶颈 | 响应时间、吞吐量 |
| 安全审计 | 检测安全威胁 | 认证失败、异常访问 |
| 容量规划 | 预测资源需求 | 连接数、消息量 |
常用分析工具
| 工具 | 类型 | 适用场景 |
|---|---|---|
| grep/awk/sed | 命令行 | 快速过滤分析 |
| ELK Stack | 可视化平台 | 大规模日志分析 |
| Grafana Loki | 日志聚合 | 云原生环境 |
| PHP 脚本 | 自定义分析 | 特定需求分析 |
日志格式解析
RabbitMQ 支持两种日志格式:
文本格式:
2024-01-15 10:30:00.123 [info] <0.123.0> connection <0.456.0> accepted: 192.168.1.100:5672 -> 192.168.1.1:5672JSON 格式:
json
{"time":"2024-01-15T10:30:00.123Z","level":"info","pid":"<0.123.0>","msg":"connection accepted"}配置示例
PHP 日志分析类
php
<?php
class RabbitMQLogAnalyzer
{
private $logFile;
private $logDir;
public function __construct($logFile = null, $logDir = '/var/log/rabbitmq')
{
$this->logFile = $logFile;
$this->logDir = $logDir;
}
public function analyzeErrors($hours = 24)
{
$errors = [];
$files = $this->getLogFiles();
foreach ($files as $file) {
$errors = array_merge($errors, $this->parseErrorsFromFile($file['path'], $hours));
}
usort($errors, function($a, $b) {
return strcmp($b['timestamp'], $a['timestamp']);
});
return $errors;
}
private function parseErrorsFromFile($filePath, $hours)
{
$errors = [];
$threshold = time() - ($hours * 3600);
if (!file_exists($filePath)) {
return $errors;
}
$handle = fopen($filePath, 'r');
$isGz = preg_match('/\.gz$/', $filePath);
while (($line = $isGz ? gzgets($handle) : fgets($handle)) !== false) {
$parsed = $this->parseLine($line);
if ($parsed && in_array($parsed['level'], ['error', 'critical'])) {
$logTime = strtotime($parsed['timestamp']);
if ($logTime >= $threshold) {
$errors[] = $parsed;
}
}
}
fclose($handle);
return $errors;
}
public function parseLine($line)
{
if (preg_match('/^\{.*\}$/', trim($line))) {
return $this->parseJsonLine($line);
}
return $this->parseTextLine($line);
}
private function parseJsonLine($line)
{
$data = json_decode($line, true);
if (!$data) {
return null;
}
return [
'timestamp' => $data['time'] ?? '',
'level' => $data['level'] ?? 'info',
'pid' => $data['pid'] ?? '',
'message' => $data['msg'] ?? '',
'raw' => $line,
'metadata' => $data,
];
}
private function parseTextLine($line)
{
$pattern = '/^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}\.\d+)\s+\[(\w+)\]\s+(<[^>]+>)\s+(.*)$/';
if (preg_match($pattern, $line, $matches)) {
return [
'timestamp' => $matches[1],
'level' => $matches[2],
'pid' => $matches[3],
'message' => $matches[4],
'raw' => $line,
];
}
return null;
}
public function getConnectionStats($hours = 24)
{
$stats = [
'total_connections' => 0,
'successful' => 0,
'failed' => 0,
'by_host' => [],
'by_user' => [],
];
$files = $this->getLogFiles();
foreach ($files as $file) {
$this->parseConnectionStats($file['path'], $hours, $stats);
}
return $stats;
}
private function parseConnectionStats($filePath, $hours, &$stats)
{
$threshold = time() - ($hours * 3600);
if (!file_exists($filePath)) {
return;
}
$handle = fopen($filePath, 'r');
$isGz = preg_match('/\.gz$/', $filePath);
while (($line = $isGz ? gzgets($handle) : fgets($handle)) !== false) {
if (strpos($line, 'connection') !== false) {
$parsed = $this->parseLine($line);
if ($parsed) {
$logTime = strtotime($parsed['timestamp']);
if ($logTime >= $threshold) {
$stats['total_connections']++;
if (strpos($parsed['message'], 'accepted') !== false) {
$stats['successful']++;
if (preg_match('/(\d+\.\d+\.\d+\.\d+)/', $parsed['message'], $m)) {
$host = $m[1];
$stats['by_host'][$host] = ($stats['by_host'][$host] ?? 0) + 1;
}
} elseif (strpos($parsed['message'], 'failed') !== false ||
strpos($parsed['message'], 'error') !== false) {
$stats['failed']++;
}
if (preg_match('/user[:\s]+(\w+)/i', $parsed['message'], $m)) {
$user = $m[1];
$stats['by_user'][$user] = ($stats['by_user'][$user] ?? 0) + 1;
}
}
}
}
}
fclose($handle);
}
public function getQueueStats($hours = 24)
{
$stats = [
'created' => 0,
'deleted' => 0,
'by_name' => [],
'errors' => [],
];
$files = $this->getLogFiles();
foreach ($files as $file) {
$this->parseQueueStats($file['path'], $hours, $stats);
}
return $stats;
}
private function parseQueueStats($filePath, $hours, &$stats)
{
$threshold = time() - ($hours * 3600);
if (!file_exists($filePath)) {
return;
}
$handle = fopen($filePath, 'r');
$isGz = preg_match('/\.gz$/', $filePath);
while (($line = $isGz ? gzgets($handle) : fgets($handle)) !== false) {
if (strpos($line, 'queue') !== false) {
$parsed = $this->parseLine($line);
if ($parsed) {
$logTime = strtotime($parsed['timestamp']);
if ($logTime >= $threshold) {
if (preg_match('/declaring queue[:\s]+([^\s,]+)/i', $parsed['message'], $m)) {
$queueName = $m[1];
$stats['created']++;
$stats['by_name'][$queueName] = ($stats['by_name'][$queueName] ?? 0) + 1;
}
if (preg_match('/deleting queue[:\s]+([^\s,]+)/i', $parsed['message'], $m)) {
$queueName = $m[1];
$stats['deleted']++;
}
if (in_array($parsed['level'], ['error', 'warning'])) {
if (preg_match('/queue[:\s]+([^\s,]+)/i', $parsed['message'], $m)) {
$stats['errors'][] = [
'queue' => $m[1],
'message' => $parsed['message'],
'timestamp' => $parsed['timestamp'],
];
}
}
}
}
}
}
fclose($handle);
}
public function searchLogs($pattern, $hours = 24, $level = null)
{
$results = [];
$threshold = time() - ($hours * 3600);
$files = $this->getLogFiles();
foreach ($files as $file) {
$this->searchInFile($file['path'], $pattern, $threshold, $level, $results);
}
return $results;
}
private function searchInFile($filePath, $pattern, $threshold, $level, &$results)
{
if (!file_exists($filePath)) {
return;
}
$handle = fopen($filePath, 'r');
$isGz = preg_match('/\.gz$/', $filePath);
while (($line = $isGz ? gzgets($handle) : fgets($handle)) !== false) {
if (stripos($line, $pattern) !== false) {
$parsed = $this->parseLine($line);
if ($parsed) {
$logTime = strtotime($parsed['timestamp']);
if ($logTime >= $threshold) {
if ($level === null || $parsed['level'] === $level) {
$results[] = $parsed;
}
}
}
}
}
fclose($handle);
}
public function getLogFiles()
{
$files = [];
if (!is_dir($this->logDir)) {
return $files;
}
$iterator = new DirectoryIterator($this->logDir);
foreach ($iterator as $file) {
if ($file->isFile() && preg_match('/\.log(\.\d+)?(\.gz)?$/', $file->getFilename())) {
$files[] = [
'name' => $file->getFilename(),
'path' => $file->getPathname(),
'size' => $file->getSize(),
'modified' => date('Y-m-d H:i:s', $file->getMTime()),
];
}
}
usort($files, function($a, $b) {
return strcmp($b['modified'], $a['modified']);
});
return $files;
}
public function generateSummaryReport($hours = 24)
{
$errors = $this->analyzeErrors($hours);
$connections = $this->getConnectionStats($hours);
$queues = $this->getQueueStats($hours);
$report = [
'period' => "Last {$hours} hours",
'generated_at' => date('Y-m-d H:i:s'),
'errors' => [
'total' => count($errors),
'critical' => count(array_filter($errors, fn($e) => $e['level'] === 'critical')),
'by_type' => $this->categorizeErrors($errors),
],
'connections' => $connections,
'queues' => $queues,
'recommendations' => $this->generateRecommendations($errors, $connections, $queues),
];
return $report;
}
private function categorizeErrors($errors)
{
$categories = [];
foreach ($errors as $error) {
$category = $this->categorizeError($error['message']);
$categories[$category] = ($categories[$category] ?? 0) + 1;
}
arsort($categories);
return $categories;
}
private function categorizeError($message)
{
if (stripos($message, 'connection') !== false) {
return 'connection';
}
if (stripos($message, 'queue') !== false) {
return 'queue';
}
if (stripos($message, 'memory') !== false) {
return 'memory';
}
if (stripos($message, 'disk') !== false) {
return 'disk';
}
if (stripos($message, 'channel') !== false) {
return 'channel';
}
if (stripos($message, 'authentication') !== false || stripos($message, 'auth') !== false) {
return 'authentication';
}
return 'other';
}
private function generateRecommendations($errors, $connections, $queues)
{
$recommendations = [];
if (count($errors) > 10) {
$recommendations[] = '检测到较多错误日志,建议检查系统状态';
}
if ($connections['failed'] > $connections['successful'] * 0.1) {
$recommendations[] = '连接失败率较高,建议检查网络和认证配置';
}
if (!empty($queues['errors'])) {
$recommendations[] = '检测到队列相关错误,建议检查队列配置';
}
return $recommendations;
}
}Shell 日志分析脚本
bash
#!/bin/bash
/opt/rabbitmq/scripts/analyze_logs.sh
LOG_DIR="/var/log/rabbitmq"
HOURS=${1:-24}
echo "=== RabbitMQ 日志分析报告 ==="
echo "分析时间范围: 最近 ${HOURS} 小时"
echo ""
echo "1. 错误统计:"
echo "-------------------"
grep -h "\[error\]\|\[critical\]" ${LOG_DIR}/*.log 2>/dev/null | \
grep "$(date -d "${HOURS} hours ago" '+%Y-%m-%d')" | \
wc -l | xargs echo "错误总数:"
echo ""
echo "2. 连接统计:"
echo "-------------------"
grep -h "connection.*accepted" ${LOG_DIR}/*.log 2>/dev/null | \
grep "$(date -d "${HOURS} hours ago" '+%Y-%m-%d')" | \
wc -l | xargs echo "成功连接:"
grep -h "connection.*failed\|connection.*error" ${LOG_DIR}/*.log 2>/dev/null | \
grep "$(date -d "${HOURS} hours ago" '+%Y-%m-%d')" | \
wc -l | xargs echo "失败连接:"
echo ""
echo "3. 认证失败:"
echo "-------------------"
grep -h "authentication.*failed\|access_refused" ${LOG_DIR}/*.log 2>/dev/null | \
grep "$(date -d "${HOURS} hours ago" '+%Y-%m-%d')" | \
wc -l | xargs echo "认证失败次数:"
echo ""
echo "4. 最近 10 条错误:"
echo "-------------------"
grep -h "\[error\]\|\[critical\]" ${LOG_DIR}/*.log 2>/dev/null | \
grep "$(date -d "${HOURS} hours ago" '+%Y-%m-%d')" | \
tail -10
echo ""
echo "5. Top 10 活跃 IP:"
echo "-------------------"
grep -h "connection.*accepted" ${LOG_DIR}/*.log 2>/dev/null | \
grep "$(date -d "${HOURS} hours ago" '+%Y-%m-%d')" | \
grep -oE '([0-9]{1,3}\.){3}[0-9]{1,3}' | \
sort | uniq -c | sort -rn | head -10ELK Stack 配置
yaml
version: '3.8'
services:
elasticsearch:
image: elasticsearch:8.0.0
environment:
- discovery.type=single-node
- xpack.security.enabled=false
ports:
- "9200:9200"
volumes:
- es_data:/usr/share/elasticsearch/data
logstash:
image: logstash:8.0.0
volumes:
- ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
- /var/log/rabbitmq:/var/log/rabbitmq:ro
depends_on:
- elasticsearch
kibana:
image: kibana:8.0.0
ports:
- "5601:5601"
depends_on:
- elasticsearch
volumes:
es_data:Logstash 配置文件:
ruby
input {
file {
path => "/var/log/rabbitmq/*.log"
start_position => "beginning"
sincedb_path => "/dev/null"
codec => json {
charset => "UTF-8"
}
}
}
filter {
if [level] {
mutate {
add_field => { "log_level" => "%{level}" }
}
}
date {
match => [ "time", "ISO8601", "YYYY-MM-dd HH:mm:ss.SSS" ]
target => "@timestamp"
}
grok {
match => { "msg" => "connection %{WORD:connection_action}: %{IP:client_ip}:%{NUMBER:client_port}" }
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "rabbitmq-%{+YYYY.MM.dd}"
}
}实际应用场景
场景一:实时错误监控
php
<?php
class RealtimeErrorMonitor
{
private $analyzer;
private $lastPosition = 0;
private $alertThreshold = 5;
private $alertWindow = 300;
public function __construct(RabbitMQLogAnalyzer $analyzer)
{
$this->analyzer = $analyzer;
}
public function monitor($logFile)
{
$errors = [];
$currentPosition = filesize($logFile);
if ($currentPosition < $this->lastPosition) {
$this->lastPosition = 0;
}
$handle = fopen($logFile, 'r');
fseek($handle, $this->lastPosition);
while (($line = fgets($handle)) !== false) {
$parsed = $this->analyzer->parseLine($line);
if ($parsed && in_array($parsed['level'], ['error', 'critical'])) {
$errors[] = $parsed;
}
}
$this->lastPosition = ftell($handle);
fclose($handle);
if (count($errors) >= $this->alertThreshold) {
$this->sendAlert($errors);
}
return $errors;
}
private function sendAlert($errors)
{
$message = sprintf(
"检测到 %d 条错误日志,请及时处理\n最近错误: %s",
count($errors),
$errors[0]['message'] ?? 'Unknown'
);
error_log("[RabbitMQ Alert] " . $message);
}
}场景二:日志导出与归档
php
<?php
class LogExporter
{
private $analyzer;
public function __construct(RabbitMQLogAnalyzer $analyzer)
{
$this->analyzer = $analyzer;
}
public function exportToJson($outputFile, $hours = 24)
{
$report = $this->analyzer->generateSummaryReport($hours);
file_put_contents($outputFile, json_encode($report, JSON_PRETTY_PRINT));
return $outputFile;
}
public function exportToCsv($outputFile, $hours = 24)
{
$errors = $this->analyzer->analyzeErrors($hours);
$fp = fopen($outputFile, 'w');
fputcsv($fp, ['Timestamp', 'Level', 'PID', 'Message']);
foreach ($errors as $error) {
fputcsv($fp, [
$error['timestamp'],
$error['level'],
$error['pid'],
$error['message'],
]);
}
fclose($fp);
return $outputFile;
}
public function archiveLogs($archiveDir, $daysOld = 30)
{
$files = $this->analyzer->getLogFiles();
$threshold = time() - ($daysOld * 86400);
$archived = [];
if (!is_dir($archiveDir)) {
mkdir($archiveDir, 0755, true);
}
foreach ($files as $file) {
$mtime = filemtime($file['path']);
if ($mtime < $threshold) {
$archivePath = $archiveDir . '/' . basename($file['path']);
if (rename($file['path'], $archivePath)) {
$archived[] = $file['name'];
}
}
}
return $archived;
}
}常见问题与解决方案
问题一:日志量过大分析缓慢
现象:分析大量日志时性能下降。
解决方案:
php
$analyzer->searchLogs('error', 1);或使用时间范围过滤:
bash
grep "$(date '+%Y-%m-%d')" /var/log/rabbitmq/rabbit.log | grep error问题二:JSON 解析失败
现象:JSON 格式日志解析出错。
解决方案:
bash
log.file.formatter = json
log.file.json.field_map = time level msg pid mfa line问题三:无法追踪问题
现象:日志信息不足以定位问题。
解决方案:
bash
log.file.level = debug
log.connection.level = debug
log.channel.level = debug最佳实践
1. 日志分析频率
| 分析类型 | 频率 | 工具 |
|---|---|---|
| 实时监控 | 持续 | 自定义脚本 |
| 日常检查 | 每日 | Shell 脚本 |
| 深度分析 | 每周 | ELK Stack |
| 归档报告 | 每月 | PHP 脚本 |
2. 关键指标监控
php
$report = $analyzer->generateSummaryReport(24);
if ($report['errors']['total'] > 100) {
trigger_alert('错误日志过多');
}3. 日志保留策略
- 原始日志:30 天
- 分析报告:90 天
- 归档数据:1 年
