Skip to content

消息结构详解

概述

在 RabbitMQ 中,消息由多个部分组成,包括消息体、属性和头部。理解消息结构对于正确使用 RabbitMQ 至关重要。

核心原理

消息组成结构

mermaid
graph TD
    subgraph AMQP 消息结构
        M[Message] --> B[Body 消息体]
        M --> P[Properties 属性]
        M --> H[Headers 头部]
        
        P --> P1[content_type]
        P --> P2[content_encoding]
        P --> P3[delivery_mode]
        P --> P4[priority]
        P --> P5[correlation_id]
        P --> P6[reply_to]
        P --> P7[expiration]
        P --> P8[message_id]
        P --> P9[timestamp]
        P --> P10[type]
        P --> P11[user_id]
        P --> P12[app_id]
        P --> P13[cluster_id]
        
        H --> H1[自定义头部]
    end
    
    style M fill:#f9f,stroke:#333

消息属性详解

属性类型说明
content_typestring消息体的 MIME 类型
content_encodingstring消息体的编码方式
delivery_modeint1=非持久化, 2=持久化
priorityint消息优先级 (0-9)
correlation_idstring关联 ID,用于 RPC
reply_tostring回复队列名称
expirationstring消息过期时间(毫秒)
message_idstring消息唯一标识
timestampint消息创建时间戳
typestring消息类型名称
user_idstring用户 ID
app_idstring应用程序 ID
cluster_idstring集群 ID

PHP 代码示例

创建基本消息

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$queueName = 'test-queue';
$channel->queue_declare($queueName, false, true, false, false);

// 创建基本消息
$messageBody = 'Hello, RabbitMQ!';
$message = new AMQPMessage($messageBody);

$channel->basic_publish($message, '', $queueName);

echo "基本消息已发送\n";

$channel->close();
$connection->close();

创建带属性的消息

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$queueName = 'test-queue';
$channel->queue_declare($queueName, false, true, false, false);

// 创建带完整属性的消息
$messageBody = json_encode([
    'event' => 'user_registered',
    'user_id' => 12345,
    'email' => 'user@example.com'
]);

$message = new AMQPMessage($messageBody, [
    'content_type' => 'application/json',
    'content_encoding' => 'UTF-8',
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
    'priority' => 5,
    'message_id' => uniqid('msg-', true),
    'timestamp' => time(),
    'type' => 'user_event',
    'user_id' => 'guest',
    'app_id' => 'user-service',
    'correlation_id' => 'req-12345',
    'expiration' => '60000'  // 60秒后过期
]);

$channel->basic_publish($message, '', $queueName);

echo "带属性的消息已发送\n";
echo "消息 ID: " . $message->get('message_id') . "\n";

$channel->close();
$connection->close();

创建带自定义 Headers 的消息

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$queueName = 'test-queue';
$channel->queue_declare($queueName, false, true, false, false);

// 创建带自定义 headers 的消息
$messageBody = json_encode([
    'order_id' => 'ORD-001',
    'amount' => 299.99
]);

$message = new AMQPMessage($messageBody, [
    'content_type' => 'application/json',
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
    'application_headers' => new AMQPTable([
        'x-custom-header-1' => 'value1',
        'x-custom-header-2' => 12345,
        'x-custom-header-3' => true,
        'x-source-service' => 'order-service',
        'x-trace-id' => 'trace-abc123',
        'x-span-id' => 'span-def456'
    ])
]);

$channel->basic_publish($message, '', $queueName);

echo "带自定义 headers 的消息已发送\n";

$channel->close();
$connection->close();

消费并读取消息属性

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$queueName = 'test-queue';
$channel->queue_declare($queueName, false, true, false, false);

echo "等待消息...\n";

$callback = function (AMQPMessage $msg) {
    echo "========== 收到消息 ==========\n";
    
    // 读取消息体
    echo "消息体: " . $msg->getBody() . "\n";
    
    // 读取基本属性
    echo "Content-Type: " . ($msg->get('content_type') ?? 'N/A') . "\n";
    echo "Content-Encoding: " . ($msg->get('content_encoding') ?? 'N/A') . "\n";
    echo "Delivery-Mode: " . ($msg->get('delivery_mode') ?? 'N/A') . "\n";
    echo "Priority: " . ($msg->get('priority') ?? 'N/A') . "\n";
    echo "Message-ID: " . ($msg->get('message_id') ?? 'N/A') . "\n";
    echo "Timestamp: " . ($msg->get('timestamp') ?? 'N/A') . "\n";
    echo "Type: " . ($msg->get('type') ?? 'N/A') . "\n";
    echo "App-ID: " . ($msg->get('app_id') ?? 'N/A') . "\n";
    echo "Correlation-ID: " . ($msg->get('correlation_id') ?? 'N/A') . "\n";
    echo "Expiration: " . ($msg->get('expiration') ?? 'N/A') . "\n";
    
    // 读取自定义 headers
    if ($msg->has('application_headers')) {
        $headers = $msg->get('application_headers')->getNativeData();
        echo "Custom Headers:\n";
        foreach ($headers as $key => $value) {
            echo "  {$key}: " . json_encode($value) . "\n";
        }
    }
    
    // 读取路由信息
    echo "Exchange: " . $msg->getExchange() . "\n";
    echo "Routing Key: " . $msg->getRoutingKey() . "\n";
    echo "Delivery Tag: " . $msg->getDeliveryTag() . "\n";
    
    echo "============================\n\n";
    
    $msg->ack();
};

$channel->basic_consume($queueName, '', false, false, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

消息构建器类

php
<?php

use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class MessageBuilder
{
    private $body;
    private $properties = [];
    private $headers = [];
    
    public static function create()
    {
        return new self();
    }
    
    public function withBody($body)
    {
        $this->body = is_array($body) ? json_encode($body) : $body;
        return $this;
    }
    
    public function withJsonContentType()
    {
        $this->properties['content_type'] = 'application/json';
        return $this;
    }
    
    public function withPersistence()
    {
        $this->properties['delivery_mode'] = AMQPMessage::DELIVERY_MODE_PERSISTENT;
        return $this;
    }
    
    public function withPriority($priority)
    {
        $this->properties['priority'] = min(9, max(0, $priority));
        return $this;
    }
    
    public function withMessageId($id = null)
    {
        $this->properties['message_id'] = $id ?? uniqid('msg-', true);
        return $this;
    }
    
    public function withTimestamp($timestamp = null)
    {
        $this->properties['timestamp'] = $timestamp ?? time();
        return $this;
    }
    
    public function withType($type)
    {
        $this->properties['type'] = $type;
        return $this;
    }
    
    public function withAppId($appId)
    {
        $this->properties['app_id'] = $appId;
        return $this;
    }
    
    public function withCorrelationId($correlationId)
    {
        $this->properties['correlation_id'] = $correlationId;
        return $this;
    }
    
    public function withReplyTo($replyTo)
    {
        $this->properties['reply_to'] = $replyTo;
        return $this;
    }
    
    public function withExpiration($ms)
    {
        $this->properties['expiration'] = (string) $ms;
        return $this;
    }
    
    public function withHeader($key, $value)
    {
        $this->headers[$key] = $value;
        return $this;
    }
    
    public function withHeaders(array $headers)
    {
        $this->headers = array_merge($this->headers, $headers);
        return $this;
    }
    
    public function withTraceContext($traceId, $spanId)
    {
        $this->headers['x-trace-id'] = $traceId;
        $this->headers['x-span-id'] = $spanId;
        return $this;
    }
    
    public function build()
    {
        if (!empty($this->headers)) {
            $this->properties['application_headers'] = new AMQPTable($this->headers);
        }
        
        return new AMQPMessage($this->body, $this->properties);
    }
}

// 使用示例
$message = MessageBuilder::create()
    ->withBody(['order_id' => 'ORD-001', 'amount' => 299.99])
    ->withJsonContentType()
    ->withPersistence()
    ->withPriority(5)
    ->withMessageId()
    ->withTimestamp()
    ->withType('order_event')
    ->withAppId('order-service')
    ->withHeader('x-source', 'api-gateway')
    ->withTraceContext('trace-123', 'span-456')
    ->withExpiration(60000)
    ->build();

实际应用场景

1. RPC 消息

php
<?php

class RpcMessageFactory
{
    public static function createRequest($method, $params, $replyTo, $correlationId = null)
    {
        return MessageBuilder::create()
            ->withBody([
                'method' => $method,
                'params' => $params
            ])
            ->withJsonContentType()
            ->withPersistence()
            ->withCorrelationId($correlationId ?? uniqid('rpc-'))
            ->withReplyTo($replyTo)
            ->withTimestamp()
            ->build();
    }
    
    public static function createResponse($result, $correlationId, $success = true)
    {
        return MessageBuilder::create()
            ->withBody([
                'success' => $success,
                'result' => $result
            ])
            ->withJsonContentType()
            ->withCorrelationId($correlationId)
            ->withTimestamp()
            ->build();
    }
}

// RPC 客户端
$callbackQueue = 'rpc_reply_' . uniqid();
list($queueName,) = $channel->queue_declare($callbackQueue, false, false, true, true);

$request = RpcMessageFactory::createRequest(
    'getUserInfo',
    ['user_id' => 123],
    $callbackQueue
);

$channel->basic_publish($request, '', 'rpc_queue');

2. 事件消息

php
<?php

class EventMessageFactory
{
    public static function create($eventType, $data, $metadata = [])
    {
        return MessageBuilder::create()
            ->withBody([
                'event_id' => uniqid('evt-'),
                'event_type' => $eventType,
                'data' => $data,
                'metadata' => array_merge([
                    'timestamp' => time(),
                    'source' => gethostname()
                ], $metadata)
            ])
            ->withJsonContentType()
            ->withPersistence()
            ->withMessageId()
            ->withType($eventType)
            ->withTimestamp()
            ->build();
    }
}

// 发送事件
$event = EventMessageFactory::create(
    'user.registered',
    ['user_id' => 123, 'email' => 'user@example.com'],
    ['ip' => '192.168.1.1', 'user_agent' => 'Mozilla/5.0']
);

$channel->basic_publish($event, 'events', 'user.registered');

3. 分布式追踪

php
<?php

class TracedMessageFactory
{
    public static function create($body, $traceContext = null)
    {
        $builder = MessageBuilder::create()
            ->withBody($body)
            ->withJsonContentType()
            ->withPersistence()
            ->withTimestamp();
        
        if ($traceContext) {
            $builder->withTraceContext(
                $traceContext['trace_id'],
                $traceContext['span_id']
            );
        }
        
        return $builder->build();
    }
    
    public static function extractTraceContext(AMQPMessage $msg)
    {
        if (!$msg->has('application_headers')) {
            return null;
        }
        
        $headers = $msg->get('application_headers')->getNativeData();
        
        return [
            'trace_id' => $headers['x-trace-id'] ?? null,
            'span_id' => $headers['x-span-id'] ?? null
        ];
    }
}

常见问题与解决方案

问题 1: 消息体编码问题

症状: 中文或特殊字符显示乱码

解决方案:

php
<?php

// 正确设置编码
$message = new AMQPMessage(
    json_encode($data, JSON_UNESCAPED_UNICODE),
    [
        'content_type' => 'application/json',
        'content_encoding' => 'UTF-8'
    ]
);

问题 2: Headers 读取失败

症状: 无法读取自定义 headers

解决方案:

php
<?php

// 正确读取 headers
if ($msg->has('application_headers')) {
    $headers = $msg->get('application_headers')->getNativeData();
    $customValue = $headers['x-custom-key'] ?? 'default';
}

问题 3: 消息大小限制

症状: 大消息发送失败

解决方案:

php
<?php

// 检查消息大小
$maxSize = 128 * 1024 * 1024;  // 128MB (默认限制)
$messageBody = json_encode($data);

if (strlen($messageBody) > $maxSize) {
    // 分片或压缩处理
    $compressed = gzencode($messageBody);
    $message = new AMQPMessage($compressed, [
        'content_type' => 'application/json',
        'content_encoding' => 'gzip'
    ]);
}

最佳实践建议

  1. 统一消息格式: 使用 JSON 格式,便于跨语言处理
  2. 设置消息 ID: 便于追踪和去重
  3. 添加时间戳: 便于问题排查
  4. 合理使用 Headers: 存放元数据,不放在消息体中
  5. 消息压缩: 大消息使用压缩

相关链接