Skip to content

RabbitMQ 消息模型(Message Model)

概述

消息模型是 RabbitMQ 中定义消息结构和行为的规范。它包括消息的组成、属性、生命周期以及消息传递的语义保证。理解消息模型对于构建可靠的消息系统至关重要,它决定了消息如何被创建、传输、存储和消费。

消息模型的核心组成

mermaid
graph TB
    subgraph 消息模型
        A[Message] --> B[Body 消息体]
        A --> C[Properties 消息属性]
        A --> D[Delivery 传递信息]
        
        C --> C1[content_type]
        C --> C2[delivery_mode]
        C --> C3[priority]
        C --> C4[expiration]
        C --> C5[headers]
        
        D --> D1[delivery_tag]
        D --> D2[exchange]
        D --> D3[routing_key]
        D --> D4[redelivered]
    end

核心知识点

1. 消息结构

一条完整的 RabbitMQ 消息包含以下部分:

mermaid
graph LR
    subgraph 消息组成
        M[Message] --> B[Body]
        M --> P[Properties]
        M --> E[Envelope]
    end
    
    B --> B1["实际数据 (JSON/XML/Binary)"]
    P --> P1["元数据"]
    E --> E1["传递信息"]

消息体(Body)

  • 消息的实际内容
  • 可以是任意格式(JSON、XML、二进制等)
  • 最大大小受内存和磁盘限制

消息属性(Properties)

属性类型说明
content_typestring消息体的 MIME 类型
content_encodingstring消息体的编码方式
headerstable自定义消息头
delivery_modeint1=非持久化,2=持久化
priorityint消息优先级(0-9)
correlation_idstring关联 ID(RPC 场景)
reply_tostring回复队列名称
expirationstring消息过期时间(毫秒)
message_idstring消息唯一标识
timestamptimestamp消息创建时间
typestring消息类型标识
user_idstring用户 ID
app_idstring应用标识
cluster_idstring集群标识

2. 消息生命周期

mermaid
stateDiagram-v2
    [*] --> Created: 生产者创建
    Created --> Published: 发布到 Exchange
    Published --> Routed: 路由到 Queue
    Published --> Unroutable: 无匹配路由
    
    Routed --> Ready: 进入队列等待
    Ready --> Unacked: 消费者获取
    Unacked --> Acked: 确认成功
    Unacked --> Ready: NACK(requeue)
    Unacked --> Dead: NACK/过期/拒绝
    
    Acked --> [*]: 从队列删除
    Dead --> [*]: 转死信队列
    Unroutable --> [*]: 丢弃或返回

3. 消息持久化

mermaid
graph TB
    subgraph 持久化三要素
        A[Exchange 持久化] --> D[durable=true]
        B[Queue 持久化] --> D
        C[Message 持久化] --> E[delivery_mode=2]
    end
    
    D --> F[Broker 重启后保留]
    E --> F

4. 消息确认机制

mermaid
sequenceDiagram
    participant B as Broker
    participant C as Consumer
    
    B->>C: 投递消息
    Note over C: 消息状态: Unacked
    
    alt 自动确认
        Note over C: auto_ack=true
        C->>B: 自动确认
    else 手动确认
        Note over C: auto_ack=false
        C->>C: 处理消息
        alt 处理成功
            C->>B: basic.ack
        else 处理失败-重试
            C->>B: basic.nack(requeue=true)
        else 处理失败-丢弃
            C->>B: basic.reject(requeue=false)
        end
    end
    
    Note over B: 删除已确认消息

5. 消息可靠性保证

mermaid
graph TB
    subgraph 生产者端
        A[Publisher Confirms]
        B[事务机制]
    end
    
    subgraph Broker 端
        C[消息持久化]
        D[镜像队列]
    end
    
    subgraph 消费者端
        E[手动确认]
        F[幂等处理]
    end
    
    A --> G[可靠传递]
    B --> G
    C --> G
    D --> G
    E --> G
    F --> G

代码示例

基础消息创建

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class MessageBuilder
{
    private $body;
    private $properties = [];
    private $headers = [];
    
    public static function create(): self
    {
        return new self();
    }
    
    public function withBody($data): self
    {
        if (is_string($data)) {
            $this->body = $data;
        } else {
            $this->body = json_encode($data);
            $this->properties['content_type'] = 'application/json';
        }
        return $this;
    }
    
    public function withContentType(string $type): self
    {
        $this->properties['content_type'] = $type;
        return $this;
    }
    
    public function persistent(): self
    {
        $this->properties['delivery_mode'] = AMQPMessage::DELIVERY_MODE_PERSISTENT;
        return $this;
    }
    
    public function withPriority(int $priority): self
    {
        $this->properties['priority'] = min(9, max(0, $priority));
        return $this;
    }
    
    public function withExpiration(int $milliseconds): self
    {
        $this->properties['expiration'] = (string)$milliseconds;
        return $this;
    }
    
    public function withMessageId(string $id): self
    {
        $this->properties['message_id'] = $id;
        return $this;
    }
    
    public function withCorrelationId(string $id): self
    {
        $this->properties['correlation_id'] = $id;
        return $this;
    }
    
    public function withReplyTo(string $queue): self
    {
        $this->properties['reply_to'] = $queue;
        return $this;
    }
    
    public function withTimestamp(?int $timestamp = null): self
    {
        $this->properties['timestamp'] = $timestamp ?? time();
        return $this;
    }
    
    public function withType(string $type): self
    {
        $this->properties['type'] = $type;
        return $this;
    }
    
    public function withUserId(string $userId): self
    {
        $this->properties['user_id'] = $userId;
        return $this;
    }
    
    public function withAppId(string $appId): self
    {
        $this->properties['app_id'] = $appId;
        return $this;
    }
    
    public function withHeader(string $key, $value): self
    {
        $this->headers[$key] = $value;
        return $this;
    }
    
    public function withHeaders(array $headers): self
    {
        $this->headers = array_merge($this->headers, $headers);
        return $this;
    }
    
    public function build(): AMQPMessage
    {
        if (!empty($this->headers)) {
            $this->properties['application_headers'] = new AMQPTable($this->headers);
        }
        
        return new AMQPMessage($this->body, $this->properties);
    }
}

$message = MessageBuilder::create()
    ->withBody(['order_id' => 1001, 'status' => 'created'])
    ->withContentType('application/json')
    ->persistent()
    ->withPriority(5)
    ->withMessageId(uniqid('msg_', true))
    ->withTimestamp()
    ->withType('order.created')
    ->withAppId('order_service')
    ->withHeader('trace_id', 'abc123')
    ->withHeader('source', 'web_frontend')
    ->build();

echo "Message created with body: " . $message->getBody() . "\n";

消息属性详解

php
<?php

class MessagePropertiesDemo
{
    public function createStandardMessage(): AMQPMessage
    {
        return new AMQPMessage(
            json_encode(['data' => 'example']),
            [
                'content_type' => 'application/json',
                'content_encoding' => 'utf-8',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'priority' => 5,
                'message_id' => uniqid('msg_', true),
                'timestamp' => time(),
                'type' => 'standard',
                'app_id' => 'demo_app'
            ]
        );
    }
    
    public function createPriorityMessage(int $priority, string $body): AMQPMessage
    {
        return new AMQPMessage($body, [
            'priority' => min(9, max(0, $priority)),
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
        ]);
    }
    
    public function createTTLMessage(string $body, int $ttlMs): AMQPMessage
    {
        return new AMQPMessage($body, [
            'expiration' => (string)$ttlMs,
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
        ]);
    }
    
    public function createRPCRequest(string $body, string $replyTo, string $correlationId): AMQPMessage
    {
        return new AMQPMessage($body, [
            'reply_to' => $replyTo,
            'correlation_id' => $correlationId,
            'content_type' => 'application/json',
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
        ]);
    }
    
    public function createHeadersMessage(string $body, array $headers): AMQPMessage
    {
        return new AMQPMessage($body, [
            'application_headers' => new AMQPTable($headers),
            'content_type' => 'application/json'
        ]);
    }
    
    public function createTracedMessage(string $body, string $traceId, string $spanId): AMQPMessage
    {
        return new AMQPMessage($body, [
            'application_headers' => new AMQPTable([
                'trace_id' => $traceId,
                'span_id' => $spanId,
                'timestamp' => microtime(true)
            ]),
            'content_type' => 'application/json',
            'message_id' => uniqid('msg_', true)
        ]);
    }
}

消息消费与处理

php
<?php

use PhpAmqpLib\Message\AMQPMessage;

class MessageHandler
{
    public function handleMessage(AMQPMessage $message): void
    {
        echo "=== Message Received ===\n";
        
        echo "Body: " . $message->getBody() . "\n";
        
        echo "\n--- Properties ---\n";
        $properties = $message->get_properties();
        foreach ($properties as $key => $value) {
            echo "{$key}: " . json_encode($value) . "\n";
        }
        
        echo "\n--- Headers ---\n";
        if ($message->has('application_headers')) {
            $headers = $message->get('application_headers');
            if ($headers instanceof AMQPTable) {
                foreach ($headers->getNativeData() as $key => $value) {
                    echo "{$key}: " . json_encode($value) . "\n";
                }
            }
        }
        
        echo "\n--- Delivery Info ---\n";
        echo "Exchange: " . $message->getExchange() . "\n";
        echo "Routing Key: " . $message->getRoutingKey() . "\n";
        echo "Delivery Tag: " . $message->getDeliveryTag() . "\n";
        echo "Redelivered: " . ($message->isRedelivery() ? 'Yes' : 'No') . "\n";
        
        echo "\n--- Consumer Tag ---\n";
        echo "Consumer Tag: " . $message->getConsumerTag() . "\n";
    }
    
    public function parseJsonMessage(AMQPMessage $message): array
    {
        $body = $message->getBody();
        $contentType = $message->get('content_type');
        
        if ($contentType === 'application/json') {
            return json_decode($body, true);
        }
        
        return ['raw' => $body];
    }
    
    public function extractTraceContext(AMQPMessage $message): array
    {
        $headers = [];
        
        if ($message->has('application_headers')) {
            $appHeaders = $message->get('application_headers');
            if ($appHeaders instanceof AMQPTable) {
                $nativeData = $appHeaders->getNativeData();
                $headers = [
                    'trace_id' => $nativeData['trace_id'] ?? null,
                    'span_id' => $nativeData['span_id'] ?? null
                ];
            }
        }
        
        return $headers;
    }
}

消息确认机制

php
<?php

class MessageAcknowledgment
{
    public function acknowledge(AMQPMessage $message): void
    {
        $message->ack();
        echo "Message acknowledged: " . $message->getDeliveryTag() . "\n";
    }
    
    public function acknowledgeMultiple(AMQPMessage $message): void
    {
        $message->ack(true);
        echo "Multiple messages acknowledged up to: " . $message->getDeliveryTag() . "\n";
    }
    
    public function rejectAndRequeue(AMQPMessage $message): void
    {
        $message->nack(true, true);
        echo "Message rejected and requeued: " . $message->getDeliveryTag() . "\n";
    }
    
    public function rejectAndDiscard(AMQPMessage $message): void
    {
        $message->nack(false, false);
        echo "Message rejected and discarded: " . $message->getDeliveryTag() . "\n";
    }
    
    public function rejectSingle(AMQPMessage $message): void
    {
        $message->reject(false);
        echo "Message rejected: " . $message->getDeliveryTag() . "\n";
    }
    
    public function rejectWithDeadLetter(AMQPMessage $message): void
    {
        $message->reject(false);
        echo "Message rejected, will go to DLQ: " . $message->getDeliveryTag() . "\n";
    }
}

消息幂等性处理

php
<?php

class IdempotentMessageProcessor
{
    private $processedMessages;
    
    public function __construct()
    {
        $this->processedMessages = [];
    }
    
    public function process(AMQPMessage $message, callable $processor): void
    {
        $messageId = $this->getMessageId($message);
        
        if ($this->isProcessed($messageId)) {
            echo "Message {$messageId} already processed, skipping\n";
            $message->ack();
            return;
        }
        
        try {
            $processor($message);
            
            $this->markAsProcessed($messageId);
            $message->ack();
            
            echo "Message {$messageId} processed successfully\n";
        } catch (Exception $e) {
            echo "Failed to process message {$messageId}: {$e->getMessage()}\n";
            $message->nack(true);
        }
    }
    
    private function getMessageId(AMQPMessage $message): string
    {
        $messageId = $message->get('message_id');
        
        if ($messageId) {
            return $messageId;
        }
        
        $body = $message->getBody();
        $data = json_decode($body, true);
        
        if (isset($data['message_id'])) {
            return $data['message_id'];
        }
        
        return md5($body);
    }
    
    private function isProcessed(string $messageId): bool
    {
        return isset($this->processedMessages[$messageId]);
    }
    
    private function markAsProcessed(string $messageId): void
    {
        $this->processedMessages[$messageId] = time();
        
        $this->cleanupOldEntries();
    }
    
    private function cleanupOldEntries(): void
    {
        $maxSize = 10000;
        
        if (count($this->processedMessages) > $maxSize) {
            $this->processedMessages = array_slice(
                $this->processedMessages,
                -$maxSize / 2,
                null,
                true
            );
        }
    }
}

消息重试机制

php
<?php

class RetryMessageProcessor
{
    private $maxRetries = 3;
    private $retryDelays = [1000, 5000, 15000, 60000];
    
    public function processWithRetry(AMQPMessage $message, callable $processor): void
    {
        $data = json_decode($message->getBody(), true);
        $retryCount = $data['_retry_count'] ?? 0;
        
        try {
            $processor($data);
            $message->ack();
            echo "Message processed successfully\n";
        } catch (Exception $e) {
            echo "Processing failed (attempt " . ($retryCount + 1) . "): {$e->getMessage()}\n";
            
            if ($retryCount < $this->maxRetries) {
                $this->scheduleRetry($message, $retryCount, $e->getMessage());
            } else {
                $this->sendToDeadLetterQueue($message, $e->getMessage());
            }
        }
    }
    
    private function scheduleRetry(AMQPMessage $message, int $retryCount, string $error): void
    {
        $data = json_decode($message->getBody(), true);
        $data['_retry_count'] = $retryCount + 1;
        $data['_last_error'] = $error;
        $data['_last_retry_at'] = date('Y-m-d H:i:s');
        
        $delay = $this->retryDelays[$retryCount] ?? end($this->retryDelays);
        
        $newMessage = new AMQPMessage(
            json_encode($data),
            [
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'expiration' => (string)$delay
            ]
        );
        
        $channel = $message->getChannel();
        $channel->basic_publish($newMessage, '', 'retry_queue');
        
        $message->ack();
        
        echo "Scheduled retry #{$retryCount} with delay {$delay}ms\n";
    }
    
    private function sendToDeadLetterQueue(AMQPMessage $message, string $error): void
    {
        $data = json_decode($message->getBody(), true);
        $data['_dead_letter_reason'] = 'max_retries_exceeded';
        $data['_dead_letter_error'] = $error;
        $data['_dead_letter_at'] = date('Y-m-d H:i:s');
        
        $dlqMessage = new AMQPMessage(
            json_encode($data),
            ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
        );
        
        $channel = $message->getChannel();
        $channel->basic_publish($dlqMessage, '', 'dead_letter_queue');
        
        $message->ack();
        
        echo "Sent to dead letter queue\n";
    }
}

实际应用场景

1. 事件消息模型

php
<?php

class EventMessageModel
{
    public static function createEvent(string $eventType, array $data, array $metadata = []): AMQPMessage
    {
        $event = [
            'event_id' => uniqid('evt_', true),
            'event_type' => $eventType,
            'data' => $data,
            'metadata' => array_merge([
                'timestamp' => time(),
                'source' => gethostname(),
                'version' => '1.0'
            ], $metadata)
        ];
        
        return new AMQPMessage(
            json_encode($event),
            [
                'content_type' => 'application/json',
                'type' => $eventType,
                'message_id' => $event['event_id'],
                'timestamp' => time(),
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
            ]
        );
    }
    
    public static function parseEvent(AMQPMessage $message): array
    {
        $body = json_decode($message->getBody(), true);
        
        return [
            'event_id' => $body['event_id'] ?? $message->get('message_id'),
            'event_type' => $body['event_type'] ?? $message->get('type'),
            'data' => $body['data'] ?? [],
            'metadata' => $body['metadata'] ?? [],
            'timestamp' => $body['metadata']['timestamp'] ?? $message->get('timestamp')
        ];
    }
}

$orderCreatedEvent = EventMessageModel::createEvent(
    'order.created',
    ['order_id' => 1001, 'user_id' => 5001, 'amount' => 299.99],
    ['correlation_id' => 'req_123', 'trace_id' => 'trace_abc']
);

2. 命令消息模型

php
<?php

class CommandMessageModel
{
    public static function createCommand(
        string $command,
        array $payload,
        ?string $replyTo = null,
        ?string $correlationId = null
    ): AMQPMessage {
        $cmd = [
            'command_id' => uniqid('cmd_', true),
            'command' => $command,
            'payload' => $payload,
            'created_at' => date('Y-m-d H:i:s')
        ];
        
        $properties = [
            'content_type' => 'application/json',
            'type' => $command,
            'message_id' => $cmd['command_id'],
            'timestamp' => time(),
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
        ];
        
        if ($replyTo) {
            $properties['reply_to'] = $replyTo;
        }
        
        if ($correlationId) {
            $properties['correlation_id'] = $correlationId;
        }
        
        return new AMQPMessage(json_encode($cmd), $properties);
    }
    
    public static function createResponse(
        string $correlationId,
        bool $success,
        $result = null,
        ?string $error = null
    ): AMQPMessage {
        $response = [
            'success' => $success,
            'result' => $result,
            'error' => $error,
            'timestamp' => time()
        ];
        
        return new AMQPMessage(
            json_encode($response),
            [
                'content_type' => 'application/json',
                'correlation_id' => $correlationId,
                'type' => 'response'
            ]
        );
    }
}

$createCommand = CommandMessageModel::createCommand(
    'create_order',
    ['user_id' => 5001, 'items' => [['product_id' => 101, 'qty' => 2]]],
    'reply_queue',
    'req_123'
);

3. 文档消息模型

php
<?php

class DocumentMessageModel
{
    public static function createDocument(
        string $documentType,
        array $document,
        array $options = []
    ): AMQPMessage {
        $envelope = [
            'document_id' => $document['id'] ?? uniqid('doc_', true),
            'document_type' => $documentType,
            'document' => $document,
            'created_at' => date('Y-m-d H:i:s'),
            'version' => $options['version'] ?? 1
        ];
        
        $properties = [
            'content_type' => 'application/json',
            'type' => $documentType,
            'message_id' => $envelope['document_id'],
            'timestamp' => time(),
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
        ];
        
        if (isset($options['priority'])) {
            $properties['priority'] = $options['priority'];
        }
        
        if (isset($options['ttl'])) {
            $properties['expiration'] = (string)$options['ttl'];
        }
        
        return new AMQPMessage(json_encode($envelope), $properties);
    }
    
    public static function parseDocument(AMQPMessage $message): array
    {
        $body = json_decode($message->getBody(), true);
        
        return [
            'document_id' => $body['document_id'],
            'document_type' => $body['document_type'],
            'document' => $body['document'],
            'version' => $body['version'] ?? 1
        ];
    }
}

$orderDocument = DocumentMessageModel::createDocument(
    'order',
    [
        'id' => 'ORD-001',
        'customer' => 'John Doe',
        'items' => [['product' => 'Widget', 'price' => 29.99]]
    ],
    ['priority' => 5, 'version' => 2]
);

常见问题与解决方案

1. 消息过大问题

问题原因

  • 消息体超过限制
  • 内存不足
  • 网络传输慢

解决方案

php
<?php

class LargeMessageHandler
{
    private $maxMessageSize = 16 * 1024 * 1024;
    
    public function splitLargeMessage(array $data, int $chunkSize = 1024 * 1024): array
    {
        $encoded = json_encode($data);
        
        if (strlen($encoded) <= $this->maxMessageSize) {
            return [new AMQPMessage($encoded, [
                'content_type' => 'application/json',
                'headers' => new AMQPTable(['chunked' => false])
            ])];
        }
        
        $chunks = str_split($encoded, $chunkSize);
        $messageId = uniqid('chunk_', true);
        $totalChunks = count($chunks);
        
        $messages = [];
        foreach ($chunks as $index => $chunk) {
            $messages[] = new AMQPMessage($chunk, [
                'content_type' => 'application/octet-stream',
                'message_id' => $messageId,
                'headers' => new AMQPTable([
                    'chunked' => true,
                    'chunk_index' => $index,
                    'total_chunks' => $totalChunks
                ])
            ]);
        }
        
        return $messages;
    }
    
    public function reassembleChunks(array $messages): string
    {
        usort($messages, function ($a, $b) {
            $headersA = $a->get('application_headers')->getNativeData();
            $headersB = $b->get('application_headers')->getNativeData();
            return $headersA['chunk_index'] <=> $headersB['chunk_index'];
        });
        
        $body = '';
        foreach ($messages as $message) {
            $body .= $message->getBody();
        }
        
        return $body;
    }
}

2. 消息格式兼容性

问题原因

  • 版本升级
  • 不同服务使用不同格式
  • 缺少版本控制

解决方案

php
<?php

class VersionedMessageModel
{
    private const CURRENT_VERSION = '2.0';
    
    public static function create(array $data, string $version = null): AMQPMessage
    {
        $version = $version ?? self::CURRENT_VERSION;
        
        $envelope = [
            'version' => $version,
            'data' => $data,
            'created_at' => time()
        ];
        
        return new AMQPMessage(
            json_encode($envelope),
            [
                'content_type' => 'application/json',
                'headers' => new AMQPTable(['version' => $version])
            ]
        );
    }
    
    public static function parse(AMQPMessage $message): array
    {
        $body = json_decode($message->getBody(), true);
        $version = $body['version'] ?? '1.0';
        
        return self::migrate($body['data'], $version);
    }
    
    private static function migrate(array $data, string $fromVersion): array
    {
        $migrations = [
            '1.0' => 'migrateV1ToV2',
            '2.0' => null
        ];
        
        $currentVersion = $fromVersion;
        
        while ($currentVersion !== self::CURRENT_VERSION && isset($migrations[$currentVersion])) {
            $migrationMethod = $migrations[$currentVersion];
            if ($migrationMethod) {
                $data = self::$migrationMethod($data);
            }
            $currentVersion = self::CURRENT_VERSION;
        }
        
        return $data;
    }
    
    private static function migrateV1ToV2(array $data): array
    {
        if (isset($data['order_id'])) {
            $data['id'] = $data['order_id'];
            unset($data['order_id']);
        }
        
        return $data;
    }
}

3. 消息追踪问题

问题原因

  • 缺少追踪信息
  • 日志不完整
  • 难以排查问题

解决方案

php
<?php

class TracedMessageModel
{
    public static function create(
        array $data,
        ?string $traceId = null,
        ?string $spanId = null
    ): AMQPMessage {
        $traceId = $traceId ?? self::generateTraceId();
        $spanId = $spanId ?? self::generateSpanId();
        
        return new AMQPMessage(
            json_encode($data),
            [
                'content_type' => 'application/json',
                'message_id' => uniqid('msg_', true),
                'timestamp' => time(),
                'headers' => new AMQPTable([
                    'trace_id' => $traceId,
                    'span_id' => $spanId,
                    'parent_span_id' => null,
                    'sampled' => true
                ])
            ]
        );
    }
    
    public static function createChild(
        AMQPMessage $parent,
        array $data
    ): AMQPMessage {
        $parentHeaders = $parent->get('application_headers');
        $parentData = $parentHeaders ? $parentHeaders->getNativeData() : [];
        
        $traceId = $parentData['trace_id'] ?? self::generateTraceId();
        $parentSpanId = $parentData['span_id'] ?? null;
        $childSpanId = self::generateSpanId();
        
        return new AMQPMessage(
            json_encode($data),
            [
                'content_type' => 'application/json',
                'message_id' => uniqid('msg_', true),
                'timestamp' => time(),
                'headers' => new AMQPTable([
                    'trace_id' => $traceId,
                    'span_id' => $childSpanId,
                    'parent_span_id' => $parentSpanId,
                    'sampled' => $parentData['sampled'] ?? true
                ])
            ]
        );
    }
    
    public static function extractTraceContext(AMQPMessage $message): array
    {
        $headers = $message->get('application_headers');
        
        if (!$headers) {
            return [];
        }
        
        $data = $headers->getNativeData();
        
        return [
            'trace_id' => $data['trace_id'] ?? null,
            'span_id' => $data['span_id'] ?? null,
            'parent_span_id' => $data['parent_span_id'] ?? null
        ];
    }
    
    private static function generateTraceId(): string
    {
        return bin2hex(random_bytes(16));
    }
    
    private static function generateSpanId(): string
    {
        return bin2hex(random_bytes(8));
    }
}

最佳实践建议

1. 消息设计原则

php
<?php

class MessageDesignPrinciples
{
    public static function createWellDesignedMessage(array $data): AMQPMessage
    {
        return MessageBuilder::create()
            ->withBody($data)
            ->withContentType('application/json')
            ->persistent()
            ->withMessageId(self::generateMessageId())
            ->withTimestamp()
            ->withHeader('version', '1.0')
            ->withHeader('source', self::getSource())
            ->build();
    }
    
    private static function generateMessageId(): string
    {
        return sprintf(
            '%s-%s-%s',
            date('YmdHis'),
            bin2hex(random_bytes(4)),
            getmypid()
        );
    }
    
    private static function getSource(): string
    {
        return gethostname() . ':' . getmypid();
    }
}

2. 消息验证

php
<?php

class MessageValidator
{
    public static function validate(AMQPMessage $message, array $rules): array
    {
        $errors = [];
        $body = json_decode($message->getBody(), true);
        
        foreach ($rules as $field => $rule) {
            if (!isset($body[$field]) && ($rule['required'] ?? false)) {
                $errors[] = "Field '{$field}' is required";
                continue;
            }
            
            if (isset($body[$field])) {
                $value = $body[$field];
                $type = $rule['type'] ?? 'string';
                
                if (!self::validateType($value, $type)) {
                    $errors[] = "Field '{$field}' must be of type {$type}";
                }
            }
        }
        
        return $errors;
    }
    
    private static function validateType($value, string $type): bool
    {
        switch ($type) {
            case 'string':
                return is_string($value);
            case 'integer':
                return is_int($value);
            case 'float':
                return is_float($value) || is_int($value);
            case 'boolean':
                return is_bool($value);
            case 'array':
                return is_array($value);
            default:
                return true;
        }
    }
}

3. 消息序列化

php
<?php

class MessageSerializer
{
    public static function serialize(array $data, string $format = 'json'): string
    {
        switch ($format) {
            case 'json':
                return json_encode($data, JSON_UNESCAPED_UNICODE);
            case 'msgpack':
                return msgpack_pack($data);
            case 'serialize':
                return serialize($data);
            default:
                throw new InvalidArgumentException("Unsupported format: {$format}");
        }
    }
    
    public static function deserialize(string $data, string $format = 'json')
    {
        switch ($format) {
            case 'json':
                return json_decode($data, true);
            case 'msgpack':
                return msgpack_unpack($data);
            case 'serialize':
                return unserialize($data);
            default:
                throw new InvalidArgumentException("Unsupported format: {$format}");
        }
    }
}

相关链接