Appearance
消息格式设计
概述
消息格式设计是 RabbitMQ 系统设计的核心环节。良好的消息格式能够提高系统的互操作性、可维护性和可扩展性。本文档介绍消息格式设计的最佳实践。
核心设计原则
1. 标准化原则
使用统一的消息格式标准,便于跨系统交互。
json
{
"message_id": "uuid-v4",
"timestamp": 1699999999,
"version": "1.0",
"source": "service-name",
"type": "event-type",
"data": {},
"metadata": {}
}2. 可扩展原则
消息格式应支持向后兼容的扩展。
json
{
"version": "1.0",
"data": {
"order_id": "12345",
"amount": 99.99
},
"metadata": {
"trace_id": "abc123",
"span_id": "def456"
}
}3. 自描述原则
消息应包含足够的信息,便于消费者理解和处理。
json
{
"message_id": "550e8400-e29b-41d4-a716-446655440000",
"type": "order.created",
"timestamp": "2024-01-15T10:30:00Z",
"source": "order-service",
"content_type": "application/json",
"data": { ... }
}4. 安全性原则
敏感信息应加密或脱敏处理。
json
{
"data": {
"user_id": "12345",
"card_number": "****-****-****-1234"
}
}消息结构设计
标准消息信封
php
<?php
namespace App\Messaging\Message;
class MessageEnvelope
{
private string $messageId;
private string $timestamp;
private string $version;
private string $source;
private string $type;
private array $data;
private array $metadata;
public function __construct(
string $type,
array $data,
array $metadata = []
) {
$this->messageId = $this->generateMessageId();
$this->timestamp = $this->formatTimestamp(time());
$this->version = '1.0';
$this->source = config('app.name');
$this->type = $type;
$this->data = $data;
$this->metadata = $metadata;
}
public function toArray(): array
{
return [
'message_id' => $this->messageId,
'timestamp' => $this->timestamp,
'version' => $this->version,
'source' => $this->source,
'type' => $this->type,
'data' => $this->data,
'metadata' => $this->metadata,
];
}
public function toJson(): string
{
return json_encode(
$this->toArray(),
JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES
);
}
public static function fromArray(array $data): self
{
$envelope = new self($data['type'], $data['data'], $data['metadata'] ?? []);
$envelope->messageId = $data['message_id'];
$envelope->timestamp = $data['timestamp'];
$envelope->version = $data['version'] ?? '1.0';
$envelope->source = $data['source'] ?? '';
return $envelope;
}
public static function fromJson(string $json): self
{
return self::fromArray(json_decode($json, true));
}
private function generateMessageId(): string
{
return sprintf(
'%04x%04x-%04x-%04x-%04x-%04x%04x%04x',
mt_rand(0, 0xffff),
mt_rand(0, 0xffff),
mt_rand(0, 0xffff),
mt_rand(0, 0x0fff) | 0x4000,
mt_rand(0, 0x3fff) | 0x8000,
mt_rand(0, 0xffff),
mt_rand(0, 0xffff),
mt_rand(0, 0xffff)
);
}
private function formatTimestamp(int $timestamp): string
{
return gmdate('Y-m-d\TH:i:s\Z', $timestamp);
}
public function getMessageId(): string
{
return $this->messageId;
}
public function getType(): string
{
return $this->type;
}
public function getData(): array
{
return $this->data;
}
public function getMetadata(): array
{
return $this->metadata;
}
}消息属性设计
php
<?php
namespace App\Messaging\Message;
use PhpAmqpLib\Message\AMQPMessage;
class MessageBuilder
{
private array $properties = [];
private array $headers = [];
private string $body = '';
public function __construct()
{
$this->properties = [
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'priority' => 0,
];
}
public function setBody(array $data): self
{
$this->body = json_encode($data, JSON_UNESCAPED_UNICODE);
return $this;
}
public function setContentType(string $type): self
{
$this->properties['content_type'] = $type;
return $this;
}
public function setPriority(int $priority): self
{
$this->properties['priority'] = min(max($priority, 0), 9);
return $this;
}
public function setExpiration(int $ttlMs): self
{
$this->properties['expiration'] = (string) $ttlMs;
return $this;
}
public function setMessageId(string $id): self
{
$this->properties['message_id'] = $id;
return $this;
}
public function setCorrelationId(string $id): self
{
$this->properties['correlation_id'] = $id;
return $this;
}
public function setReplyTo(string $queue): self
{
$this->properties['reply_to'] = $queue;
return $this;
}
public function setTimestamp(int $timestamp): self
{
$this->properties['timestamp'] = $timestamp;
return $this;
}
public function setType(string $type): self
{
$this->properties['type'] = $type;
return $this;
}
public function setUserId(string $userId): self
{
$this->properties['user_id'] = $userId;
return $this;
}
public function setAppId(string $appId): self
{
$this->properties['app_id'] = $appId;
return $this;
}
public function addHeader(string $key, $value): self
{
$this->headers[$key] = $value;
return $this;
}
public function setTraceContext(string $traceId, string $spanId): self
{
$this->headers['x-trace-id'] = $traceId;
$this->headers['x-span-id'] = $spanId;
return $this;
}
public function setRetryCount(int $count): self
{
$this->headers['x-retry-count'] = $count;
return $this;
}
public function build(): AMQPMessage
{
if (!empty($this->headers)) {
$this->properties['application_headers'] = new \PhpAmqpLib\Wire\AMQPTable($this->headers);
}
return new AMQPMessage($this->body, $this->properties);
}
}PHP 代码示例
正确做法:标准化消息格式
php
<?php
namespace App\Messaging\Producer;
use App\Messaging\Message\MessageEnvelope;
use App\Messaging\Message\MessageBuilder;
use PhpAmqpLib\Message\AMQPMessage;
class OrderEventProducer
{
private $channel;
public function publishOrderCreated(array $order): void
{
$envelope = new MessageEnvelope(
'order.created',
[
'order_id' => $order['id'],
'user_id' => $order['user_id'],
'amount' => $order['amount'],
'currency' => $order['currency'] ?? 'CNY',
'items' => $order['items'],
'shipping_address' => $order['shipping_address'],
],
[
'trace_id' => $this->getTraceId(),
'user_agent' => request()->userAgent(),
'ip' => request()->ip(),
]
);
$message = (new MessageBuilder())
->setBody($envelope->toArray())
->setMessageId($envelope->getMessageId())
->setType('order.created')
->setTimestamp(time())
->setAppId(config('app.name'))
->setTraceContext($this->getTraceId(), $this->getSpanId())
->build();
$this->channel->basic_publish(
$message,
'order.events',
'order.created'
);
}
public function publishOrderPaid(string $orderId, array $payment): void
{
$envelope = new MessageEnvelope(
'order.paid',
[
'order_id' => $orderId,
'payment_id' => $payment['id'],
'payment_method' => $payment['method'],
'paid_amount' => $payment['amount'],
'paid_at' => $payment['paid_at'],
],
[
'trace_id' => $this->getTraceId(),
]
);
$message = (new MessageBuilder())
->setBody($envelope->toArray())
->setMessageId($envelope->getMessageId())
->setType('order.paid')
->setTimestamp(time())
->setAppId(config('app.name'))
->build();
$this->channel->basic_publish(
$message,
'order.events',
'order.paid'
);
}
private function getTraceId(): string
{
return request()->header('X-Trace-Id', $this->generateTraceId());
}
private function getSpanId(): string
{
return bin2hex(random_bytes(8));
}
private function generateTraceId(): string
{
return bin2hex(random_bytes(16));
}
}消费者处理示例
php
<?php
namespace App\Messaging\Consumer;
use App\Messaging\Message\MessageEnvelope;
use PhpAmqpLib\Message\AMQPMessage;
class OrderEventConsumer
{
public function handle(AMQPMessage $message): void
{
$envelope = MessageEnvelope::fromJson($message->body);
$this->validateMessage($envelope);
$this->logMessage($envelope, $message);
switch ($envelope->getType()) {
case 'order.created':
$this->handleOrderCreated($envelope);
break;
case 'order.paid':
$this->handleOrderPaid($envelope);
break;
case 'order.cancelled':
$this->handleOrderCancelled($envelope);
break;
default:
$this->handleUnknownType($envelope);
}
$message->ack();
}
private function validateMessage(MessageEnvelope $envelope): void
{
if (empty($envelope->getMessageId())) {
throw new \InvalidArgumentException('Missing message_id');
}
if (empty($envelope->getType())) {
throw new \InvalidArgumentException('Missing message type');
}
if (empty($envelope->getData())) {
throw new \InvalidArgumentException('Missing message data');
}
}
private function logMessage(MessageEnvelope $envelope, AMQPMessage $message): void
{
$headers = $message->get('application_headers');
logger()->info('Processing message', [
'message_id' => $envelope->getMessageId(),
'type' => $envelope->getType(),
'source' => $envelope->source,
'trace_id' => $headers ? $headers->getNativeData()['x-trace-id'] ?? null : null,
]);
}
private function handleOrderCreated(MessageEnvelope $envelope): void
{
$data = $envelope->getData();
// 处理订单创建事件
}
private function handleOrderPaid(MessageEnvelope $envelope): void
{
$data = $envelope->getData();
// 处理订单支付事件
}
private function handleOrderCancelled(MessageEnvelope $envelope): void
{
$data = $envelope->getData();
// 处理订单取消事件
}
private function handleUnknownType(MessageEnvelope $envelope): void
{
logger()->warning('Unknown message type', [
'message_id' => $envelope->getMessageId(),
'type' => $envelope->getType(),
]);
}
}错误做法:不规范的消息格式
php
<?php
class BadMessageProducer
{
public function publishOrderCreated(array $order): void
{
// 错误1:直接发送原始数据,无消息ID
$message = new AMQPMessage(json_encode($order));
// 错误2:无消息类型标识
// 错误3:无时间戳
// 错误4:无来源标识
// 错误5:无追踪信息
$this->channel->basic_publish($message, '', 'order_queue');
}
public function publishOrderPaid(array $data): void
{
// 错误6:消息格式不一致
// 有时用 order_id,有时用 orderId
$message = new AMQPMessage(json_encode([
'orderId' => $data['order_id'],
'paid' => true,
]));
$this->channel->basic_publish($message, '', 'order_queue');
}
public function publishSensitiveData(array $user): void
{
// 错误7:明文传输敏感信息
$message = new AMQPMessage(json_encode([
'user_id' => $user['id'],
'password' => $user['password'],
'credit_card' => $user['credit_card'],
]));
$this->channel->basic_publish($message, '', 'user_queue');
}
}消息类型设计
事件消息
php
<?php
class EventMessage
{
public static function create(string $eventType, array $payload): array
{
return [
'message_id' => self::generateId(),
'timestamp' => gmdate('Y-m-d\TH:i:s\Z'),
'version' => '1.0',
'type' => $eventType,
'source' => config('app.name'),
'specversion' => '1.0',
'data' => $payload,
];
}
}
// 使用示例
$event = EventMessage::create('order.created', [
'order_id' => '12345',
'user_id' => '67890',
'amount' => 99.99,
]);命令消息
php
<?php
class CommandMessage
{
public static function create(string $command, array $params): array
{
return [
'message_id' => self::generateId(),
'timestamp' => gmdate('Y-m-d\TH:i:s\Z'),
'command' => $command,
'params' => $params,
'reply_to' => null,
'correlation_id' => null,
];
}
}
// 使用示例
$command = CommandMessage::create('inventory.deduct', [
'order_id' => '12345',
'sku' => 'SKU-001',
'quantity' => 1,
]);RPC 消息
php
<?php
class RpcMessage
{
public static function createRequest(string $method, array $params): array
{
return [
'message_id' => self::generateId(),
'timestamp' => time(),
'method' => $method,
'params' => $params,
];
}
public static function createResponse(string $correlationId, $result, ?string $error = null): array
{
return [
'message_id' => self::generateId(),
'correlation_id' => $correlationId,
'timestamp' => time(),
'result' => $result,
'error' => $error,
];
}
}版本兼容性设计
版本化消息格式
php
<?php
namespace App\Messaging\Message;
class VersionedMessage
{
private static array $formats = [
'1.0' => [
'required' => ['message_id', 'type', 'data'],
'optional' => ['timestamp', 'source', 'metadata'],
],
'1.1' => [
'required' => ['message_id', 'type', 'data', 'timestamp'],
'optional' => ['source', 'metadata', 'trace_context'],
],
'2.0' => [
'required' => ['id', 'specversion', 'type', 'source', 'time', 'data'],
'optional' => ['subject', 'datacontenttype', 'dataschema', 'traceparent'],
],
];
public static function create(string $version, string $type, array $data): array
{
$format = self::$formats[$version] ?? self::$formats['1.0'];
$message = ['version' => $version];
if ($version === '2.0') {
$message['id'] = self::generateId();
$message['specversion'] = '1.0';
$message['type'] = $type;
$message['source'] = config('app.name');
$message['time'] = gmdate('Y-m-d\TH:i:s\Z');
$message['data'] = $data;
} else {
foreach ($format['required'] as $field) {
$message[$field] = match ($field) {
'message_id' => self::generateId(),
'type' => $type,
'data' => $data,
'timestamp' => time(),
'source' => config('app.name'),
default => null,
};
}
}
return $message;
}
public static function normalize(array $message, string $targetVersion = '1.0'): array
{
$sourceVersion = $message['version'] ?? '1.0';
if ($sourceVersion === $targetVersion) {
return $message;
}
return self::convert($message, $sourceVersion, $targetVersion);
}
private static function convert(array $message, string $from, string $to): array
{
// 版本转换逻辑
return $message;
}
}实际应用场景
场景一:跨服务事件传递
php
<?php
class CrossServiceEventPublisher
{
public function publishUserRegistered(array $user): void
{
$event = [
'specversion' => '1.0',
'type' => 'com.example.user.registered',
'source' => '/services/user-service',
'id' => $this->generateId(),
'time' => gmdate('Y-m-d\TH:i:s\Z'),
'datacontenttype' => 'application/json',
'data' => [
'userId' => $user['id'],
'email' => $user['email'],
'registeredAt' => $user['created_at'],
],
'traceparent' => $this->getTraceParent(),
];
$this->publish($event);
}
}场景二:批量消息处理
php
<?php
class BatchMessageFormat
{
public static function createBatch(array $messages): array
{
return [
'batch_id' => self::generateId(),
'batch_size' => count($messages),
'created_at' => gmdate('Y-m-d\TH:i:s\Z'),
'messages' => $messages,
];
}
public static function createBatchAck(string $batchId, array $results): array
{
return [
'batch_id' => $batchId,
'processed_at' => gmdate('Y-m-d\TH:i:s\Z'),
'success_count' => count(array_filter($results)),
'failure_count' => count($results) - count(array_filter($results)),
'results' => $results,
];
}
}最佳实践建议清单
消息设计
- [ ] 使用标准化的消息信封格式
- [ ] 包含唯一的消息ID
- [ ] 包含时间戳信息
- [ ] 包含消息类型标识
- [ ] 包含来源标识
- [ ] 数据与元数据分离
安全性
- [ ] 敏感信息加密处理
- [ ] 大数据压缩传输
- [ ] 验证消息完整性
- [ ] 控制消息大小
可维护性
- [ ] 版本化消息格式
- [ ] 向后兼容设计
- [ ] 完善的文档说明
- [ ] 合理的命名规范
可追踪性
- [ ] 包含追踪ID
- [ ] 记录消息链路
- [ ] 支持日志关联
生产环境注意事项
消息大小控制
- 单条消息建议不超过 64KB
- 大数据考虑压缩或分片
- 避免传输不必要的字段
序列化选择
- JSON:通用性好,调试方便
- MessagePack:性能更好,体积更小
- Protobuf:强类型,跨语言支持
压缩策略
- 大于 1KB 的消息考虑压缩
- 选择合适的压缩算法(gzip、lz4)
- 在消息头标注压缩类型
敏感数据处理
- 传输前加密敏感字段
- 使用安全的序列化方式
- 避免在日志中记录敏感信息
