Skip to content

消息 TTL

概述

消息 TTL(Time To Live)是消息的生存时间,超过指定时间后消息会被自动删除或转移到死信队列。TTL 可以在队列级别或消息级别设置。

核心原理

TTL 设置方式

mermaid
graph TD
    subgraph TTL 设置方式
        Q[队列级别 TTL] --> |x-message-ttl| E1[队列中所有消息统一过期]
        M[消息级别 TTL] --> |expiration| E2[每条消息独立过期]
        B[两者同时设置] --> |取较小值| E3[实际 TTL]
    end
    
    subgraph 过期后处理
        E1 --> D[删除消息]
        E2 --> D
        E3 --> D
        D --> |配置了死信| DLX[死信队列]
        D --> |未配置| R[直接丢弃]
    end
    
    style E1 fill:#87CEEB
    style E2 fill:#90EE90
    style E3 fill:#DDA0DD

TTL 对比

设置方式参数范围优先级
队列 TTLx-message-ttl队列中所有消息
消息 TTLexpiration单条消息
策略 TTLpolicy队列级别最低

过期时间计算

  • 队列 TTL:消息入队时开始计时
  • 消息 TTL:消息发送时开始计时
  • 同时设置:取较小值

PHP 代码示例

队列级别 TTL

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$queueName = 'ttl-queue';

// 声明带 TTL 的队列
$channel->queue_declare(
    $queueName,
    false,
    true,
    false,
    false,
    false,
    new AMQPTable([
        'x-message-ttl' => 60000  // 60秒后过期
    ])
);

echo "队列 TTL 已设置: 60秒\n";

// 发送消息(无需设置 expiration)
$message = new AMQPMessage(
    json_encode(['data' => 'test']),
    ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);

$channel->basic_publish($message, '', $queueName);

echo "消息已发送到 TTL 队列\n";

$channel->close();
$connection->close();

消息级别 TTL

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$queueName = 'message-ttl-queue';
$channel->queue_declare($queueName, false, true, false, false);

// 发送不同 TTL 的消息
$messages = [
    ['body' => '10秒后过期', 'ttl' => 10000],
    ['body' => '30秒后过期', 'ttl' => 30000],
    ['body' => '60秒后过期', 'ttl' => 60000],
];

foreach ($messages as $msgData) {
    $message = new AMQPMessage(
        json_encode(['message' => $msgData['body']]),
        [
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
            'expiration' => (string)$msgData['ttl']  // 毫秒,字符串类型
        ]
    );
    
    $channel->basic_publish($message, '', $queueName);
    echo "消息已发送: {$msgData['body']}\n";
}

$channel->close();
$connection->close();

TTL + 死信队列

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 声明死信交换机
$dlxExchange = 'dlx';
$channel->exchange_declare($dlxExchange, 'direct', false, true, false);

// 声明死信队列
$dlqQueue = 'expired_messages';
$channel->queue_declare($dlqQueue, false, true, false, false);
$channel->queue_bind($dlqQueue, $dlxExchange, 'expired');

// 声明带 TTL 和死信的队列
$queueName = 'ttl_with_dlq';
$channel->queue_declare(
    $queueName,
    false,
    true,
    false,
    false,
    false,
    new AMQPTable([
        'x-message-ttl' => 30000,              // 30秒 TTL
        'x-dead-letter-exchange' => $dlxExchange,
        'x-dead-letter-routing-key' => 'expired'
    ])
);

echo "TTL 队列已创建,过期消息将进入死信队列\n";

// 发送消息
$message = new AMQPMessage(
    json_encode(['data' => 'will expire']),
    ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);

$channel->basic_publish($message, '', $queueName);

echo "消息已发送,30秒后将进入死信队列\n";

$channel->close();
$connection->close();

TTL 工具类

php
<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class TtlMessageHelper
{
    private $channel;
    
    public function __construct($channel)
    {
        $this->channel = $channel;
    }
    
    public function declareQueueWithTtl($queueName, $ttlMs, array $options = [])
    {
        $arguments = ['x-message-ttl' => $ttlMs];
        
        if (isset($options['dlx'])) {
            $arguments['x-dead-letter-exchange'] = $options['dlx'];
        }
        
        if (isset($options['dlk'])) {
            $arguments['x-dead-letter-routing-key'] = $options['dlk'];
        }
        
        if (isset($options['maxLength'])) {
            $arguments['x-max-length'] = $options['maxLength'];
        }
        
        $this->channel->queue_declare(
            $queueName,
            false,
            true,
            false,
            false,
            false,
            new AMQPTable($arguments)
        );
        
        return $queueName;
    }
    
    public function sendWithTtl($queueName, $data, $ttlMs = null)
    {
        $properties = [
            'content_type' => 'application/json',
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
        ];
        
        if ($ttlMs !== null) {
            $properties['expiration'] = (string)$ttlMs;
        }
        
        $message = new AMQPMessage(json_encode($data), $properties);
        $this->channel->basic_publish($message, '', $queueName);
    }
    
    public function sendWithDeadline($queueName, $data, $deadlineTimestamp)
    {
        $ttlMs = max(0, ($deadlineTimestamp - time()) * 1000);
        $this->sendWithTtl($queueName, $data, $ttlMs);
    }
    
    public function sendWithTimeout($queueName, $data, $timeoutSeconds)
    {
        $this->sendWithTtl($queueName, $data, $timeoutSeconds * 1000);
    }
}

实际应用场景

1. 订单超时取消

php
<?php

class OrderTimeoutService
{
    private $ttlHelper;
    private $queueName = 'order_timeout';
    
    public function __construct($channel)
    {
        $this->ttlHelper = new TtlMessageHelper($channel);
        $this->setup();
    }
    
    private function setup()
    {
        // 声明死信队列
        $this->ttlHelper->declareQueueWithTtl('order_timeout_dlq', 0);
        
        // 声明超时队列(30分钟)
        $this->ttlHelper->declareQueueWithTtl($this->queueName, 1800000, [
            'dlx' => '',
            'dlk' => 'order_timeout_dlq'
        ]);
    }
    
    public function scheduleTimeout($orderId, $timeoutMinutes = 30)
    {
        $ttlMs = $timeoutMinutes * 60 * 1000;
        
        $this->ttlHelper->sendWithTtl($this->queueName, [
            'order_id' => $orderId,
            'scheduled_at' => time(),
            'timeout_minutes' => $timeoutMinutes
        ], $ttlMs);
        
        echo "订单 {$orderId} 已设置 {$timeoutMinutes} 分钟超时\n";
    }
    
    public function processTimeouts()
    {
        $callback = function ($msg) {
            $data = json_decode($msg->getBody(), true);
            
            // 检查订单是否已支付
            if (!$this->isOrderPaid($data['order_id'])) {
                $this->cancelOrder($data['order_id']);
                echo "订单 {$data['order_id']} 已超时取消\n";
            }
            
            $msg->ack();
        };
        
        $this->channel->basic_consume('order_timeout_dlq', '', false, false, false, false, $callback);
    }
    
    private function isOrderPaid($orderId)
    {
        // 检查订单支付状态
        return false;
    }
    
    private function cancelOrder($orderId)
    {
        // 取消订单逻辑
    }
}

2. 验证码过期

php
<?php

class VerificationCodeService
{
    private $ttlHelper;
    private $queueName = 'verification_codes';
    
    public function __construct($channel)
    {
        $this->ttlHelper = new TtlMessageHelper($channel);
        $this->ttlHelper->declareQueueWithTtl($queueName, 300000);  // 5分钟
    }
    
    public function sendCode($phone, $code)
    {
        $this->ttlHelper->sendWithTtl($this->queueName, [
            'phone' => $phone,
            'code' => $code,
            'created_at' => time()
        ]);
        
        echo "验证码已发送到 {$phone}\n";
    }
    
    public function verifyCode($phone, $code)
    {
        // 验证逻辑
        // 由于设置了 TTL,过期的验证码会自动删除
    }
}

3. 缓存预热

php
<?php

class CacheWarmupService
{
    private $ttlHelper;
    private $queueName = 'cache_warmup';
    
    public function __construct($channel)
    {
        $this->ttlHelper = new TtlMessageHelper($channel);
        $this->ttlHelper->declareQueueWithTtl($queueName, 60000);  // 1分钟
    }
    
    public function scheduleWarmup($cacheKey, $data, $delaySeconds = 0)
    {
        $ttlMs = max(0, $delaySeconds * 1000);
        
        $this->ttlHelper->sendWithTtl($this->queueName, [
            'cache_key' => $cacheKey,
            'data' => $data,
            'scheduled_at' => time()
        ], $ttlMs > 0 ? $ttlMs : null);
    }
    
    public function processWarmup()
    {
        $callback = function ($msg) {
            $data = json_decode($msg->getBody(), true);
            $this->warmCache($data['cache_key'], $data['data']);
            $msg->ack();
        };
        
        $this->channel->basic_consume($this->queueName, '', false, false, false, false, $callback);
    }
    
    private function warmCache($key, $data)
    {
        // 预热缓存逻辑
        echo "缓存预热: {$key}\n";
    }
}

常见问题与解决方案

问题 1: 消息未过期

症状: 消息超过 TTL 时间仍未被删除

原因: 消息在队列头部被阻塞,TTL 只在消息到达队列头部时检查

解决方案:

php
<?php

// 使用死信队列确保过期消息被处理
$channel->queue_declare(
    $queueName,
    false,
    true,
    false,
    false,
    false,
    new AMQPTable([
        'x-message-ttl' => 60000,
        'x-dead-letter-exchange' => 'dlx',  // 配置死信交换机
        'x-dead-letter-routing-key' => 'expired'
    ])
);

问题 2: TTL 单位错误

症状: 消息过期时间不符合预期

原因: TTL 单位是毫秒,容易混淆

解决方案:

php
<?php

// 正确的单位转换
$seconds = 60;
$ttlMs = $seconds * 1000;  // 转换为毫秒

// 队列 TTL
new AMQPTable(['x-message-ttl' => $ttlMs]);

// 消息 TTL(必须是字符串)
$message = new AMQPMessage($body, [
    'expiration' => (string)$ttlMs
]);

问题 3: 消息 TTL 和队列 TTL 冲突

症状: 不确定实际 TTL 是多少

解决方案:

php
<?php

// 同时设置时,取较小值
$queueTtl = 60000;   // 队列 TTL: 60秒
$messageTtl = 30000; // 消息 TTL: 30秒

// 实际 TTL = min(queueTtl, messageTtl) = 30秒

// 建议:只设置一种 TTL,避免混淆
// 方案一:只设置队列 TTL(统一过期时间)
// 方案二:只设置消息 TTL(灵活过期时间)

最佳实践建议

  1. 统一 TTL 策略: 选择队列 TTL 或消息 TTL,避免混用
  2. 配置死信队列: 过期消息转移到死信队列便于处理
  3. 合理设置时间: 根据业务需求设置合理的 TTL
  4. 监控过期消息: 监控死信队列的消息数量
  5. 单位注意: TTL 单位是毫秒

相关链接