Skip to content

延迟消息插件

概述

延迟消息插件(rabbitmq_delayed_message_exchange)是 RabbitMQ 的一个重要插件,它允许开发者实现消息的延迟投递功能。通过这个插件,可以在消息发送时指定延迟时间,消息将在指定时间后才被投递到队列中。

核心知识点

延迟消息原理

mermaid
graph TB
    subgraph 生产者
        P[消息生产者]
    end

    subgraph RabbitMQ
        E[Delayed Exchange<br/>x-delayed-message]
        MQ[内部存储]
        Q[目标队列]
    end

    subgraph 消费者
        C[消息消费者]
    end

    P -->|发送消息<br/>x-delay=5000ms| E
    E -->|暂存消息| MQ
    MQ -->|延迟到期| Q
    Q -->|消费| C

    style E fill:#e1f5fe
    style MQ fill:#fff3e0

工作流程

mermaid
sequenceDiagram
    participant P as 生产者
    participant E as Delayed Exchange
    participant S as 内部存储
    participant Q as 目标队列
    participant C as 消费者

    Note over P: 发送延迟消息
    P->>E: 消息(x-delay=5000ms)
    E->>S: 存储消息,设置定时器

    Note over S: 等待延迟时间...
    S-->>S: 5秒后

    Note over S: 延迟到期
    S->>Q: 投递到目标队列
    Q->>C: 消费消息

核心概念

概念说明
x-delayed-message延迟交换器类型
x-delay消息延迟时间(毫秒)
x-delayed-type实际的交换器类型(direct/topic/fanout)

PHP 代码示例

安装和启用插件

php
<?php

class DelayedMessagePluginInstaller
{
    public function install()
    {
        echo "安装延迟消息插件...\n";

        $commands = [
            'curl -L https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.10.2/rabbitmq_delayed_message_exchange-3.10.2.ez -o /usr/lib/rabbitmq/plugins/rabbitmq_delayed_message_exchange-3.10.2.ez',
            'rabbitmq-plugins enable rabbitmq_delayed_message_exchange',
            'systemctl restart rabbitmq-server'
        ];

        foreach ($commands as $command) {
            exec($command, $output, $returnCode);

            if ($returnCode !== 0) {
                throw new RuntimeException("命令执行失败: {$command}");
            }
        }

        echo "延迟消息插件安装完成\n";
    }

    public function isEnabled()
    {
        exec('rabbitmq-plugins list -e', $output);

        foreach ($output as $line) {
            if (strpos($line, 'rabbitmq_delayed_message_exchange') !== false) {
                return true;
            }
        }

        return false;
    }
}

延迟消息生产者

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class DelayedMessagePublisher
{
    private $connection;
    private $channel;
    private $exchangeName = 'delayed_exchange';

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        $this->declareDelayedExchange();
    }

    private function declareDelayedExchange()
    {
        $args = new AMQPTable([
            'x-delayed-type' => 'direct'
        ]);

        $this->channel->exchange_declare(
            $this->exchangeName,
            'x-delayed-message',
            false,
            true,
            false,
            false,
            false,
            $args
        );
    }

    public function publishDelayed($message, $delayMs, $routingKey = '')
    {
        $headers = new AMQPTable([
            'x-delay' => $delayMs
        ]);

        $msg = new AMQPMessage(
            json_encode([
                'content' => $message,
                'delay_ms' => $delayMs,
                'created_at' => time(),
                'scheduled_at' => time() + ($delayMs / 1000)
            ]),
            [
                'content_type' => 'application/json',
                'application_headers' => $headers
            ]
        );

        $this->channel->basic_publish(
            $msg,
            $this->exchangeName,
            $routingKey
        );

        $delaySec = $delayMs / 1000;

        echo sprintf(
            "消息已发送,延迟 %.1f 秒后投递\n",
            $delaySec
        );
    }

    public function publishAtTime($message, $timestamp, $routingKey = '')
    {
        $delayMs = max(0, ($timestamp - time()) * 1000);

        $this->publishDelayed($message, $delayMs, $routingKey);
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

延迟消息消费者

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class DelayedMessageConsumer
{
    private $connection;
    private $channel;
    private $exchangeName = 'delayed_exchange';
    private $queueName = 'delayed_queue';

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        $this->setup();
    }

    private function setup()
    {
        $args = new AMQPTable([
            'x-delayed-type' => 'direct'
        ]);

        $this->channel->exchange_declare(
            $this->exchangeName,
            'x-delayed-message',
            false,
            true,
            false,
            false,
            false,
            $args
        );

        $this->channel->queue_declare(
            $this->queueName,
            false,
            true,
            false,
            false
        );

        $this->channel->queue_bind(
            $this->queueName,
            $this->exchangeName,
            ''
        );
    }

    public function consume()
    {
        $callback = function (AMQPMessage $message) {
            $body = json_decode($message->body, true);

            $delayMs = $body['delay_ms'];
            $actualDelay = time() - $body['created_at'];
            $expectedDelay = $delayMs / 1000;

            echo sprintf(
                "[%s] 收到延迟消息\n",
                date('Y-m-d H:i:s')
            );
            echo sprintf(
                "  预期延迟: %.1f 秒\n",
                $expectedDelay
            );
            echo sprintf(
                "  实际延迟: %.1f 秒\n",
                $actualDelay
            );
            echo sprintf(
                "  内容: %s\n",
                $body['content']
            );

            $message->ack();
        };

        $this->channel->basic_qos(null, 1, null);
        $this->channel->basic_consume(
            $this->queueName,
            '',
            false,
            false,
            false,
            false,
            $callback
        );

        echo "延迟消息消费者已启动\n";

        while ($this->channel->is_open()) {
            $this->channel->wait();
        }
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

完整示例:订单超时取消

php
<?php

class OrderTimeoutHandler
{
    private $publisher;
    private $consumer;

    const TIMEOUT_SECONDS = 1800;

    public function __construct()
    {
        $this->publisher = new DelayedMessagePublisher();
    }

    public function createOrder($orderId, $orderData)
    {
        $order = $this->saveOrder($orderId, $orderData);

        $this->scheduleTimeoutCheck($orderId);

        return $order;
    }

    private function saveOrder($orderId, $orderData)
    {
        echo "订单已创建: {$orderId}\n";

        return [
            'order_id' => $orderId,
            'status' => 'pending',
            'created_at' => time(),
            'data' => $orderData
        ];
    }

    private function scheduleTimeoutCheck($orderId)
    {
        $delayMs = self::TIMEOUT_SECONDS * 1000;

        $this->publisher->publishDelayed(
            [
                'order_id' => $orderId,
                'action' => 'timeout_check'
            ],
            $delayMs,
            'order_timeout'
        );

        echo sprintf(
            "已安排订单 %s 在 %d 秒后进行超时检查\n",
            $orderId,
            self::TIMEOUT_SECONDS
        );
    }

    public function processTimeoutCheck($orderId)
    {
        $order = $this->getOrder($orderId);

        if ($order && $order['status'] === 'pending') {
            $this->cancelOrder($orderId);

            echo "订单 {$orderId} 已超时取消\n";
        } else {
            echo "订单 {$orderId} 已处理,跳过超时检查\n";
        }
    }

    private function getOrder($orderId)
    {
        return [
            'order_id' => $orderId,
            'status' => 'pending'
        ];
    }

    private function cancelOrder($orderId)
    {
        echo "取消订单: {$orderId}\n";
    }

    public function payOrder($orderId)
    {
        echo "订单 {$orderId} 已支付\n";
    }
}

class OrderTimeoutConsumer
{
    private $handler;

    public function __construct()
    {
        $this->handler = new OrderTimeoutHandler();
    }

    public function start()
    {
        $consumer = new DelayedMessageConsumer();

        $callback = function ($message) {
            $body = json_decode($message->body, true);

            if ($body['content']['action'] === 'timeout_check') {
                $this->handler->processTimeoutCheck(
                    $body['content']['order_id']
                );
            }

            $message->ack();
        };

        echo "订单超时检查消费者已启动\n";
    }
}

延迟任务调度器

php
<?php

class DelayedTaskScheduler
{
    private $publisher;
    private $tasks = [];

    public function __construct()
    {
        $this->publisher = new DelayedMessagePublisher();
    }

    public function schedule($taskId, $taskName, $payload, $delaySeconds)
    {
        $task = [
            'task_id' => $taskId,
            'task_name' => $taskName,
            'payload' => $payload,
            'scheduled_at' => time() + $delaySeconds
        ];

        $this->tasks[$taskId] = $task;

        $this->publisher->publishDelayed(
            $task,
            $delaySeconds * 1000,
            'tasks'
        );

        echo "任务 {$taskName} 已安排在 {$delaySeconds} 秒后执行\n";

        return $taskId;
    }

    public function scheduleAt($taskId, $taskName, $payload, $timestamp)
    {
        $delaySeconds = max(0, $timestamp - time());

        return $this->schedule($taskId, $taskName, $payload, $delaySeconds);
    }

    public function scheduleCron($taskId, $taskName, $payload, $cronExpression)
    {
        $nextRun = $this->parseCronExpression($cronExpression);

        return $this->scheduleAt($taskId, $taskName, $payload, $nextRun);
    }

    public function cancel($taskId)
    {
        if (isset($this->tasks[$taskId])) {
            unset($this->tasks[$taskId]);
            echo "任务 {$taskId} 已取消\n";
            return true;
        }

        return false;
    }

    public function getScheduledTasks()
    {
        return $this->tasks;
    }

    private function parseCronExpression($expression)
    {
        return time() + 60;
    }
}

class DelayedTaskWorker
{
    private $handlers = [];

    public function registerHandler($taskName, callable $handler)
    {
        $this->handlers[$taskName] = $handler;
    }

    public function process($task)
    {
        $taskName = $task['task_name'];

        if (!isset($this->handlers[$taskName])) {
            throw new RuntimeException("未注册的任务处理器: {$taskName}");
        }

        return call_user_func(
            $this->handlers[$taskName],
            $task['payload']
        );
    }
}

实际应用场景

1. 用户注册验证邮件

php
<?php

class UserVerificationScheduler
{
    private $publisher;

    public function __construct()
    {
        $this->publisher = new DelayedMessagePublisher();
    }

    public function scheduleVerification($userId, $email)
    {
        $this->publisher->publishDelayed(
            [
                'user_id' => $userId,
                'email' => $email,
                'action' => 'send_verification'
            ],
            5000,
            'user_verification'
        );

        $this->scheduleReminder($userId, $email);
    }

    private function scheduleReminder($userId, $email)
    {
        $this->publisher->publishDelayed(
            [
                'user_id' => $userId,
                'email' => $email,
                'action' => 'send_reminder'
            ],
            24 * 60 * 60 * 1000,
            'user_verification'
        );
    }
}

2. 优惠券过期提醒

php
<?php

class CouponExpiryScheduler
{
    private $publisher;

    public function __construct()
    {
        $this->publisher = new DelayedMessagePublisher();
    }

    public function scheduleExpiryReminder($couponId, $userId, $expiryTime)
    {
        $reminderTime = $expiryTime - 24 * 60 * 60;

        $delayMs = max(0, ($reminderTime - time()) * 1000);

        if ($delayMs > 0) {
            $this->publisher->publishDelayed(
                [
                    'coupon_id' => $couponId,
                    'user_id' => $userId,
                    'action' => 'expiry_reminder'
                ],
                $delayMs,
                'coupon_reminder'
            );
        }
    }

    public function scheduleExpiryNotification($couponId, $userId, $expiryTime)
    {
        $delayMs = max(0, ($expiryTime - time()) * 1000);

        if ($delayMs > 0) {
            $this->publisher->publishDelayed(
                [
                    'coupon_id' => $couponId,
                    'user_id' => $userId,
                    'action' => 'expired'
                ],
                $delayMs,
                'coupon_reminder'
            );
        }
    }
}

3. 分布式重试机制

php
<?php

class RetryScheduler
{
    private $publisher;

    const MAX_RETRIES = 5;
    const BASE_DELAY_MS = 1000;

    public function __construct()
    {
        $this->publisher = new DelayedMessagePublisher();
    }

    public function scheduleRetry($taskName, $payload, $attempt = 1)
    {
        if ($attempt > self::MAX_RETRIES) {
            throw new RuntimeException("超过最大重试次数");
        }

        $delayMs = $this->calculateBackoffDelay($attempt);

        $this->publisher->publishDelayed(
            [
                'task_name' => $taskName,
                'payload' => $payload,
                'attempt' => $attempt,
                'max_retries' => self::MAX_RETRIES
            ],
            $delayMs,
            'retry_tasks'
        );

        echo sprintf(
            "任务 %s 第 %d 次重试已安排,延迟 %d 毫秒\n",
            $taskName,
            $attempt,
            $delayMs
        );
    }

    private function calculateBackoffDelay($attempt)
    {
        return self::BASE_DELAY_MS * pow(2, $attempt - 1);
    }

    public function handleRetry($task, callable $handler)
    {
        try {
            return call_user_func($handler, $task['payload']);
        } catch (Exception $e) {
            if ($task['attempt'] < $task['max_retries']) {
                $this->scheduleRetry(
                    $task['task_name'],
                    $task['payload'],
                    $task['attempt'] + 1
                );
            } else {
                $this->handleMaxRetriesExceeded($task, $e);
            }

            throw $e;
        }
    }

    private function handleMaxRetriesExceeded($task, Exception $e)
    {
        error_log(sprintf(
            "任务 %s 重试次数耗尽: %s",
            $task['task_name'],
            $e->getMessage()
        ));
    }
}

常见问题与解决方案

问题 1:延迟消息堆积

原因:大量延迟消息占用内存

解决方案

php
<?php

class DelayedMessageMonitor
{
    private $connection;
    private $channel;

    public function getDelayedMessageCount($exchangeName)
    {
        $this->channel->exchange_declare(
            $exchangeName,
            'x-delayed-message',
            false,
            true,
            false
        );

        return 0;
    }

    public function getDelayedMessageStats()
    {
        return [
            'total_delayed' => 0,
            'oldest_delay' => null,
            'newest_delay' => null
        ];
    }

    public function checkDelayedMessageBacklog($threshold = 10000)
    {
        $stats = $this->getDelayedMessageStats();

        if ($stats['total_delayed'] > $threshold) {
            $this->alertBacklog($stats);
        }
    }

    private function alertBacklog($stats)
    {
        error_log("延迟消息堆积警告: " . json_encode($stats));
    }
}

问题 2:延迟时间不准确

原因:服务器时间不同步或消息处理延迟

解决方案

php
<?php

class AccurateDelayedPublisher
{
    private $publisher;

    public function publishWithAccuracy($message, $targetTimestamp)
    {
        $currentTime = time();
        $serverTime = $this->getServerTime();

        $timeDiff = $serverTime - $currentTime;

        $adjustedTimestamp = $targetTimestamp - $timeDiff;
        $delayMs = max(0, ($adjustedTimestamp - $serverTime) * 1000);

        $this->publisher->publishDelayed(
            array_merge($message, [
                'original_target_time' => $targetTimestamp,
                'adjusted_for_clock_drift' => $timeDiff
            ]),
            $delayMs
        );
    }

    private function getServerTime()
    {
        return time();
    }
}

问题 3:插件不可用时的降级方案

解决方案

php
<?php

class FallbackDelayedPublisher
{
    private $usePlugin = false;
    private $publisher;
    private $redis;

    public function __construct()
    {
        $this->checkPluginAvailability();

        if ($this->usePlugin) {
            $this->publisher = new DelayedMessagePublisher();
        } else {
            $this->redis = new Redis();
            $this->redis->connect('127.0.0.1', 6379);
        }
    }

    private function checkPluginAvailability()
    {
        $installer = new DelayedMessagePluginInstaller();
        $this->usePlugin = $installer->isEnabled();
    }

    public function publishDelayed($message, $delayMs, $routingKey = '')
    {
        if ($this->usePlugin) {
            $this->publisher->publishDelayed($message, $delayMs, $routingKey);
        } else {
            $this->publishWithRedis($message, $delayMs, $routingKey);
        }
    }

    private function publishWithRedis($message, $delayMs, $routingKey)
    {
        $executeAt = time() + ($delayMs / 1000);

        $task = [
            'message' => $message,
            'routing_key' => $routingKey,
            'execute_at' => $executeAt
        ];

        $this->redis->zAdd(
            'delayed_tasks',
            $executeAt,
            json_encode($task)
        );
    }
}

最佳实践建议

1. 合理设置延迟时间

php
<?php

class DelayTimeCalculator
{
    const MIN_DELAY_MS = 0;
    const MAX_DELAY_MS = 86400000;

    public function calculateDelay($targetTime, $currentTime = null)
    {
        $currentTime = $currentTime ?? time();

        $delaySeconds = $targetTime - $currentTime;
        $delayMs = $delaySeconds * 1000;

        return max(self::MIN_DELAY_MS, min($delayMs, self::MAX_DELAY_MS));
    }

    public function validateDelay($delayMs)
    {
        if ($delayMs < self::MIN_DELAY_MS) {
            throw new InvalidArgumentException("延迟时间不能为负数");
        }

        if ($delayMs > self::MAX_DELAY_MS) {
            throw new InvalidArgumentException("延迟时间不能超过 24 小时");
        }

        return true;
    }
}

2. 延迟消息追踪

php
<?php

class DelayedMessageTracker
{
    private $redis;

    public function __construct()
    {
        $this->redis = new Redis();
        $this->redis->connect('127.0.0.1', 6379);
    }

    public function track($messageId, $message, $delayMs)
    {
        $data = [
            'message_id' => $messageId,
            'message' => $message,
            'delay_ms' => $delayMs,
            'created_at' => time(),
            'expected_delivery' => time() + ($delayMs / 1000),
            'status' => 'pending'
        ];

        $this->redis->hSet(
            'delayed_messages',
            $messageId,
            json_encode($data)
        );
    }

    public function markDelivered($messageId)
    {
        $data = $this->get($messageId);

        if ($data) {
            $data['status'] = 'delivered';
            $data['delivered_at'] = time();
            $data['actual_delay_ms'] = ($data['delivered_at'] - $data['created_at']) * 1000;

            $this->redis->hSet(
                'delayed_messages',
                $messageId,
                json_encode($data)
            );
        }
    }

    public function get($messageId)
    {
        $data = $this->redis->hGet('delayed_messages', $messageId);

        return $data ? json_decode($data, true) : null;
    }

    public function getPendingMessages()
    {
        $all = $this->redis->hGetAll('delayed_messages');

        $pending = [];

        foreach ($all as $id => $data) {
            $message = json_decode($data, true);

            if ($message['status'] === 'pending') {
                $pending[] = $message;
            }
        }

        return $pending;
    }
}

3. 监控和告警

php
<?php

class DelayedMessageAlert
{
    private $thresholds = [
        'pending_count' => 10000,
        'avg_delay_accuracy' => 5,
        'failed_rate' => 0.01
    ];

    public function checkAndAlert()
    {
        $metrics = $this->collectMetrics();

        $alerts = [];

        if ($metrics['pending_count'] > $this->thresholds['pending_count']) {
            $alerts[] = [
                'type' => 'high_pending',
                'message' => sprintf(
                    '延迟消息堆积: %d 条',
                    $metrics['pending_count']
                )
            ];
        }

        if ($metrics['avg_delay_accuracy'] > $this->thresholds['avg_delay_accuracy']) {
            $alerts[] = [
                'type' => 'delay_inaccuracy',
                'message' => sprintf(
                    '延迟精度下降: 平均偏差 %.2f 秒',
                    $metrics['avg_delay_accuracy']
                )
            ];
        }

        if ($metrics['failed_rate'] > $this->thresholds['failed_rate']) {
            $alerts[] = [
                'type' => 'high_failure',
                'message' => sprintf(
                    '延迟消息失败率过高: %.2f%%',
                    $metrics['failed_rate'] * 100
                )
            ];
        }

        if (!empty($alerts)) {
            $this->sendAlerts($alerts);
        }

        return $alerts;
    }

    private function collectMetrics()
    {
        return [
            'pending_count' => 0,
            'avg_delay_accuracy' => 0,
            'failed_rate' => 0
        ];
    }

    private function sendAlerts($alerts)
    {
        foreach ($alerts as $alert) {
            error_log("[ALERT] " . $alert['message']);
        }
    }
}

相关链接