Skip to content

消息属性

概述

消息属性(Message Properties)是 AMQP 消息的元数据,用于描述消息的特征和行为。正确使用消息属性可以实现消息持久化、优先级、过期时间等功能。

核心原理

消息属性分类

mermaid
graph TD
    subgraph 消息属性
        P[Properties] --> C[内容描述]
        P --> D[投递控制]
        P --> I[标识信息]
        P --> T[追踪信息]
        
        C --> C1[content_type]
        C --> C2[content_encoding]
        
        D --> D1[delivery_mode]
        D --> D2[priority]
        D --> D3[expiration]
        
        I --> I1[message_id]
        I --> I2[correlation_id]
        I --> I3[type]
        
        T --> T1[timestamp]
        T --> T2[user_id]
        T --> T3[app_id]
    end
    
    style P fill:#f9f,stroke:#333

属性详解

属性类型必填说明
content_typestring消息体 MIME 类型
content_encodingstring消息体编码方式
headerstable自定义头部
delivery_modeint持久化模式
priorityint优先级 (0-9)
correlation_idstring关联 ID
reply_tostring回复队列
expirationstring过期时间(毫秒)
message_idstring消息 ID
timestampint时间戳
typestring消息类型
user_idstring用户 ID
app_idstring应用 ID
cluster_idstring集群 ID

PHP 代码示例

内容描述属性

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$queueName = 'content-queue';
$channel->queue_declare($queueName, false, true, false, false);

// JSON 内容
$jsonMessage = new AMQPMessage(
    json_encode(['name' => '张三', 'age' => 25]),
    [
        'content_type' => 'application/json',
        'content_encoding' => 'UTF-8'
    ]
);

// XML 内容
$xmlMessage = new AMQPMessage(
    '<?xml version="1.0" encoding="UTF-8"?><user><name>张三</name></user>',
    [
        'content_type' => 'application/xml',
        'content_encoding' => 'UTF-8'
    ]
);

// 纯文本内容
$textMessage = new AMQPMessage(
    '这是一条纯文本消息',
    [
        'content_type' => 'text/plain',
        'content_encoding' => 'UTF-8'
    ]
);

// 二进制内容
$binaryData = file_get_contents('image.png');
$binaryMessage = new AMQPMessage(
    $binaryData,
    [
        'content_type' => 'image/png',
        'content_encoding' => 'binary'
    ]
);

$channel->basic_publish($jsonMessage, '', $queueName);
$channel->basic_publish($xmlMessage, '', $queueName);
$channel->basic_publish($textMessage, '', $queueName);
$channel->basic_publish($binaryMessage, '', $queueName);

echo "不同内容类型的消息已发送\n";

$channel->close();
$connection->close();

投递控制属性

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 声明优先级队列
$queueName = 'delivery-queue';
$channel->queue_declare(
    $queueName,
    false,
    true,
    false,
    false,
    false,
    new AMQPTable(['x-max-priority' => 10])
);

// 持久化消息
$persistentMessage = new AMQPMessage(
    json_encode(['type' => 'persistent']),
    [
        'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
    ]
);

// 非持久化消息
$transientMessage = new AMQPMessage(
    json_encode(['type' => 'transient']),
    [
        'delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT
    ]
);

// 高优先级消息
$highPriorityMessage = new AMQPMessage(
    json_encode(['type' => 'urgent']),
    [
        'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
        'priority' => 10
    ]
);

// 带过期时间的消息
$ttlMessage = new AMQPMessage(
    json_encode(['type' => 'ttl']),
    [
        'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
        'expiration' => '60000'  // 60秒后过期
    ]
);

$channel->basic_publish($persistentMessage, '', $queueName);
$channel->basic_publish($transientMessage, '', $queueName);
$channel->basic_publish($highPriorityMessage, '', $queueName);
$channel->basic_publish($ttlMessage, '', $queueName);

echo "不同投递控制的消息已发送\n";

$channel->close();
$connection->close();

标识信息属性

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$queueName = 'identity-queue';
$channel->queue_declare($queueName, false, true, false, false);

// 完整标识信息
$message = new AMQPMessage(
    json_encode([
        'event' => 'order_created',
        'order_id' => 'ORD-001'
    ]),
    [
        'content_type' => 'application/json',
        'message_id' => 'msg-' . uniqid(),
        'correlation_id' => 'req-' . uniqid(),
        'type' => 'order_event',
        'user_id' => 'guest',
        'app_id' => 'order-service',
        'timestamp' => time()
    ]
);

$channel->basic_publish($message, '', $queueName);

echo "带完整标识的消息已发送\n";
echo "Message ID: " . $message->get('message_id') . "\n";
echo "Correlation ID: " . $message->get('correlation_id') . "\n";

$channel->close();
$connection->close();

RPC 相关属性

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// RPC 请求消息
$rpcQueue = 'rpc_queue';
$channel->queue_declare($rpcQueue, false, false, false, false);

// 创建临时回调队列
list($callbackQueue,) = $channel->queue_declare('', false, false, true, true);

$correlationId = uniqid('rpc-');

$requestMessage = new AMQPMessage(
    json_encode(['method' => 'getUser', 'params' => ['id' => 123]]),
    [
        'content_type' => 'application/json',
        'correlation_id' => $correlationId,
        'reply_to' => $callbackQueue
    ]
);

$channel->basic_publish($requestMessage, '', $rpcQueue);

echo "RPC 请求已发送\n";
echo "Correlation ID: {$correlationId}\n";
echo "Reply To: {$callbackQueue}\n";

// 等待响应
$response = null;
$channel->basic_consume($callbackQueue, '', false, true, false, false, 
    function ($msg) use ($correlationId, &$response) {
        if ($msg->get('correlation_id') === $correlationId) {
            $response = json_decode($msg->getBody(), true);
        }
    }
);

while (!$response) {
    $channel->wait();
}

echo "收到响应: " . json_encode($response) . "\n";

$channel->close();
$connection->close();

消息属性读取器

php
<?php

use PhpAmqpLib\Message\AMQPMessage;

class MessagePropertyReader
{
    private $message;
    
    public function __construct(AMQPMessage $message)
    {
        $this->message = $message;
    }
    
    public function getContentType()
    {
        return $this->message->get('content_type');
    }
    
    public function getContentEncoding()
    {
        return $this->message->get('content_encoding');
    }
    
    public function isPersistent()
    {
        return $this->message->get('delivery_mode') === AMQPMessage::DELIVERY_MODE_PERSISTENT;
    }
    
    public function getPriority()
    {
        return $this->message->get('priority') ?? 0;
    }
    
    public function getMessageId()
    {
        return $this->message->get('message_id');
    }
    
    public function getCorrelationId()
    {
        return $this->message->get('correlation_id');
    }
    
    public function getReplyTo()
    {
        return $this->message->get('reply_to');
    }
    
    public function getExpiration()
    {
        return $this->message->get('expiration');
    }
    
    public function getExpirationSeconds()
    {
        $ms = $this->getExpiration();
        return $ms ? (int)($ms / 1000) : null;
    }
    
    public function getTimestamp()
    {
        return $this->message->get('timestamp');
    }
    
    public function getDateTime()
    {
        $timestamp = $this->getTimestamp();
        return $timestamp ? date('Y-m-d H:i:s', $timestamp) : null;
    }
    
    public function getType()
    {
        return $this->message->get('type');
    }
    
    public function getUserId()
    {
        return $this->message->get('user_id');
    }
    
    public function getAppId()
    {
        return $this->message->get('app_id');
    }
    
    public function getHeader($key, $default = null)
    {
        if (!$this->message->has('application_headers')) {
            return $default;
        }
        
        $headers = $this->message->get('application_headers')->getNativeData();
        return $headers[$key] ?? $default;
    }
    
    public function getAllHeaders()
    {
        if (!$this->message->has('application_headers')) {
            return [];
        }
        
        return $this->message->get('application_headers')->getNativeData();
    }
    
    public function getBody()
    {
        return $this->message->getBody();
    }
    
    public function getBodyAsJson($assoc = true)
    {
        return json_decode($this->message->getBody(), $assoc);
    }
    
    public function toArray()
    {
        return [
            'content_type' => $this->getContentType(),
            'content_encoding' => $this->getContentEncoding(),
            'delivery_mode' => $this->isPersistent() ? 'persistent' : 'transient',
            'priority' => $this->getPriority(),
            'message_id' => $this->getMessageId(),
            'correlation_id' => $this->getCorrelationId(),
            'reply_to' => $this->getReplyTo(),
            'expiration' => $this->getExpiration(),
            'timestamp' => $this->getTimestamp(),
            'datetime' => $this->getDateTime(),
            'type' => $this->getType(),
            'user_id' => $this->getUserId(),
            'app_id' => $this->getAppId(),
            'headers' => $this->getAllHeaders(),
            'body' => $this->getBody()
        ];
    }
}

实际应用场景

1. 消息追踪

php
<?php

class TraceableMessage
{
    public static function create($body, $traceContext = null)
    {
        $properties = [
            'content_type' => 'application/json',
            'message_id' => self::generateMessageId(),
            'timestamp' => time(),
            'app_id' => config('app.name')
        ];
        
        if ($traceContext) {
            $properties['correlation_id'] = $traceContext['correlation_id'] ?? null;
            $properties['headers'] = [
                'x-trace-id' => $traceContext['trace_id'] ?? null,
                'x-span-id' => $traceContext['span_id'] ?? null,
                'x-parent-span-id' => $traceContext['parent_span_id'] ?? null
            ];
        }
        
        return new AMQPMessage(json_encode($body), $properties);
    }
    
    private static function generateMessageId()
    {
        return sprintf(
            '%s-%s-%s',
            date('YmdHis'),
            gethostname(),
            uniqid()
        );
    }
}

2. 消息过期处理

php
<?php

class ExpiringMessage
{
    public static function create($body, $ttlSeconds)
    {
        return new AMQPMessage(
            json_encode($body),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'expiration' => (string)($ttlSeconds * 1000),
                'timestamp' => time()
            ]
        );
    }
    
    public static function createWithDeadline($body, $deadlineTimestamp)
    {
        $ttl = max(0, $deadlineTimestamp - time());
        return self::create($body, $ttl);
    }
}

// 使用示例
$message = ExpiringMessage::create(['task' => 'send_email'], 300);  // 5分钟后过期
$message = ExpiringMessage::createWithDeadline(['task' => 'process'], strtotime('2024-12-31 23:59:59'));

3. 消息分类

php
<?php

class TypedMessage
{
    const TYPE_ORDER_CREATED = 'order.created';
    const TYPE_ORDER_CANCELLED = 'order.cancelled';
    const TYPE_PAYMENT_COMPLETED = 'payment.completed';
    
    public static function create($type, $data)
    {
        return new AMQPMessage(
            json_encode([
                'type' => $type,
                'data' => $data,
                'timestamp' => time()
            ]),
            [
                'content_type' => 'application/json',
                'type' => $type,
                'message_id' => uniqid($type . '-'),
                'timestamp' => time()
            ]
        );
    }
    
    public static function createOrderEvent($eventType, $orderData)
    {
        return self::create('order.' . $eventType, $orderData);
    }
    
    public static function createPaymentEvent($eventType, $paymentData)
    {
        return self::create('payment.' . $eventType, $paymentData);
    }
}

常见问题与解决方案

问题 1: 属性丢失

症状: 发送的消息属性在消费端读取不到

原因: 属性名错误或未正确设置

解决方案:

php
<?php

// 确保使用正确的属性名
$message = new AMQPMessage($body, [
    'content_type' => 'application/json',  // 正确
    'contentType' => 'application/json',   // 错误,会被忽略
]);

// 读取时使用正确的属性名
$contentType = $msg->get('content_type');  // 正确

问题 2: 时间戳格式

症状: 时间戳显示不正确

原因: RabbitMQ 使用 Unix 时间戳(秒)

解决方案:

php
<?php

// 正确设置时间戳(秒级)
$message = new AMQPMessage($body, [
    'timestamp' => time()  // Unix 时间戳(秒)
]);

// 读取并格式化
$timestamp = $msg->get('timestamp');
$datetime = date('Y-m-d H:i:s', $timestamp);

问题 3: 过期时间单位

症状: 消息过期时间不符合预期

原因: expiration 属性单位是毫秒,需要字符串类型

解决方案:

php
<?php

// 正确设置过期时间(毫秒,字符串)
$message = new AMQPMessage($body, [
    'expiration' => '60000'  // 60秒,字符串类型
]);

// 错误示例
$message = new AMQPMessage($body, [
    'expiration' => 60000    // 错误,应该是字符串
]);

最佳实践建议

  1. 统一内容类型: 使用 JSON 格式,设置 content_type
  2. 设置消息 ID: 便于追踪和去重
  3. 添加时间戳: 便于问题排查
  4. 合理使用过期: 避免过期消息堆积
  5. 类型化消息: 使用 type 属性分类消息

相关链接