Appearance
消息结构详解
概述
在 RabbitMQ 中,消息由多个部分组成,包括消息体、属性和头部。理解消息结构对于正确使用 RabbitMQ 至关重要。
核心原理
消息组成结构
mermaid
graph TD
subgraph AMQP 消息结构
M[Message] --> B[Body 消息体]
M --> P[Properties 属性]
M --> H[Headers 头部]
P --> P1[content_type]
P --> P2[content_encoding]
P --> P3[delivery_mode]
P --> P4[priority]
P --> P5[correlation_id]
P --> P6[reply_to]
P --> P7[expiration]
P --> P8[message_id]
P --> P9[timestamp]
P --> P10[type]
P --> P11[user_id]
P --> P12[app_id]
P --> P13[cluster_id]
H --> H1[自定义头部]
end
style M fill:#f9f,stroke:#333消息属性详解
| 属性 | 类型 | 说明 |
|---|---|---|
| content_type | string | 消息体的 MIME 类型 |
| content_encoding | string | 消息体的编码方式 |
| delivery_mode | int | 1=非持久化, 2=持久化 |
| priority | int | 消息优先级 (0-9) |
| correlation_id | string | 关联 ID,用于 RPC |
| reply_to | string | 回复队列名称 |
| expiration | string | 消息过期时间(毫秒) |
| message_id | string | 消息唯一标识 |
| timestamp | int | 消息创建时间戳 |
| type | string | 消息类型名称 |
| user_id | string | 用户 ID |
| app_id | string | 应用程序 ID |
| cluster_id | string | 集群 ID |
PHP 代码示例
创建基本消息
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queueName = 'test-queue';
$channel->queue_declare($queueName, false, true, false, false);
// 创建基本消息
$messageBody = 'Hello, RabbitMQ!';
$message = new AMQPMessage($messageBody);
$channel->basic_publish($message, '', $queueName);
echo "基本消息已发送\n";
$channel->close();
$connection->close();创建带属性的消息
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queueName = 'test-queue';
$channel->queue_declare($queueName, false, true, false, false);
// 创建带完整属性的消息
$messageBody = json_encode([
'event' => 'user_registered',
'user_id' => 12345,
'email' => 'user@example.com'
]);
$message = new AMQPMessage($messageBody, [
'content_type' => 'application/json',
'content_encoding' => 'UTF-8',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'priority' => 5,
'message_id' => uniqid('msg-', true),
'timestamp' => time(),
'type' => 'user_event',
'user_id' => 'guest',
'app_id' => 'user-service',
'correlation_id' => 'req-12345',
'expiration' => '60000' // 60秒后过期
]);
$channel->basic_publish($message, '', $queueName);
echo "带属性的消息已发送\n";
echo "消息 ID: " . $message->get('message_id') . "\n";
$channel->close();
$connection->close();创建带自定义 Headers 的消息
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queueName = 'test-queue';
$channel->queue_declare($queueName, false, true, false, false);
// 创建带自定义 headers 的消息
$messageBody = json_encode([
'order_id' => 'ORD-001',
'amount' => 299.99
]);
$message = new AMQPMessage($messageBody, [
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'application_headers' => new AMQPTable([
'x-custom-header-1' => 'value1',
'x-custom-header-2' => 12345,
'x-custom-header-3' => true,
'x-source-service' => 'order-service',
'x-trace-id' => 'trace-abc123',
'x-span-id' => 'span-def456'
])
]);
$channel->basic_publish($message, '', $queueName);
echo "带自定义 headers 的消息已发送\n";
$channel->close();
$connection->close();消费并读取消息属性
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queueName = 'test-queue';
$channel->queue_declare($queueName, false, true, false, false);
echo "等待消息...\n";
$callback = function (AMQPMessage $msg) {
echo "========== 收到消息 ==========\n";
// 读取消息体
echo "消息体: " . $msg->getBody() . "\n";
// 读取基本属性
echo "Content-Type: " . ($msg->get('content_type') ?? 'N/A') . "\n";
echo "Content-Encoding: " . ($msg->get('content_encoding') ?? 'N/A') . "\n";
echo "Delivery-Mode: " . ($msg->get('delivery_mode') ?? 'N/A') . "\n";
echo "Priority: " . ($msg->get('priority') ?? 'N/A') . "\n";
echo "Message-ID: " . ($msg->get('message_id') ?? 'N/A') . "\n";
echo "Timestamp: " . ($msg->get('timestamp') ?? 'N/A') . "\n";
echo "Type: " . ($msg->get('type') ?? 'N/A') . "\n";
echo "App-ID: " . ($msg->get('app_id') ?? 'N/A') . "\n";
echo "Correlation-ID: " . ($msg->get('correlation_id') ?? 'N/A') . "\n";
echo "Expiration: " . ($msg->get('expiration') ?? 'N/A') . "\n";
// 读取自定义 headers
if ($msg->has('application_headers')) {
$headers = $msg->get('application_headers')->getNativeData();
echo "Custom Headers:\n";
foreach ($headers as $key => $value) {
echo " {$key}: " . json_encode($value) . "\n";
}
}
// 读取路由信息
echo "Exchange: " . $msg->getExchange() . "\n";
echo "Routing Key: " . $msg->getRoutingKey() . "\n";
echo "Delivery Tag: " . $msg->getDeliveryTag() . "\n";
echo "============================\n\n";
$msg->ack();
};
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();消息构建器类
php
<?php
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class MessageBuilder
{
private $body;
private $properties = [];
private $headers = [];
public static function create()
{
return new self();
}
public function withBody($body)
{
$this->body = is_array($body) ? json_encode($body) : $body;
return $this;
}
public function withJsonContentType()
{
$this->properties['content_type'] = 'application/json';
return $this;
}
public function withPersistence()
{
$this->properties['delivery_mode'] = AMQPMessage::DELIVERY_MODE_PERSISTENT;
return $this;
}
public function withPriority($priority)
{
$this->properties['priority'] = min(9, max(0, $priority));
return $this;
}
public function withMessageId($id = null)
{
$this->properties['message_id'] = $id ?? uniqid('msg-', true);
return $this;
}
public function withTimestamp($timestamp = null)
{
$this->properties['timestamp'] = $timestamp ?? time();
return $this;
}
public function withType($type)
{
$this->properties['type'] = $type;
return $this;
}
public function withAppId($appId)
{
$this->properties['app_id'] = $appId;
return $this;
}
public function withCorrelationId($correlationId)
{
$this->properties['correlation_id'] = $correlationId;
return $this;
}
public function withReplyTo($replyTo)
{
$this->properties['reply_to'] = $replyTo;
return $this;
}
public function withExpiration($ms)
{
$this->properties['expiration'] = (string) $ms;
return $this;
}
public function withHeader($key, $value)
{
$this->headers[$key] = $value;
return $this;
}
public function withHeaders(array $headers)
{
$this->headers = array_merge($this->headers, $headers);
return $this;
}
public function withTraceContext($traceId, $spanId)
{
$this->headers['x-trace-id'] = $traceId;
$this->headers['x-span-id'] = $spanId;
return $this;
}
public function build()
{
if (!empty($this->headers)) {
$this->properties['application_headers'] = new AMQPTable($this->headers);
}
return new AMQPMessage($this->body, $this->properties);
}
}
// 使用示例
$message = MessageBuilder::create()
->withBody(['order_id' => 'ORD-001', 'amount' => 299.99])
->withJsonContentType()
->withPersistence()
->withPriority(5)
->withMessageId()
->withTimestamp()
->withType('order_event')
->withAppId('order-service')
->withHeader('x-source', 'api-gateway')
->withTraceContext('trace-123', 'span-456')
->withExpiration(60000)
->build();实际应用场景
1. RPC 消息
php
<?php
class RpcMessageFactory
{
public static function createRequest($method, $params, $replyTo, $correlationId = null)
{
return MessageBuilder::create()
->withBody([
'method' => $method,
'params' => $params
])
->withJsonContentType()
->withPersistence()
->withCorrelationId($correlationId ?? uniqid('rpc-'))
->withReplyTo($replyTo)
->withTimestamp()
->build();
}
public static function createResponse($result, $correlationId, $success = true)
{
return MessageBuilder::create()
->withBody([
'success' => $success,
'result' => $result
])
->withJsonContentType()
->withCorrelationId($correlationId)
->withTimestamp()
->build();
}
}
// RPC 客户端
$callbackQueue = 'rpc_reply_' . uniqid();
list($queueName,) = $channel->queue_declare($callbackQueue, false, false, true, true);
$request = RpcMessageFactory::createRequest(
'getUserInfo',
['user_id' => 123],
$callbackQueue
);
$channel->basic_publish($request, '', 'rpc_queue');2. 事件消息
php
<?php
class EventMessageFactory
{
public static function create($eventType, $data, $metadata = [])
{
return MessageBuilder::create()
->withBody([
'event_id' => uniqid('evt-'),
'event_type' => $eventType,
'data' => $data,
'metadata' => array_merge([
'timestamp' => time(),
'source' => gethostname()
], $metadata)
])
->withJsonContentType()
->withPersistence()
->withMessageId()
->withType($eventType)
->withTimestamp()
->build();
}
}
// 发送事件
$event = EventMessageFactory::create(
'user.registered',
['user_id' => 123, 'email' => 'user@example.com'],
['ip' => '192.168.1.1', 'user_agent' => 'Mozilla/5.0']
);
$channel->basic_publish($event, 'events', 'user.registered');3. 分布式追踪
php
<?php
class TracedMessageFactory
{
public static function create($body, $traceContext = null)
{
$builder = MessageBuilder::create()
->withBody($body)
->withJsonContentType()
->withPersistence()
->withTimestamp();
if ($traceContext) {
$builder->withTraceContext(
$traceContext['trace_id'],
$traceContext['span_id']
);
}
return $builder->build();
}
public static function extractTraceContext(AMQPMessage $msg)
{
if (!$msg->has('application_headers')) {
return null;
}
$headers = $msg->get('application_headers')->getNativeData();
return [
'trace_id' => $headers['x-trace-id'] ?? null,
'span_id' => $headers['x-span-id'] ?? null
];
}
}常见问题与解决方案
问题 1: 消息体编码问题
症状: 中文或特殊字符显示乱码
解决方案:
php
<?php
// 正确设置编码
$message = new AMQPMessage(
json_encode($data, JSON_UNESCAPED_UNICODE),
[
'content_type' => 'application/json',
'content_encoding' => 'UTF-8'
]
);问题 2: Headers 读取失败
症状: 无法读取自定义 headers
解决方案:
php
<?php
// 正确读取 headers
if ($msg->has('application_headers')) {
$headers = $msg->get('application_headers')->getNativeData();
$customValue = $headers['x-custom-key'] ?? 'default';
}问题 3: 消息大小限制
症状: 大消息发送失败
解决方案:
php
<?php
// 检查消息大小
$maxSize = 128 * 1024 * 1024; // 128MB (默认限制)
$messageBody = json_encode($data);
if (strlen($messageBody) > $maxSize) {
// 分片或压缩处理
$compressed = gzencode($messageBody);
$message = new AMQPMessage($compressed, [
'content_type' => 'application/json',
'content_encoding' => 'gzip'
]);
}最佳实践建议
- 统一消息格式: 使用 JSON 格式,便于跨语言处理
- 设置消息 ID: 便于追踪和去重
- 添加时间戳: 便于问题排查
- 合理使用 Headers: 存放元数据,不放在消息体中
- 消息压缩: 大消息使用压缩
