Skip to content

发布/订阅模式

概述

发布/订阅模式(Publish/Subscribe Pattern)是一种一对多的消息传递模式。生产者(发布者)将消息发送到交换器,交换器将消息广播到所有绑定的队列,每个消费者(订阅者)都能收到消息的副本。这种模式适用于广播通知、实时更新等场景。

核心知识点

架构图

mermaid
graph TB
    subgraph 发布者
        P[Publisher]
    end

    subgraph RabbitMQ
        E[Fanout Exchange]
        Q1[Queue 1]
        Q2[Queue 2]
        Q3[Queue 3]
    end

    subgraph 订阅者
        C1[Subscriber 1]
        C2[Subscriber 2]
        C3[Subscriber 3]
    end

    P -->|发布消息| E
    E -->|广播| Q1
    E -->|广播| Q2
    E -->|广播| Q3
    Q1 --> C1
    Q2 --> C2
    Q3 --> C3

    style E fill:#ffcdd2
    style P fill:#e1f5fe
    style C1 fill:#e8f5e9
    style C2 fill:#e8f5e9
    style C3 fill:#e8f5e9

工作流程

mermaid
sequenceDiagram
    participant P as 发布者
    participant E as Fanout Exchange
    participant Q1 as 队列1
    participant Q2 as 队列2
    participant C1 as 订阅者1
    participant C2 as 订阅者2

    Note over E: Fanout 交换器<br/>广播到所有绑定队列

    P->>E: 发布消息
    E->>Q1: 复制消息
    E->>Q2: 复制消息

    Q1->>C1: 投递消息
    Q2->>C2: 投递消息

交换器类型

类型说明使用场景
Fanout广播到所有绑定队列发布/订阅、广播通知
Direct精确匹配路由键点对点消息
Topic模式匹配路由键多条件过滤
Headers基于消息头匹配复杂条件路由

PHP 代码示例

发布者实现

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class PubSubPublisher
{
    private $connection;
    private $channel;
    private $exchangeName = 'pubsub_exchange';

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        $this->channel->exchange_declare(
            $this->exchangeName,
            'fanout',
            false,
            true,
            false
        );
    }

    public function publish($message)
    {
        $msg = new AMQPMessage(
            json_encode([
                'content' => $message,
                'timestamp' => time(),
                'source' => gethostname()
            ]),
            ['content_type' => 'application/json']
        );

        $this->channel->basic_publish($msg, $this->exchangeName);

        echo " [x] 消息已广播: {$message}\n";
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

订阅者实现

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

class PubSubSubscriber
{
    private $connection;
    private $channel;
    private $exchangeName = 'pubsub_exchange';
    private $queueName;

    public function __construct($subscriberId = null)
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        $this->channel->exchange_declare(
            $this->exchangeName,
            'fanout',
            false,
            true,
            false
        );

        list($this->queueName, ,) = $this->channel->queue_declare(
            '',
            false,
            false,
            true,
            true
        );

        $this->channel->queue_bind(
            $this->queueName,
            $this->exchangeName
        );

        $id = $subscriberId ?? getmypid();
        echo " [x] 订阅者 {$id} 已启动,队列: {$this->queueName}\n";
    }

    public function subscribe(callable $callback)
    {
        $wrapper = function ($message) use ($callback) {
            $body = json_decode($message->body, true);

            call_user_func($callback, $body);

            $message->ack();
        };

        $this->channel->basic_qos(null, 1, null);
        $this->channel->basic_consume(
            $this->queueName,
            '',
            false,
            false,
            false,
            false,
            $wrapper
        );

        while ($this->channel->is_open()) {
            $this->channel->wait();
        }
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

完整示例:新闻发布系统

php
<?php

class NewsPublisher
{
    private $publisher;

    public function __construct()
    {
        $this->publisher = new PubSubPublisher();
    }

    public function publishNews($title, $content, $category = 'general')
    {
        $news = [
            'id' => uniqid('news_'),
            'title' => $title,
            'content' => $content,
            'category' => $category,
            'published_at' => time()
        ];

        $this->publisher->publish($news);

        return $news['id'];
    }

    public function publishBreakingNews($title, $content)
    {
        return $this->publishNews($title, $content, 'breaking');
    }

    public function publishSportsNews($title, $content)
    {
        return $this->publishNews($title, $content, 'sports');
    }
}

class NewsSubscriber
{
    private $subscriber;
    private $name;

    public function __construct($name)
    {
        $this->name = $name;
        $this->subscriber = new PubSubSubscriber($name);
    }

    public function start()
    {
        echo "新闻订阅者 {$this->name} 开始监听...\n";

        $this->subscriber->subscribe(function ($news) {
            $this->displayNews($news);
        });
    }

    private function displayNews($news)
    {
        echo sprintf(
            "\n[%s] %s 收到新闻:\n",
            date('Y-m-d H:i:s', $news['timestamp']),
            $this->name
        );
        echo "标题: {$news['title']}\n";
        echo "分类: {$news['category']}\n";
        echo "内容: {$news['content']}\n";
    }
}

广播通知系统

php
<?php

class NotificationBroadcaster
{
    private $publisher;

    public function __construct()
    {
        $this->publisher = new PubSubPublisher();
    }

    public function broadcast($type, $title, $message, $data = [])
    {
        $notification = [
            'id' => uniqid('notif_'),
            'type' => $type,
            'title' => $title,
            'message' => $message,
            'data' => $data,
            'timestamp' => time()
        ];

        $this->publisher->publish($notification);

        return $notification['id'];
    }

    public function systemAlert($title, $message)
    {
        return $this->broadcast('system_alert', $title, $message);
    }

    public function maintenanceNotice($title, $message, $scheduledTime)
    {
        return $this->broadcast(
            'maintenance',
            $title,
            $message,
            ['scheduled_time' => $scheduledTime]
        );
    }

    public function featureAnnouncement($title, $message, $features)
    {
        return $this->broadcast(
            'feature',
            $title,
            $message,
            ['features' => $features]
        );
    }
}

class NotificationReceiver
{
    private $subscriber;
    private $filters = [];

    public function __construct()
    {
        $this->subscriber = new PubSubSubscriber();
    }

    public function addFilter($type)
    {
        $this->filters[] = $type;
    }

    public function start()
    {
        $this->subscriber->subscribe(function ($notification) {
            if (!empty($this->filters) &&
                !in_array($notification['type'], $this->filters)) {
                return;
            }

            $this->handleNotification($notification);
        });
    }

    private function handleNotification($notification)
    {
        echo sprintf(
            "[%s] %s: %s\n",
            $notification['type'],
            $notification['title'],
            $notification['message']
        );
    }
}

实时数据推送

php
<?php

class RealtimeDataPublisher
{
    private $publisher;

    public function __construct()
    {
        $this->publisher = new PubSubPublisher();
    }

    public function pushMetrics($metrics)
    {
        $this->publisher->publish([
            'type' => 'metrics',
            'data' => $metrics,
            'timestamp' => time()
        ]);
    }

    public function pushAlert($alert)
    {
        $this->publisher->publish([
            'type' => 'alert',
            'data' => $alert,
            'timestamp' => time()
        ]);
    }

    public function pushUpdate($entity, $action, $data)
    {
        $this->publisher->publish([
            'type' => 'update',
            'entity' => $entity,
            'action' => $action,
            'data' => $data,
            'timestamp' => time()
        ]);
    }
}

class RealtimeDataClient
{
    private $subscriber;

    public function __construct()
    {
        $this->subscriber = new PubSubSubscriber();
    }

    public function start()
    {
        $this->subscriber->subscribe(function ($message) {
            switch ($message['type']) {
                case 'metrics':
                    $this->handleMetrics($message['data']);
                    break;
                case 'alert':
                    $this->handleAlert($message['data']);
                    break;
                case 'update':
                    $this->handleUpdate($message);
                    break;
            }
        });
    }

    private function handleMetrics($metrics)
    {
        echo "收到指标数据: " . json_encode($metrics) . "\n";
    }

    private function handleAlert($alert)
    {
        echo "收到告警: " . json_encode($alert) . "\n";
    }

    private function handleUpdate($message)
    {
        echo sprintf(
            "收到更新: %s %s\n",
            $message['entity'],
            $message['action']
        );
    }
}

实际应用场景

1. 系统公告广播

php
<?php

class SystemAnnouncement
{
    private $publisher;

    public function __construct()
    {
        $this->publisher = new PubSubPublisher();
    }

    public function announce($title, $content, $priority = 'normal')
    {
        $announcement = [
            'id' => uniqid('ann_'),
            'title' => $title,
            'content' => $content,
            'priority' => $priority,
            'created_at' => time()
        ];

        $this->publisher->publish($announcement);

        return $announcement;
    }

    public function emergency($title, $content)
    {
        return $this->announce($title, $content, 'emergency');
    }

    public function scheduled($title, $content)
    {
        return $this->announce($title, $content, 'scheduled');
    }
}

2. 缓存失效通知

php
<?php

class CacheInvalidationPublisher
{
    private $publisher;

    public function __construct()
    {
        $this->publisher = new PubSubPublisher();
    }

    public function invalidate($keys)
    {
        $this->publisher->publish([
            'action' => 'invalidate',
            'keys' => (array)$keys,
            'timestamp' => time()
        ]);
    }

    public function invalidatePattern($pattern)
    {
        $this->publisher->publish([
            'action' => 'invalidate_pattern',
            'pattern' => $pattern,
            'timestamp' => time()
        ]);
    }

    public function clearAll()
    {
        $this->publisher->publish([
            'action' => 'clear_all',
            'timestamp' => time()
        ]);
    }
}

class CacheInvalidationHandler
{
    private $cache;

    public function __construct()
    {
        $this->cache = new Redis();
        $this->cache->connect('127.0.0.1', 6379);
    }

    public function handle($message)
    {
        switch ($message['action']) {
            case 'invalidate':
                foreach ($message['keys'] as $key) {
                    $this->cache->del($key);
                }
                break;

            case 'invalidate_pattern':
                $keys = $this->cache->keys($message['pattern']);
                foreach ($keys as $key) {
                    $this->cache->del($key);
                }
                break;

            case 'clear_all':
                $this->cache->flushDB();
                break;
        }
    }
}

3. 配置更新广播

php
<?php

class ConfigUpdateBroadcaster
{
    private $publisher;

    public function __construct()
    {
        $this->publisher = new PubSubPublisher();
    }

    public function updateConfig($key, $value)
    {
        $this->publisher->publish([
            'type' => 'config_update',
            'key' => $key,
            'value' => $value,
            'timestamp' => time()
        ]);
    }

    public function reloadConfig($section = null)
    {
        $this->publisher->publish([
            'type' => 'config_reload',
            'section' => $section,
            'timestamp' => time()
        ]);
    }
}

class ConfigUpdateHandler
{
    private $config;

    public function __construct()
    {
        $this->config = [];
    }

    public function handle($message)
    {
        switch ($message['type']) {
            case 'config_update':
                $this->config[$message['key']] = $message['value'];
                echo "配置已更新: {$message['key']}\n";
                break;

            case 'config_reload':
                $this->reloadConfig($message['section']);
                echo "配置已重新加载\n";
                break;
        }
    }

    private function reloadConfig($section)
    {
        // 重新加载配置
    }
}

常见问题与解决方案

问题 1:消息重复消费

原因:多个订阅者收到相同消息

解决方案

php
<?php

class DeduplicationSubscriber
{
    private $processedIds = [];
    private $maxSize = 10000;

    public function subscribe($message)
    {
        $id = $message['id'] ?? null;

        if (!$id) {
            return;
        }

        if (in_array($id, $this->processedIds)) {
            echo "跳过重复消息: {$id}\n";
            return;
        }

        $this->processedIds[] = $id;

        if (count($this->processedIds) > $this->maxSize) {
            $this->processedIds = array_slice($this->processedIds, -$this->maxSize);
        }

        $this->processMessage($message);
    }

    private function processMessage($message)
    {
        // 处理消息
    }
}

问题 2:订阅者离线错过消息

原因:临时队列在订阅者断开后删除

解决方案

php
<?php

class DurableSubscriber
{
    private $connection;
    private $channel;
    private $exchangeName = 'pubsub_exchange';
    private $queueName;
    private $subscriberId;

    public function __construct($subscriberId)
    {
        $this->subscriberId = $subscriberId;
        $this->queueName = "subscriber_{$subscriberId}_queue";

        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        $this->channel->exchange_declare(
            $this->exchangeName,
            'fanout',
            false,
            true,
            false
        );

        $this->channel->queue_declare(
            $this->queueName,
            false,
            true,
            false,
            false
        );

        $this->channel->queue_bind(
            $this->queueName,
            $this->exchangeName
        );
    }

    public function subscribe(callable $callback)
    {
        $this->channel->basic_consume(
            $this->queueName,
            '',
            false,
            false,
            false,
            false,
            function ($message) use ($callback) {
                call_user_func($callback, json_decode($message->body, true));
                $message->ack();
            }
        );

        while ($this->channel->is_open()) {
            $this->channel->wait();
        }
    }
}

问题 3:广播风暴

原因:大量消息同时广播

解决方案

php
<?php

class RateLimitedPublisher
{
    private $publisher;
    private $rateLimit = 100;
    private $interval = 1;
    private $messageCount = 0;
    private $lastReset;

    public function __construct()
    {
        $this->publisher = new PubSubPublisher();
        $this->lastReset = time();
    }

    public function publish($message)
    {
        $this->checkRateLimit();

        $this->publisher->publish($message);
        $this->messageCount++;
    }

    private function checkRateLimit()
    {
        $now = time();

        if ($now - $this->lastReset >= $this->interval) {
            $this->messageCount = 0;
            $this->lastReset = $now;
        }

        if ($this->messageCount >= $this->rateLimit) {
            $sleepTime = $this->interval - ($now - $this->lastReset);
            if ($sleepTime > 0) {
                sleep($sleepTime);
            }
            $this->messageCount = 0;
            $this->lastReset = time();
        }
    }
}

最佳实践建议

1. 消息格式标准化

php
<?php

class BroadcastMessage
{
    public static function create($type, $payload)
    {
        return [
            'id' => bin2hex(random_bytes(16)),
            'type' => $type,
            'payload' => $payload,
            'timestamp' => time(),
            'source' => gethostname(),
            'version' => '1.0'
        ];
    }

    public static function validate($message)
    {
        $required = ['id', 'type', 'payload', 'timestamp'];

        foreach ($required as $field) {
            if (!isset($message[$field])) {
                throw new InvalidArgumentException("缺少必需字段: {$field}");
            }
        }

        return true;
    }
}

2. 订阅者管理

php
<?php

class SubscriberRegistry
{
    private $subscribers = [];

    public function register($subscriberId, $filters = [])
    {
        $this->subscribers[$subscriberId] = [
            'id' => $subscriberId,
            'filters' => $filters,
            'registered_at' => time(),
            'status' => 'active'
        ];
    }

    public function unregister($subscriberId)
    {
        unset($this->subscribers[$subscriberId]);
    }

    public function getActiveSubscribers()
    {
        return array_filter($this->subscribers, function ($sub) {
            return $sub['status'] === 'active';
        });
    }

    public function getSubscriberCount()
    {
        return count($this->getActiveSubscribers());
    }
}

3. 消息确认机制

php
<?php

class ReliablePubSub
{
    public function publishWithConfirm($message)
    {
        $this->channel->confirm_select();

        $msg = new AMQPMessage(json_encode($message));

        $this->channel->basic_publish($msg, $this->exchangeName);

        try {
            $this->channel->wait_for_pending_acks(5);
            return true;
        } catch (Exception $e) {
            return false;
        }
    }
}

相关链接