Appearance
延迟消息插件
概述
延迟消息插件(rabbitmq_delayed_message_exchange)是 RabbitMQ 的一个重要插件,它允许开发者实现消息的延迟投递功能。通过这个插件,可以在消息发送时指定延迟时间,消息将在指定时间后才被投递到队列中。
核心知识点
延迟消息原理
mermaid
graph TB
subgraph 生产者
P[消息生产者]
end
subgraph RabbitMQ
E[Delayed Exchange<br/>x-delayed-message]
MQ[内部存储]
Q[目标队列]
end
subgraph 消费者
C[消息消费者]
end
P -->|发送消息<br/>x-delay=5000ms| E
E -->|暂存消息| MQ
MQ -->|延迟到期| Q
Q -->|消费| C
style E fill:#e1f5fe
style MQ fill:#fff3e0工作流程
mermaid
sequenceDiagram
participant P as 生产者
participant E as Delayed Exchange
participant S as 内部存储
participant Q as 目标队列
participant C as 消费者
Note over P: 发送延迟消息
P->>E: 消息(x-delay=5000ms)
E->>S: 存储消息,设置定时器
Note over S: 等待延迟时间...
S-->>S: 5秒后
Note over S: 延迟到期
S->>Q: 投递到目标队列
Q->>C: 消费消息核心概念
| 概念 | 说明 |
|---|---|
| x-delayed-message | 延迟交换器类型 |
| x-delay | 消息延迟时间(毫秒) |
| x-delayed-type | 实际的交换器类型(direct/topic/fanout) |
PHP 代码示例
安装和启用插件
php
<?php
class DelayedMessagePluginInstaller
{
public function install()
{
echo "安装延迟消息插件...\n";
$commands = [
'curl -L https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.10.2/rabbitmq_delayed_message_exchange-3.10.2.ez -o /usr/lib/rabbitmq/plugins/rabbitmq_delayed_message_exchange-3.10.2.ez',
'rabbitmq-plugins enable rabbitmq_delayed_message_exchange',
'systemctl restart rabbitmq-server'
];
foreach ($commands as $command) {
exec($command, $output, $returnCode);
if ($returnCode !== 0) {
throw new RuntimeException("命令执行失败: {$command}");
}
}
echo "延迟消息插件安装完成\n";
}
public function isEnabled()
{
exec('rabbitmq-plugins list -e', $output);
foreach ($output as $line) {
if (strpos($line, 'rabbitmq_delayed_message_exchange') !== false) {
return true;
}
}
return false;
}
}延迟消息生产者
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class DelayedMessagePublisher
{
private $connection;
private $channel;
private $exchangeName = 'delayed_exchange';
public function __construct()
{
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
$this->declareDelayedExchange();
}
private function declareDelayedExchange()
{
$args = new AMQPTable([
'x-delayed-type' => 'direct'
]);
$this->channel->exchange_declare(
$this->exchangeName,
'x-delayed-message',
false,
true,
false,
false,
false,
$args
);
}
public function publishDelayed($message, $delayMs, $routingKey = '')
{
$headers = new AMQPTable([
'x-delay' => $delayMs
]);
$msg = new AMQPMessage(
json_encode([
'content' => $message,
'delay_ms' => $delayMs,
'created_at' => time(),
'scheduled_at' => time() + ($delayMs / 1000)
]),
[
'content_type' => 'application/json',
'application_headers' => $headers
]
);
$this->channel->basic_publish(
$msg,
$this->exchangeName,
$routingKey
);
$delaySec = $delayMs / 1000;
echo sprintf(
"消息已发送,延迟 %.1f 秒后投递\n",
$delaySec
);
}
public function publishAtTime($message, $timestamp, $routingKey = '')
{
$delayMs = max(0, ($timestamp - time()) * 1000);
$this->publishDelayed($message, $delayMs, $routingKey);
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}延迟消息消费者
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class DelayedMessageConsumer
{
private $connection;
private $channel;
private $exchangeName = 'delayed_exchange';
private $queueName = 'delayed_queue';
public function __construct()
{
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
$this->setup();
}
private function setup()
{
$args = new AMQPTable([
'x-delayed-type' => 'direct'
]);
$this->channel->exchange_declare(
$this->exchangeName,
'x-delayed-message',
false,
true,
false,
false,
false,
$args
);
$this->channel->queue_declare(
$this->queueName,
false,
true,
false,
false
);
$this->channel->queue_bind(
$this->queueName,
$this->exchangeName,
''
);
}
public function consume()
{
$callback = function (AMQPMessage $message) {
$body = json_decode($message->body, true);
$delayMs = $body['delay_ms'];
$actualDelay = time() - $body['created_at'];
$expectedDelay = $delayMs / 1000;
echo sprintf(
"[%s] 收到延迟消息\n",
date('Y-m-d H:i:s')
);
echo sprintf(
" 预期延迟: %.1f 秒\n",
$expectedDelay
);
echo sprintf(
" 实际延迟: %.1f 秒\n",
$actualDelay
);
echo sprintf(
" 内容: %s\n",
$body['content']
);
$message->ack();
};
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume(
$this->queueName,
'',
false,
false,
false,
false,
$callback
);
echo "延迟消息消费者已启动\n";
while ($this->channel->is_open()) {
$this->channel->wait();
}
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}完整示例:订单超时取消
php
<?php
class OrderTimeoutHandler
{
private $publisher;
private $consumer;
const TIMEOUT_SECONDS = 1800;
public function __construct()
{
$this->publisher = new DelayedMessagePublisher();
}
public function createOrder($orderId, $orderData)
{
$order = $this->saveOrder($orderId, $orderData);
$this->scheduleTimeoutCheck($orderId);
return $order;
}
private function saveOrder($orderId, $orderData)
{
echo "订单已创建: {$orderId}\n";
return [
'order_id' => $orderId,
'status' => 'pending',
'created_at' => time(),
'data' => $orderData
];
}
private function scheduleTimeoutCheck($orderId)
{
$delayMs = self::TIMEOUT_SECONDS * 1000;
$this->publisher->publishDelayed(
[
'order_id' => $orderId,
'action' => 'timeout_check'
],
$delayMs,
'order_timeout'
);
echo sprintf(
"已安排订单 %s 在 %d 秒后进行超时检查\n",
$orderId,
self::TIMEOUT_SECONDS
);
}
public function processTimeoutCheck($orderId)
{
$order = $this->getOrder($orderId);
if ($order && $order['status'] === 'pending') {
$this->cancelOrder($orderId);
echo "订单 {$orderId} 已超时取消\n";
} else {
echo "订单 {$orderId} 已处理,跳过超时检查\n";
}
}
private function getOrder($orderId)
{
return [
'order_id' => $orderId,
'status' => 'pending'
];
}
private function cancelOrder($orderId)
{
echo "取消订单: {$orderId}\n";
}
public function payOrder($orderId)
{
echo "订单 {$orderId} 已支付\n";
}
}
class OrderTimeoutConsumer
{
private $handler;
public function __construct()
{
$this->handler = new OrderTimeoutHandler();
}
public function start()
{
$consumer = new DelayedMessageConsumer();
$callback = function ($message) {
$body = json_decode($message->body, true);
if ($body['content']['action'] === 'timeout_check') {
$this->handler->processTimeoutCheck(
$body['content']['order_id']
);
}
$message->ack();
};
echo "订单超时检查消费者已启动\n";
}
}延迟任务调度器
php
<?php
class DelayedTaskScheduler
{
private $publisher;
private $tasks = [];
public function __construct()
{
$this->publisher = new DelayedMessagePublisher();
}
public function schedule($taskId, $taskName, $payload, $delaySeconds)
{
$task = [
'task_id' => $taskId,
'task_name' => $taskName,
'payload' => $payload,
'scheduled_at' => time() + $delaySeconds
];
$this->tasks[$taskId] = $task;
$this->publisher->publishDelayed(
$task,
$delaySeconds * 1000,
'tasks'
);
echo "任务 {$taskName} 已安排在 {$delaySeconds} 秒后执行\n";
return $taskId;
}
public function scheduleAt($taskId, $taskName, $payload, $timestamp)
{
$delaySeconds = max(0, $timestamp - time());
return $this->schedule($taskId, $taskName, $payload, $delaySeconds);
}
public function scheduleCron($taskId, $taskName, $payload, $cronExpression)
{
$nextRun = $this->parseCronExpression($cronExpression);
return $this->scheduleAt($taskId, $taskName, $payload, $nextRun);
}
public function cancel($taskId)
{
if (isset($this->tasks[$taskId])) {
unset($this->tasks[$taskId]);
echo "任务 {$taskId} 已取消\n";
return true;
}
return false;
}
public function getScheduledTasks()
{
return $this->tasks;
}
private function parseCronExpression($expression)
{
return time() + 60;
}
}
class DelayedTaskWorker
{
private $handlers = [];
public function registerHandler($taskName, callable $handler)
{
$this->handlers[$taskName] = $handler;
}
public function process($task)
{
$taskName = $task['task_name'];
if (!isset($this->handlers[$taskName])) {
throw new RuntimeException("未注册的任务处理器: {$taskName}");
}
return call_user_func(
$this->handlers[$taskName],
$task['payload']
);
}
}实际应用场景
1. 用户注册验证邮件
php
<?php
class UserVerificationScheduler
{
private $publisher;
public function __construct()
{
$this->publisher = new DelayedMessagePublisher();
}
public function scheduleVerification($userId, $email)
{
$this->publisher->publishDelayed(
[
'user_id' => $userId,
'email' => $email,
'action' => 'send_verification'
],
5000,
'user_verification'
);
$this->scheduleReminder($userId, $email);
}
private function scheduleReminder($userId, $email)
{
$this->publisher->publishDelayed(
[
'user_id' => $userId,
'email' => $email,
'action' => 'send_reminder'
],
24 * 60 * 60 * 1000,
'user_verification'
);
}
}2. 优惠券过期提醒
php
<?php
class CouponExpiryScheduler
{
private $publisher;
public function __construct()
{
$this->publisher = new DelayedMessagePublisher();
}
public function scheduleExpiryReminder($couponId, $userId, $expiryTime)
{
$reminderTime = $expiryTime - 24 * 60 * 60;
$delayMs = max(0, ($reminderTime - time()) * 1000);
if ($delayMs > 0) {
$this->publisher->publishDelayed(
[
'coupon_id' => $couponId,
'user_id' => $userId,
'action' => 'expiry_reminder'
],
$delayMs,
'coupon_reminder'
);
}
}
public function scheduleExpiryNotification($couponId, $userId, $expiryTime)
{
$delayMs = max(0, ($expiryTime - time()) * 1000);
if ($delayMs > 0) {
$this->publisher->publishDelayed(
[
'coupon_id' => $couponId,
'user_id' => $userId,
'action' => 'expired'
],
$delayMs,
'coupon_reminder'
);
}
}
}3. 分布式重试机制
php
<?php
class RetryScheduler
{
private $publisher;
const MAX_RETRIES = 5;
const BASE_DELAY_MS = 1000;
public function __construct()
{
$this->publisher = new DelayedMessagePublisher();
}
public function scheduleRetry($taskName, $payload, $attempt = 1)
{
if ($attempt > self::MAX_RETRIES) {
throw new RuntimeException("超过最大重试次数");
}
$delayMs = $this->calculateBackoffDelay($attempt);
$this->publisher->publishDelayed(
[
'task_name' => $taskName,
'payload' => $payload,
'attempt' => $attempt,
'max_retries' => self::MAX_RETRIES
],
$delayMs,
'retry_tasks'
);
echo sprintf(
"任务 %s 第 %d 次重试已安排,延迟 %d 毫秒\n",
$taskName,
$attempt,
$delayMs
);
}
private function calculateBackoffDelay($attempt)
{
return self::BASE_DELAY_MS * pow(2, $attempt - 1);
}
public function handleRetry($task, callable $handler)
{
try {
return call_user_func($handler, $task['payload']);
} catch (Exception $e) {
if ($task['attempt'] < $task['max_retries']) {
$this->scheduleRetry(
$task['task_name'],
$task['payload'],
$task['attempt'] + 1
);
} else {
$this->handleMaxRetriesExceeded($task, $e);
}
throw $e;
}
}
private function handleMaxRetriesExceeded($task, Exception $e)
{
error_log(sprintf(
"任务 %s 重试次数耗尽: %s",
$task['task_name'],
$e->getMessage()
));
}
}常见问题与解决方案
问题 1:延迟消息堆积
原因:大量延迟消息占用内存
解决方案:
php
<?php
class DelayedMessageMonitor
{
private $connection;
private $channel;
public function getDelayedMessageCount($exchangeName)
{
$this->channel->exchange_declare(
$exchangeName,
'x-delayed-message',
false,
true,
false
);
return 0;
}
public function getDelayedMessageStats()
{
return [
'total_delayed' => 0,
'oldest_delay' => null,
'newest_delay' => null
];
}
public function checkDelayedMessageBacklog($threshold = 10000)
{
$stats = $this->getDelayedMessageStats();
if ($stats['total_delayed'] > $threshold) {
$this->alertBacklog($stats);
}
}
private function alertBacklog($stats)
{
error_log("延迟消息堆积警告: " . json_encode($stats));
}
}问题 2:延迟时间不准确
原因:服务器时间不同步或消息处理延迟
解决方案:
php
<?php
class AccurateDelayedPublisher
{
private $publisher;
public function publishWithAccuracy($message, $targetTimestamp)
{
$currentTime = time();
$serverTime = $this->getServerTime();
$timeDiff = $serverTime - $currentTime;
$adjustedTimestamp = $targetTimestamp - $timeDiff;
$delayMs = max(0, ($adjustedTimestamp - $serverTime) * 1000);
$this->publisher->publishDelayed(
array_merge($message, [
'original_target_time' => $targetTimestamp,
'adjusted_for_clock_drift' => $timeDiff
]),
$delayMs
);
}
private function getServerTime()
{
return time();
}
}问题 3:插件不可用时的降级方案
解决方案:
php
<?php
class FallbackDelayedPublisher
{
private $usePlugin = false;
private $publisher;
private $redis;
public function __construct()
{
$this->checkPluginAvailability();
if ($this->usePlugin) {
$this->publisher = new DelayedMessagePublisher();
} else {
$this->redis = new Redis();
$this->redis->connect('127.0.0.1', 6379);
}
}
private function checkPluginAvailability()
{
$installer = new DelayedMessagePluginInstaller();
$this->usePlugin = $installer->isEnabled();
}
public function publishDelayed($message, $delayMs, $routingKey = '')
{
if ($this->usePlugin) {
$this->publisher->publishDelayed($message, $delayMs, $routingKey);
} else {
$this->publishWithRedis($message, $delayMs, $routingKey);
}
}
private function publishWithRedis($message, $delayMs, $routingKey)
{
$executeAt = time() + ($delayMs / 1000);
$task = [
'message' => $message,
'routing_key' => $routingKey,
'execute_at' => $executeAt
];
$this->redis->zAdd(
'delayed_tasks',
$executeAt,
json_encode($task)
);
}
}最佳实践建议
1. 合理设置延迟时间
php
<?php
class DelayTimeCalculator
{
const MIN_DELAY_MS = 0;
const MAX_DELAY_MS = 86400000;
public function calculateDelay($targetTime, $currentTime = null)
{
$currentTime = $currentTime ?? time();
$delaySeconds = $targetTime - $currentTime;
$delayMs = $delaySeconds * 1000;
return max(self::MIN_DELAY_MS, min($delayMs, self::MAX_DELAY_MS));
}
public function validateDelay($delayMs)
{
if ($delayMs < self::MIN_DELAY_MS) {
throw new InvalidArgumentException("延迟时间不能为负数");
}
if ($delayMs > self::MAX_DELAY_MS) {
throw new InvalidArgumentException("延迟时间不能超过 24 小时");
}
return true;
}
}2. 延迟消息追踪
php
<?php
class DelayedMessageTracker
{
private $redis;
public function __construct()
{
$this->redis = new Redis();
$this->redis->connect('127.0.0.1', 6379);
}
public function track($messageId, $message, $delayMs)
{
$data = [
'message_id' => $messageId,
'message' => $message,
'delay_ms' => $delayMs,
'created_at' => time(),
'expected_delivery' => time() + ($delayMs / 1000),
'status' => 'pending'
];
$this->redis->hSet(
'delayed_messages',
$messageId,
json_encode($data)
);
}
public function markDelivered($messageId)
{
$data = $this->get($messageId);
if ($data) {
$data['status'] = 'delivered';
$data['delivered_at'] = time();
$data['actual_delay_ms'] = ($data['delivered_at'] - $data['created_at']) * 1000;
$this->redis->hSet(
'delayed_messages',
$messageId,
json_encode($data)
);
}
}
public function get($messageId)
{
$data = $this->redis->hGet('delayed_messages', $messageId);
return $data ? json_decode($data, true) : null;
}
public function getPendingMessages()
{
$all = $this->redis->hGetAll('delayed_messages');
$pending = [];
foreach ($all as $id => $data) {
$message = json_decode($data, true);
if ($message['status'] === 'pending') {
$pending[] = $message;
}
}
return $pending;
}
}3. 监控和告警
php
<?php
class DelayedMessageAlert
{
private $thresholds = [
'pending_count' => 10000,
'avg_delay_accuracy' => 5,
'failed_rate' => 0.01
];
public function checkAndAlert()
{
$metrics = $this->collectMetrics();
$alerts = [];
if ($metrics['pending_count'] > $this->thresholds['pending_count']) {
$alerts[] = [
'type' => 'high_pending',
'message' => sprintf(
'延迟消息堆积: %d 条',
$metrics['pending_count']
)
];
}
if ($metrics['avg_delay_accuracy'] > $this->thresholds['avg_delay_accuracy']) {
$alerts[] = [
'type' => 'delay_inaccuracy',
'message' => sprintf(
'延迟精度下降: 平均偏差 %.2f 秒',
$metrics['avg_delay_accuracy']
)
];
}
if ($metrics['failed_rate'] > $this->thresholds['failed_rate']) {
$alerts[] = [
'type' => 'high_failure',
'message' => sprintf(
'延迟消息失败率过高: %.2f%%',
$metrics['failed_rate'] * 100
)
];
}
if (!empty($alerts)) {
$this->sendAlerts($alerts);
}
return $alerts;
}
private function collectMetrics()
{
return [
'pending_count' => 0,
'avg_delay_accuracy' => 0,
'failed_rate' => 0
];
}
private function sendAlerts($alerts)
{
foreach ($alerts as $alert) {
error_log("[ALERT] " . $alert['message']);
}
}
}