Skip to content

RabbitMQ 审计日志

概述

审计日志是记录系统中重要操作和事件的日志,用于安全审计、合规检查和问题追溯。RabbitMQ 提供了审计日志功能,可以记录用户操作、权限变更、资源配置等关键事件。本文将详细介绍 RabbitMQ 审计日志的配置和使用方法。

核心知识点

审计日志类型

类型说明记录内容
用户管理用户创建、删除、修改用户名、操作类型、操作者
权限变更权限授予、撤销权限范围、操作对象
资源操作队列、交换机操作资源名称、操作类型
连接事件连接建立、断开客户端信息、认证结果
配置变更策略、参数修改配置内容、变更前后

审计日志要素

要素说明示例
时间戳操作发生时间2024-01-15 10:30:00
操作者执行操作的用户admin
操作类型具体操作动作CREATE_QUEUE
操作对象被操作的资源/vhost/queue_name
结果操作结果SUCCESS/FAILURE
来源 IP操作来源地址192.168.1.100

审计日志级别

级别说明
INFO正常操作记录
WARNING可疑操作警告
CRITICAL安全事件记录

配置示例

启用审计日志

bash
log.audit.enabled = true
log.audit.level = info
log.audit.file = /var/log/rabbitmq/audit.log
log.audit.rotation.date = $D0
log.audit.rotation.count = 90

审计事件配置

bash
audit.events = user.create, user.delete, user.update
audit.events += permission.grant, permission.revoke
audit.events += queue.create, queue.delete
audit.events += exchange.create, exchange.delete
audit.events += policy.create, policy.delete, policy.update
audit.events += connection.open, connection.close
audit.events += vhost.create, vhost.delete

PHP 审计日志管理类

php
<?php

class RabbitMQAuditLogger
{
    private $logFile;
    private $enabled;
    private $events;
    
    private $defaultEvents = [
        'user.create', 'user.delete', 'user.update',
        'permission.grant', 'permission.revoke',
        'queue.create', 'queue.delete',
        'exchange.create', 'exchange.delete',
        'policy.create', 'policy.delete', 'policy.update',
        'connection.open', 'connection.close',
        'vhost.create', 'vhost.delete',
    ];
    
    public function __construct($logFile = '/var/log/rabbitmq/audit.log', $enabled = true)
    {
        $this->logFile = $logFile;
        $this->enabled = $enabled;
        $this->events = $this->defaultEvents;
        
        $this->ensureLogDir();
    }
    
    public function log($eventType, $operator, $object, $result, $details = [])
    {
        if (!$this->enabled) {
            return false;
        }
        
        if (!in_array($eventType, $this->events)) {
            return false;
        }
        
        $entry = [
            'timestamp' => date('Y-m-d H:i:s'),
            'event_type' => $eventType,
            'operator' => $operator,
            'object' => $object,
            'result' => $result ? 'SUCCESS' : 'FAILURE',
            'source_ip' => $this->getClientIp(),
            'details' => $details,
        ];
        
        $logLine = json_encode($entry) . "\n";
        
        return file_put_contents($this->logFile, $logLine, FILE_APPEND) !== false;
    }
    
    public function logUserCreate($operator, $username, $tags = [], $result = true)
    {
        return $this->log('user.create', $operator, $username, $result, [
            'tags' => $tags,
        ]);
    }
    
    public function logUserDelete($operator, $username, $result = true)
    {
        return $this->log('user.delete', $operator, $username, $result);
    }
    
    public function logUserUpdate($operator, $username, $changes = [], $result = true)
    {
        return $this->log('user.update', $operator, $username, $result, [
            'changes' => $changes,
        ]);
    }
    
    public function logPermissionGrant($operator, $username, $vhost, $permissions, $result = true)
    {
        return $this->log('permission.grant', $operator, "{$vhost}:{$username}", $result, [
            'vhost' => $vhost,
            'permissions' => $permissions,
        ]);
    }
    
    public function logPermissionRevoke($operator, $username, $vhost, $result = true)
    {
        return $this->log('permission.revoke', $operator, "{$vhost}:{$username}", $result, [
            'vhost' => $vhost,
        ]);
    }
    
    public function logQueueCreate($operator, $queueName, $vhost, $arguments = [], $result = true)
    {
        return $this->log('queue.create', $operator, "{$vhost}/{$queueName}", $result, [
            'vhost' => $vhost,
            'arguments' => $arguments,
        ]);
    }
    
    public function logQueueDelete($operator, $queueName, $vhost, $result = true)
    {
        return $this->log('queue.delete', $operator, "{$vhost}/{$queueName}", $result, [
            'vhost' => $vhost,
        ]);
    }
    
    public function logExchangeCreate($operator, $exchangeName, $vhost, $type, $result = true)
    {
        return $this->log('exchange.create', $operator, "{$vhost}/{$exchangeName}", $result, [
            'vhost' => $vhost,
            'type' => $type,
        ]);
    }
    
    public function logExchangeDelete($operator, $exchangeName, $vhost, $result = true)
    {
        return $this->log('exchange.delete', $operator, "{$vhost}/{$exchangeName}", $result, [
            'vhost' => $vhost,
        ]);
    }
    
    public function logPolicyCreate($operator, $policyName, $vhost, $pattern, $definition, $result = true)
    {
        return $this->log('policy.create', $operator, "{$vhost}/{$policyName}", $result, [
            'vhost' => $vhost,
            'pattern' => $pattern,
            'definition' => $definition,
        ]);
    }
    
    public function logPolicyDelete($operator, $policyName, $vhost, $result = true)
    {
        return $this->log('policy.delete', $operator, "{$vhost}/{$policyName}", $result, [
            'vhost' => $vhost,
        ]);
    }
    
    public function logConnectionOpen($operator, $clientInfo, $result = true)
    {
        return $this->log('connection.open', $operator, $clientInfo['peer_host'] ?? 'unknown', $result, $clientInfo);
    }
    
    public function logConnectionClose($operator, $clientInfo, $reason = '', $result = true)
    {
        return $this->log('connection.close', $operator, $clientInfo['peer_host'] ?? 'unknown', $result, [
            'reason' => $reason,
            'client_info' => $clientInfo,
        ]);
    }
    
    public function logVhostCreate($operator, $vhostName, $result = true)
    {
        return $this->log('vhost.create', $operator, $vhostName, $result);
    }
    
    public function logVhostDelete($operator, $vhostName, $result = true)
    {
        return $this->log('vhost.delete', $operator, $vhostName, $result);
    }
    
    public function query($filters = [])
    {
        $entries = [];
        
        if (!file_exists($this->logFile)) {
            return $entries;
        }
        
        $handle = fopen($this->logFile, 'r');
        
        while (($line = fgets($handle)) !== false) {
            $entry = json_decode(trim($line), true);
            
            if ($entry && $this->matchesFilters($entry, $filters)) {
                $entries[] = $entry;
            }
        }
        
        fclose($handle);
        
        return $entries;
    }
    
    public function queryByEventType($eventType, $limit = 100)
    {
        return $this->query(['event_type' => $eventType]);
    }
    
    public function queryByOperator($operator, $limit = 100)
    {
        return $this->query(['operator' => $operator]);
    }
    
    public function queryByTimeRange($startTime, $endTime)
    {
        return $this->query([
            'start_time' => $startTime,
            'end_time' => $endTime,
        ]);
    }
    
    public function queryByResult($result)
    {
        return $this->query(['result' => $result ? 'SUCCESS' : 'FAILURE']);
    }
    
    public function getStatistics($hours = 24)
    {
        $threshold = date('Y-m-d H:i:s', time() - ($hours * 3600));
        
        $stats = [
            'total_events' => 0,
            'by_type' => [],
            'by_operator' => [],
            'by_result' => ['SUCCESS' => 0, 'FAILURE' => 0],
            'failures' => [],
        ];
        
        $entries = $this->query(['start_time' => $threshold]);
        
        foreach ($entries as $entry) {
            $stats['total_events']++;
            
            $type = $entry['event_type'];
            $stats['by_type'][$type] = ($stats['by_type'][$type] ?? 0) + 1;
            
            $operator = $entry['operator'];
            $stats['by_operator'][$operator] = ($stats['by_operator'][$operator] ?? 0) + 1;
            
            $result = $entry['result'];
            $stats['by_result'][$result]++;
            
            if ($result === 'FAILURE') {
                $stats['failures'][] = $entry;
            }
        }
        
        return $stats;
    }
    
    public function generateAuditReport($hours = 24)
    {
        $stats = $this->getStatistics($hours);
        
        $report = "RabbitMQ 审计日志报告\n";
        $report .= str_repeat("=", 50) . "\n";
        $report .= "报告时间范围: 最近 {$hours} 小时\n";
        $report .= "生成时间: " . date('Y-m-d H:i:s') . "\n\n";
        
        $report .= "事件统计:\n";
        $report .= "- 总事件数: {$stats['total_events']}\n";
        $report .= "- 成功: {$stats['by_result']['SUCCESS']}\n";
        $report .= "- 失败: {$stats['by_result']['FAILURE']}\n\n";
        
        $report .= "按类型统计:\n";
        foreach ($stats['by_type'] as $type => $count) {
            $report .= "- {$type}: {$count}\n";
        }
        
        $report .= "\n按操作者统计:\n";
        foreach ($stats['by_operator'] as $operator => $count) {
            $report .= "- {$operator}: {$count}\n";
        }
        
        if (!empty($stats['failures'])) {
            $report .= "\n失败事件:\n";
            foreach (array_slice($stats['failures'], 0, 10) as $failure) {
                $report .= "- [{$failure['timestamp']}] {$failure['event_type']} by {$failure['operator']}\n";
            }
        }
        
        return $report;
    }
    
    public function detectAnomalies($hours = 24)
    {
        $anomalies = [];
        $stats = $this->getStatistics($hours);
        
        if ($stats['by_result']['FAILURE'] > 10) {
            $anomalies[] = [
                'type' => 'high_failure_rate',
                'severity' => 'warning',
                'message' => "检测到 {$stats['by_result']['FAILURE']} 次失败操作",
            ];
        }
        
        foreach ($stats['by_operator'] as $operator => $count) {
            if ($count > 100) {
                $anomalies[] = [
                    'type' => 'high_activity',
                    'severity' => 'info',
                    'message' => "用户 {$operator} 操作频繁: {$count} 次",
                ];
            }
        }
        
        $deleteEvents = ($stats['by_type']['user.delete'] ?? 0) + 
                        ($stats['by_type']['queue.delete'] ?? 0) +
                        ($stats['by_type']['vhost.delete'] ?? 0);
        
        if ($deleteEvents > 20) {
            $anomalies[] = [
                'type' => 'high_delete_rate',
                'severity' => 'warning',
                'message' => "检测到大量删除操作: {$deleteEvents} 次",
            ];
        }
        
        return $anomalies;
    }
    
    public function setEnabled($enabled)
    {
        $this->enabled = $enabled;
    }
    
    public function setEvents($events)
    {
        $this->events = $events;
    }
    
    private function matchesFilters($entry, $filters)
    {
        if (isset($filters['event_type']) && $entry['event_type'] !== $filters['event_type']) {
            return false;
        }
        
        if (isset($filters['operator']) && $entry['operator'] !== $filters['operator']) {
            return false;
        }
        
        if (isset($filters['result']) && $entry['result'] !== $filters['result']) {
            return false;
        }
        
        if (isset($filters['start_time']) && $entry['timestamp'] < $filters['start_time']) {
            return false;
        }
        
        if (isset($filters['end_time']) && $entry['timestamp'] > $filters['end_time']) {
            return false;
        }
        
        return true;
    }
    
    private function getClientIp()
    {
        if (!empty($_SERVER['HTTP_CLIENT_IP'])) {
            return $_SERVER['HTTP_CLIENT_IP'];
        }
        
        if (!empty($_SERVER['HTTP_X_FORWARDED_FOR'])) {
            return explode(',', $_SERVER['HTTP_X_FORWARDED_FOR'])[0];
        }
        
        return $_SERVER['REMOTE_ADDR'] ?? 'unknown';
    }
    
    private function ensureLogDir()
    {
        $dir = dirname($this->logFile);
        if (!is_dir($dir)) {
            mkdir($dir, 0755, true);
        }
    }
}

实际应用场景

场景一:用户操作审计

php
<?php

class UserOperationAuditor
{
    private $auditLogger;
    private $rabbitmqApi;
    
    public function __construct(RabbitMQAuditLogger $auditLogger, $rabbitmqApi)
    {
        $this->auditLogger = $auditLogger;
        $this->rabbitmqApi = $rabbitmqApi;
    }
    
    public function createUser($operator, $username, $password, $tags = '')
    {
        $result = $this->rabbitmqApi->createUser($username, $password, $tags);
        
        $this->auditLogger->logUserCreate(
            $operator,
            $username,
            explode(',', $tags),
            $result['code'] === 204
        );
        
        return $result;
    }
    
    public function deleteUser($operator, $username)
    {
        $result = $this->rabbitmqApi->deleteUser($username);
        
        $this->auditLogger->logUserDelete(
            $operator,
            $username,
            $result['code'] === 204
        );
        
        return $result;
    }
    
    public function grantPermission($operator, $username, $vhost, $configure, $write, $read)
    {
        $result = $this->rabbitmqApi->setPermissions($vhost, $username, $configure, $write, $read);
        
        $this->auditLogger->logPermissionGrant(
            $operator,
            $username,
            $vhost,
            ['configure' => $configure, 'write' => $write, 'read' => $read],
            $result['code'] === 204
        );
        
        return $result;
    }
}

场景二:安全审计报告

php
<?php

class SecurityAuditReport
{
    private $auditLogger;
    
    public function __construct(RabbitMQAuditLogger $auditLogger)
    {
        $this->auditLogger = $auditLogger;
    }
    
    public function generateDailyReport()
    {
        $stats = $this->auditLogger->getStatistics(24);
        $anomalies = $this->auditLogger->detectAnomalies(24);
        
        $report = [
            'report_type' => 'daily_security_audit',
            'generated_at' => date('Y-m-d H:i:s'),
            'period' => '24 hours',
            'summary' => [
                'total_events' => $stats['total_events'],
                'success_rate' => $this->calculateSuccessRate($stats),
                'unique_operators' => count($stats['by_operator']),
            ],
            'event_breakdown' => $stats['by_type'],
            'top_operators' => $this->getTopOperators($stats['by_operator'], 5),
            'failures' => array_slice($stats['failures'], 0, 10),
            'anomalies' => $anomalies,
            'recommendations' => $this->generateRecommendations($stats, $anomalies),
        ];
        
        return $report;
    }
    
    private function calculateSuccessRate($stats)
    {
        $total = $stats['by_result']['SUCCESS'] + $stats['by_result']['FAILURE'];
        
        if ($total === 0) {
            return 100;
        }
        
        return round(($stats['by_result']['SUCCESS'] / $total) * 100, 2);
    }
    
    private function getTopOperators($operators, $limit)
    {
        arsort($operators);
        return array_slice($operators, 0, $limit, true);
    }
    
    private function generateRecommendations($stats, $anomalies)
    {
        $recommendations = [];
        
        if ($stats['by_result']['FAILURE'] > 10) {
            $recommendations[] = '建议检查失败操作的原因,可能存在配置问题';
        }
        
        foreach ($anomalies as $anomaly) {
            if ($anomaly['type'] === 'high_delete_rate') {
                $recommendations[] = '检测到大量删除操作,建议确认是否有未授权操作';
            }
        }
        
        if (empty($stats['by_type']['connection.open'])) {
            $recommendations[] = '未检测到连接事件,建议确认审计日志配置';
        }
        
        return $recommendations;
    }
}

场景三:合规检查

php
<?php

class ComplianceChecker
{
    private $auditLogger;
    
    private $complianceRules = [
        'password_change_interval' => 90,
        'max_failed_attempts' => 5,
        'audit_log_retention' => 365,
    ];
    
    public function __construct(RabbitMQAuditLogger $auditLogger)
    {
        $this->auditLogger = $auditLogger;
    }
    
    public function checkCompliance()
    {
        $results = [
            'compliant' => true,
            'checks' => [],
        ];
        
        $results['checks']['audit_enabled'] = $this->checkAuditEnabled();
        $results['checks']['failed_attempts'] = $this->checkFailedAttempts();
        $results['checks']['user_management'] = $this->checkUserManagement();
        $results['checks']['permission_changes'] = $this->checkPermissionChanges();
        
        foreach ($results['checks'] as $check) {
            if (!$check['compliant']) {
                $results['compliant'] = false;
            }
        }
        
        return $results;
    }
    
    private function checkAuditEnabled()
    {
        $stats = $this->auditLogger->getStatistics(1);
        
        return [
            'compliant' => $stats['total_events'] > 0,
            'message' => $stats['total_events'] > 0 
                ? '审计日志正常记录' 
                : '未检测到审计日志记录',
        ];
    }
    
    private function checkFailedAttempts()
    {
        $stats = $this->auditLogger->getStatistics(24);
        $failedCount = $stats['by_result']['FAILURE'];
        
        return [
            'compliant' => $failedCount <= $this->complianceRules['max_failed_attempts'],
            'message' => "24小时内失败操作: {$failedCount} 次",
            'threshold' => $this->complianceRules['max_failed_attempts'],
        ];
    }
    
    private function checkUserManagement()
    {
        $userCreates = $this->auditLogger->queryByEventType('user.create');
        $userDeletes = $this->auditLogger->queryByEventType('user.delete');
        
        return [
            'compliant' => true,
            'message' => sprintf(
                '用户创建: %d 次, 用户删除: %d 次',
                count($userCreates),
                count($userDeletes)
            ),
        ];
    }
    
    private function checkPermissionChanges()
    {
        $grants = $this->auditLogger->queryByEventType('permission.grant');
        $revokes = $this->auditLogger->queryByEventType('permission.revoke');
        
        return [
            'compliant' => true,
            'message' => sprintf(
                '权限授予: %d 次, 权限撤销: %d 次',
                count($grants),
                count($revokes)
            ),
        ];
    }
    
    public function generateComplianceReport()
    {
        $check = $this->checkCompliance();
        
        $report = "RabbitMQ 合规检查报告\n";
        $report .= str_repeat("=", 50) . "\n";
        $report .= "检查时间: " . date('Y-m-d H:i:s') . "\n";
        $report .= "合规状态: " . ($check['compliant'] ? '合规' : '不合规') . "\n\n";
        
        foreach ($check['checks'] as $name => $result) {
            $status = $result['compliant'] ? '✓' : '✗';
            $report .= "[{$status}] {$name}: {$result['message']}\n";
        }
        
        return $report;
    }
}

常见问题与解决方案

问题一:审计日志丢失

现象:部分操作没有被记录。

解决方案

php
$auditLogger->setEnabled(true);
$auditLogger->setEvents([
    'user.create', 'user.delete', 'user.update',
    'permission.grant', 'permission.revoke',
    'queue.create', 'queue.delete',
]);

问题二:审计日志过大

现象:审计日志占用过多空间。

解决方案

bash
log.audit.rotation.date = $D0
log.audit.rotation.count = 90
log.audit.rotation.size = 104857600

问题三:无法追溯操作者

现象:审计日志中操作者信息不完整。

解决方案

php
$auditLogger->log('user.create', $currentUsername, $targetUser, true, [
    'source_ip' => $_SERVER['REMOTE_ADDR'],
    'user_agent' => $_SERVER['HTTP_USER_AGENT'],
]);

最佳实践

1. 审计事件选择

环境推荐事件
生产环境全部事件
测试环境用户、权限事件
开发环境连接、资源事件

2. 日志保留策略

bash
log.audit.rotation.date = $D0
log.audit.rotation.count = 365

3. 定期审计

php
$checker = new ComplianceChecker($auditLogger);
$report = $checker->generateComplianceReport();
file_put_contents('/var/log/rabbitmq/compliance_' . date('Ymd') . '.log', $report);

相关链接