Appearance
消息推送系统
概述
消息推送系统是现代应用不可或缺的功能,包括站内信、邮件通知、短信验证码、App 推送等多种渠道。通过 RabbitMQ 构建统一的消息推送平台,实现多渠道消息的可靠投递、状态追踪和失败重试。
业务背景与需求
场景描述
某综合平台需要支持多种消息推送场景:
| 推送渠道 | 使用场景 | 特点 | 量级 |
|---|---|---|---|
| 站内信 | 系统通知、活动公告 | 实时性要求高 | 100万/天 |
| 邮件 | 订单确认、账单通知 | 可靠性要求高 | 50万/天 |
| 短信 | 验证码、告警通知 | 实时性+可靠性 | 20万/天 |
| App推送 | 营销活动、状态更新 | 需要设备在线 | 200万/天 |
| 微信模板 | 服务通知 | 需要用户授权 | 30万/天 |
需求分析
| 需求项 | 描述 |
|---|---|
| 多渠道支持 | 统一接口支持多种推送渠道 |
| 模板管理 | 支持消息模板和变量替换 |
| 定时发送 | 支持指定时间发送 |
| 状态追踪 | 追踪消息发送状态 |
| 失败重试 | 自动重试失败的推送 |
| 频率控制 | 防止用户被过度打扰 |
| 投诉处理 | 处理用户投诉和退订 |
架构设计
整体架构图
mermaid
graph TB
subgraph "业务系统"
A[订单系统]
B[用户系统]
C[营销系统]
D[告警系统]
end
subgraph "推送网关"
E[推送API]
F[消息路由器]
G[模板引擎]
H[频率控制器]
end
subgraph "RabbitMQ"
I[推送交换机<br/>push.exchange]
subgraph "渠道队列"
J[站内信队列<br/>push.inbox]
K[邮件队列<br/>push.email]
L[短信队列<br/>push.sms]
M[App推送队列<br/>push.app]
N[微信队列<br/>push.wechat]
end
O[定时队列<br/>push.scheduled]
P[重试队列<br/>push.retry]
Q[死信队列<br/>push.dlq]
end
subgraph "推送服务"
R[站内信服务]
S[邮件服务]
T[短信服务]
U[App推送服务]
V[微信服务]
end
subgraph "第三方平台"
W[SMTP服务器]
X[短信网关]
Y[APNS/FCM]
Z[微信API]
end
subgraph "状态追踪"
AA[状态存储]
AB[回调处理]
end
A --> E
B --> E
C --> E
D --> E
E --> F
F --> G
G --> H
H --> I
I --> J
I --> K
I --> L
I --> M
I --> N
I --> O
J --> R
K --> S
L --> T
M --> U
N --> V
O --> I
S --> W
T --> X
U --> Y
V --> Z
J -.-> P
K -.-> P
L -.-> P
M -.-> P
N -.-> P
P -.-> Q
P --> I
R --> AA
S --> AA
T --> AA
U --> AA
V --> AA
W --> AB
X --> AB
Y --> AB
Z --> AB
AB --> AA消息推送流程
mermaid
sequenceDiagram
participant App as 业务应用
participant Gateway as 推送网关
participant MQ as RabbitMQ
participant Service as 推送服务
participant ThirdParty as 第三方平台
participant DB as 状态存储
App->>Gateway: 发送推送请求
Gateway->>Gateway: 参数校验
Gateway->>Gateway: 模板渲染
Gateway->>Gateway: 频率检查
alt 频率超限
Gateway-->>App: 返回频率限制错误
else 正常
Gateway->>DB: 记录推送状态(pending)
Gateway->>MQ: 发布推送消息
Gateway-->>App: 返回消息ID
MQ->>Service: 投递消息
Service->>Service: 处理推送逻辑
Service->>ThirdParty: 调用第三方API
alt 发送成功
ThirdParty-->>Service: 返回成功
Service->>MQ: ACK确认
Service->>DB: 更新状态(success)
else 发送失败
ThirdParty-->>Service: 返回失败
Service->>Service: 判断是否重试
alt 需要重试
Service->>MQ: NACK(重新入队)
else 不再重试
Service->>MQ: ACK(进入死信)
Service->>DB: 更新状态(failed)
end
end
end消息状态流转
mermaid
stateDiagram-v2
[*] --> pending: 创建推送
pending --> sent: 发送成功
pending --> retrying: 发送失败(可重试)
pending --> failed: 发送失败(不可重试)
retrying --> sent: 重试成功
retrying --> failed: 重试次数耗尽
sent --> delivered: 已送达
sent --> opened: 已打开
sent --> clicked: 已点击
delivered --> opened
opened --> clicked
failed --> [*]
clicked --> [*]
note right of pending: 等待发送
note right of sent: 已发送
note right of delivered: 第三方确认
note right of failed: 最终失败PHP 代码实现
推送消息结构定义
php
<?php
namespace App\Messaging\Push;
class PushMessage
{
public string $messageId;
public string $channel;
public string $templateCode;
public array $recipients;
public array $content;
public array $options;
public int $priority;
public ?int $scheduledAt;
public int $maxRetries;
public int $currentRetry;
public int $createdAt;
public string $traceId;
public const CHANNEL_INBOX = 'inbox';
public const CHANNEL_EMAIL = 'email';
public const CHANNEL_SMS = 'sms';
public const CHANNEL_APP = 'app';
public const CHANNEL_WECHAT = 'wechat';
public const PRIORITY_LOW = 1;
public const PRIORITY_NORMAL = 5;
public const PRIORITY_HIGH = 8;
public const PRIORITY_URGENT = 10;
public function __construct(
string $channel,
array $recipients,
array $content = [],
string $templateCode = '',
array $options = []
) {
$this->messageId = $this->generateMessageId();
$this->channel = $channel;
$this->recipients = $recipients;
$this->content = $content;
$this->templateCode = $templateCode;
$this->options = array_merge([
'priority' => self::PRIORITY_NORMAL,
'scheduled_at' => null,
'max_retries' => 3,
'ttl' => 86400,
], $options);
$this->priority = $this->options['priority'];
$this->scheduledAt = $this->options['scheduled_at'];
$this->maxRetries = $this->options['max_retries'];
$this->currentRetry = 0;
$this->createdAt = time();
$this->traceId = $this->generateTraceId();
}
private function generateMessageId(): string
{
return sprintf('msg_%s_%s', date('YmdHis'), bin2hex(random_bytes(8)));
}
private function generateTraceId(): string
{
return bin2hex(random_bytes(16));
}
public function incrementRetry(): self
{
$this->currentRetry++;
return $this;
}
public function canRetry(): bool
{
return $this->currentRetry < $this->maxRetries;
}
public function isScheduled(): bool
{
return $this->scheduledAt !== null && $this->scheduledAt > time();
}
public function isExpired(): bool
{
return time() > ($this->createdAt + $this->options['ttl']);
}
public function toArray(): array
{
return [
'message_id' => $this->messageId,
'channel' => $this->channel,
'template_code' => $this->templateCode,
'recipients' => $this->recipients,
'content' => $this->content,
'options' => $this->options,
'priority' => $this->priority,
'scheduled_at' => $this->scheduledAt,
'max_retries' => $this->maxRetries,
'current_retry' => $this->currentRetry,
'created_at' => $this->createdAt,
'trace_id' => $this->traceId,
];
}
public static function fromArray(array $data): self
{
$message = new self(
$data['channel'],
$data['recipients'],
$data['content'] ?? [],
$data['template_code'] ?? '',
$data['options'] ?? []
);
$message->messageId = $data['message_id'];
$message->priority = $data['priority'];
$message->scheduledAt = $data['scheduled_at'] ?? null;
$message->maxRetries = $data['max_retries'];
$message->currentRetry = $data['current_retry'] ?? 0;
$message->createdAt = $data['created_at'];
$message->traceId = $data['trace_id'] ?? '';
return $message;
}
}推送网关实现
php
<?php
namespace App\Messaging\Push;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use App\Services\Push\PushService;
class PushGateway
{
private AMQPStreamConnection $connection;
private $channel;
private PushService $pushService;
private string $exchangeName = 'push.exchange';
private string $scheduledExchange = 'push.scheduled.exchange';
private string $retryExchange = 'push.retry.exchange';
private string $deadLetterExchange = 'push.dlx';
private const CHANNEL_QUEUES = [
PushMessage::CHANNEL_INBOX => 'push.inbox',
PushMessage::CHANNEL_EMAIL => 'push.email',
PushMessage::CHANNEL_SMS => 'push.sms',
PushMessage::CHANNEL_APP => 'push.app',
PushMessage::CHANNEL_WECHAT => 'push.wechat',
];
public function __construct(
AMQPStreamConnection $connection,
PushService $pushService
) {
$this->connection = $connection;
$this->channel = $connection->channel();
$this->pushService = $pushService;
$this->setupInfrastructure();
}
private function setupInfrastructure(): void
{
$this->channel->exchange_declare(
$this->exchangeName,
AMQPExchangeType::DIRECT,
false,
true,
false
);
$this->channel->exchange_declare(
$this->deadLetterExchange,
AMQPExchangeType::DIRECT,
false,
true,
false
);
foreach (self::CHANNEL_QUEUES as $channel => $queueName) {
$args = [
'x-dead-letter-exchange' => ['S', $this->deadLetterExchange],
'x-dead-letter-routing-key' => ['S', 'push.failed'],
'x-max-priority' => ['I', 10],
'x-message-ttl' => ['I', 86400000],
];
$this->channel->queue_declare(
$queueName,
false,
true,
false,
false,
false,
$args
);
$this->channel->queue_bind($queueName, $this->exchangeName, $queueName);
}
$this->channel->queue_declare('push.dlq', false, true, false, false);
$this->channel->queue_bind('push.dlq', $this->deadLetterExchange, 'push.failed');
}
public function send(PushMessage $message): array
{
$result = $this->pushService->validateAndPrepare($message);
if (!$result['success']) {
return $result;
}
if ($message->isScheduled()) {
$this->scheduleMessage($message);
} else {
$this->publishMessage($message);
}
return ['success' => true, 'message_id' => $message->messageId];
}
private function publishMessage(PushMessage $message): void
{
$queueName = self::CHANNEL_QUEUES[$message->channel] ?? null;
if (!$queueName) {
throw new \InvalidArgumentException("Unknown channel: {$message->channel}");
}
$amqpMessage = new AMQPMessage(
json_encode($message->toArray()),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'priority' => $message->priority,
'message_id' => $message->messageId,
'timestamp' => time(),
]
);
$this->channel->basic_publish(
$amqpMessage,
$this->exchangeName,
$queueName
);
}
private function scheduleMessage(PushMessage $message): void
{
$delay = ($message->scheduledAt - time()) * 1000;
$amqpMessage = new AMQPMessage(
json_encode($message->toArray()),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'expiration' => strval($delay),
]
);
$this->channel->basic_publish(
$amqpMessage,
'',
'push.scheduled'
);
}
public function close(): void
{
if ($this->channel) {
$this->channel->close();
}
}
}渠道消费者实现
php
<?php
namespace App\Consumers\Push;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use App\Messaging\Push\PushMessage;
use App\Services\Push\PushService;
abstract class PushConsumer
{
protected AMQPStreamConnection $connection;
protected $channel;
protected PushService $pushService;
protected string $queueName;
protected int $prefetchCount = 10;
protected bool $running = true;
public function __construct(
AMQPStreamConnection $connection,
PushService $pushService
) {
$this->connection = $connection;
$this->channel = $connection->channel();
$this->pushService = $pushService;
$this->setup();
}
protected function setup(): void
{
$this->channel->basic_qos(null, $this->prefetchCount, null);
}
public function consume(): void
{
$this->channel->basic_consume(
$this->queueName,
'',
false,
false,
false,
false,
[$this, 'processMessage']
);
while ($this->running && count($this->channel->callbacks)) {
$this->channel->wait(null, true);
if (!$this->running) break;
usleep(100000);
}
}
public function processMessage(AMQPMessage $message): void
{
$data = json_decode($message->body, true);
$pushMessage = PushMessage::fromArray($data);
try {
if ($pushMessage->isExpired()) {
$this->pushService->updateStatus($pushMessage->messageId, 'expired');
$message->ack();
return;
}
$result = $this->doPush($pushMessage);
if ($result['success']) {
$this->pushService->updateStatus($pushMessage->messageId, 'sent', $result);
$message->ack();
} else {
$this->handleFailure($message, $pushMessage, $result['error']);
}
} catch (\Exception $e) {
$this->handleException($message, $pushMessage, $e);
}
}
abstract protected function doPush(PushMessage $message): array;
protected function handleFailure(AMQPMessage $message, PushMessage $pushMessage, \Exception $error): void
{
if ($pushMessage->canRetry() && $this->shouldRetry($error)) {
$this->pushService->updateStatus(
$pushMessage->messageId,
'retrying',
['retry_count' => $pushMessage->currentRetry + 1]
);
$message->nack(false, true);
} else {
$this->pushService->updateStatus(
$pushMessage->messageId,
'failed',
['error' => $error->getMessage()]
);
$message->nack(false, false);
}
}
protected function handleException(AMQPMessage $message, PushMessage $pushMessage, \Exception $e): void
{
error_log(sprintf('[PushConsumer] Error: %s - %s', $pushMessage->messageId, $e->getMessage()));
$message->nack(false, true);
}
protected function shouldRetry(\Exception $error): bool
{
return true;
}
public function stop(): void
{
$this->running = false;
}
public function close(): void
{
$this->stop();
if ($this->channel) {
$this->channel->close();
}
}
}
class EmailConsumer extends PushConsumer
{
protected string $queueName = 'push.email';
private EmailSender $emailSender;
public function __construct(
AMQPStreamConnection $connection,
PushService $pushService,
EmailSender $emailSender
) {
parent::__construct($connection, $pushService);
$this->emailSender = $emailSender;
}
protected function doPush(PushMessage $message): array
{
$content = $message->content;
foreach ($message->recipients as $recipient) {
$result = $this->emailSender->send([
'to' => $recipient['email'],
'to_name' => $recipient['name'] ?? '',
'subject' => $content['subject'],
'body' => $content['body'],
]);
if (!$result['success']) {
return $result;
}
}
return ['success' => true];
}
}
class SmsConsumer extends PushConsumer
{
protected string $queueName = 'push.sms';
private SmsSender $smsSender;
public function __construct(
AMQPStreamConnection $connection,
PushService $pushService,
SmsSender $smsSender
) {
parent::__construct($connection, $pushService);
$this->smsSender = $smsSender;
}
protected function doPush(PushMessage $message): array
{
$content = $message->content;
foreach ($message->recipients as $recipient) {
$result = $this->smsSender->send([
'phone' => $recipient['phone'],
'content' => $content['body'],
'template_id' => $content['template_id'] ?? null,
]);
if (!$result['success']) {
return $result;
}
}
return ['success' => true];
}
protected function shouldRetry(\Exception $error): bool
{
$nonRetryableCodes = ['INVALID_PHONE', 'BLACKLISTED'];
foreach ($nonRetryableCodes as $code) {
if (strpos($error->getMessage(), $code) !== false) {
return false;
}
}
return true;
}
}频率控制器实现
php
<?php
namespace App\Services\Push;
class FrequencyLimiter
{
private $redis;
private array $limits;
public function __construct($redis, array $limits = [])
{
$this->redis = $redis;
$this->limits = array_merge([
'inbox' => ['per_minute' => 10, 'per_hour' => 100, 'per_day' => 500],
'email' => ['per_hour' => 20, 'per_day' => 100],
'sms' => ['per_minute' => 1, 'per_hour' => 10, 'per_day' => 30],
'app' => ['per_minute' => 5, 'per_hour' => 50, 'per_day' => 200],
], $limits);
}
public function check(array $recipient, string $channel): bool
{
$identifier = $this->getIdentifier($recipient, $channel);
$limits = $this->limits[$channel] ?? [];
foreach ($limits as $period => $maxCount) {
$key = $this->getKey($identifier, $channel, $period);
$ttl = $this->getTtl($period);
$current = (int) $this->redis->incr($key);
if ($current === 1) {
$this->redis->expire($key, $ttl);
}
if ($current > $maxCount) {
return false;
}
}
return true;
}
private function getIdentifier(array $recipient, string $channel): string
{
switch ($channel) {
case 'inbox':
return 'user:' . $recipient['user_id'];
case 'email':
return 'email:' . $recipient['email'];
case 'sms':
return 'phone:' . $recipient['phone'];
case 'app':
return 'device:' . $recipient['device_token'];
default:
return 'unknown:' . md5(json_encode($recipient));
}
}
private function getKey(string $identifier, string $channel, string $period): string
{
return sprintf('push:limit:%s:%s:%s', $channel, $identifier, $period);
}
private function getTtl(string $period): int
{
$ttls = [
'per_minute' => 60,
'per_hour' => 3600,
'per_day' => 86400,
];
return $ttls[$period] ?? 3600;
}
}完整使用示例
php
<?php
require_once 'vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use App\Messaging\Push\{PushMessage, PushGateway};
use App\Services\Push\{PushService, FrequencyLimiter};
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$redis = new Redis();
$redis->connect('localhost', 6379);
$pushService = new PushService();
$frequencyLimiter = new FrequencyLimiter($redis);
$gateway = new PushGateway($connection, $pushService);
$emailMessage = new PushMessage(
PushMessage::CHANNEL_EMAIL,
[
['email' => 'user@example.com', 'name' => '张三'],
],
[
'subject' => '订单确认通知',
'body' => '<h1>您的订单已确认</h1><p>订单号: ORD123456</p>',
],
'',
['priority' => PushMessage::PRIORITY_HIGH]
);
$result = $gateway->send($emailMessage);
echo "邮件发送结果: " . json_encode($result) . "\n";
$smsMessage = new PushMessage(
PushMessage::CHANNEL_SMS,
[
['phone' => '13800138000'],
],
[
'body' => '您的验证码是: 123456,5分钟内有效。',
'template_id' => 'SMS_VERIFY_CODE',
],
'',
['priority' => PushMessage::PRIORITY_URGENT]
);
$result = $gateway->send($smsMessage);
echo "短信发送结果: " . json_encode($result) . "\n";
$scheduledTime = time() + 3600;
$scheduledMessage = new PushMessage(
PushMessage::CHANNEL_EMAIL,
[
['email' => 'user@example.com', 'name' => '张三'],
],
[
'subject' => '活动提醒',
'body' => '<p>您关注的活动即将开始</p>',
],
'',
['scheduled_at' => $scheduledTime]
);
$result = $gateway->send($scheduledMessage);
echo "定时邮件发送结果: " . json_encode($result) . "\n";
$gateway->close();
$connection->close();关键技术点解析
1. 多渠道路由
通过交换机和队列的绑定实现消息路由:
php
private const CHANNEL_QUEUES = [
'inbox' => 'push.inbox',
'email' => 'push.email',
'sms' => 'push.sms',
'app' => 'push.app',
'wechat' => 'push.wechat',
];2. 优先级队列
php
$args = [
'x-max-priority' => ['I', 10],
];- 验证码等紧急消息使用高优先级
- 营销消息使用低优先级
- 确保重要消息优先处理
3. 定时发送
通过消息 TTL 实现延迟发送:
php
$delay = ($message->scheduledAt - time()) * 1000;
$amqpMessage->set('expiration', strval($delay));4. 频率控制
使用 Redis 实现多维度频率限制:
php
$limits = [
'per_minute' => 10,
'per_hour' => 100,
'per_day' => 500,
];性能优化建议
| 优化项 | 建议 | 说明 |
|---|---|---|
| 批量发送 | 合并相同渠道消息 | 减少网络请求 |
| 连接复用 | 保持长连接 | 避免频繁建连 |
| 异步处理 | 使用独立消费者进程 | 不阻塞主业务 |
| 缓存模板 | Redis 缓存模板 | 减少数据库查询 |
常见问题与解决方案
1. 消息堆积
解决方案: 增加消费者数量,优化处理逻辑
2. 第三方限流
解决方案: 实现令牌桶限流,控制发送速率
3. 用户投诉
解决方案: 提供退订功能,记录用户偏好
4. 消息丢失
解决方案: 开启消息持久化,使用 publisher confirms
