Appearance
消息 TTL
概述
消息 TTL(Time To Live)是消息的生存时间,超过指定时间后消息会被自动删除或转移到死信队列。TTL 可以在队列级别或消息级别设置。
核心原理
TTL 设置方式
mermaid
graph TD
subgraph TTL 设置方式
Q[队列级别 TTL] --> |x-message-ttl| E1[队列中所有消息统一过期]
M[消息级别 TTL] --> |expiration| E2[每条消息独立过期]
B[两者同时设置] --> |取较小值| E3[实际 TTL]
end
subgraph 过期后处理
E1 --> D[删除消息]
E2 --> D
E3 --> D
D --> |配置了死信| DLX[死信队列]
D --> |未配置| R[直接丢弃]
end
style E1 fill:#87CEEB
style E2 fill:#90EE90
style E3 fill:#DDA0DDTTL 对比
| 设置方式 | 参数 | 范围 | 优先级 |
|---|---|---|---|
| 队列 TTL | x-message-ttl | 队列中所有消息 | 低 |
| 消息 TTL | expiration | 单条消息 | 高 |
| 策略 TTL | policy | 队列级别 | 最低 |
过期时间计算
- 队列 TTL:消息入队时开始计时
- 消息 TTL:消息发送时开始计时
- 同时设置:取较小值
PHP 代码示例
队列级别 TTL
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queueName = 'ttl-queue';
// 声明带 TTL 的队列
$channel->queue_declare(
$queueName,
false,
true,
false,
false,
false,
new AMQPTable([
'x-message-ttl' => 60000 // 60秒后过期
])
);
echo "队列 TTL 已设置: 60秒\n";
// 发送消息(无需设置 expiration)
$message = new AMQPMessage(
json_encode(['data' => 'test']),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$channel->basic_publish($message, '', $queueName);
echo "消息已发送到 TTL 队列\n";
$channel->close();
$connection->close();消息级别 TTL
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queueName = 'message-ttl-queue';
$channel->queue_declare($queueName, false, true, false, false);
// 发送不同 TTL 的消息
$messages = [
['body' => '10秒后过期', 'ttl' => 10000],
['body' => '30秒后过期', 'ttl' => 30000],
['body' => '60秒后过期', 'ttl' => 60000],
];
foreach ($messages as $msgData) {
$message = new AMQPMessage(
json_encode(['message' => $msgData['body']]),
[
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'expiration' => (string)$msgData['ttl'] // 毫秒,字符串类型
]
);
$channel->basic_publish($message, '', $queueName);
echo "消息已发送: {$msgData['body']}\n";
}
$channel->close();
$connection->close();TTL + 死信队列
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 声明死信交换机
$dlxExchange = 'dlx';
$channel->exchange_declare($dlxExchange, 'direct', false, true, false);
// 声明死信队列
$dlqQueue = 'expired_messages';
$channel->queue_declare($dlqQueue, false, true, false, false);
$channel->queue_bind($dlqQueue, $dlxExchange, 'expired');
// 声明带 TTL 和死信的队列
$queueName = 'ttl_with_dlq';
$channel->queue_declare(
$queueName,
false,
true,
false,
false,
false,
new AMQPTable([
'x-message-ttl' => 30000, // 30秒 TTL
'x-dead-letter-exchange' => $dlxExchange,
'x-dead-letter-routing-key' => 'expired'
])
);
echo "TTL 队列已创建,过期消息将进入死信队列\n";
// 发送消息
$message = new AMQPMessage(
json_encode(['data' => 'will expire']),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$channel->basic_publish($message, '', $queueName);
echo "消息已发送,30秒后将进入死信队列\n";
$channel->close();
$connection->close();TTL 工具类
php
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class TtlMessageHelper
{
private $channel;
public function __construct($channel)
{
$this->channel = $channel;
}
public function declareQueueWithTtl($queueName, $ttlMs, array $options = [])
{
$arguments = ['x-message-ttl' => $ttlMs];
if (isset($options['dlx'])) {
$arguments['x-dead-letter-exchange'] = $options['dlx'];
}
if (isset($options['dlk'])) {
$arguments['x-dead-letter-routing-key'] = $options['dlk'];
}
if (isset($options['maxLength'])) {
$arguments['x-max-length'] = $options['maxLength'];
}
$this->channel->queue_declare(
$queueName,
false,
true,
false,
false,
false,
new AMQPTable($arguments)
);
return $queueName;
}
public function sendWithTtl($queueName, $data, $ttlMs = null)
{
$properties = [
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
];
if ($ttlMs !== null) {
$properties['expiration'] = (string)$ttlMs;
}
$message = new AMQPMessage(json_encode($data), $properties);
$this->channel->basic_publish($message, '', $queueName);
}
public function sendWithDeadline($queueName, $data, $deadlineTimestamp)
{
$ttlMs = max(0, ($deadlineTimestamp - time()) * 1000);
$this->sendWithTtl($queueName, $data, $ttlMs);
}
public function sendWithTimeout($queueName, $data, $timeoutSeconds)
{
$this->sendWithTtl($queueName, $data, $timeoutSeconds * 1000);
}
}实际应用场景
1. 订单超时取消
php
<?php
class OrderTimeoutService
{
private $ttlHelper;
private $queueName = 'order_timeout';
public function __construct($channel)
{
$this->ttlHelper = new TtlMessageHelper($channel);
$this->setup();
}
private function setup()
{
// 声明死信队列
$this->ttlHelper->declareQueueWithTtl('order_timeout_dlq', 0);
// 声明超时队列(30分钟)
$this->ttlHelper->declareQueueWithTtl($this->queueName, 1800000, [
'dlx' => '',
'dlk' => 'order_timeout_dlq'
]);
}
public function scheduleTimeout($orderId, $timeoutMinutes = 30)
{
$ttlMs = $timeoutMinutes * 60 * 1000;
$this->ttlHelper->sendWithTtl($this->queueName, [
'order_id' => $orderId,
'scheduled_at' => time(),
'timeout_minutes' => $timeoutMinutes
], $ttlMs);
echo "订单 {$orderId} 已设置 {$timeoutMinutes} 分钟超时\n";
}
public function processTimeouts()
{
$callback = function ($msg) {
$data = json_decode($msg->getBody(), true);
// 检查订单是否已支付
if (!$this->isOrderPaid($data['order_id'])) {
$this->cancelOrder($data['order_id']);
echo "订单 {$data['order_id']} 已超时取消\n";
}
$msg->ack();
};
$this->channel->basic_consume('order_timeout_dlq', '', false, false, false, false, $callback);
}
private function isOrderPaid($orderId)
{
// 检查订单支付状态
return false;
}
private function cancelOrder($orderId)
{
// 取消订单逻辑
}
}2. 验证码过期
php
<?php
class VerificationCodeService
{
private $ttlHelper;
private $queueName = 'verification_codes';
public function __construct($channel)
{
$this->ttlHelper = new TtlMessageHelper($channel);
$this->ttlHelper->declareQueueWithTtl($queueName, 300000); // 5分钟
}
public function sendCode($phone, $code)
{
$this->ttlHelper->sendWithTtl($this->queueName, [
'phone' => $phone,
'code' => $code,
'created_at' => time()
]);
echo "验证码已发送到 {$phone}\n";
}
public function verifyCode($phone, $code)
{
// 验证逻辑
// 由于设置了 TTL,过期的验证码会自动删除
}
}3. 缓存预热
php
<?php
class CacheWarmupService
{
private $ttlHelper;
private $queueName = 'cache_warmup';
public function __construct($channel)
{
$this->ttlHelper = new TtlMessageHelper($channel);
$this->ttlHelper->declareQueueWithTtl($queueName, 60000); // 1分钟
}
public function scheduleWarmup($cacheKey, $data, $delaySeconds = 0)
{
$ttlMs = max(0, $delaySeconds * 1000);
$this->ttlHelper->sendWithTtl($this->queueName, [
'cache_key' => $cacheKey,
'data' => $data,
'scheduled_at' => time()
], $ttlMs > 0 ? $ttlMs : null);
}
public function processWarmup()
{
$callback = function ($msg) {
$data = json_decode($msg->getBody(), true);
$this->warmCache($data['cache_key'], $data['data']);
$msg->ack();
};
$this->channel->basic_consume($this->queueName, '', false, false, false, false, $callback);
}
private function warmCache($key, $data)
{
// 预热缓存逻辑
echo "缓存预热: {$key}\n";
}
}常见问题与解决方案
问题 1: 消息未过期
症状: 消息超过 TTL 时间仍未被删除
原因: 消息在队列头部被阻塞,TTL 只在消息到达队列头部时检查
解决方案:
php
<?php
// 使用死信队列确保过期消息被处理
$channel->queue_declare(
$queueName,
false,
true,
false,
false,
false,
new AMQPTable([
'x-message-ttl' => 60000,
'x-dead-letter-exchange' => 'dlx', // 配置死信交换机
'x-dead-letter-routing-key' => 'expired'
])
);问题 2: TTL 单位错误
症状: 消息过期时间不符合预期
原因: TTL 单位是毫秒,容易混淆
解决方案:
php
<?php
// 正确的单位转换
$seconds = 60;
$ttlMs = $seconds * 1000; // 转换为毫秒
// 队列 TTL
new AMQPTable(['x-message-ttl' => $ttlMs]);
// 消息 TTL(必须是字符串)
$message = new AMQPMessage($body, [
'expiration' => (string)$ttlMs
]);问题 3: 消息 TTL 和队列 TTL 冲突
症状: 不确定实际 TTL 是多少
解决方案:
php
<?php
// 同时设置时,取较小值
$queueTtl = 60000; // 队列 TTL: 60秒
$messageTtl = 30000; // 消息 TTL: 30秒
// 实际 TTL = min(queueTtl, messageTtl) = 30秒
// 建议:只设置一种 TTL,避免混淆
// 方案一:只设置队列 TTL(统一过期时间)
// 方案二:只设置消息 TTL(灵活过期时间)最佳实践建议
- 统一 TTL 策略: 选择队列 TTL 或消息 TTL,避免混用
- 配置死信队列: 过期消息转移到死信队列便于处理
- 合理设置时间: 根据业务需求设置合理的 TTL
- 监控过期消息: 监控死信队列的消息数量
- 单位注意: TTL 单位是毫秒
