Skip to content

消息推送系统

概述

消息推送系统是现代应用不可或缺的功能,包括站内信、邮件通知、短信验证码、App 推送等多种渠道。通过 RabbitMQ 构建统一的消息推送平台,实现多渠道消息的可靠投递、状态追踪和失败重试。

业务背景与需求

场景描述

某综合平台需要支持多种消息推送场景:

推送渠道使用场景特点量级
站内信系统通知、活动公告实时性要求高100万/天
邮件订单确认、账单通知可靠性要求高50万/天
短信验证码、告警通知实时性+可靠性20万/天
App推送营销活动、状态更新需要设备在线200万/天
微信模板服务通知需要用户授权30万/天

需求分析

需求项描述
多渠道支持统一接口支持多种推送渠道
模板管理支持消息模板和变量替换
定时发送支持指定时间发送
状态追踪追踪消息发送状态
失败重试自动重试失败的推送
频率控制防止用户被过度打扰
投诉处理处理用户投诉和退订

架构设计

整体架构图

mermaid
graph TB
    subgraph "业务系统"
        A[订单系统]
        B[用户系统]
        C[营销系统]
        D[告警系统]
    end
    
    subgraph "推送网关"
        E[推送API]
        F[消息路由器]
        G[模板引擎]
        H[频率控制器]
    end
    
    subgraph "RabbitMQ"
        I[推送交换机<br/>push.exchange]
        
        subgraph "渠道队列"
            J[站内信队列<br/>push.inbox]
            K[邮件队列<br/>push.email]
            L[短信队列<br/>push.sms]
            M[App推送队列<br/>push.app]
            N[微信队列<br/>push.wechat]
        end
        
        O[定时队列<br/>push.scheduled]
        P[重试队列<br/>push.retry]
        Q[死信队列<br/>push.dlq]
    end
    
    subgraph "推送服务"
        R[站内信服务]
        S[邮件服务]
        T[短信服务]
        U[App推送服务]
        V[微信服务]
    end
    
    subgraph "第三方平台"
        W[SMTP服务器]
        X[短信网关]
        Y[APNS/FCM]
        Z[微信API]
    end
    
    subgraph "状态追踪"
        AA[状态存储]
        AB[回调处理]
    end
    
    A --> E
    B --> E
    C --> E
    D --> E
    
    E --> F
    F --> G
    G --> H
    H --> I
    
    I --> J
    I --> K
    I --> L
    I --> M
    I --> N
    I --> O
    
    J --> R
    K --> S
    L --> T
    M --> U
    N --> V
    O --> I
    
    S --> W
    T --> X
    U --> Y
    V --> Z
    
    J -.-> P
    K -.-> P
    L -.-> P
    M -.-> P
    N -.-> P
    
    P -.-> Q
    P --> I
    
    R --> AA
    S --> AA
    T --> AA
    U --> AA
    V --> AA
    
    W --> AB
    X --> AB
    Y --> AB
    Z --> AB
    
    AB --> AA

消息推送流程

mermaid
sequenceDiagram
    participant App as 业务应用
    participant Gateway as 推送网关
    participant MQ as RabbitMQ
    participant Service as 推送服务
    participant ThirdParty as 第三方平台
    participant DB as 状态存储
    
    App->>Gateway: 发送推送请求
    Gateway->>Gateway: 参数校验
    Gateway->>Gateway: 模板渲染
    Gateway->>Gateway: 频率检查
    
    alt 频率超限
        Gateway-->>App: 返回频率限制错误
    else 正常
        Gateway->>DB: 记录推送状态(pending)
        Gateway->>MQ: 发布推送消息
        Gateway-->>App: 返回消息ID
        
        MQ->>Service: 投递消息
        Service->>Service: 处理推送逻辑
        Service->>ThirdParty: 调用第三方API
        
        alt 发送成功
            ThirdParty-->>Service: 返回成功
            Service->>MQ: ACK确认
            Service->>DB: 更新状态(success)
        else 发送失败
            ThirdParty-->>Service: 返回失败
            Service->>Service: 判断是否重试
            alt 需要重试
                Service->>MQ: NACK(重新入队)
            else 不再重试
                Service->>MQ: ACK(进入死信)
                Service->>DB: 更新状态(failed)
            end
        end
    end

消息状态流转

mermaid
stateDiagram-v2
    [*] --> pending: 创建推送
    pending --> sent: 发送成功
    pending --> retrying: 发送失败(可重试)
    pending --> failed: 发送失败(不可重试)
    
    retrying --> sent: 重试成功
    retrying --> failed: 重试次数耗尽
    
    sent --> delivered: 已送达
    sent --> opened: 已打开
    sent --> clicked: 已点击
    
    delivered --> opened
    opened --> clicked
    
    failed --> [*]
    clicked --> [*]
    
    note right of pending: 等待发送
    note right of sent: 已发送
    note right of delivered: 第三方确认
    note right of failed: 最终失败

PHP 代码实现

推送消息结构定义

php
<?php

namespace App\Messaging\Push;

class PushMessage
{
    public string $messageId;
    public string $channel;
    public string $templateCode;
    public array $recipients;
    public array $content;
    public array $options;
    public int $priority;
    public ?int $scheduledAt;
    public int $maxRetries;
    public int $currentRetry;
    public int $createdAt;
    public string $traceId;

    public const CHANNEL_INBOX = 'inbox';
    public const CHANNEL_EMAIL = 'email';
    public const CHANNEL_SMS = 'sms';
    public const CHANNEL_APP = 'app';
    public const CHANNEL_WECHAT = 'wechat';

    public const PRIORITY_LOW = 1;
    public const PRIORITY_NORMAL = 5;
    public const PRIORITY_HIGH = 8;
    public const PRIORITY_URGENT = 10;

    public function __construct(
        string $channel,
        array $recipients,
        array $content = [],
        string $templateCode = '',
        array $options = []
    ) {
        $this->messageId = $this->generateMessageId();
        $this->channel = $channel;
        $this->recipients = $recipients;
        $this->content = $content;
        $this->templateCode = $templateCode;
        $this->options = array_merge([
            'priority' => self::PRIORITY_NORMAL,
            'scheduled_at' => null,
            'max_retries' => 3,
            'ttl' => 86400,
        ], $options);
        
        $this->priority = $this->options['priority'];
        $this->scheduledAt = $this->options['scheduled_at'];
        $this->maxRetries = $this->options['max_retries'];
        $this->currentRetry = 0;
        $this->createdAt = time();
        $this->traceId = $this->generateTraceId();
    }

    private function generateMessageId(): string
    {
        return sprintf('msg_%s_%s', date('YmdHis'), bin2hex(random_bytes(8)));
    }

    private function generateTraceId(): string
    {
        return bin2hex(random_bytes(16));
    }

    public function incrementRetry(): self
    {
        $this->currentRetry++;
        return $this;
    }

    public function canRetry(): bool
    {
        return $this->currentRetry < $this->maxRetries;
    }

    public function isScheduled(): bool
    {
        return $this->scheduledAt !== null && $this->scheduledAt > time();
    }

    public function isExpired(): bool
    {
        return time() > ($this->createdAt + $this->options['ttl']);
    }

    public function toArray(): array
    {
        return [
            'message_id' => $this->messageId,
            'channel' => $this->channel,
            'template_code' => $this->templateCode,
            'recipients' => $this->recipients,
            'content' => $this->content,
            'options' => $this->options,
            'priority' => $this->priority,
            'scheduled_at' => $this->scheduledAt,
            'max_retries' => $this->maxRetries,
            'current_retry' => $this->currentRetry,
            'created_at' => $this->createdAt,
            'trace_id' => $this->traceId,
        ];
    }

    public static function fromArray(array $data): self
    {
        $message = new self(
            $data['channel'],
            $data['recipients'],
            $data['content'] ?? [],
            $data['template_code'] ?? '',
            $data['options'] ?? []
        );
        $message->messageId = $data['message_id'];
        $message->priority = $data['priority'];
        $message->scheduledAt = $data['scheduled_at'] ?? null;
        $message->maxRetries = $data['max_retries'];
        $message->currentRetry = $data['current_retry'] ?? 0;
        $message->createdAt = $data['created_at'];
        $message->traceId = $data['trace_id'] ?? '';
        return $message;
    }
}

推送网关实现

php
<?php

namespace App\Messaging\Push;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use App\Services\Push\PushService;

class PushGateway
{
    private AMQPStreamConnection $connection;
    private $channel;
    private PushService $pushService;
    private string $exchangeName = 'push.exchange';
    private string $scheduledExchange = 'push.scheduled.exchange';
    private string $retryExchange = 'push.retry.exchange';
    private string $deadLetterExchange = 'push.dlx';

    private const CHANNEL_QUEUES = [
        PushMessage::CHANNEL_INBOX => 'push.inbox',
        PushMessage::CHANNEL_EMAIL => 'push.email',
        PushMessage::CHANNEL_SMS => 'push.sms',
        PushMessage::CHANNEL_APP => 'push.app',
        PushMessage::CHANNEL_WECHAT => 'push.wechat',
    ];

    public function __construct(
        AMQPStreamConnection $connection,
        PushService $pushService
    ) {
        $this->connection = $connection;
        $this->channel = $connection->channel();
        $this->pushService = $pushService;
        $this->setupInfrastructure();
    }

    private function setupInfrastructure(): void
    {
        $this->channel->exchange_declare(
            $this->exchangeName,
            AMQPExchangeType::DIRECT,
            false,
            true,
            false
        );

        $this->channel->exchange_declare(
            $this->deadLetterExchange,
            AMQPExchangeType::DIRECT,
            false,
            true,
            false
        );

        foreach (self::CHANNEL_QUEUES as $channel => $queueName) {
            $args = [
                'x-dead-letter-exchange' => ['S', $this->deadLetterExchange],
                'x-dead-letter-routing-key' => ['S', 'push.failed'],
                'x-max-priority' => ['I', 10],
                'x-message-ttl' => ['I', 86400000],
            ];

            $this->channel->queue_declare(
                $queueName,
                false,
                true,
                false,
                false,
                false,
                $args
            );

            $this->channel->queue_bind($queueName, $this->exchangeName, $queueName);
        }

        $this->channel->queue_declare('push.dlq', false, true, false, false);
        $this->channel->queue_bind('push.dlq', $this->deadLetterExchange, 'push.failed');
    }

    public function send(PushMessage $message): array
    {
        $result = $this->pushService->validateAndPrepare($message);

        if (!$result['success']) {
            return $result;
        }

        if ($message->isScheduled()) {
            $this->scheduleMessage($message);
        } else {
            $this->publishMessage($message);
        }

        return ['success' => true, 'message_id' => $message->messageId];
    }

    private function publishMessage(PushMessage $message): void
    {
        $queueName = self::CHANNEL_QUEUES[$message->channel] ?? null;
        
        if (!$queueName) {
            throw new \InvalidArgumentException("Unknown channel: {$message->channel}");
        }

        $amqpMessage = new AMQPMessage(
            json_encode($message->toArray()),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'priority' => $message->priority,
                'message_id' => $message->messageId,
                'timestamp' => time(),
            ]
        );

        $this->channel->basic_publish(
            $amqpMessage,
            $this->exchangeName,
            $queueName
        );
    }

    private function scheduleMessage(PushMessage $message): void
    {
        $delay = ($message->scheduledAt - time()) * 1000;

        $amqpMessage = new AMQPMessage(
            json_encode($message->toArray()),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'expiration' => strval($delay),
            ]
        );

        $this->channel->basic_publish(
            $amqpMessage,
            '',
            'push.scheduled'
        );
    }

    public function close(): void
    {
        if ($this->channel) {
            $this->channel->close();
        }
    }
}

渠道消费者实现

php
<?php

namespace App\Consumers\Push;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use App\Messaging\Push\PushMessage;
use App\Services\Push\PushService;

abstract class PushConsumer
{
    protected AMQPStreamConnection $connection;
    protected $channel;
    protected PushService $pushService;
    protected string $queueName;
    protected int $prefetchCount = 10;
    protected bool $running = true;

    public function __construct(
        AMQPStreamConnection $connection,
        PushService $pushService
    ) {
        $this->connection = $connection;
        $this->channel = $connection->channel();
        $this->pushService = $pushService;
        $this->setup();
    }

    protected function setup(): void
    {
        $this->channel->basic_qos(null, $this->prefetchCount, null);
    }

    public function consume(): void
    {
        $this->channel->basic_consume(
            $this->queueName,
            '',
            false,
            false,
            false,
            false,
            [$this, 'processMessage']
        );

        while ($this->running && count($this->channel->callbacks)) {
            $this->channel->wait(null, true);
            if (!$this->running) break;
            usleep(100000);
        }
    }

    public function processMessage(AMQPMessage $message): void
    {
        $data = json_decode($message->body, true);
        $pushMessage = PushMessage::fromArray($data);

        try {
            if ($pushMessage->isExpired()) {
                $this->pushService->updateStatus($pushMessage->messageId, 'expired');
                $message->ack();
                return;
            }

            $result = $this->doPush($pushMessage);

            if ($result['success']) {
                $this->pushService->updateStatus($pushMessage->messageId, 'sent', $result);
                $message->ack();
            } else {
                $this->handleFailure($message, $pushMessage, $result['error']);
            }

        } catch (\Exception $e) {
            $this->handleException($message, $pushMessage, $e);
        }
    }

    abstract protected function doPush(PushMessage $message): array;

    protected function handleFailure(AMQPMessage $message, PushMessage $pushMessage, \Exception $error): void
    {
        if ($pushMessage->canRetry() && $this->shouldRetry($error)) {
            $this->pushService->updateStatus(
                $pushMessage->messageId,
                'retrying',
                ['retry_count' => $pushMessage->currentRetry + 1]
            );
            $message->nack(false, true);
        } else {
            $this->pushService->updateStatus(
                $pushMessage->messageId,
                'failed',
                ['error' => $error->getMessage()]
            );
            $message->nack(false, false);
        }
    }

    protected function handleException(AMQPMessage $message, PushMessage $pushMessage, \Exception $e): void
    {
        error_log(sprintf('[PushConsumer] Error: %s - %s', $pushMessage->messageId, $e->getMessage()));
        $message->nack(false, true);
    }

    protected function shouldRetry(\Exception $error): bool
    {
        return true;
    }

    public function stop(): void
    {
        $this->running = false;
    }

    public function close(): void
    {
        $this->stop();
        if ($this->channel) {
            $this->channel->close();
        }
    }
}

class EmailConsumer extends PushConsumer
{
    protected string $queueName = 'push.email';
    private EmailSender $emailSender;

    public function __construct(
        AMQPStreamConnection $connection,
        PushService $pushService,
        EmailSender $emailSender
    ) {
        parent::__construct($connection, $pushService);
        $this->emailSender = $emailSender;
    }

    protected function doPush(PushMessage $message): array
    {
        $content = $message->content;

        foreach ($message->recipients as $recipient) {
            $result = $this->emailSender->send([
                'to' => $recipient['email'],
                'to_name' => $recipient['name'] ?? '',
                'subject' => $content['subject'],
                'body' => $content['body'],
            ]);

            if (!$result['success']) {
                return $result;
            }
        }

        return ['success' => true];
    }
}

class SmsConsumer extends PushConsumer
{
    protected string $queueName = 'push.sms';
    private SmsSender $smsSender;

    public function __construct(
        AMQPStreamConnection $connection,
        PushService $pushService,
        SmsSender $smsSender
    ) {
        parent::__construct($connection, $pushService);
        $this->smsSender = $smsSender;
    }

    protected function doPush(PushMessage $message): array
    {
        $content = $message->content;

        foreach ($message->recipients as $recipient) {
            $result = $this->smsSender->send([
                'phone' => $recipient['phone'],
                'content' => $content['body'],
                'template_id' => $content['template_id'] ?? null,
            ]);

            if (!$result['success']) {
                return $result;
            }
        }

        return ['success' => true];
    }

    protected function shouldRetry(\Exception $error): bool
    {
        $nonRetryableCodes = ['INVALID_PHONE', 'BLACKLISTED'];
        foreach ($nonRetryableCodes as $code) {
            if (strpos($error->getMessage(), $code) !== false) {
                return false;
            }
        }
        return true;
    }
}

频率控制器实现

php
<?php

namespace App\Services\Push;

class FrequencyLimiter
{
    private $redis;
    private array $limits;

    public function __construct($redis, array $limits = [])
    {
        $this->redis = $redis;
        $this->limits = array_merge([
            'inbox' => ['per_minute' => 10, 'per_hour' => 100, 'per_day' => 500],
            'email' => ['per_hour' => 20, 'per_day' => 100],
            'sms' => ['per_minute' => 1, 'per_hour' => 10, 'per_day' => 30],
            'app' => ['per_minute' => 5, 'per_hour' => 50, 'per_day' => 200],
        ], $limits);
    }

    public function check(array $recipient, string $channel): bool
    {
        $identifier = $this->getIdentifier($recipient, $channel);
        $limits = $this->limits[$channel] ?? [];

        foreach ($limits as $period => $maxCount) {
            $key = $this->getKey($identifier, $channel, $period);
            $ttl = $this->getTtl($period);

            $current = (int) $this->redis->incr($key);
            
            if ($current === 1) {
                $this->redis->expire($key, $ttl);
            }

            if ($current > $maxCount) {
                return false;
            }
        }

        return true;
    }

    private function getIdentifier(array $recipient, string $channel): string
    {
        switch ($channel) {
            case 'inbox':
                return 'user:' . $recipient['user_id'];
            case 'email':
                return 'email:' . $recipient['email'];
            case 'sms':
                return 'phone:' . $recipient['phone'];
            case 'app':
                return 'device:' . $recipient['device_token'];
            default:
                return 'unknown:' . md5(json_encode($recipient));
        }
    }

    private function getKey(string $identifier, string $channel, string $period): string
    {
        return sprintf('push:limit:%s:%s:%s', $channel, $identifier, $period);
    }

    private function getTtl(string $period): int
    {
        $ttls = [
            'per_minute' => 60,
            'per_hour' => 3600,
            'per_day' => 86400,
        ];
        return $ttls[$period] ?? 3600;
    }
}

完整使用示例

php
<?php

require_once 'vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use App\Messaging\Push\{PushMessage, PushGateway};
use App\Services\Push\{PushService, FrequencyLimiter};

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$redis = new Redis();
$redis->connect('localhost', 6379);

$pushService = new PushService();
$frequencyLimiter = new FrequencyLimiter($redis);

$gateway = new PushGateway($connection, $pushService);

$emailMessage = new PushMessage(
    PushMessage::CHANNEL_EMAIL,
    [
        ['email' => 'user@example.com', 'name' => '张三'],
    ],
    [
        'subject' => '订单确认通知',
        'body' => '<h1>您的订单已确认</h1><p>订单号: ORD123456</p>',
    ],
    '',
    ['priority' => PushMessage::PRIORITY_HIGH]
);

$result = $gateway->send($emailMessage);
echo "邮件发送结果: " . json_encode($result) . "\n";

$smsMessage = new PushMessage(
    PushMessage::CHANNEL_SMS,
    [
        ['phone' => '13800138000'],
    ],
    [
        'body' => '您的验证码是: 123456,5分钟内有效。',
        'template_id' => 'SMS_VERIFY_CODE',
    ],
    '',
    ['priority' => PushMessage::PRIORITY_URGENT]
);

$result = $gateway->send($smsMessage);
echo "短信发送结果: " . json_encode($result) . "\n";

$scheduledTime = time() + 3600;
$scheduledMessage = new PushMessage(
    PushMessage::CHANNEL_EMAIL,
    [
        ['email' => 'user@example.com', 'name' => '张三'],
    ],
    [
        'subject' => '活动提醒',
        'body' => '<p>您关注的活动即将开始</p>',
    ],
    '',
    ['scheduled_at' => $scheduledTime]
);

$result = $gateway->send($scheduledMessage);
echo "定时邮件发送结果: " . json_encode($result) . "\n";

$gateway->close();
$connection->close();

关键技术点解析

1. 多渠道路由

通过交换机和队列的绑定实现消息路由:

php
private const CHANNEL_QUEUES = [
    'inbox' => 'push.inbox',
    'email' => 'push.email',
    'sms' => 'push.sms',
    'app' => 'push.app',
    'wechat' => 'push.wechat',
];

2. 优先级队列

php
$args = [
    'x-max-priority' => ['I', 10],
];
  • 验证码等紧急消息使用高优先级
  • 营销消息使用低优先级
  • 确保重要消息优先处理

3. 定时发送

通过消息 TTL 实现延迟发送:

php
$delay = ($message->scheduledAt - time()) * 1000;
$amqpMessage->set('expiration', strval($delay));

4. 频率控制

使用 Redis 实现多维度频率限制:

php
$limits = [
    'per_minute' => 10,
    'per_hour' => 100,
    'per_day' => 500,
];

性能优化建议

优化项建议说明
批量发送合并相同渠道消息减少网络请求
连接复用保持长连接避免频繁建连
异步处理使用独立消费者进程不阻塞主业务
缓存模板Redis 缓存模板减少数据库查询

常见问题与解决方案

1. 消息堆积

解决方案: 增加消费者数量,优化处理逻辑

2. 第三方限流

解决方案: 实现令牌桶限流,控制发送速率

3. 用户投诉

解决方案: 提供退订功能,记录用户偏好

4. 消息丢失

解决方案: 开启消息持久化,使用 publisher confirms

相关链接