Skip to content

延迟队列

概述

延迟队列(Delay Queue)是一种特殊的消息队列,允许消息在指定的时间后才被消费者处理。这在定时任务、延迟处理、消息重试等场景中非常有用。

核心原理

RabbitMQ 本身不直接支持延迟队列,但可以通过以下方式实现:

  1. 消息 TTL + 死信队列: 设置消息的 TTL,消息过期后进入死信队列
  2. 延迟插件: 使用 rabbitmq_delayed_message_exchange 插件
mermaid
graph TD
    subgraph 发送阶段
        P[生产者] --> E[交换机]
        E --> Q[延迟队列<br/>x-message-ttl: 10000ms<br/>x-dead-letter-exchange: dlx]
    end
    
    subgraph 延迟阶段
        Q -->|等待10秒| DLQ[死信队列]
    end
    
    subgraph 消费阶段
        DLQ --> C[消费者]
    end
    
    style Q fill:#f9f,stroke:#333
    style DLQ fill:#90EE90,stroke:#333

实现方式对比

方式优点缺点精度
TTL + 死信队列无需插件精度较低(毫秒级)毫秒
延迟消息插件精度高,支持毫秒需要安装插件毫秒

PHP 代码示例

方式一:TTL + 死信队列实现延迟队列

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 1. 声明死信交换机
$dlxExchange = 'delay_dlx';
$channel->exchange_declare($dlxExchange, 'direct', false, true, false);

// 2. 声明死信队列
$dlqQueue = 'delay_dlq';
$channel->queue_declare($dlqQueue, false, true, false, false);
$channel->queue_bind($dlqQueue, $dlxExchange, 'delayed');

// 3. 声明延迟队列(带 TTL 和死信交换机配置)
$delayQueue = 'delay_queue';
$channel->queue_declare(
    $delayQueue,
    false,
    true,
    false,
    false,
    false,
    new AMQPTable([
        'x-message-ttl' => 5000,              // 5秒延迟
        'x-dead-letter-exchange' => $dlxExchange,
        'x-dead-letter-routing-key' => 'delayed'
    ])
);

echo "延迟队列已创建,延迟时间: 5秒\n";

// 发送延迟消息
$message = new AMQPMessage(
    json_encode([
        'task' => 'send_reminder',
        'user_id' => 123,
        'data' => '提醒内容'
    ]),
    [
        'content_type' => 'application/json',
        'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
    ]
);

$channel->basic_publish($message, '', $delayQueue);

echo "延迟消息已发送,5秒后将投递到死信队列\n";

$channel->close();
$connection->close();

消费死信队列消息

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$dlqQueue = 'delay_dlq';

echo "监听死信队列,等待延迟消息...\n";

$callback = function (AMQPMessage $msg) {
    $data = json_decode($msg->getBody(), true);
    
    echo "收到延迟消息:\n";
    echo "  任务: {$data['task']}\n";
    echo "  用户ID: {$data['user_id']}\n";
    echo "  原始发送时间: " . date('Y-m-d H:i:s') . "\n";
    echo "  现在时间: " . date('Y-m-d H:i:s') . "\n";
    echo "-------------------\n";
    
    // 处理延迟任务
    processDelayedTask($data);
    
    $msg->ack();
};

$channel->basic_consume($dlqQueue, '', false, false, false, false, $callback);

function processDelayedTask($data)
{
    echo "执行任务: {$data['task']}\n";
}

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

封装的延迟队列类

php
<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class DelayQueue
{
    private $channel;
    private $delayQueueName;
    private $dlxExchange;
    private $dlqQueueName;
    private $delayTime;
    
    public function __construct($channel, $delayTimeMs, $queueName = 'delay_queue')
    {
        $this->channel = $channel;
        $this->delayTime = $delayTimeMs;
        
        $this->dlxExchange = $queueName . '_dlx';
        $this->dlqQueueName = $queueName . '_dlq';
        $this->delayQueueName = $queueName;
        
        $this->setup();
    }
    
    private function setup()
    {
        // 声明死信交换机
        $this->channel->exchange_declare(
            $this->dlxExchange,
            'direct',
            false,
            true,
            false
        );
        
        // 声明死信队列
        $this->channel->queue_declare(
            $this->dlqQueueName,
            false,
            true,
            false,
            false
        );
        $this->channel->queue_bind(
            $this->dlqQueueName,
            $this->dlxExchange,
            'delayed'
        );
        
        // 声明延迟队列
        $this->channel->queue_declare(
            $this->delayQueueName,
            false,
            true,
            false,
            false,
            false,
            new AMQPTable([
                'x-message-ttl' => $this->delayTime,
                'x-dead-letter-exchange' => $this->dlxExchange,
                'x-dead-letter-routing-key' => 'delayed'
            ])
        );
    }
    
    public function publish($data, $delayMs = null)
    {
        $delayMs = $delayMs ?? $this->delayTime;
        
        // 动态设置延迟时间
        $queueName = $this->delayQueueName . '_' . $delayMs;
        
        $this->channel->queue_declare(
            $queueName,
            false,
            true,
            false,
            false,
            false,
            new AMQPTable([
                'x-message-ttl' => $delayMs,
                'x-dead-letter-exchange' => $this->dlxExchange,
                'x-dead-letter-routing-key' => 'delayed'
            ])
        );
        
        $message = new AMQPMessage(
            json_encode($data),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
            ]
        );
        
        $this->channel->basic_publish($message, '', $queueName);
    }
    
    public function consume(callable $handler)
    {
        $callback = function ($msg) use ($handler) {
            $data = json_decode($msg->getBody(), true);
            $handler($data);
            $msg->ack();
        };
        
        $this->channel->basic_consume(
            $this->dlqQueueName,
            '',
            false,
            false,
            false,
            false,
            $callback
        );
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
}

实际应用场景

1. 订单超时取消

php
<?php

class OrderTimeoutService
{
    private $delayQueue;
    
    public function __construct($channel)
    {
        $this->delayQueue = new DelayQueue($channel, 1800000, 'order_timeout'); // 30分钟
    }
    
    public function scheduleOrderTimeout($orderId, $timeoutMinutes = 30)
    {
        $delayMs = $timeoutMinutes * 60 * 1000;
        
        $this->delayQueue->publish([
            'action' => 'cancel_order',
            'order_id' => $orderId,
            'reason' => 'payment_timeout',
            'scheduled_at' => time()
        ], $delayMs);
        
        echo "订单 {$orderId} 已设置超时取消: {$timeoutMinutes}分钟后\n";
    }
    
    public function processTimeouts()
    {
        $this->delayQueue->consume(function ($data) {
            if ($data['action'] === 'cancel_order') {
                $this->cancelOrder($data['order_id'], $data['reason']);
            }
        });
    }
    
    private function cancelOrder($orderId, $reason)
    {
        echo "取消订单: {$orderId}, 原因: {$reason}\n";
        // 更新订单状态为已取消
    }
}

// 使用示例
$delayQueue = new DelayQueue($channel, 1800000, 'order_timeout');
$service = new OrderTimeoutService($channel);

// 下单时设置超时取消
$service->scheduleOrderTimeout('ORD-001', 30);

// 启动超时处理服务
$service->processTimeouts();

2. 消息重试机制

php
<?php

class RetryQueue
{
    private $channel;
    private $maxRetries;
    
    public function __construct($channel, $maxRetries = 3)
    {
        $this->channel = $channel;
        $this->maxRetries = $maxRetries;
        
        $this->setup();
    }
    
    private function setup()
    {
        // 主队列
        $this->channel->queue_declare('main_queue', false, true, false, false);
        
        // 重试队列(延迟)
        $this->channel->queue_declare(
            'retry_queue',
            false,
            true,
            false,
            false,
            false,
            new AMQPTable([
                'x-message-ttl' => 5000,  // 5秒后重试
                'x-dead-letter-exchange' => '',
                'x-dead-letter-routing-key' => 'main_queue'
            ])
        );
        
        // 死信队列(超过最大重试次数)
        $this->channel->queue_declare('failed_queue', false, true, false, false);
    }
    
    public function publish($data, $retryCount = 0)
    {
        $message = new AMQPMessage(
            json_encode($data),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'headers' => ['x-retry-count' => $retryCount]
            ]
        );
        
        $this->channel->basic_publish($message, '', 'main_queue');
    }
    
    public function consume(callable $handler)
    {
        $callback = function ($msg) {
            $data = json_decode($msg->getBody(), true);
            $retryCount = $msg->get('x-retry-count') ?? 0;
            
            try {
                $handler($data);
                $msg->ack();
            } catch (Exception $e) {
                if ($retryCount < $this->maxRetries) {
                    // 重新发布到重试队列
                    $this->republish($data, $retryCount + 1);
                    $msg->ack();
                } else {
                    // 发送到死信队列
                    $this->publishToFailed($data, $e->getMessage());
                    $msg->ack();
                }
            }
        };
        
        $this->channel->basic_consume('main_queue', '', false, false, false, false, $callback);
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
    
    private function republish($data, $retryCount)
    {
        $message = new AMQPMessage(
            json_encode($data),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'headers' => ['x-retry-count' => $retryCount]
            ]
        );
        
        $this->channel->basic_publish($message, '', 'retry_queue');
        echo "消息已重新投递,重试次数: {$retryCount}\n";
    }
    
    private function publishToFailed($data, $error)
    {
        $data['error'] = $error;
        $data['failed_at'] = time();
        
        $message = new AMQPMessage(
            json_encode($data),
            ['content_type' => 'application/json']
        );
        
        $this->channel->basic_publish($message, '', 'failed_queue');
        echo "消息已发送到死信队列\n";
    }
}

3. 定时提醒

php
<?php

class ReminderService
{
    private $channel;
    private $reminderDelays = [
        '1min' => 60000,
        '10min' => 600000,
        '1hour' => 3600000,
        '1day' => 86400000
    ];
    
    public function __construct($channel)
    {
        $this->channel = $channel;
        $this->setupQueues();
    }
    
    private function setupQueues()
    {
        $this->channel->exchange_declare('reminder_dlx', 'direct', false, true, false);
        
        foreach ($this->reminderDelays as $name => $delay) {
            $queueName = "reminder_{$name}";
            $this->channel->queue_declare($queueName, false, true, false, false, false,
                new AMQPTable([
                    'x-message-ttl' => $delay,
                    'x-dead-letter-exchange' => 'reminder_dlx',
                    'x-dead-letter-routing-key' => $name
                ])
            );
            
            $this->channel->queue_bind($queueName, 'reminder_dlx', $name);
        }
    }
    
    public function scheduleReminder($userId, $message, $delay)
    {
        $delayMs = $this->reminderDelays[$delay] ?? $delay;
        
        $message = new AMQPMessage(
            json_encode([
                'user_id' => $userId,
                'message' => $message,
                'scheduled_at' => time()
            ]),
            ['content_type' => 'application/json']
        );
        
        $queueName = "reminder_{$delay}";
        $this->channel->basic_publish($message, '', $queueName);
        
        echo "已设置提醒: {$delay}\n";
    }
    
    public function processReminders()
    {
        foreach ($this->reminderDelays as $name => $delay) {
            $queueName = "reminder_{$name}";
            $this->processQueue($queueName, $name);
        }
    }
    
    private function processQueue($queueName, $delayType)
    {
        echo "处理 {$delayType} 提醒队列\n";
        
        $callback = function ($msg) {
            $data = json_decode($msg->getBody(), true);
            
            echo "发送提醒给用户 {$data['user_id']}: {$data['message']}\n";
            // 发送实际的通知
            
            $msg->ack();
        };
        
        $this->channel->basic_consume($queueName, '', false, false, false, false, $callback);
    }
}

常见问题与解决方案

问题 1: 延迟精度不高

症状: 消息实际投递时间与预期有误差

原因: 消息在队列中按顺序投递,精度受限于队列性能

解决方案:

php
<?php

// 使用更短的 TTL 会提高精度
// 但会增加队列维护开销

// 对于高精度需求,考虑使用延迟消息插件
// 需要安装 rabbitmq_delayed_message_exchange
$channel->exchange_declare(
    'delayed_exchange',
    'x-delayed-message',
    false,
    true,
    false,
    false,
    new AMQPTable(['x-delayed-type' => 'direct'])
);

问题 2: 消息重复消费

症状: 消费者收到重复消息

原因: 消费者处理失败后消息重新入队

解决方案:

php
<?php

// 使用幂等性处理
$callback = function ($msg) {
    $data = json_decode($msg->getBody(), true);
    $messageId = $data['message_id'] ?? null;
    
    // 使用 Redis 检查是否已处理
    if ($redis->exists("processed:{$messageId}")) {
        $msg->ack();
        return;
    }
    
    // 处理消息
    processMessage($data);
    
    // 标记为已处理
    $redis->setex("processed:{$messageId}", 86400, '1');
    
    $msg->ack();
};

问题 3: 队列内存占用

症状: 延迟队列占用大量内存

原因: 大量消息在延迟队列中等待

解决方案:

php
<?php

// 设置队列最大长度
$channel->queue_declare(
    $queueName,
    false,
    true,
    false,
    false,
    false,
    new AMQPTable([
        'x-message-ttl' => 3600000,
        'x-max-length' => 10000,           // 最多10000条消息
        'x-overflow' => 'reject-publish'  // 超过则拒绝新消息
    ])
);

最佳实践建议

  1. 合理设置 TTL: 根据业务需求设置合理的延迟时间
  2. 监控队列积压: 监控延迟队列的消息数量
  3. 幂等性处理: 消费者需要处理重复消息
  4. 失败处理: 设置死信队列处理失败消息
  5. 延迟插件: 对于高精度需求,安装延迟消息插件

相关链接