Appearance
发布/订阅模式
概述
发布/订阅模式(Publish/Subscribe Pattern)是一种一对多的消息传递模式。生产者(发布者)将消息发送到交换器,交换器将消息广播到所有绑定的队列,每个消费者(订阅者)都能收到消息的副本。这种模式适用于广播通知、实时更新等场景。
核心知识点
架构图
mermaid
graph TB
subgraph 发布者
P[Publisher]
end
subgraph RabbitMQ
E[Fanout Exchange]
Q1[Queue 1]
Q2[Queue 2]
Q3[Queue 3]
end
subgraph 订阅者
C1[Subscriber 1]
C2[Subscriber 2]
C3[Subscriber 3]
end
P -->|发布消息| E
E -->|广播| Q1
E -->|广播| Q2
E -->|广播| Q3
Q1 --> C1
Q2 --> C2
Q3 --> C3
style E fill:#ffcdd2
style P fill:#e1f5fe
style C1 fill:#e8f5e9
style C2 fill:#e8f5e9
style C3 fill:#e8f5e9工作流程
mermaid
sequenceDiagram
participant P as 发布者
participant E as Fanout Exchange
participant Q1 as 队列1
participant Q2 as 队列2
participant C1 as 订阅者1
participant C2 as 订阅者2
Note over E: Fanout 交换器<br/>广播到所有绑定队列
P->>E: 发布消息
E->>Q1: 复制消息
E->>Q2: 复制消息
Q1->>C1: 投递消息
Q2->>C2: 投递消息交换器类型
| 类型 | 说明 | 使用场景 |
|---|---|---|
| Fanout | 广播到所有绑定队列 | 发布/订阅、广播通知 |
| Direct | 精确匹配路由键 | 点对点消息 |
| Topic | 模式匹配路由键 | 多条件过滤 |
| Headers | 基于消息头匹配 | 复杂条件路由 |
PHP 代码示例
发布者实现
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class PubSubPublisher
{
private $connection;
private $channel;
private $exchangeName = 'pubsub_exchange';
public function __construct()
{
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
$this->channel->exchange_declare(
$this->exchangeName,
'fanout',
false,
true,
false
);
}
public function publish($message)
{
$msg = new AMQPMessage(
json_encode([
'content' => $message,
'timestamp' => time(),
'source' => gethostname()
]),
['content_type' => 'application/json']
);
$this->channel->basic_publish($msg, $this->exchangeName);
echo " [x] 消息已广播: {$message}\n";
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}订阅者实现
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
class PubSubSubscriber
{
private $connection;
private $channel;
private $exchangeName = 'pubsub_exchange';
private $queueName;
public function __construct($subscriberId = null)
{
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
$this->channel->exchange_declare(
$this->exchangeName,
'fanout',
false,
true,
false
);
list($this->queueName, ,) = $this->channel->queue_declare(
'',
false,
false,
true,
true
);
$this->channel->queue_bind(
$this->queueName,
$this->exchangeName
);
$id = $subscriberId ?? getmypid();
echo " [x] 订阅者 {$id} 已启动,队列: {$this->queueName}\n";
}
public function subscribe(callable $callback)
{
$wrapper = function ($message) use ($callback) {
$body = json_decode($message->body, true);
call_user_func($callback, $body);
$message->ack();
};
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume(
$this->queueName,
'',
false,
false,
false,
false,
$wrapper
);
while ($this->channel->is_open()) {
$this->channel->wait();
}
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}完整示例:新闻发布系统
php
<?php
class NewsPublisher
{
private $publisher;
public function __construct()
{
$this->publisher = new PubSubPublisher();
}
public function publishNews($title, $content, $category = 'general')
{
$news = [
'id' => uniqid('news_'),
'title' => $title,
'content' => $content,
'category' => $category,
'published_at' => time()
];
$this->publisher->publish($news);
return $news['id'];
}
public function publishBreakingNews($title, $content)
{
return $this->publishNews($title, $content, 'breaking');
}
public function publishSportsNews($title, $content)
{
return $this->publishNews($title, $content, 'sports');
}
}
class NewsSubscriber
{
private $subscriber;
private $name;
public function __construct($name)
{
$this->name = $name;
$this->subscriber = new PubSubSubscriber($name);
}
public function start()
{
echo "新闻订阅者 {$this->name} 开始监听...\n";
$this->subscriber->subscribe(function ($news) {
$this->displayNews($news);
});
}
private function displayNews($news)
{
echo sprintf(
"\n[%s] %s 收到新闻:\n",
date('Y-m-d H:i:s', $news['timestamp']),
$this->name
);
echo "标题: {$news['title']}\n";
echo "分类: {$news['category']}\n";
echo "内容: {$news['content']}\n";
}
}广播通知系统
php
<?php
class NotificationBroadcaster
{
private $publisher;
public function __construct()
{
$this->publisher = new PubSubPublisher();
}
public function broadcast($type, $title, $message, $data = [])
{
$notification = [
'id' => uniqid('notif_'),
'type' => $type,
'title' => $title,
'message' => $message,
'data' => $data,
'timestamp' => time()
];
$this->publisher->publish($notification);
return $notification['id'];
}
public function systemAlert($title, $message)
{
return $this->broadcast('system_alert', $title, $message);
}
public function maintenanceNotice($title, $message, $scheduledTime)
{
return $this->broadcast(
'maintenance',
$title,
$message,
['scheduled_time' => $scheduledTime]
);
}
public function featureAnnouncement($title, $message, $features)
{
return $this->broadcast(
'feature',
$title,
$message,
['features' => $features]
);
}
}
class NotificationReceiver
{
private $subscriber;
private $filters = [];
public function __construct()
{
$this->subscriber = new PubSubSubscriber();
}
public function addFilter($type)
{
$this->filters[] = $type;
}
public function start()
{
$this->subscriber->subscribe(function ($notification) {
if (!empty($this->filters) &&
!in_array($notification['type'], $this->filters)) {
return;
}
$this->handleNotification($notification);
});
}
private function handleNotification($notification)
{
echo sprintf(
"[%s] %s: %s\n",
$notification['type'],
$notification['title'],
$notification['message']
);
}
}实时数据推送
php
<?php
class RealtimeDataPublisher
{
private $publisher;
public function __construct()
{
$this->publisher = new PubSubPublisher();
}
public function pushMetrics($metrics)
{
$this->publisher->publish([
'type' => 'metrics',
'data' => $metrics,
'timestamp' => time()
]);
}
public function pushAlert($alert)
{
$this->publisher->publish([
'type' => 'alert',
'data' => $alert,
'timestamp' => time()
]);
}
public function pushUpdate($entity, $action, $data)
{
$this->publisher->publish([
'type' => 'update',
'entity' => $entity,
'action' => $action,
'data' => $data,
'timestamp' => time()
]);
}
}
class RealtimeDataClient
{
private $subscriber;
public function __construct()
{
$this->subscriber = new PubSubSubscriber();
}
public function start()
{
$this->subscriber->subscribe(function ($message) {
switch ($message['type']) {
case 'metrics':
$this->handleMetrics($message['data']);
break;
case 'alert':
$this->handleAlert($message['data']);
break;
case 'update':
$this->handleUpdate($message);
break;
}
});
}
private function handleMetrics($metrics)
{
echo "收到指标数据: " . json_encode($metrics) . "\n";
}
private function handleAlert($alert)
{
echo "收到告警: " . json_encode($alert) . "\n";
}
private function handleUpdate($message)
{
echo sprintf(
"收到更新: %s %s\n",
$message['entity'],
$message['action']
);
}
}实际应用场景
1. 系统公告广播
php
<?php
class SystemAnnouncement
{
private $publisher;
public function __construct()
{
$this->publisher = new PubSubPublisher();
}
public function announce($title, $content, $priority = 'normal')
{
$announcement = [
'id' => uniqid('ann_'),
'title' => $title,
'content' => $content,
'priority' => $priority,
'created_at' => time()
];
$this->publisher->publish($announcement);
return $announcement;
}
public function emergency($title, $content)
{
return $this->announce($title, $content, 'emergency');
}
public function scheduled($title, $content)
{
return $this->announce($title, $content, 'scheduled');
}
}2. 缓存失效通知
php
<?php
class CacheInvalidationPublisher
{
private $publisher;
public function __construct()
{
$this->publisher = new PubSubPublisher();
}
public function invalidate($keys)
{
$this->publisher->publish([
'action' => 'invalidate',
'keys' => (array)$keys,
'timestamp' => time()
]);
}
public function invalidatePattern($pattern)
{
$this->publisher->publish([
'action' => 'invalidate_pattern',
'pattern' => $pattern,
'timestamp' => time()
]);
}
public function clearAll()
{
$this->publisher->publish([
'action' => 'clear_all',
'timestamp' => time()
]);
}
}
class CacheInvalidationHandler
{
private $cache;
public function __construct()
{
$this->cache = new Redis();
$this->cache->connect('127.0.0.1', 6379);
}
public function handle($message)
{
switch ($message['action']) {
case 'invalidate':
foreach ($message['keys'] as $key) {
$this->cache->del($key);
}
break;
case 'invalidate_pattern':
$keys = $this->cache->keys($message['pattern']);
foreach ($keys as $key) {
$this->cache->del($key);
}
break;
case 'clear_all':
$this->cache->flushDB();
break;
}
}
}3. 配置更新广播
php
<?php
class ConfigUpdateBroadcaster
{
private $publisher;
public function __construct()
{
$this->publisher = new PubSubPublisher();
}
public function updateConfig($key, $value)
{
$this->publisher->publish([
'type' => 'config_update',
'key' => $key,
'value' => $value,
'timestamp' => time()
]);
}
public function reloadConfig($section = null)
{
$this->publisher->publish([
'type' => 'config_reload',
'section' => $section,
'timestamp' => time()
]);
}
}
class ConfigUpdateHandler
{
private $config;
public function __construct()
{
$this->config = [];
}
public function handle($message)
{
switch ($message['type']) {
case 'config_update':
$this->config[$message['key']] = $message['value'];
echo "配置已更新: {$message['key']}\n";
break;
case 'config_reload':
$this->reloadConfig($message['section']);
echo "配置已重新加载\n";
break;
}
}
private function reloadConfig($section)
{
// 重新加载配置
}
}常见问题与解决方案
问题 1:消息重复消费
原因:多个订阅者收到相同消息
解决方案:
php
<?php
class DeduplicationSubscriber
{
private $processedIds = [];
private $maxSize = 10000;
public function subscribe($message)
{
$id = $message['id'] ?? null;
if (!$id) {
return;
}
if (in_array($id, $this->processedIds)) {
echo "跳过重复消息: {$id}\n";
return;
}
$this->processedIds[] = $id;
if (count($this->processedIds) > $this->maxSize) {
$this->processedIds = array_slice($this->processedIds, -$this->maxSize);
}
$this->processMessage($message);
}
private function processMessage($message)
{
// 处理消息
}
}问题 2:订阅者离线错过消息
原因:临时队列在订阅者断开后删除
解决方案:
php
<?php
class DurableSubscriber
{
private $connection;
private $channel;
private $exchangeName = 'pubsub_exchange';
private $queueName;
private $subscriberId;
public function __construct($subscriberId)
{
$this->subscriberId = $subscriberId;
$this->queueName = "subscriber_{$subscriberId}_queue";
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
$this->channel->exchange_declare(
$this->exchangeName,
'fanout',
false,
true,
false
);
$this->channel->queue_declare(
$this->queueName,
false,
true,
false,
false
);
$this->channel->queue_bind(
$this->queueName,
$this->exchangeName
);
}
public function subscribe(callable $callback)
{
$this->channel->basic_consume(
$this->queueName,
'',
false,
false,
false,
false,
function ($message) use ($callback) {
call_user_func($callback, json_decode($message->body, true));
$message->ack();
}
);
while ($this->channel->is_open()) {
$this->channel->wait();
}
}
}问题 3:广播风暴
原因:大量消息同时广播
解决方案:
php
<?php
class RateLimitedPublisher
{
private $publisher;
private $rateLimit = 100;
private $interval = 1;
private $messageCount = 0;
private $lastReset;
public function __construct()
{
$this->publisher = new PubSubPublisher();
$this->lastReset = time();
}
public function publish($message)
{
$this->checkRateLimit();
$this->publisher->publish($message);
$this->messageCount++;
}
private function checkRateLimit()
{
$now = time();
if ($now - $this->lastReset >= $this->interval) {
$this->messageCount = 0;
$this->lastReset = $now;
}
if ($this->messageCount >= $this->rateLimit) {
$sleepTime = $this->interval - ($now - $this->lastReset);
if ($sleepTime > 0) {
sleep($sleepTime);
}
$this->messageCount = 0;
$this->lastReset = time();
}
}
}最佳实践建议
1. 消息格式标准化
php
<?php
class BroadcastMessage
{
public static function create($type, $payload)
{
return [
'id' => bin2hex(random_bytes(16)),
'type' => $type,
'payload' => $payload,
'timestamp' => time(),
'source' => gethostname(),
'version' => '1.0'
];
}
public static function validate($message)
{
$required = ['id', 'type', 'payload', 'timestamp'];
foreach ($required as $field) {
if (!isset($message[$field])) {
throw new InvalidArgumentException("缺少必需字段: {$field}");
}
}
return true;
}
}2. 订阅者管理
php
<?php
class SubscriberRegistry
{
private $subscribers = [];
public function register($subscriberId, $filters = [])
{
$this->subscribers[$subscriberId] = [
'id' => $subscriberId,
'filters' => $filters,
'registered_at' => time(),
'status' => 'active'
];
}
public function unregister($subscriberId)
{
unset($this->subscribers[$subscriberId]);
}
public function getActiveSubscribers()
{
return array_filter($this->subscribers, function ($sub) {
return $sub['status'] === 'active';
});
}
public function getSubscriberCount()
{
return count($this->getActiveSubscribers());
}
}3. 消息确认机制
php
<?php
class ReliablePubSub
{
public function publishWithConfirm($message)
{
$this->channel->confirm_select();
$msg = new AMQPMessage(json_encode($message));
$this->channel->basic_publish($msg, $this->exchangeName);
try {
$this->channel->wait_for_pending_acks(5);
return true;
} catch (Exception $e) {
return false;
}
}
}