Appearance
RabbitMQ 消息模型(Message Model)
概述
消息模型是 RabbitMQ 中定义消息结构和行为的规范。它包括消息的组成、属性、生命周期以及消息传递的语义保证。理解消息模型对于构建可靠的消息系统至关重要,它决定了消息如何被创建、传输、存储和消费。
消息模型的核心组成
mermaid
graph TB
subgraph 消息模型
A[Message] --> B[Body 消息体]
A --> C[Properties 消息属性]
A --> D[Delivery 传递信息]
C --> C1[content_type]
C --> C2[delivery_mode]
C --> C3[priority]
C --> C4[expiration]
C --> C5[headers]
D --> D1[delivery_tag]
D --> D2[exchange]
D --> D3[routing_key]
D --> D4[redelivered]
end核心知识点
1. 消息结构
一条完整的 RabbitMQ 消息包含以下部分:
mermaid
graph LR
subgraph 消息组成
M[Message] --> B[Body]
M --> P[Properties]
M --> E[Envelope]
end
B --> B1["实际数据 (JSON/XML/Binary)"]
P --> P1["元数据"]
E --> E1["传递信息"]消息体(Body)
- 消息的实际内容
- 可以是任意格式(JSON、XML、二进制等)
- 最大大小受内存和磁盘限制
消息属性(Properties)
| 属性 | 类型 | 说明 |
|---|---|---|
| content_type | string | 消息体的 MIME 类型 |
| content_encoding | string | 消息体的编码方式 |
| headers | table | 自定义消息头 |
| delivery_mode | int | 1=非持久化,2=持久化 |
| priority | int | 消息优先级(0-9) |
| correlation_id | string | 关联 ID(RPC 场景) |
| reply_to | string | 回复队列名称 |
| expiration | string | 消息过期时间(毫秒) |
| message_id | string | 消息唯一标识 |
| timestamp | timestamp | 消息创建时间 |
| type | string | 消息类型标识 |
| user_id | string | 用户 ID |
| app_id | string | 应用标识 |
| cluster_id | string | 集群标识 |
2. 消息生命周期
mermaid
stateDiagram-v2
[*] --> Created: 生产者创建
Created --> Published: 发布到 Exchange
Published --> Routed: 路由到 Queue
Published --> Unroutable: 无匹配路由
Routed --> Ready: 进入队列等待
Ready --> Unacked: 消费者获取
Unacked --> Acked: 确认成功
Unacked --> Ready: NACK(requeue)
Unacked --> Dead: NACK/过期/拒绝
Acked --> [*]: 从队列删除
Dead --> [*]: 转死信队列
Unroutable --> [*]: 丢弃或返回3. 消息持久化
mermaid
graph TB
subgraph 持久化三要素
A[Exchange 持久化] --> D[durable=true]
B[Queue 持久化] --> D
C[Message 持久化] --> E[delivery_mode=2]
end
D --> F[Broker 重启后保留]
E --> F4. 消息确认机制
mermaid
sequenceDiagram
participant B as Broker
participant C as Consumer
B->>C: 投递消息
Note over C: 消息状态: Unacked
alt 自动确认
Note over C: auto_ack=true
C->>B: 自动确认
else 手动确认
Note over C: auto_ack=false
C->>C: 处理消息
alt 处理成功
C->>B: basic.ack
else 处理失败-重试
C->>B: basic.nack(requeue=true)
else 处理失败-丢弃
C->>B: basic.reject(requeue=false)
end
end
Note over B: 删除已确认消息5. 消息可靠性保证
mermaid
graph TB
subgraph 生产者端
A[Publisher Confirms]
B[事务机制]
end
subgraph Broker 端
C[消息持久化]
D[镜像队列]
end
subgraph 消费者端
E[手动确认]
F[幂等处理]
end
A --> G[可靠传递]
B --> G
C --> G
D --> G
E --> G
F --> G代码示例
基础消息创建
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class MessageBuilder
{
private $body;
private $properties = [];
private $headers = [];
public static function create(): self
{
return new self();
}
public function withBody($data): self
{
if (is_string($data)) {
$this->body = $data;
} else {
$this->body = json_encode($data);
$this->properties['content_type'] = 'application/json';
}
return $this;
}
public function withContentType(string $type): self
{
$this->properties['content_type'] = $type;
return $this;
}
public function persistent(): self
{
$this->properties['delivery_mode'] = AMQPMessage::DELIVERY_MODE_PERSISTENT;
return $this;
}
public function withPriority(int $priority): self
{
$this->properties['priority'] = min(9, max(0, $priority));
return $this;
}
public function withExpiration(int $milliseconds): self
{
$this->properties['expiration'] = (string)$milliseconds;
return $this;
}
public function withMessageId(string $id): self
{
$this->properties['message_id'] = $id;
return $this;
}
public function withCorrelationId(string $id): self
{
$this->properties['correlation_id'] = $id;
return $this;
}
public function withReplyTo(string $queue): self
{
$this->properties['reply_to'] = $queue;
return $this;
}
public function withTimestamp(?int $timestamp = null): self
{
$this->properties['timestamp'] = $timestamp ?? time();
return $this;
}
public function withType(string $type): self
{
$this->properties['type'] = $type;
return $this;
}
public function withUserId(string $userId): self
{
$this->properties['user_id'] = $userId;
return $this;
}
public function withAppId(string $appId): self
{
$this->properties['app_id'] = $appId;
return $this;
}
public function withHeader(string $key, $value): self
{
$this->headers[$key] = $value;
return $this;
}
public function withHeaders(array $headers): self
{
$this->headers = array_merge($this->headers, $headers);
return $this;
}
public function build(): AMQPMessage
{
if (!empty($this->headers)) {
$this->properties['application_headers'] = new AMQPTable($this->headers);
}
return new AMQPMessage($this->body, $this->properties);
}
}
$message = MessageBuilder::create()
->withBody(['order_id' => 1001, 'status' => 'created'])
->withContentType('application/json')
->persistent()
->withPriority(5)
->withMessageId(uniqid('msg_', true))
->withTimestamp()
->withType('order.created')
->withAppId('order_service')
->withHeader('trace_id', 'abc123')
->withHeader('source', 'web_frontend')
->build();
echo "Message created with body: " . $message->getBody() . "\n";消息属性详解
php
<?php
class MessagePropertiesDemo
{
public function createStandardMessage(): AMQPMessage
{
return new AMQPMessage(
json_encode(['data' => 'example']),
[
'content_type' => 'application/json',
'content_encoding' => 'utf-8',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'priority' => 5,
'message_id' => uniqid('msg_', true),
'timestamp' => time(),
'type' => 'standard',
'app_id' => 'demo_app'
]
);
}
public function createPriorityMessage(int $priority, string $body): AMQPMessage
{
return new AMQPMessage($body, [
'priority' => min(9, max(0, $priority)),
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);
}
public function createTTLMessage(string $body, int $ttlMs): AMQPMessage
{
return new AMQPMessage($body, [
'expiration' => (string)$ttlMs,
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);
}
public function createRPCRequest(string $body, string $replyTo, string $correlationId): AMQPMessage
{
return new AMQPMessage($body, [
'reply_to' => $replyTo,
'correlation_id' => $correlationId,
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);
}
public function createHeadersMessage(string $body, array $headers): AMQPMessage
{
return new AMQPMessage($body, [
'application_headers' => new AMQPTable($headers),
'content_type' => 'application/json'
]);
}
public function createTracedMessage(string $body, string $traceId, string $spanId): AMQPMessage
{
return new AMQPMessage($body, [
'application_headers' => new AMQPTable([
'trace_id' => $traceId,
'span_id' => $spanId,
'timestamp' => microtime(true)
]),
'content_type' => 'application/json',
'message_id' => uniqid('msg_', true)
]);
}
}消息消费与处理
php
<?php
use PhpAmqpLib\Message\AMQPMessage;
class MessageHandler
{
public function handleMessage(AMQPMessage $message): void
{
echo "=== Message Received ===\n";
echo "Body: " . $message->getBody() . "\n";
echo "\n--- Properties ---\n";
$properties = $message->get_properties();
foreach ($properties as $key => $value) {
echo "{$key}: " . json_encode($value) . "\n";
}
echo "\n--- Headers ---\n";
if ($message->has('application_headers')) {
$headers = $message->get('application_headers');
if ($headers instanceof AMQPTable) {
foreach ($headers->getNativeData() as $key => $value) {
echo "{$key}: " . json_encode($value) . "\n";
}
}
}
echo "\n--- Delivery Info ---\n";
echo "Exchange: " . $message->getExchange() . "\n";
echo "Routing Key: " . $message->getRoutingKey() . "\n";
echo "Delivery Tag: " . $message->getDeliveryTag() . "\n";
echo "Redelivered: " . ($message->isRedelivery() ? 'Yes' : 'No') . "\n";
echo "\n--- Consumer Tag ---\n";
echo "Consumer Tag: " . $message->getConsumerTag() . "\n";
}
public function parseJsonMessage(AMQPMessage $message): array
{
$body = $message->getBody();
$contentType = $message->get('content_type');
if ($contentType === 'application/json') {
return json_decode($body, true);
}
return ['raw' => $body];
}
public function extractTraceContext(AMQPMessage $message): array
{
$headers = [];
if ($message->has('application_headers')) {
$appHeaders = $message->get('application_headers');
if ($appHeaders instanceof AMQPTable) {
$nativeData = $appHeaders->getNativeData();
$headers = [
'trace_id' => $nativeData['trace_id'] ?? null,
'span_id' => $nativeData['span_id'] ?? null
];
}
}
return $headers;
}
}消息确认机制
php
<?php
class MessageAcknowledgment
{
public function acknowledge(AMQPMessage $message): void
{
$message->ack();
echo "Message acknowledged: " . $message->getDeliveryTag() . "\n";
}
public function acknowledgeMultiple(AMQPMessage $message): void
{
$message->ack(true);
echo "Multiple messages acknowledged up to: " . $message->getDeliveryTag() . "\n";
}
public function rejectAndRequeue(AMQPMessage $message): void
{
$message->nack(true, true);
echo "Message rejected and requeued: " . $message->getDeliveryTag() . "\n";
}
public function rejectAndDiscard(AMQPMessage $message): void
{
$message->nack(false, false);
echo "Message rejected and discarded: " . $message->getDeliveryTag() . "\n";
}
public function rejectSingle(AMQPMessage $message): void
{
$message->reject(false);
echo "Message rejected: " . $message->getDeliveryTag() . "\n";
}
public function rejectWithDeadLetter(AMQPMessage $message): void
{
$message->reject(false);
echo "Message rejected, will go to DLQ: " . $message->getDeliveryTag() . "\n";
}
}消息幂等性处理
php
<?php
class IdempotentMessageProcessor
{
private $processedMessages;
public function __construct()
{
$this->processedMessages = [];
}
public function process(AMQPMessage $message, callable $processor): void
{
$messageId = $this->getMessageId($message);
if ($this->isProcessed($messageId)) {
echo "Message {$messageId} already processed, skipping\n";
$message->ack();
return;
}
try {
$processor($message);
$this->markAsProcessed($messageId);
$message->ack();
echo "Message {$messageId} processed successfully\n";
} catch (Exception $e) {
echo "Failed to process message {$messageId}: {$e->getMessage()}\n";
$message->nack(true);
}
}
private function getMessageId(AMQPMessage $message): string
{
$messageId = $message->get('message_id');
if ($messageId) {
return $messageId;
}
$body = $message->getBody();
$data = json_decode($body, true);
if (isset($data['message_id'])) {
return $data['message_id'];
}
return md5($body);
}
private function isProcessed(string $messageId): bool
{
return isset($this->processedMessages[$messageId]);
}
private function markAsProcessed(string $messageId): void
{
$this->processedMessages[$messageId] = time();
$this->cleanupOldEntries();
}
private function cleanupOldEntries(): void
{
$maxSize = 10000;
if (count($this->processedMessages) > $maxSize) {
$this->processedMessages = array_slice(
$this->processedMessages,
-$maxSize / 2,
null,
true
);
}
}
}消息重试机制
php
<?php
class RetryMessageProcessor
{
private $maxRetries = 3;
private $retryDelays = [1000, 5000, 15000, 60000];
public function processWithRetry(AMQPMessage $message, callable $processor): void
{
$data = json_decode($message->getBody(), true);
$retryCount = $data['_retry_count'] ?? 0;
try {
$processor($data);
$message->ack();
echo "Message processed successfully\n";
} catch (Exception $e) {
echo "Processing failed (attempt " . ($retryCount + 1) . "): {$e->getMessage()}\n";
if ($retryCount < $this->maxRetries) {
$this->scheduleRetry($message, $retryCount, $e->getMessage());
} else {
$this->sendToDeadLetterQueue($message, $e->getMessage());
}
}
}
private function scheduleRetry(AMQPMessage $message, int $retryCount, string $error): void
{
$data = json_decode($message->getBody(), true);
$data['_retry_count'] = $retryCount + 1;
$data['_last_error'] = $error;
$data['_last_retry_at'] = date('Y-m-d H:i:s');
$delay = $this->retryDelays[$retryCount] ?? end($this->retryDelays);
$newMessage = new AMQPMessage(
json_encode($data),
[
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'expiration' => (string)$delay
]
);
$channel = $message->getChannel();
$channel->basic_publish($newMessage, '', 'retry_queue');
$message->ack();
echo "Scheduled retry #{$retryCount} with delay {$delay}ms\n";
}
private function sendToDeadLetterQueue(AMQPMessage $message, string $error): void
{
$data = json_decode($message->getBody(), true);
$data['_dead_letter_reason'] = 'max_retries_exceeded';
$data['_dead_letter_error'] = $error;
$data['_dead_letter_at'] = date('Y-m-d H:i:s');
$dlqMessage = new AMQPMessage(
json_encode($data),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$channel = $message->getChannel();
$channel->basic_publish($dlqMessage, '', 'dead_letter_queue');
$message->ack();
echo "Sent to dead letter queue\n";
}
}实际应用场景
1. 事件消息模型
php
<?php
class EventMessageModel
{
public static function createEvent(string $eventType, array $data, array $metadata = []): AMQPMessage
{
$event = [
'event_id' => uniqid('evt_', true),
'event_type' => $eventType,
'data' => $data,
'metadata' => array_merge([
'timestamp' => time(),
'source' => gethostname(),
'version' => '1.0'
], $metadata)
];
return new AMQPMessage(
json_encode($event),
[
'content_type' => 'application/json',
'type' => $eventType,
'message_id' => $event['event_id'],
'timestamp' => time(),
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]
);
}
public static function parseEvent(AMQPMessage $message): array
{
$body = json_decode($message->getBody(), true);
return [
'event_id' => $body['event_id'] ?? $message->get('message_id'),
'event_type' => $body['event_type'] ?? $message->get('type'),
'data' => $body['data'] ?? [],
'metadata' => $body['metadata'] ?? [],
'timestamp' => $body['metadata']['timestamp'] ?? $message->get('timestamp')
];
}
}
$orderCreatedEvent = EventMessageModel::createEvent(
'order.created',
['order_id' => 1001, 'user_id' => 5001, 'amount' => 299.99],
['correlation_id' => 'req_123', 'trace_id' => 'trace_abc']
);2. 命令消息模型
php
<?php
class CommandMessageModel
{
public static function createCommand(
string $command,
array $payload,
?string $replyTo = null,
?string $correlationId = null
): AMQPMessage {
$cmd = [
'command_id' => uniqid('cmd_', true),
'command' => $command,
'payload' => $payload,
'created_at' => date('Y-m-d H:i:s')
];
$properties = [
'content_type' => 'application/json',
'type' => $command,
'message_id' => $cmd['command_id'],
'timestamp' => time(),
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
];
if ($replyTo) {
$properties['reply_to'] = $replyTo;
}
if ($correlationId) {
$properties['correlation_id'] = $correlationId;
}
return new AMQPMessage(json_encode($cmd), $properties);
}
public static function createResponse(
string $correlationId,
bool $success,
$result = null,
?string $error = null
): AMQPMessage {
$response = [
'success' => $success,
'result' => $result,
'error' => $error,
'timestamp' => time()
];
return new AMQPMessage(
json_encode($response),
[
'content_type' => 'application/json',
'correlation_id' => $correlationId,
'type' => 'response'
]
);
}
}
$createCommand = CommandMessageModel::createCommand(
'create_order',
['user_id' => 5001, 'items' => [['product_id' => 101, 'qty' => 2]]],
'reply_queue',
'req_123'
);3. 文档消息模型
php
<?php
class DocumentMessageModel
{
public static function createDocument(
string $documentType,
array $document,
array $options = []
): AMQPMessage {
$envelope = [
'document_id' => $document['id'] ?? uniqid('doc_', true),
'document_type' => $documentType,
'document' => $document,
'created_at' => date('Y-m-d H:i:s'),
'version' => $options['version'] ?? 1
];
$properties = [
'content_type' => 'application/json',
'type' => $documentType,
'message_id' => $envelope['document_id'],
'timestamp' => time(),
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
];
if (isset($options['priority'])) {
$properties['priority'] = $options['priority'];
}
if (isset($options['ttl'])) {
$properties['expiration'] = (string)$options['ttl'];
}
return new AMQPMessage(json_encode($envelope), $properties);
}
public static function parseDocument(AMQPMessage $message): array
{
$body = json_decode($message->getBody(), true);
return [
'document_id' => $body['document_id'],
'document_type' => $body['document_type'],
'document' => $body['document'],
'version' => $body['version'] ?? 1
];
}
}
$orderDocument = DocumentMessageModel::createDocument(
'order',
[
'id' => 'ORD-001',
'customer' => 'John Doe',
'items' => [['product' => 'Widget', 'price' => 29.99]]
],
['priority' => 5, 'version' => 2]
);常见问题与解决方案
1. 消息过大问题
问题原因:
- 消息体超过限制
- 内存不足
- 网络传输慢
解决方案:
php
<?php
class LargeMessageHandler
{
private $maxMessageSize = 16 * 1024 * 1024;
public function splitLargeMessage(array $data, int $chunkSize = 1024 * 1024): array
{
$encoded = json_encode($data);
if (strlen($encoded) <= $this->maxMessageSize) {
return [new AMQPMessage($encoded, [
'content_type' => 'application/json',
'headers' => new AMQPTable(['chunked' => false])
])];
}
$chunks = str_split($encoded, $chunkSize);
$messageId = uniqid('chunk_', true);
$totalChunks = count($chunks);
$messages = [];
foreach ($chunks as $index => $chunk) {
$messages[] = new AMQPMessage($chunk, [
'content_type' => 'application/octet-stream',
'message_id' => $messageId,
'headers' => new AMQPTable([
'chunked' => true,
'chunk_index' => $index,
'total_chunks' => $totalChunks
])
]);
}
return $messages;
}
public function reassembleChunks(array $messages): string
{
usort($messages, function ($a, $b) {
$headersA = $a->get('application_headers')->getNativeData();
$headersB = $b->get('application_headers')->getNativeData();
return $headersA['chunk_index'] <=> $headersB['chunk_index'];
});
$body = '';
foreach ($messages as $message) {
$body .= $message->getBody();
}
return $body;
}
}2. 消息格式兼容性
问题原因:
- 版本升级
- 不同服务使用不同格式
- 缺少版本控制
解决方案:
php
<?php
class VersionedMessageModel
{
private const CURRENT_VERSION = '2.0';
public static function create(array $data, string $version = null): AMQPMessage
{
$version = $version ?? self::CURRENT_VERSION;
$envelope = [
'version' => $version,
'data' => $data,
'created_at' => time()
];
return new AMQPMessage(
json_encode($envelope),
[
'content_type' => 'application/json',
'headers' => new AMQPTable(['version' => $version])
]
);
}
public static function parse(AMQPMessage $message): array
{
$body = json_decode($message->getBody(), true);
$version = $body['version'] ?? '1.0';
return self::migrate($body['data'], $version);
}
private static function migrate(array $data, string $fromVersion): array
{
$migrations = [
'1.0' => 'migrateV1ToV2',
'2.0' => null
];
$currentVersion = $fromVersion;
while ($currentVersion !== self::CURRENT_VERSION && isset($migrations[$currentVersion])) {
$migrationMethod = $migrations[$currentVersion];
if ($migrationMethod) {
$data = self::$migrationMethod($data);
}
$currentVersion = self::CURRENT_VERSION;
}
return $data;
}
private static function migrateV1ToV2(array $data): array
{
if (isset($data['order_id'])) {
$data['id'] = $data['order_id'];
unset($data['order_id']);
}
return $data;
}
}3. 消息追踪问题
问题原因:
- 缺少追踪信息
- 日志不完整
- 难以排查问题
解决方案:
php
<?php
class TracedMessageModel
{
public static function create(
array $data,
?string $traceId = null,
?string $spanId = null
): AMQPMessage {
$traceId = $traceId ?? self::generateTraceId();
$spanId = $spanId ?? self::generateSpanId();
return new AMQPMessage(
json_encode($data),
[
'content_type' => 'application/json',
'message_id' => uniqid('msg_', true),
'timestamp' => time(),
'headers' => new AMQPTable([
'trace_id' => $traceId,
'span_id' => $spanId,
'parent_span_id' => null,
'sampled' => true
])
]
);
}
public static function createChild(
AMQPMessage $parent,
array $data
): AMQPMessage {
$parentHeaders = $parent->get('application_headers');
$parentData = $parentHeaders ? $parentHeaders->getNativeData() : [];
$traceId = $parentData['trace_id'] ?? self::generateTraceId();
$parentSpanId = $parentData['span_id'] ?? null;
$childSpanId = self::generateSpanId();
return new AMQPMessage(
json_encode($data),
[
'content_type' => 'application/json',
'message_id' => uniqid('msg_', true),
'timestamp' => time(),
'headers' => new AMQPTable([
'trace_id' => $traceId,
'span_id' => $childSpanId,
'parent_span_id' => $parentSpanId,
'sampled' => $parentData['sampled'] ?? true
])
]
);
}
public static function extractTraceContext(AMQPMessage $message): array
{
$headers = $message->get('application_headers');
if (!$headers) {
return [];
}
$data = $headers->getNativeData();
return [
'trace_id' => $data['trace_id'] ?? null,
'span_id' => $data['span_id'] ?? null,
'parent_span_id' => $data['parent_span_id'] ?? null
];
}
private static function generateTraceId(): string
{
return bin2hex(random_bytes(16));
}
private static function generateSpanId(): string
{
return bin2hex(random_bytes(8));
}
}最佳实践建议
1. 消息设计原则
php
<?php
class MessageDesignPrinciples
{
public static function createWellDesignedMessage(array $data): AMQPMessage
{
return MessageBuilder::create()
->withBody($data)
->withContentType('application/json')
->persistent()
->withMessageId(self::generateMessageId())
->withTimestamp()
->withHeader('version', '1.0')
->withHeader('source', self::getSource())
->build();
}
private static function generateMessageId(): string
{
return sprintf(
'%s-%s-%s',
date('YmdHis'),
bin2hex(random_bytes(4)),
getmypid()
);
}
private static function getSource(): string
{
return gethostname() . ':' . getmypid();
}
}2. 消息验证
php
<?php
class MessageValidator
{
public static function validate(AMQPMessage $message, array $rules): array
{
$errors = [];
$body = json_decode($message->getBody(), true);
foreach ($rules as $field => $rule) {
if (!isset($body[$field]) && ($rule['required'] ?? false)) {
$errors[] = "Field '{$field}' is required";
continue;
}
if (isset($body[$field])) {
$value = $body[$field];
$type = $rule['type'] ?? 'string';
if (!self::validateType($value, $type)) {
$errors[] = "Field '{$field}' must be of type {$type}";
}
}
}
return $errors;
}
private static function validateType($value, string $type): bool
{
switch ($type) {
case 'string':
return is_string($value);
case 'integer':
return is_int($value);
case 'float':
return is_float($value) || is_int($value);
case 'boolean':
return is_bool($value);
case 'array':
return is_array($value);
default:
return true;
}
}
}3. 消息序列化
php
<?php
class MessageSerializer
{
public static function serialize(array $data, string $format = 'json'): string
{
switch ($format) {
case 'json':
return json_encode($data, JSON_UNESCAPED_UNICODE);
case 'msgpack':
return msgpack_pack($data);
case 'serialize':
return serialize($data);
default:
throw new InvalidArgumentException("Unsupported format: {$format}");
}
}
public static function deserialize(string $data, string $format = 'json')
{
switch ($format) {
case 'json':
return json_decode($data, true);
case 'msgpack':
return msgpack_unpack($data);
case 'serialize':
return unserialize($data);
default:
throw new InvalidArgumentException("Unsupported format: {$format}");
}
}
}