Appearance
延迟队列
概述
延迟队列(Delay Queue)是一种特殊的消息队列,允许消息在指定的时间后才被消费者处理。这在定时任务、延迟处理、消息重试等场景中非常有用。
核心原理
RabbitMQ 本身不直接支持延迟队列,但可以通过以下方式实现:
- 消息 TTL + 死信队列: 设置消息的 TTL,消息过期后进入死信队列
- 延迟插件: 使用 rabbitmq_delayed_message_exchange 插件
mermaid
graph TD
subgraph 发送阶段
P[生产者] --> E[交换机]
E --> Q[延迟队列<br/>x-message-ttl: 10000ms<br/>x-dead-letter-exchange: dlx]
end
subgraph 延迟阶段
Q -->|等待10秒| DLQ[死信队列]
end
subgraph 消费阶段
DLQ --> C[消费者]
end
style Q fill:#f9f,stroke:#333
style DLQ fill:#90EE90,stroke:#333实现方式对比
| 方式 | 优点 | 缺点 | 精度 |
|---|---|---|---|
| TTL + 死信队列 | 无需插件 | 精度较低(毫秒级) | 毫秒 |
| 延迟消息插件 | 精度高,支持毫秒 | 需要安装插件 | 毫秒 |
PHP 代码示例
方式一:TTL + 死信队列实现延迟队列
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 1. 声明死信交换机
$dlxExchange = 'delay_dlx';
$channel->exchange_declare($dlxExchange, 'direct', false, true, false);
// 2. 声明死信队列
$dlqQueue = 'delay_dlq';
$channel->queue_declare($dlqQueue, false, true, false, false);
$channel->queue_bind($dlqQueue, $dlxExchange, 'delayed');
// 3. 声明延迟队列(带 TTL 和死信交换机配置)
$delayQueue = 'delay_queue';
$channel->queue_declare(
$delayQueue,
false,
true,
false,
false,
false,
new AMQPTable([
'x-message-ttl' => 5000, // 5秒延迟
'x-dead-letter-exchange' => $dlxExchange,
'x-dead-letter-routing-key' => 'delayed'
])
);
echo "延迟队列已创建,延迟时间: 5秒\n";
// 发送延迟消息
$message = new AMQPMessage(
json_encode([
'task' => 'send_reminder',
'user_id' => 123,
'data' => '提醒内容'
]),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]
);
$channel->basic_publish($message, '', $delayQueue);
echo "延迟消息已发送,5秒后将投递到死信队列\n";
$channel->close();
$connection->close();消费死信队列消息
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$dlqQueue = 'delay_dlq';
echo "监听死信队列,等待延迟消息...\n";
$callback = function (AMQPMessage $msg) {
$data = json_decode($msg->getBody(), true);
echo "收到延迟消息:\n";
echo " 任务: {$data['task']}\n";
echo " 用户ID: {$data['user_id']}\n";
echo " 原始发送时间: " . date('Y-m-d H:i:s') . "\n";
echo " 现在时间: " . date('Y-m-d H:i:s') . "\n";
echo "-------------------\n";
// 处理延迟任务
processDelayedTask($data);
$msg->ack();
};
$channel->basic_consume($dlqQueue, '', false, false, false, false, $callback);
function processDelayedTask($data)
{
echo "执行任务: {$data['task']}\n";
}
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();封装的延迟队列类
php
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class DelayQueue
{
private $channel;
private $delayQueueName;
private $dlxExchange;
private $dlqQueueName;
private $delayTime;
public function __construct($channel, $delayTimeMs, $queueName = 'delay_queue')
{
$this->channel = $channel;
$this->delayTime = $delayTimeMs;
$this->dlxExchange = $queueName . '_dlx';
$this->dlqQueueName = $queueName . '_dlq';
$this->delayQueueName = $queueName;
$this->setup();
}
private function setup()
{
// 声明死信交换机
$this->channel->exchange_declare(
$this->dlxExchange,
'direct',
false,
true,
false
);
// 声明死信队列
$this->channel->queue_declare(
$this->dlqQueueName,
false,
true,
false,
false
);
$this->channel->queue_bind(
$this->dlqQueueName,
$this->dlxExchange,
'delayed'
);
// 声明延迟队列
$this->channel->queue_declare(
$this->delayQueueName,
false,
true,
false,
false,
false,
new AMQPTable([
'x-message-ttl' => $this->delayTime,
'x-dead-letter-exchange' => $this->dlxExchange,
'x-dead-letter-routing-key' => 'delayed'
])
);
}
public function publish($data, $delayMs = null)
{
$delayMs = $delayMs ?? $this->delayTime;
// 动态设置延迟时间
$queueName = $this->delayQueueName . '_' . $delayMs;
$this->channel->queue_declare(
$queueName,
false,
true,
false,
false,
false,
new AMQPTable([
'x-message-ttl' => $delayMs,
'x-dead-letter-exchange' => $this->dlxExchange,
'x-dead-letter-routing-key' => 'delayed'
])
);
$message = new AMQPMessage(
json_encode($data),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]
);
$this->channel->basic_publish($message, '', $queueName);
}
public function consume(callable $handler)
{
$callback = function ($msg) use ($handler) {
$data = json_decode($msg->getBody(), true);
$handler($data);
$msg->ack();
};
$this->channel->basic_consume(
$this->dlqQueueName,
'',
false,
false,
false,
false,
$callback
);
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
}实际应用场景
1. 订单超时取消
php
<?php
class OrderTimeoutService
{
private $delayQueue;
public function __construct($channel)
{
$this->delayQueue = new DelayQueue($channel, 1800000, 'order_timeout'); // 30分钟
}
public function scheduleOrderTimeout($orderId, $timeoutMinutes = 30)
{
$delayMs = $timeoutMinutes * 60 * 1000;
$this->delayQueue->publish([
'action' => 'cancel_order',
'order_id' => $orderId,
'reason' => 'payment_timeout',
'scheduled_at' => time()
], $delayMs);
echo "订单 {$orderId} 已设置超时取消: {$timeoutMinutes}分钟后\n";
}
public function processTimeouts()
{
$this->delayQueue->consume(function ($data) {
if ($data['action'] === 'cancel_order') {
$this->cancelOrder($data['order_id'], $data['reason']);
}
});
}
private function cancelOrder($orderId, $reason)
{
echo "取消订单: {$orderId}, 原因: {$reason}\n";
// 更新订单状态为已取消
}
}
// 使用示例
$delayQueue = new DelayQueue($channel, 1800000, 'order_timeout');
$service = new OrderTimeoutService($channel);
// 下单时设置超时取消
$service->scheduleOrderTimeout('ORD-001', 30);
// 启动超时处理服务
$service->processTimeouts();2. 消息重试机制
php
<?php
class RetryQueue
{
private $channel;
private $maxRetries;
public function __construct($channel, $maxRetries = 3)
{
$this->channel = $channel;
$this->maxRetries = $maxRetries;
$this->setup();
}
private function setup()
{
// 主队列
$this->channel->queue_declare('main_queue', false, true, false, false);
// 重试队列(延迟)
$this->channel->queue_declare(
'retry_queue',
false,
true,
false,
false,
false,
new AMQPTable([
'x-message-ttl' => 5000, // 5秒后重试
'x-dead-letter-exchange' => '',
'x-dead-letter-routing-key' => 'main_queue'
])
);
// 死信队列(超过最大重试次数)
$this->channel->queue_declare('failed_queue', false, true, false, false);
}
public function publish($data, $retryCount = 0)
{
$message = new AMQPMessage(
json_encode($data),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'headers' => ['x-retry-count' => $retryCount]
]
);
$this->channel->basic_publish($message, '', 'main_queue');
}
public function consume(callable $handler)
{
$callback = function ($msg) {
$data = json_decode($msg->getBody(), true);
$retryCount = $msg->get('x-retry-count') ?? 0;
try {
$handler($data);
$msg->ack();
} catch (Exception $e) {
if ($retryCount < $this->maxRetries) {
// 重新发布到重试队列
$this->republish($data, $retryCount + 1);
$msg->ack();
} else {
// 发送到死信队列
$this->publishToFailed($data, $e->getMessage());
$msg->ack();
}
}
};
$this->channel->basic_consume('main_queue', '', false, false, false, false, $callback);
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
private function republish($data, $retryCount)
{
$message = new AMQPMessage(
json_encode($data),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'headers' => ['x-retry-count' => $retryCount]
]
);
$this->channel->basic_publish($message, '', 'retry_queue');
echo "消息已重新投递,重试次数: {$retryCount}\n";
}
private function publishToFailed($data, $error)
{
$data['error'] = $error;
$data['failed_at'] = time();
$message = new AMQPMessage(
json_encode($data),
['content_type' => 'application/json']
);
$this->channel->basic_publish($message, '', 'failed_queue');
echo "消息已发送到死信队列\n";
}
}3. 定时提醒
php
<?php
class ReminderService
{
private $channel;
private $reminderDelays = [
'1min' => 60000,
'10min' => 600000,
'1hour' => 3600000,
'1day' => 86400000
];
public function __construct($channel)
{
$this->channel = $channel;
$this->setupQueues();
}
private function setupQueues()
{
$this->channel->exchange_declare('reminder_dlx', 'direct', false, true, false);
foreach ($this->reminderDelays as $name => $delay) {
$queueName = "reminder_{$name}";
$this->channel->queue_declare($queueName, false, true, false, false, false,
new AMQPTable([
'x-message-ttl' => $delay,
'x-dead-letter-exchange' => 'reminder_dlx',
'x-dead-letter-routing-key' => $name
])
);
$this->channel->queue_bind($queueName, 'reminder_dlx', $name);
}
}
public function scheduleReminder($userId, $message, $delay)
{
$delayMs = $this->reminderDelays[$delay] ?? $delay;
$message = new AMQPMessage(
json_encode([
'user_id' => $userId,
'message' => $message,
'scheduled_at' => time()
]),
['content_type' => 'application/json']
);
$queueName = "reminder_{$delay}";
$this->channel->basic_publish($message, '', $queueName);
echo "已设置提醒: {$delay}\n";
}
public function processReminders()
{
foreach ($this->reminderDelays as $name => $delay) {
$queueName = "reminder_{$name}";
$this->processQueue($queueName, $name);
}
}
private function processQueue($queueName, $delayType)
{
echo "处理 {$delayType} 提醒队列\n";
$callback = function ($msg) {
$data = json_decode($msg->getBody(), true);
echo "发送提醒给用户 {$data['user_id']}: {$data['message']}\n";
// 发送实际的通知
$msg->ack();
};
$this->channel->basic_consume($queueName, '', false, false, false, false, $callback);
}
}常见问题与解决方案
问题 1: 延迟精度不高
症状: 消息实际投递时间与预期有误差
原因: 消息在队列中按顺序投递,精度受限于队列性能
解决方案:
php
<?php
// 使用更短的 TTL 会提高精度
// 但会增加队列维护开销
// 对于高精度需求,考虑使用延迟消息插件
// 需要安装 rabbitmq_delayed_message_exchange
$channel->exchange_declare(
'delayed_exchange',
'x-delayed-message',
false,
true,
false,
false,
new AMQPTable(['x-delayed-type' => 'direct'])
);问题 2: 消息重复消费
症状: 消费者收到重复消息
原因: 消费者处理失败后消息重新入队
解决方案:
php
<?php
// 使用幂等性处理
$callback = function ($msg) {
$data = json_decode($msg->getBody(), true);
$messageId = $data['message_id'] ?? null;
// 使用 Redis 检查是否已处理
if ($redis->exists("processed:{$messageId}")) {
$msg->ack();
return;
}
// 处理消息
processMessage($data);
// 标记为已处理
$redis->setex("processed:{$messageId}", 86400, '1');
$msg->ack();
};问题 3: 队列内存占用
症状: 延迟队列占用大量内存
原因: 大量消息在延迟队列中等待
解决方案:
php
<?php
// 设置队列最大长度
$channel->queue_declare(
$queueName,
false,
true,
false,
false,
false,
new AMQPTable([
'x-message-ttl' => 3600000,
'x-max-length' => 10000, // 最多10000条消息
'x-overflow' => 'reject-publish' // 超过则拒绝新消息
])
);最佳实践建议
- 合理设置 TTL: 根据业务需求设置合理的延迟时间
- 监控队列积压: 监控延迟队列的消息数量
- 幂等性处理: 消费者需要处理重复消息
- 失败处理: 设置死信队列处理失败消息
- 延迟插件: 对于高精度需求,安装延迟消息插件
