Skip to content

消息追踪插件

概述

消息追踪插件(rabbitmq_tracing)是 RabbitMQ 提供的一个强大的消息追踪和调试工具。它可以记录消息的发布和消费情况,帮助开发者排查消息丢失、消息流向等问题,是生产环境中调试和监控的重要工具。

核心知识点

消息追踪架构

mermaid
graph TB
    subgraph 生产者
        P[消息生产者]
    end

    subgraph RabbitMQ
        E[Exchange]
        Q[Queue]
        T[Tracing 插件]
        L[日志存储]
    end

    subgraph 消费者
        C[消息消费者]
    end

    P -->|1. 发布消息| E
    E -->|2. 记录发布日志| T
    T -->|3. 存储| L
    E -->|4. 路由| Q
    Q -->|5. 记录消费日志| T
    T -->|6. 存储| L
    Q -->|7. 投递| C

    style T fill:#e1f5fe
    style L fill:#fff3e0

追踪日志格式

mermaid
sequenceDiagram
    participant P as 生产者
    participant T as Tracing
    participant Q as 队列
    participant C as 消费者

    Note over P,T: 发布追踪
    P->>T: publish<br/>exchange=x<br/>routing_key=y
    T-->>T: 记录日志

    Note over T,C: 消费追踪
    Q->>T: deliver<br/>queue=z<br/>consumer_tag=tag
    T-->>T: 记录日志

    C->>T: ack<br/>delivery_tag=1
    T-->>T: 记录确认

核心功能

功能说明
发布追踪记录消息发布到交换器的信息
投递追踪记录消息从队列投递到消费者的信息
确认追踪记录消费者确认消息的信息
日志存储将追踪日志存储到文件或数据库

PHP 代码示例

安装和配置追踪插件

php
<?php

class TracingPluginManager
{
    public function install()
    {
        echo "安装消息追踪插件...\n";

        $commands = [
            'rabbitmq-plugins enable rabbitmq_tracing',
            'systemctl restart rabbitmq-server'
        ];

        foreach ($commands as $command) {
            exec($command, $output, $returnCode);

            if ($returnCode !== 0) {
                throw new RuntimeException("命令执行失败: {$command}");
            }
        }

        echo "消息追踪插件安装完成\n";
    }

    public function isEnabled()
    {
        exec('rabbitmq-plugins list -e', $output);

        foreach ($output as $line) {
            if (strpos($line, 'rabbitmq_tracing') !== false && strpos($line, '[E]') !== false) {
                return true;
            }
        }

        return false;
    }
}

追踪配置管理

php
<?php

class TracingConfigManager
{
    private $apiClient;

    public function __construct()
    {
        $this->apiClient = new RabbitMQApiClient(
            'localhost',
            15672,
            'guest',
            'guest'
        );
    }

    public function createTracing($name, $pattern = '#', $maxBytes = 104857600)
    {
        $config = [
            'name' => $name,
            'pattern' => $pattern,
            'max_bytes' => $maxBytes,
            'format' => 'json'
        ];

        $response = $this->apiClient->put(
            "/api/traces/%2f/{$name}",
            $config
        );

        echo "追踪已创建: {$name}\n";

        return $response;
    }

    public function deleteTracing($name)
    {
        $response = $this->apiClient->delete(
            "/api/traces/%2f/{$name}"
        );

        echo "追踪已删除: {$name}\n";

        return $response;
    }

    public function listTracings()
    {
        return $this->apiClient->get('/api/traces/%2f');
    }

    public function getTracing($name)
    {
        return $this->apiClient->get("/api/traces/%2f/{$name}");
    }

    public function getTracingFile($name)
    {
        return $this->apiClient->get("/api/traces/%2f/{$name}/get");
    }
}

class RabbitMQApiClient
{
    private $host;
    private $port;
    private $user;
    private $password;

    public function __construct($host, $port, $user, $password)
    {
        $this->host = $host;
        $this->port = $port;
        $this->user = $user;
        $this->password = $password;
    }

    public function get($endpoint)
    {
        return $this->request('GET', $endpoint);
    }

    public function put($endpoint, $data = [])
    {
        return $this->request('PUT', $endpoint, $data);
    }

    public function delete($endpoint)
    {
        return $this->request('DELETE', $endpoint);
    }

    private function request($method, $endpoint, $data = [])
    {
        $url = "http://{$this->host}:{$this->port}{$endpoint}";

        $ch = curl_init();
        curl_setopt($ch, CURLOPT_URL, $url);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
        curl_setopt($ch, CURLOPT_USERPWD, "{$this->user}:{$this->password}");
        curl_setopt($ch, CURLOPT_CUSTOMREQUEST, $method);
        curl_setopt($ch, CURLOPT_TIMEOUT, 30);

        if (!empty($data)) {
            curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data));
            curl_setopt($ch, CURLOPT_HTTPHEADER, ['Content-Type: application/json']);
        }

        $response = curl_exec($ch);
        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        curl_close($ch);

        if ($httpCode >= 400) {
            throw new RuntimeException("API 请求失败: HTTP {$httpCode}");
        }

        return json_decode($response, true);
    }
}

消息追踪分析器

php
<?php

class MessageTracer
{
    private $configManager;
    private $traceName = 'message_trace';

    public function __construct()
    {
        $this->configManager = new TracingConfigManager();
    }

    public function startTracing($pattern = '#')
    {
        $this->configManager->createTracing($this->traceName, $pattern);

        echo "消息追踪已启动,模式: {$pattern}\n";
    }

    public function stopTracing()
    {
        $this->configManager->deleteTracing($this->traceName);

        echo "消息追踪已停止\n";
    }

    public function getTraceLogs()
    {
        $logs = $this->configManager->getTracingFile($this->traceName);

        return $this->parseTraceLogs($logs);
    }

    private function parseTraceLogs($rawLogs)
    {
        $logs = [];

        if (empty($rawLogs)) {
            return $logs;
        }

        $lines = explode("\n", $rawLogs);

        foreach ($lines as $line) {
            if (empty(trim($line))) {
                continue;
            }

            $log = json_decode($line, true);

            if ($log) {
                $logs[] = $this->formatLogEntry($log);
            }
        }

        return $logs;
    }

    private function formatLogEntry($log)
    {
        return [
            'timestamp' => $log['timestamp'] ?? null,
            'type' => $log['type'] ?? null,
            'exchange' => $log['exchange'] ?? null,
            'queue' => $log['queue'] ?? null,
            'routing_key' => $log['routing_key'] ?? null,
            'payload' => $log['payload'] ?? null,
            'properties' => $log['properties'] ?? []
        ];
    }

    public function analyzeMessageFlow($messageId)
    {
        $logs = $this->getTraceLogs();

        $flow = [];

        foreach ($logs as $log) {
            $properties = $log['properties'];
            $correlationId = $properties['correlation_id'] ?? null;

            if ($correlationId === $messageId) {
                $flow[] = $log;
            }
        }

        return $this->buildFlowDiagram($flow);
    }

    private function buildFlowDiagram($flow)
    {
        usort($flow, function ($a, $b) {
            return strcmp($a['timestamp'], $b['timestamp']);
        });

        return [
            'message_id' => $flow[0]['properties']['correlation_id'] ?? null,
            'total_steps' => count($flow),
            'steps' => $flow
        ];
    }
}

消息丢失检测

php
<?php

class MessageLossDetector
{
    private $tracer;

    public function __construct()
    {
        $this->tracer = new MessageTracer();
    }

    public function detectLostMessages($timeRange = 3600)
    {
        $logs = $this->tracer->getTraceLogs();

        $published = [];
        $consumed = [];

        $cutoffTime = time() - $timeRange;

        foreach ($logs as $log) {
            $timestamp = strtotime($log['timestamp']);

            if ($timestamp < $cutoffTime) {
                continue;
            }

            $messageId = $log['properties']['message_id'] ?? null;

            if (!$messageId) {
                continue;
            }

            if ($log['type'] === 'publish') {
                $published[$messageId] = $log;
            } elseif ($log['type'] === 'deliver') {
                $consumed[$messageId] = $log;
            }
        }

        $lost = [];

        foreach ($published as $messageId => $publishLog) {
            if (!isset($consumed[$messageId])) {
                $lost[] = [
                    'message_id' => $messageId,
                    'published_at' => $publishLog['timestamp'],
                    'exchange' => $publishLog['exchange'],
                    'routing_key' => $publishLog['routing_key']
                ];
            }
        }

        return [
            'total_published' => count($published),
            'total_consumed' => count($consumed),
            'lost_count' => count($lost),
            'lost_messages' => $lost
        ];
    }

    public function detectUnackedMessages()
    {
        $logs = $this->tracer->getTraceLogs();

        $delivered = [];
        $acked = [];

        foreach ($logs as $log) {
            $deliveryTag = $log['delivery_tag'] ?? null;

            if (!$deliveryTag) {
                continue;
            }

            if ($log['type'] === 'deliver') {
                $delivered[$deliveryTag] = $log;
            } elseif ($log['type'] === 'ack') {
                $acked[$deliveryTag] = $log;
            }
        }

        $unacked = [];

        foreach ($delivered as $tag => $deliverLog) {
            if (!isset($acked[$tag])) {
                $unacked[] = [
                    'delivery_tag' => $tag,
                    'delivered_at' => $deliverLog['timestamp'],
                    'queue' => $deliverLog['queue']
                ];
            }
        }

        return [
            'total_delivered' => count($delivered),
            'total_acked' => count($acked),
            'unacked_count' => count($unacked),
            'unacked_messages' => $unacked
        ];
    }
}

完整示例:消息追踪系统

php
<?php

class TracingSystem
{
    private $tracer;
    private $lossDetector;

    public function __construct()
    {
        $this->tracer = new MessageTracer();
        $this->lossDetector = new MessageLossDetector();
    }

    public function startMonitoring()
    {
        $this->tracer->startTracing();

        echo "消息追踪监控已启动\n";
    }

    public function stopMonitoring()
    {
        $this->tracer->stopTracing();

        echo "消息追踪监控已停止\n";
    }

    public function generateReport()
    {
        $lostMessages = $this->lossDetector->detectLostMessages();
        $unackedMessages = $this->lossDetector->detectUnackedMessages();

        $report = [
            'generated_at' => date('Y-m-d H:i:s'),
            'summary' => [
                'total_published' => $lostMessages['total_published'],
                'total_consumed' => $lostMessages['total_consumed'],
                'lost_messages' => $lostMessages['lost_count'],
                'unacked_messages' => $unackedMessages['unacked_count']
            ],
            'details' => [
                'lost_messages' => $lostMessages['lost_messages'],
                'unacked_messages' => $unackedMessages['unacked_messages']
            ]
        ];

        return $report;
    }

    public function traceMessage($messageId)
    {
        return $this->tracer->analyzeMessageFlow($messageId);
    }
}

实际应用场景

1. 订单消息追踪

php
<?php

class OrderMessageTracer
{
    private $tracer;

    public function __construct()
    {
        $this->tracer = new MessageTracer();
    }

    public function traceOrderMessage($orderId)
    {
        return $this->tracer->analyzeMessageFlow("order_{$orderId}");
    }

    public function findOrderMessageIssues($orderId)
    {
        $flow = $this->traceOrderMessage($orderId);

        $issues = [];

        $expectedSteps = ['publish', 'deliver', 'ack'];

        $actualSteps = array_column($flow['steps'], 'type');

        foreach ($expectedSteps as $step) {
            if (!in_array($step, $actualSteps)) {
                $issues[] = "缺少步骤: {$step}";
            }
        }

        return [
            'order_id' => $orderId,
            'flow' => $flow,
            'issues' => $issues,
            'has_issues' => !empty($issues)
        ];
    }
}

2. 性能分析

php
<?php

class PerformanceAnalyzer
{
    private $tracer;

    public function __construct()
    {
        $this->tracer = new MessageTracer();
    }

    public function analyzeLatency($queueName)
    {
        $logs = $this->tracer->getTraceLogs();

        $latencies = [];

        $publishTimes = [];
        $deliverTimes = [];

        foreach ($logs as $log) {
            if ($log['queue'] !== $queueName) {
                continue;
            }

            $messageId = $log['properties']['message_id'] ?? null;

            if (!$messageId) {
                continue;
            }

            $timestamp = strtotime($log['timestamp']);

            if ($log['type'] === 'publish') {
                $publishTimes[$messageId] = $timestamp;
            } elseif ($log['type'] === 'deliver') {
                $deliverTimes[$messageId] = $timestamp;
            }
        }

        foreach ($publishTimes as $messageId => $publishTime) {
            if (isset($deliverTimes[$messageId])) {
                $latencies[] = $deliverTimes[$messageId] - $publishTime;
            }
        }

        if (empty($latencies)) {
            return null;
        }

        return [
            'count' => count($latencies),
            'min' => min($latencies),
            'max' => max($latencies),
            'avg' => array_sum($latencies) / count($latencies),
            'p50' => $this->percentile($latencies, 50),
            'p95' => $this->percentile($latencies, 95),
            'p99' => $this->percentile($latencies, 99)
        ];
    }

    private function percentile($data, $percentile)
    {
        sort($data);

        $index = ceil(($percentile / 100) * count($data)) - 1;

        return $data[$index] ?? 0;
    }
}

3. 审计日志

php
<?php

class AuditLogger
{
    private $tracer;

    public function __construct()
    {
        $this->tracer = new MessageTracer();
    }

    public function generateAuditLog($startTime, $endTime)
    {
        $logs = $this->tracer->getTraceLogs();

        $auditLog = [];

        foreach ($logs as $log) {
            $timestamp = strtotime($log['timestamp']);

            if ($timestamp >= $startTime && $timestamp <= $endTime) {
                $auditLog[] = [
                    'timestamp' => $log['timestamp'],
                    'action' => $log['type'],
                    'exchange' => $log['exchange'],
                    'queue' => $log['queue'],
                    'routing_key' => $log['routing_key'],
                    'user' => $log['user'] ?? 'unknown',
                    'connection' => $log['connection'] ?? 'unknown'
                ];
            }
        }

        return $auditLog;
    }

    public function exportToCsv($auditLog, $filename)
    {
        $fp = fopen($filename, 'w');

        fputcsv($fp, ['Timestamp', 'Action', 'Exchange', 'Queue', 'Routing Key', 'User']);

        foreach ($auditLog as $entry) {
            fputcsv($fp, [
                $entry['timestamp'],
                $entry['action'],
                $entry['exchange'],
                $entry['queue'],
                $entry['routing_key'],
                $entry['user']
            ]);
        }

        fclose($fp);

        echo "审计日志已导出到: {$filename}\n";
    }
}

常见问题与解决方案

问题 1:追踪日志过大

原因:长时间追踪产生大量日志

解决方案

php
<?php

class TraceLogManager
{
    private $configManager;
    private $maxLogSize = 104857600;

    public function rotateTraceLog($traceName)
    {
        $trace = $this->configManager->getTracing($traceName);

        if (!$trace) {
            return;
        }

        $currentSize = $trace['info']['file_size'] ?? 0;

        if ($currentSize > $this->maxLogSize) {
            $this->archiveTraceLog($traceName);
            $this->configManager->deleteTracing($traceName);
            $this->configManager->createTracing($traceName);
        }
    }

    private function archiveTraceLog($traceName)
    {
        $logs = $this->configManager->getTracingFile($traceName);

        $archiveFile = "/var/log/rabbitmq/tracing/{$traceName}_" . date('YmdHis') . ".json";

        file_put_contents($archiveFile, $logs);

        echo "追踪日志已归档: {$archiveFile}\n";
    }

    public function cleanOldArchives($daysToKeep = 7)
    {
        $archiveDir = '/var/log/rabbitmq/tracing';

        $files = glob("{$archiveDir}/*.json");

        $cutoffTime = time() - ($daysToKeep * 86400);

        foreach ($files as $file) {
            if (filemtime($file) < $cutoffTime) {
                unlink($file);
                echo "已删除旧归档: {$file}\n";
            }
        }
    }
}

问题 2:追踪影响性能

原因:追踪增加系统开销

解决方案

php
<?php

class SmartTracer
{
    private $configManager;
    private $samplingRate = 0.1;

    public function shouldTrace()
    {
        return mt_rand() / mt_getrandmax() < $this->samplingRate;
    }

    public function startSamplingTrace($traceName, $samplingRate = 0.1)
    {
        $this->samplingRate = $samplingRate;

        $pattern = $this->buildSamplingPattern();

        $this->configManager->createTracing($traceName, $pattern);
    }

    private function buildSamplingPattern()
    {
        return '#';
    }

    public function setSamplingRate($rate)
    {
        $this->samplingRate = min(1, max(0, $rate));
    }
}

问题 3:敏感信息泄露

原因:追踪日志包含消息内容

解决方案

php
<?php

class SecureTracer
{
    private $sensitiveFields = ['password', 'token', 'secret', 'credit_card'];
    private $tracer;

    public function getSanitizedLogs()
    {
        $logs = $this->tracer->getTraceLogs();

        return array_map([$this, 'sanitizeLog'], $logs);
    }

    private function sanitizeLog($log)
    {
        if (isset($log['payload'])) {
            $log['payload'] = $this->sanitizePayload($log['payload']);
        }

        return $log;
    }

    private function sanitizePayload($payload)
    {
        $data = json_decode($payload, true);

        if (!$data) {
            return '[REDACTED]';
        }

        foreach ($this->sensitiveFields as $field) {
            if (isset($data[$field])) {
                $data[$field] = '***REDACTED***';
            }
        }

        return json_encode($data);
    }

    public function addSensitiveField($field)
    {
        $this->sensitiveFields[] = $field;
    }
}

最佳实践建议

1. 定期清理追踪日志

php
<?php

class TraceCleanupScheduler
{
    private $logManager;

    public function scheduleCleanup()
    {
        $this->logManager = new TraceLogManager();

        $this->logManager->cleanOldArchives(7);
    }
}

2. 追踪告警配置

php
<?php

class TraceAlertConfig
{
    private $alerts = [];

    public function addAlert($name, $condition, $threshold)
    {
        $this->alerts[$name] = [
            'condition' => $condition,
            'threshold' => $threshold
        ];
    }

    public function checkAlerts($metrics)
    {
        $triggered = [];

        foreach ($this->alerts as $name => $config) {
            $value = $metrics[$config['condition']] ?? 0;

            if ($value > $config['threshold']) {
                $triggered[] = [
                    'name' => $name,
                    'condition' => $config['condition'],
                    'value' => $value,
                    'threshold' => $config['threshold']
                ];
            }
        }

        return $triggered;
    }
}

3. 追踪数据导出

php
<?php

class TraceExporter
{
    public function exportToElasticsearch($logs, $index = 'rabbitmq_traces')
    {
        $client = new Elasticsearch\Client();

        foreach ($logs as $log) {
            $params = [
                'index' => $index,
                'body' => $log
            ];

            $client->index($params);
        }
    }

    public function exportToS3($logs, $bucket, $key)
    {
        $s3 = new Aws\S3\S3Client();

        $s3->putObject([
            'Bucket' => $bucket,
            'Key' => $key,
            'Body' => json_encode($logs)
        ]);
    }
}

相关链接