Skip to content

消息格式设计

概述

消息格式设计是 RabbitMQ 系统设计的核心环节。良好的消息格式能够提高系统的互操作性、可维护性和可扩展性。本文档介绍消息格式设计的最佳实践。

核心设计原则

1. 标准化原则

使用统一的消息格式标准,便于跨系统交互。

json
{
    "message_id": "uuid-v4",
    "timestamp": 1699999999,
    "version": "1.0",
    "source": "service-name",
    "type": "event-type",
    "data": {},
    "metadata": {}
}

2. 可扩展原则

消息格式应支持向后兼容的扩展。

json
{
    "version": "1.0",
    "data": {
        "order_id": "12345",
        "amount": 99.99
    },
    "metadata": {
        "trace_id": "abc123",
        "span_id": "def456"
    }
}

3. 自描述原则

消息应包含足够的信息,便于消费者理解和处理。

json
{
    "message_id": "550e8400-e29b-41d4-a716-446655440000",
    "type": "order.created",
    "timestamp": "2024-01-15T10:30:00Z",
    "source": "order-service",
    "content_type": "application/json",
    "data": { ... }
}

4. 安全性原则

敏感信息应加密或脱敏处理。

json
{
    "data": {
        "user_id": "12345",
        "card_number": "****-****-****-1234"
    }
}

消息结构设计

标准消息信封

php
<?php

namespace App\Messaging\Message;

class MessageEnvelope
{
    private string $messageId;
    private string $timestamp;
    private string $version;
    private string $source;
    private string $type;
    private array $data;
    private array $metadata;
    
    public function __construct(
        string $type,
        array $data,
        array $metadata = []
    ) {
        $this->messageId = $this->generateMessageId();
        $this->timestamp = $this->formatTimestamp(time());
        $this->version = '1.0';
        $this->source = config('app.name');
        $this->type = $type;
        $this->data = $data;
        $this->metadata = $metadata;
    }
    
    public function toArray(): array
    {
        return [
            'message_id' => $this->messageId,
            'timestamp' => $this->timestamp,
            'version' => $this->version,
            'source' => $this->source,
            'type' => $this->type,
            'data' => $this->data,
            'metadata' => $this->metadata,
        ];
    }
    
    public function toJson(): string
    {
        return json_encode(
            $this->toArray(),
            JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES
        );
    }
    
    public static function fromArray(array $data): self
    {
        $envelope = new self($data['type'], $data['data'], $data['metadata'] ?? []);
        $envelope->messageId = $data['message_id'];
        $envelope->timestamp = $data['timestamp'];
        $envelope->version = $data['version'] ?? '1.0';
        $envelope->source = $data['source'] ?? '';
        
        return $envelope;
    }
    
    public static function fromJson(string $json): self
    {
        return self::fromArray(json_decode($json, true));
    }
    
    private function generateMessageId(): string
    {
        return sprintf(
            '%04x%04x-%04x-%04x-%04x-%04x%04x%04x',
            mt_rand(0, 0xffff),
            mt_rand(0, 0xffff),
            mt_rand(0, 0xffff),
            mt_rand(0, 0x0fff) | 0x4000,
            mt_rand(0, 0x3fff) | 0x8000,
            mt_rand(0, 0xffff),
            mt_rand(0, 0xffff),
            mt_rand(0, 0xffff)
        );
    }
    
    private function formatTimestamp(int $timestamp): string
    {
        return gmdate('Y-m-d\TH:i:s\Z', $timestamp);
    }
    
    public function getMessageId(): string
    {
        return $this->messageId;
    }
    
    public function getType(): string
    {
        return $this->type;
    }
    
    public function getData(): array
    {
        return $this->data;
    }
    
    public function getMetadata(): array
    {
        return $this->metadata;
    }
}

消息属性设计

php
<?php

namespace App\Messaging\Message;

use PhpAmqpLib\Message\AMQPMessage;

class MessageBuilder
{
    private array $properties = [];
    private array $headers = [];
    private string $body = '';
    
    public function __construct()
    {
        $this->properties = [
            'content_type' => 'application/json',
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
            'priority' => 0,
        ];
    }
    
    public function setBody(array $data): self
    {
        $this->body = json_encode($data, JSON_UNESCAPED_UNICODE);
        return $this;
    }
    
    public function setContentType(string $type): self
    {
        $this->properties['content_type'] = $type;
        return $this;
    }
    
    public function setPriority(int $priority): self
    {
        $this->properties['priority'] = min(max($priority, 0), 9);
        return $this;
    }
    
    public function setExpiration(int $ttlMs): self
    {
        $this->properties['expiration'] = (string) $ttlMs;
        return $this;
    }
    
    public function setMessageId(string $id): self
    {
        $this->properties['message_id'] = $id;
        return $this;
    }
    
    public function setCorrelationId(string $id): self
    {
        $this->properties['correlation_id'] = $id;
        return $this;
    }
    
    public function setReplyTo(string $queue): self
    {
        $this->properties['reply_to'] = $queue;
        return $this;
    }
    
    public function setTimestamp(int $timestamp): self
    {
        $this->properties['timestamp'] = $timestamp;
        return $this;
    }
    
    public function setType(string $type): self
    {
        $this->properties['type'] = $type;
        return $this;
    }
    
    public function setUserId(string $userId): self
    {
        $this->properties['user_id'] = $userId;
        return $this;
    }
    
    public function setAppId(string $appId): self
    {
        $this->properties['app_id'] = $appId;
        return $this;
    }
    
    public function addHeader(string $key, $value): self
    {
        $this->headers[$key] = $value;
        return $this;
    }
    
    public function setTraceContext(string $traceId, string $spanId): self
    {
        $this->headers['x-trace-id'] = $traceId;
        $this->headers['x-span-id'] = $spanId;
        return $this;
    }
    
    public function setRetryCount(int $count): self
    {
        $this->headers['x-retry-count'] = $count;
        return $this;
    }
    
    public function build(): AMQPMessage
    {
        if (!empty($this->headers)) {
            $this->properties['application_headers'] = new \PhpAmqpLib\Wire\AMQPTable($this->headers);
        }
        
        return new AMQPMessage($this->body, $this->properties);
    }
}

PHP 代码示例

正确做法:标准化消息格式

php
<?php

namespace App\Messaging\Producer;

use App\Messaging\Message\MessageEnvelope;
use App\Messaging\Message\MessageBuilder;
use PhpAmqpLib\Message\AMQPMessage;

class OrderEventProducer
{
    private $channel;
    
    public function publishOrderCreated(array $order): void
    {
        $envelope = new MessageEnvelope(
            'order.created',
            [
                'order_id' => $order['id'],
                'user_id' => $order['user_id'],
                'amount' => $order['amount'],
                'currency' => $order['currency'] ?? 'CNY',
                'items' => $order['items'],
                'shipping_address' => $order['shipping_address'],
            ],
            [
                'trace_id' => $this->getTraceId(),
                'user_agent' => request()->userAgent(),
                'ip' => request()->ip(),
            ]
        );
        
        $message = (new MessageBuilder())
            ->setBody($envelope->toArray())
            ->setMessageId($envelope->getMessageId())
            ->setType('order.created')
            ->setTimestamp(time())
            ->setAppId(config('app.name'))
            ->setTraceContext($this->getTraceId(), $this->getSpanId())
            ->build();
        
        $this->channel->basic_publish(
            $message,
            'order.events',
            'order.created'
        );
    }
    
    public function publishOrderPaid(string $orderId, array $payment): void
    {
        $envelope = new MessageEnvelope(
            'order.paid',
            [
                'order_id' => $orderId,
                'payment_id' => $payment['id'],
                'payment_method' => $payment['method'],
                'paid_amount' => $payment['amount'],
                'paid_at' => $payment['paid_at'],
            ],
            [
                'trace_id' => $this->getTraceId(),
            ]
        );
        
        $message = (new MessageBuilder())
            ->setBody($envelope->toArray())
            ->setMessageId($envelope->getMessageId())
            ->setType('order.paid')
            ->setTimestamp(time())
            ->setAppId(config('app.name'))
            ->build();
        
        $this->channel->basic_publish(
            $message,
            'order.events',
            'order.paid'
        );
    }
    
    private function getTraceId(): string
    {
        return request()->header('X-Trace-Id', $this->generateTraceId());
    }
    
    private function getSpanId(): string
    {
        return bin2hex(random_bytes(8));
    }
    
    private function generateTraceId(): string
    {
        return bin2hex(random_bytes(16));
    }
}

消费者处理示例

php
<?php

namespace App\Messaging\Consumer;

use App\Messaging\Message\MessageEnvelope;
use PhpAmqpLib\Message\AMQPMessage;

class OrderEventConsumer
{
    public function handle(AMQPMessage $message): void
    {
        $envelope = MessageEnvelope::fromJson($message->body);
        
        $this->validateMessage($envelope);
        $this->logMessage($envelope, $message);
        
        switch ($envelope->getType()) {
            case 'order.created':
                $this->handleOrderCreated($envelope);
                break;
            case 'order.paid':
                $this->handleOrderPaid($envelope);
                break;
            case 'order.cancelled':
                $this->handleOrderCancelled($envelope);
                break;
            default:
                $this->handleUnknownType($envelope);
        }
        
        $message->ack();
    }
    
    private function validateMessage(MessageEnvelope $envelope): void
    {
        if (empty($envelope->getMessageId())) {
            throw new \InvalidArgumentException('Missing message_id');
        }
        
        if (empty($envelope->getType())) {
            throw new \InvalidArgumentException('Missing message type');
        }
        
        if (empty($envelope->getData())) {
            throw new \InvalidArgumentException('Missing message data');
        }
    }
    
    private function logMessage(MessageEnvelope $envelope, AMQPMessage $message): void
    {
        $headers = $message->get('application_headers');
        
        logger()->info('Processing message', [
            'message_id' => $envelope->getMessageId(),
            'type' => $envelope->getType(),
            'source' => $envelope->source,
            'trace_id' => $headers ? $headers->getNativeData()['x-trace-id'] ?? null : null,
        ]);
    }
    
    private function handleOrderCreated(MessageEnvelope $envelope): void
    {
        $data = $envelope->getData();
        
        // 处理订单创建事件
    }
    
    private function handleOrderPaid(MessageEnvelope $envelope): void
    {
        $data = $envelope->getData();
        
        // 处理订单支付事件
    }
    
    private function handleOrderCancelled(MessageEnvelope $envelope): void
    {
        $data = $envelope->getData();
        
        // 处理订单取消事件
    }
    
    private function handleUnknownType(MessageEnvelope $envelope): void
    {
        logger()->warning('Unknown message type', [
            'message_id' => $envelope->getMessageId(),
            'type' => $envelope->getType(),
        ]);
    }
}

错误做法:不规范的消息格式

php
<?php

class BadMessageProducer
{
    public function publishOrderCreated(array $order): void
    {
        // 错误1:直接发送原始数据,无消息ID
        $message = new AMQPMessage(json_encode($order));
        
        // 错误2:无消息类型标识
        // 错误3:无时间戳
        // 错误4:无来源标识
        // 错误5:无追踪信息
        
        $this->channel->basic_publish($message, '', 'order_queue');
    }
    
    public function publishOrderPaid(array $data): void
    {
        // 错误6:消息格式不一致
        // 有时用 order_id,有时用 orderId
        $message = new AMQPMessage(json_encode([
            'orderId' => $data['order_id'],
            'paid' => true,
        ]));
        
        $this->channel->basic_publish($message, '', 'order_queue');
    }
    
    public function publishSensitiveData(array $user): void
    {
        // 错误7:明文传输敏感信息
        $message = new AMQPMessage(json_encode([
            'user_id' => $user['id'],
            'password' => $user['password'],
            'credit_card' => $user['credit_card'],
        ]));
        
        $this->channel->basic_publish($message, '', 'user_queue');
    }
}

消息类型设计

事件消息

php
<?php

class EventMessage
{
    public static function create(string $eventType, array $payload): array
    {
        return [
            'message_id' => self::generateId(),
            'timestamp' => gmdate('Y-m-d\TH:i:s\Z'),
            'version' => '1.0',
            'type' => $eventType,
            'source' => config('app.name'),
            'specversion' => '1.0',
            'data' => $payload,
        ];
    }
}

// 使用示例
$event = EventMessage::create('order.created', [
    'order_id' => '12345',
    'user_id' => '67890',
    'amount' => 99.99,
]);

命令消息

php
<?php

class CommandMessage
{
    public static function create(string $command, array $params): array
    {
        return [
            'message_id' => self::generateId(),
            'timestamp' => gmdate('Y-m-d\TH:i:s\Z'),
            'command' => $command,
            'params' => $params,
            'reply_to' => null,
            'correlation_id' => null,
        ];
    }
}

// 使用示例
$command = CommandMessage::create('inventory.deduct', [
    'order_id' => '12345',
    'sku' => 'SKU-001',
    'quantity' => 1,
]);

RPC 消息

php
<?php

class RpcMessage
{
    public static function createRequest(string $method, array $params): array
    {
        return [
            'message_id' => self::generateId(),
            'timestamp' => time(),
            'method' => $method,
            'params' => $params,
        ];
    }
    
    public static function createResponse(string $correlationId, $result, ?string $error = null): array
    {
        return [
            'message_id' => self::generateId(),
            'correlation_id' => $correlationId,
            'timestamp' => time(),
            'result' => $result,
            'error' => $error,
        ];
    }
}

版本兼容性设计

版本化消息格式

php
<?php

namespace App\Messaging\Message;

class VersionedMessage
{
    private static array $formats = [
        '1.0' => [
            'required' => ['message_id', 'type', 'data'],
            'optional' => ['timestamp', 'source', 'metadata'],
        ],
        '1.1' => [
            'required' => ['message_id', 'type', 'data', 'timestamp'],
            'optional' => ['source', 'metadata', 'trace_context'],
        ],
        '2.0' => [
            'required' => ['id', 'specversion', 'type', 'source', 'time', 'data'],
            'optional' => ['subject', 'datacontenttype', 'dataschema', 'traceparent'],
        ],
    ];
    
    public static function create(string $version, string $type, array $data): array
    {
        $format = self::$formats[$version] ?? self::$formats['1.0'];
        
        $message = ['version' => $version];
        
        if ($version === '2.0') {
            $message['id'] = self::generateId();
            $message['specversion'] = '1.0';
            $message['type'] = $type;
            $message['source'] = config('app.name');
            $message['time'] = gmdate('Y-m-d\TH:i:s\Z');
            $message['data'] = $data;
        } else {
            foreach ($format['required'] as $field) {
                $message[$field] = match ($field) {
                    'message_id' => self::generateId(),
                    'type' => $type,
                    'data' => $data,
                    'timestamp' => time(),
                    'source' => config('app.name'),
                    default => null,
                };
            }
        }
        
        return $message;
    }
    
    public static function normalize(array $message, string $targetVersion = '1.0'): array
    {
        $sourceVersion = $message['version'] ?? '1.0';
        
        if ($sourceVersion === $targetVersion) {
            return $message;
        }
        
        return self::convert($message, $sourceVersion, $targetVersion);
    }
    
    private static function convert(array $message, string $from, string $to): array
    {
        // 版本转换逻辑
        return $message;
    }
}

实际应用场景

场景一:跨服务事件传递

php
<?php

class CrossServiceEventPublisher
{
    public function publishUserRegistered(array $user): void
    {
        $event = [
            'specversion' => '1.0',
            'type' => 'com.example.user.registered',
            'source' => '/services/user-service',
            'id' => $this->generateId(),
            'time' => gmdate('Y-m-d\TH:i:s\Z'),
            'datacontenttype' => 'application/json',
            'data' => [
                'userId' => $user['id'],
                'email' => $user['email'],
                'registeredAt' => $user['created_at'],
            ],
            'traceparent' => $this->getTraceParent(),
        ];
        
        $this->publish($event);
    }
}

场景二:批量消息处理

php
<?php

class BatchMessageFormat
{
    public static function createBatch(array $messages): array
    {
        return [
            'batch_id' => self::generateId(),
            'batch_size' => count($messages),
            'created_at' => gmdate('Y-m-d\TH:i:s\Z'),
            'messages' => $messages,
        ];
    }
    
    public static function createBatchAck(string $batchId, array $results): array
    {
        return [
            'batch_id' => $batchId,
            'processed_at' => gmdate('Y-m-d\TH:i:s\Z'),
            'success_count' => count(array_filter($results)),
            'failure_count' => count($results) - count(array_filter($results)),
            'results' => $results,
        ];
    }
}

最佳实践建议清单

消息设计

  • [ ] 使用标准化的消息信封格式
  • [ ] 包含唯一的消息ID
  • [ ] 包含时间戳信息
  • [ ] 包含消息类型标识
  • [ ] 包含来源标识
  • [ ] 数据与元数据分离

安全性

  • [ ] 敏感信息加密处理
  • [ ] 大数据压缩传输
  • [ ] 验证消息完整性
  • [ ] 控制消息大小

可维护性

  • [ ] 版本化消息格式
  • [ ] 向后兼容设计
  • [ ] 完善的文档说明
  • [ ] 合理的命名规范

可追踪性

  • [ ] 包含追踪ID
  • [ ] 记录消息链路
  • [ ] 支持日志关联

生产环境注意事项

  1. 消息大小控制

    • 单条消息建议不超过 64KB
    • 大数据考虑压缩或分片
    • 避免传输不必要的字段
  2. 序列化选择

    • JSON:通用性好,调试方便
    • MessagePack:性能更好,体积更小
    • Protobuf:强类型,跨语言支持
  3. 压缩策略

    • 大于 1KB 的消息考虑压缩
    • 选择合适的压缩算法(gzip、lz4)
    • 在消息头标注压缩类型
  4. 敏感数据处理

    • 传输前加密敏感字段
    • 使用安全的序列化方式
    • 避免在日志中记录敏感信息

相关链接