Appearance
消息追踪插件
概述
消息追踪插件(rabbitmq_tracing)是 RabbitMQ 提供的一个强大的消息追踪和调试工具。它可以记录消息的发布和消费情况,帮助开发者排查消息丢失、消息流向等问题,是生产环境中调试和监控的重要工具。
核心知识点
消息追踪架构
mermaid
graph TB
subgraph 生产者
P[消息生产者]
end
subgraph RabbitMQ
E[Exchange]
Q[Queue]
T[Tracing 插件]
L[日志存储]
end
subgraph 消费者
C[消息消费者]
end
P -->|1. 发布消息| E
E -->|2. 记录发布日志| T
T -->|3. 存储| L
E -->|4. 路由| Q
Q -->|5. 记录消费日志| T
T -->|6. 存储| L
Q -->|7. 投递| C
style T fill:#e1f5fe
style L fill:#fff3e0追踪日志格式
mermaid
sequenceDiagram
participant P as 生产者
participant T as Tracing
participant Q as 队列
participant C as 消费者
Note over P,T: 发布追踪
P->>T: publish<br/>exchange=x<br/>routing_key=y
T-->>T: 记录日志
Note over T,C: 消费追踪
Q->>T: deliver<br/>queue=z<br/>consumer_tag=tag
T-->>T: 记录日志
C->>T: ack<br/>delivery_tag=1
T-->>T: 记录确认核心功能
| 功能 | 说明 |
|---|---|
| 发布追踪 | 记录消息发布到交换器的信息 |
| 投递追踪 | 记录消息从队列投递到消费者的信息 |
| 确认追踪 | 记录消费者确认消息的信息 |
| 日志存储 | 将追踪日志存储到文件或数据库 |
PHP 代码示例
安装和配置追踪插件
php
<?php
class TracingPluginManager
{
public function install()
{
echo "安装消息追踪插件...\n";
$commands = [
'rabbitmq-plugins enable rabbitmq_tracing',
'systemctl restart rabbitmq-server'
];
foreach ($commands as $command) {
exec($command, $output, $returnCode);
if ($returnCode !== 0) {
throw new RuntimeException("命令执行失败: {$command}");
}
}
echo "消息追踪插件安装完成\n";
}
public function isEnabled()
{
exec('rabbitmq-plugins list -e', $output);
foreach ($output as $line) {
if (strpos($line, 'rabbitmq_tracing') !== false && strpos($line, '[E]') !== false) {
return true;
}
}
return false;
}
}追踪配置管理
php
<?php
class TracingConfigManager
{
private $apiClient;
public function __construct()
{
$this->apiClient = new RabbitMQApiClient(
'localhost',
15672,
'guest',
'guest'
);
}
public function createTracing($name, $pattern = '#', $maxBytes = 104857600)
{
$config = [
'name' => $name,
'pattern' => $pattern,
'max_bytes' => $maxBytes,
'format' => 'json'
];
$response = $this->apiClient->put(
"/api/traces/%2f/{$name}",
$config
);
echo "追踪已创建: {$name}\n";
return $response;
}
public function deleteTracing($name)
{
$response = $this->apiClient->delete(
"/api/traces/%2f/{$name}"
);
echo "追踪已删除: {$name}\n";
return $response;
}
public function listTracings()
{
return $this->apiClient->get('/api/traces/%2f');
}
public function getTracing($name)
{
return $this->apiClient->get("/api/traces/%2f/{$name}");
}
public function getTracingFile($name)
{
return $this->apiClient->get("/api/traces/%2f/{$name}/get");
}
}
class RabbitMQApiClient
{
private $host;
private $port;
private $user;
private $password;
public function __construct($host, $port, $user, $password)
{
$this->host = $host;
$this->port = $port;
$this->user = $user;
$this->password = $password;
}
public function get($endpoint)
{
return $this->request('GET', $endpoint);
}
public function put($endpoint, $data = [])
{
return $this->request('PUT', $endpoint, $data);
}
public function delete($endpoint)
{
return $this->request('DELETE', $endpoint);
}
private function request($method, $endpoint, $data = [])
{
$url = "http://{$this->host}:{$this->port}{$endpoint}";
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_USERPWD, "{$this->user}:{$this->password}");
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, $method);
curl_setopt($ch, CURLOPT_TIMEOUT, 30);
if (!empty($data)) {
curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data));
curl_setopt($ch, CURLOPT_HTTPHEADER, ['Content-Type: application/json']);
}
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
if ($httpCode >= 400) {
throw new RuntimeException("API 请求失败: HTTP {$httpCode}");
}
return json_decode($response, true);
}
}消息追踪分析器
php
<?php
class MessageTracer
{
private $configManager;
private $traceName = 'message_trace';
public function __construct()
{
$this->configManager = new TracingConfigManager();
}
public function startTracing($pattern = '#')
{
$this->configManager->createTracing($this->traceName, $pattern);
echo "消息追踪已启动,模式: {$pattern}\n";
}
public function stopTracing()
{
$this->configManager->deleteTracing($this->traceName);
echo "消息追踪已停止\n";
}
public function getTraceLogs()
{
$logs = $this->configManager->getTracingFile($this->traceName);
return $this->parseTraceLogs($logs);
}
private function parseTraceLogs($rawLogs)
{
$logs = [];
if (empty($rawLogs)) {
return $logs;
}
$lines = explode("\n", $rawLogs);
foreach ($lines as $line) {
if (empty(trim($line))) {
continue;
}
$log = json_decode($line, true);
if ($log) {
$logs[] = $this->formatLogEntry($log);
}
}
return $logs;
}
private function formatLogEntry($log)
{
return [
'timestamp' => $log['timestamp'] ?? null,
'type' => $log['type'] ?? null,
'exchange' => $log['exchange'] ?? null,
'queue' => $log['queue'] ?? null,
'routing_key' => $log['routing_key'] ?? null,
'payload' => $log['payload'] ?? null,
'properties' => $log['properties'] ?? []
];
}
public function analyzeMessageFlow($messageId)
{
$logs = $this->getTraceLogs();
$flow = [];
foreach ($logs as $log) {
$properties = $log['properties'];
$correlationId = $properties['correlation_id'] ?? null;
if ($correlationId === $messageId) {
$flow[] = $log;
}
}
return $this->buildFlowDiagram($flow);
}
private function buildFlowDiagram($flow)
{
usort($flow, function ($a, $b) {
return strcmp($a['timestamp'], $b['timestamp']);
});
return [
'message_id' => $flow[0]['properties']['correlation_id'] ?? null,
'total_steps' => count($flow),
'steps' => $flow
];
}
}消息丢失检测
php
<?php
class MessageLossDetector
{
private $tracer;
public function __construct()
{
$this->tracer = new MessageTracer();
}
public function detectLostMessages($timeRange = 3600)
{
$logs = $this->tracer->getTraceLogs();
$published = [];
$consumed = [];
$cutoffTime = time() - $timeRange;
foreach ($logs as $log) {
$timestamp = strtotime($log['timestamp']);
if ($timestamp < $cutoffTime) {
continue;
}
$messageId = $log['properties']['message_id'] ?? null;
if (!$messageId) {
continue;
}
if ($log['type'] === 'publish') {
$published[$messageId] = $log;
} elseif ($log['type'] === 'deliver') {
$consumed[$messageId] = $log;
}
}
$lost = [];
foreach ($published as $messageId => $publishLog) {
if (!isset($consumed[$messageId])) {
$lost[] = [
'message_id' => $messageId,
'published_at' => $publishLog['timestamp'],
'exchange' => $publishLog['exchange'],
'routing_key' => $publishLog['routing_key']
];
}
}
return [
'total_published' => count($published),
'total_consumed' => count($consumed),
'lost_count' => count($lost),
'lost_messages' => $lost
];
}
public function detectUnackedMessages()
{
$logs = $this->tracer->getTraceLogs();
$delivered = [];
$acked = [];
foreach ($logs as $log) {
$deliveryTag = $log['delivery_tag'] ?? null;
if (!$deliveryTag) {
continue;
}
if ($log['type'] === 'deliver') {
$delivered[$deliveryTag] = $log;
} elseif ($log['type'] === 'ack') {
$acked[$deliveryTag] = $log;
}
}
$unacked = [];
foreach ($delivered as $tag => $deliverLog) {
if (!isset($acked[$tag])) {
$unacked[] = [
'delivery_tag' => $tag,
'delivered_at' => $deliverLog['timestamp'],
'queue' => $deliverLog['queue']
];
}
}
return [
'total_delivered' => count($delivered),
'total_acked' => count($acked),
'unacked_count' => count($unacked),
'unacked_messages' => $unacked
];
}
}完整示例:消息追踪系统
php
<?php
class TracingSystem
{
private $tracer;
private $lossDetector;
public function __construct()
{
$this->tracer = new MessageTracer();
$this->lossDetector = new MessageLossDetector();
}
public function startMonitoring()
{
$this->tracer->startTracing();
echo "消息追踪监控已启动\n";
}
public function stopMonitoring()
{
$this->tracer->stopTracing();
echo "消息追踪监控已停止\n";
}
public function generateReport()
{
$lostMessages = $this->lossDetector->detectLostMessages();
$unackedMessages = $this->lossDetector->detectUnackedMessages();
$report = [
'generated_at' => date('Y-m-d H:i:s'),
'summary' => [
'total_published' => $lostMessages['total_published'],
'total_consumed' => $lostMessages['total_consumed'],
'lost_messages' => $lostMessages['lost_count'],
'unacked_messages' => $unackedMessages['unacked_count']
],
'details' => [
'lost_messages' => $lostMessages['lost_messages'],
'unacked_messages' => $unackedMessages['unacked_messages']
]
];
return $report;
}
public function traceMessage($messageId)
{
return $this->tracer->analyzeMessageFlow($messageId);
}
}实际应用场景
1. 订单消息追踪
php
<?php
class OrderMessageTracer
{
private $tracer;
public function __construct()
{
$this->tracer = new MessageTracer();
}
public function traceOrderMessage($orderId)
{
return $this->tracer->analyzeMessageFlow("order_{$orderId}");
}
public function findOrderMessageIssues($orderId)
{
$flow = $this->traceOrderMessage($orderId);
$issues = [];
$expectedSteps = ['publish', 'deliver', 'ack'];
$actualSteps = array_column($flow['steps'], 'type');
foreach ($expectedSteps as $step) {
if (!in_array($step, $actualSteps)) {
$issues[] = "缺少步骤: {$step}";
}
}
return [
'order_id' => $orderId,
'flow' => $flow,
'issues' => $issues,
'has_issues' => !empty($issues)
];
}
}2. 性能分析
php
<?php
class PerformanceAnalyzer
{
private $tracer;
public function __construct()
{
$this->tracer = new MessageTracer();
}
public function analyzeLatency($queueName)
{
$logs = $this->tracer->getTraceLogs();
$latencies = [];
$publishTimes = [];
$deliverTimes = [];
foreach ($logs as $log) {
if ($log['queue'] !== $queueName) {
continue;
}
$messageId = $log['properties']['message_id'] ?? null;
if (!$messageId) {
continue;
}
$timestamp = strtotime($log['timestamp']);
if ($log['type'] === 'publish') {
$publishTimes[$messageId] = $timestamp;
} elseif ($log['type'] === 'deliver') {
$deliverTimes[$messageId] = $timestamp;
}
}
foreach ($publishTimes as $messageId => $publishTime) {
if (isset($deliverTimes[$messageId])) {
$latencies[] = $deliverTimes[$messageId] - $publishTime;
}
}
if (empty($latencies)) {
return null;
}
return [
'count' => count($latencies),
'min' => min($latencies),
'max' => max($latencies),
'avg' => array_sum($latencies) / count($latencies),
'p50' => $this->percentile($latencies, 50),
'p95' => $this->percentile($latencies, 95),
'p99' => $this->percentile($latencies, 99)
];
}
private function percentile($data, $percentile)
{
sort($data);
$index = ceil(($percentile / 100) * count($data)) - 1;
return $data[$index] ?? 0;
}
}3. 审计日志
php
<?php
class AuditLogger
{
private $tracer;
public function __construct()
{
$this->tracer = new MessageTracer();
}
public function generateAuditLog($startTime, $endTime)
{
$logs = $this->tracer->getTraceLogs();
$auditLog = [];
foreach ($logs as $log) {
$timestamp = strtotime($log['timestamp']);
if ($timestamp >= $startTime && $timestamp <= $endTime) {
$auditLog[] = [
'timestamp' => $log['timestamp'],
'action' => $log['type'],
'exchange' => $log['exchange'],
'queue' => $log['queue'],
'routing_key' => $log['routing_key'],
'user' => $log['user'] ?? 'unknown',
'connection' => $log['connection'] ?? 'unknown'
];
}
}
return $auditLog;
}
public function exportToCsv($auditLog, $filename)
{
$fp = fopen($filename, 'w');
fputcsv($fp, ['Timestamp', 'Action', 'Exchange', 'Queue', 'Routing Key', 'User']);
foreach ($auditLog as $entry) {
fputcsv($fp, [
$entry['timestamp'],
$entry['action'],
$entry['exchange'],
$entry['queue'],
$entry['routing_key'],
$entry['user']
]);
}
fclose($fp);
echo "审计日志已导出到: {$filename}\n";
}
}常见问题与解决方案
问题 1:追踪日志过大
原因:长时间追踪产生大量日志
解决方案:
php
<?php
class TraceLogManager
{
private $configManager;
private $maxLogSize = 104857600;
public function rotateTraceLog($traceName)
{
$trace = $this->configManager->getTracing($traceName);
if (!$trace) {
return;
}
$currentSize = $trace['info']['file_size'] ?? 0;
if ($currentSize > $this->maxLogSize) {
$this->archiveTraceLog($traceName);
$this->configManager->deleteTracing($traceName);
$this->configManager->createTracing($traceName);
}
}
private function archiveTraceLog($traceName)
{
$logs = $this->configManager->getTracingFile($traceName);
$archiveFile = "/var/log/rabbitmq/tracing/{$traceName}_" . date('YmdHis') . ".json";
file_put_contents($archiveFile, $logs);
echo "追踪日志已归档: {$archiveFile}\n";
}
public function cleanOldArchives($daysToKeep = 7)
{
$archiveDir = '/var/log/rabbitmq/tracing';
$files = glob("{$archiveDir}/*.json");
$cutoffTime = time() - ($daysToKeep * 86400);
foreach ($files as $file) {
if (filemtime($file) < $cutoffTime) {
unlink($file);
echo "已删除旧归档: {$file}\n";
}
}
}
}问题 2:追踪影响性能
原因:追踪增加系统开销
解决方案:
php
<?php
class SmartTracer
{
private $configManager;
private $samplingRate = 0.1;
public function shouldTrace()
{
return mt_rand() / mt_getrandmax() < $this->samplingRate;
}
public function startSamplingTrace($traceName, $samplingRate = 0.1)
{
$this->samplingRate = $samplingRate;
$pattern = $this->buildSamplingPattern();
$this->configManager->createTracing($traceName, $pattern);
}
private function buildSamplingPattern()
{
return '#';
}
public function setSamplingRate($rate)
{
$this->samplingRate = min(1, max(0, $rate));
}
}问题 3:敏感信息泄露
原因:追踪日志包含消息内容
解决方案:
php
<?php
class SecureTracer
{
private $sensitiveFields = ['password', 'token', 'secret', 'credit_card'];
private $tracer;
public function getSanitizedLogs()
{
$logs = $this->tracer->getTraceLogs();
return array_map([$this, 'sanitizeLog'], $logs);
}
private function sanitizeLog($log)
{
if (isset($log['payload'])) {
$log['payload'] = $this->sanitizePayload($log['payload']);
}
return $log;
}
private function sanitizePayload($payload)
{
$data = json_decode($payload, true);
if (!$data) {
return '[REDACTED]';
}
foreach ($this->sensitiveFields as $field) {
if (isset($data[$field])) {
$data[$field] = '***REDACTED***';
}
}
return json_encode($data);
}
public function addSensitiveField($field)
{
$this->sensitiveFields[] = $field;
}
}最佳实践建议
1. 定期清理追踪日志
php
<?php
class TraceCleanupScheduler
{
private $logManager;
public function scheduleCleanup()
{
$this->logManager = new TraceLogManager();
$this->logManager->cleanOldArchives(7);
}
}2. 追踪告警配置
php
<?php
class TraceAlertConfig
{
private $alerts = [];
public function addAlert($name, $condition, $threshold)
{
$this->alerts[$name] = [
'condition' => $condition,
'threshold' => $threshold
];
}
public function checkAlerts($metrics)
{
$triggered = [];
foreach ($this->alerts as $name => $config) {
$value = $metrics[$config['condition']] ?? 0;
if ($value > $config['threshold']) {
$triggered[] = [
'name' => $name,
'condition' => $config['condition'],
'value' => $value,
'threshold' => $config['threshold']
];
}
}
return $triggered;
}
}3. 追踪数据导出
php
<?php
class TraceExporter
{
public function exportToElasticsearch($logs, $index = 'rabbitmq_traces')
{
$client = new Elasticsearch\Client();
foreach ($logs as $log) {
$params = [
'index' => $index,
'body' => $log
];
$client->index($params);
}
}
public function exportToS3($logs, $bucket, $key)
{
$s3 = new Aws\S3\S3Client();
$s3->putObject([
'Bucket' => $bucket,
'Key' => $key,
'Body' => json_encode($logs)
]);
}
}