Appearance
消息属性
概述
消息属性(Message Properties)是 AMQP 消息的元数据,用于描述消息的特征和行为。正确使用消息属性可以实现消息持久化、优先级、过期时间等功能。
核心原理
消息属性分类
mermaid
graph TD
subgraph 消息属性
P[Properties] --> C[内容描述]
P --> D[投递控制]
P --> I[标识信息]
P --> T[追踪信息]
C --> C1[content_type]
C --> C2[content_encoding]
D --> D1[delivery_mode]
D --> D2[priority]
D --> D3[expiration]
I --> I1[message_id]
I --> I2[correlation_id]
I --> I3[type]
T --> T1[timestamp]
T --> T2[user_id]
T --> T3[app_id]
end
style P fill:#f9f,stroke:#333属性详解
| 属性 | 类型 | 必填 | 说明 |
|---|---|---|---|
| content_type | string | 否 | 消息体 MIME 类型 |
| content_encoding | string | 否 | 消息体编码方式 |
| headers | table | 否 | 自定义头部 |
| delivery_mode | int | 否 | 持久化模式 |
| priority | int | 否 | 优先级 (0-9) |
| correlation_id | string | 否 | 关联 ID |
| reply_to | string | 否 | 回复队列 |
| expiration | string | 否 | 过期时间(毫秒) |
| message_id | string | 否 | 消息 ID |
| timestamp | int | 否 | 时间戳 |
| type | string | 否 | 消息类型 |
| user_id | string | 否 | 用户 ID |
| app_id | string | 否 | 应用 ID |
| cluster_id | string | 否 | 集群 ID |
PHP 代码示例
内容描述属性
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queueName = 'content-queue';
$channel->queue_declare($queueName, false, true, false, false);
// JSON 内容
$jsonMessage = new AMQPMessage(
json_encode(['name' => '张三', 'age' => 25]),
[
'content_type' => 'application/json',
'content_encoding' => 'UTF-8'
]
);
// XML 内容
$xmlMessage = new AMQPMessage(
'<?xml version="1.0" encoding="UTF-8"?><user><name>张三</name></user>',
[
'content_type' => 'application/xml',
'content_encoding' => 'UTF-8'
]
);
// 纯文本内容
$textMessage = new AMQPMessage(
'这是一条纯文本消息',
[
'content_type' => 'text/plain',
'content_encoding' => 'UTF-8'
]
);
// 二进制内容
$binaryData = file_get_contents('image.png');
$binaryMessage = new AMQPMessage(
$binaryData,
[
'content_type' => 'image/png',
'content_encoding' => 'binary'
]
);
$channel->basic_publish($jsonMessage, '', $queueName);
$channel->basic_publish($xmlMessage, '', $queueName);
$channel->basic_publish($textMessage, '', $queueName);
$channel->basic_publish($binaryMessage, '', $queueName);
echo "不同内容类型的消息已发送\n";
$channel->close();
$connection->close();投递控制属性
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 声明优先级队列
$queueName = 'delivery-queue';
$channel->queue_declare(
$queueName,
false,
true,
false,
false,
false,
new AMQPTable(['x-max-priority' => 10])
);
// 持久化消息
$persistentMessage = new AMQPMessage(
json_encode(['type' => 'persistent']),
[
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]
);
// 非持久化消息
$transientMessage = new AMQPMessage(
json_encode(['type' => 'transient']),
[
'delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT
]
);
// 高优先级消息
$highPriorityMessage = new AMQPMessage(
json_encode(['type' => 'urgent']),
[
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'priority' => 10
]
);
// 带过期时间的消息
$ttlMessage = new AMQPMessage(
json_encode(['type' => 'ttl']),
[
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'expiration' => '60000' // 60秒后过期
]
);
$channel->basic_publish($persistentMessage, '', $queueName);
$channel->basic_publish($transientMessage, '', $queueName);
$channel->basic_publish($highPriorityMessage, '', $queueName);
$channel->basic_publish($ttlMessage, '', $queueName);
echo "不同投递控制的消息已发送\n";
$channel->close();
$connection->close();标识信息属性
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queueName = 'identity-queue';
$channel->queue_declare($queueName, false, true, false, false);
// 完整标识信息
$message = new AMQPMessage(
json_encode([
'event' => 'order_created',
'order_id' => 'ORD-001'
]),
[
'content_type' => 'application/json',
'message_id' => 'msg-' . uniqid(),
'correlation_id' => 'req-' . uniqid(),
'type' => 'order_event',
'user_id' => 'guest',
'app_id' => 'order-service',
'timestamp' => time()
]
);
$channel->basic_publish($message, '', $queueName);
echo "带完整标识的消息已发送\n";
echo "Message ID: " . $message->get('message_id') . "\n";
echo "Correlation ID: " . $message->get('correlation_id') . "\n";
$channel->close();
$connection->close();RPC 相关属性
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// RPC 请求消息
$rpcQueue = 'rpc_queue';
$channel->queue_declare($rpcQueue, false, false, false, false);
// 创建临时回调队列
list($callbackQueue,) = $channel->queue_declare('', false, false, true, true);
$correlationId = uniqid('rpc-');
$requestMessage = new AMQPMessage(
json_encode(['method' => 'getUser', 'params' => ['id' => 123]]),
[
'content_type' => 'application/json',
'correlation_id' => $correlationId,
'reply_to' => $callbackQueue
]
);
$channel->basic_publish($requestMessage, '', $rpcQueue);
echo "RPC 请求已发送\n";
echo "Correlation ID: {$correlationId}\n";
echo "Reply To: {$callbackQueue}\n";
// 等待响应
$response = null;
$channel->basic_consume($callbackQueue, '', false, true, false, false,
function ($msg) use ($correlationId, &$response) {
if ($msg->get('correlation_id') === $correlationId) {
$response = json_decode($msg->getBody(), true);
}
}
);
while (!$response) {
$channel->wait();
}
echo "收到响应: " . json_encode($response) . "\n";
$channel->close();
$connection->close();消息属性读取器
php
<?php
use PhpAmqpLib\Message\AMQPMessage;
class MessagePropertyReader
{
private $message;
public function __construct(AMQPMessage $message)
{
$this->message = $message;
}
public function getContentType()
{
return $this->message->get('content_type');
}
public function getContentEncoding()
{
return $this->message->get('content_encoding');
}
public function isPersistent()
{
return $this->message->get('delivery_mode') === AMQPMessage::DELIVERY_MODE_PERSISTENT;
}
public function getPriority()
{
return $this->message->get('priority') ?? 0;
}
public function getMessageId()
{
return $this->message->get('message_id');
}
public function getCorrelationId()
{
return $this->message->get('correlation_id');
}
public function getReplyTo()
{
return $this->message->get('reply_to');
}
public function getExpiration()
{
return $this->message->get('expiration');
}
public function getExpirationSeconds()
{
$ms = $this->getExpiration();
return $ms ? (int)($ms / 1000) : null;
}
public function getTimestamp()
{
return $this->message->get('timestamp');
}
public function getDateTime()
{
$timestamp = $this->getTimestamp();
return $timestamp ? date('Y-m-d H:i:s', $timestamp) : null;
}
public function getType()
{
return $this->message->get('type');
}
public function getUserId()
{
return $this->message->get('user_id');
}
public function getAppId()
{
return $this->message->get('app_id');
}
public function getHeader($key, $default = null)
{
if (!$this->message->has('application_headers')) {
return $default;
}
$headers = $this->message->get('application_headers')->getNativeData();
return $headers[$key] ?? $default;
}
public function getAllHeaders()
{
if (!$this->message->has('application_headers')) {
return [];
}
return $this->message->get('application_headers')->getNativeData();
}
public function getBody()
{
return $this->message->getBody();
}
public function getBodyAsJson($assoc = true)
{
return json_decode($this->message->getBody(), $assoc);
}
public function toArray()
{
return [
'content_type' => $this->getContentType(),
'content_encoding' => $this->getContentEncoding(),
'delivery_mode' => $this->isPersistent() ? 'persistent' : 'transient',
'priority' => $this->getPriority(),
'message_id' => $this->getMessageId(),
'correlation_id' => $this->getCorrelationId(),
'reply_to' => $this->getReplyTo(),
'expiration' => $this->getExpiration(),
'timestamp' => $this->getTimestamp(),
'datetime' => $this->getDateTime(),
'type' => $this->getType(),
'user_id' => $this->getUserId(),
'app_id' => $this->getAppId(),
'headers' => $this->getAllHeaders(),
'body' => $this->getBody()
];
}
}实际应用场景
1. 消息追踪
php
<?php
class TraceableMessage
{
public static function create($body, $traceContext = null)
{
$properties = [
'content_type' => 'application/json',
'message_id' => self::generateMessageId(),
'timestamp' => time(),
'app_id' => config('app.name')
];
if ($traceContext) {
$properties['correlation_id'] = $traceContext['correlation_id'] ?? null;
$properties['headers'] = [
'x-trace-id' => $traceContext['trace_id'] ?? null,
'x-span-id' => $traceContext['span_id'] ?? null,
'x-parent-span-id' => $traceContext['parent_span_id'] ?? null
];
}
return new AMQPMessage(json_encode($body), $properties);
}
private static function generateMessageId()
{
return sprintf(
'%s-%s-%s',
date('YmdHis'),
gethostname(),
uniqid()
);
}
}2. 消息过期处理
php
<?php
class ExpiringMessage
{
public static function create($body, $ttlSeconds)
{
return new AMQPMessage(
json_encode($body),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'expiration' => (string)($ttlSeconds * 1000),
'timestamp' => time()
]
);
}
public static function createWithDeadline($body, $deadlineTimestamp)
{
$ttl = max(0, $deadlineTimestamp - time());
return self::create($body, $ttl);
}
}
// 使用示例
$message = ExpiringMessage::create(['task' => 'send_email'], 300); // 5分钟后过期
$message = ExpiringMessage::createWithDeadline(['task' => 'process'], strtotime('2024-12-31 23:59:59'));3. 消息分类
php
<?php
class TypedMessage
{
const TYPE_ORDER_CREATED = 'order.created';
const TYPE_ORDER_CANCELLED = 'order.cancelled';
const TYPE_PAYMENT_COMPLETED = 'payment.completed';
public static function create($type, $data)
{
return new AMQPMessage(
json_encode([
'type' => $type,
'data' => $data,
'timestamp' => time()
]),
[
'content_type' => 'application/json',
'type' => $type,
'message_id' => uniqid($type . '-'),
'timestamp' => time()
]
);
}
public static function createOrderEvent($eventType, $orderData)
{
return self::create('order.' . $eventType, $orderData);
}
public static function createPaymentEvent($eventType, $paymentData)
{
return self::create('payment.' . $eventType, $paymentData);
}
}常见问题与解决方案
问题 1: 属性丢失
症状: 发送的消息属性在消费端读取不到
原因: 属性名错误或未正确设置
解决方案:
php
<?php
// 确保使用正确的属性名
$message = new AMQPMessage($body, [
'content_type' => 'application/json', // 正确
'contentType' => 'application/json', // 错误,会被忽略
]);
// 读取时使用正确的属性名
$contentType = $msg->get('content_type'); // 正确问题 2: 时间戳格式
症状: 时间戳显示不正确
原因: RabbitMQ 使用 Unix 时间戳(秒)
解决方案:
php
<?php
// 正确设置时间戳(秒级)
$message = new AMQPMessage($body, [
'timestamp' => time() // Unix 时间戳(秒)
]);
// 读取并格式化
$timestamp = $msg->get('timestamp');
$datetime = date('Y-m-d H:i:s', $timestamp);问题 3: 过期时间单位
症状: 消息过期时间不符合预期
原因: expiration 属性单位是毫秒,需要字符串类型
解决方案:
php
<?php
// 正确设置过期时间(毫秒,字符串)
$message = new AMQPMessage($body, [
'expiration' => '60000' // 60秒,字符串类型
]);
// 错误示例
$message = new AMQPMessage($body, [
'expiration' => 60000 // 错误,应该是字符串
]);最佳实践建议
- 统一内容类型: 使用 JSON 格式,设置
content_type - 设置消息 ID: 便于追踪和去重
- 添加时间戳: 便于问题排查
- 合理使用过期: 避免过期消息堆积
- 类型化消息: 使用
type属性分类消息
