Skip to content

Fanout Exchange

概述

Fanout Exchange(扇出交换机)是 RabbitMQ 中最简单的交换机类型。它会将接收到的所有消息广播到所有绑定的队列中,忽略 routing key。

核心原理

Fanout Exchange 的工作方式类似于广播:所有绑定到该交换机的队列都会收到消息的副本。这种机制非常适合发布/订阅模式。

mermaid
graph LR
    P[生产者] -->|消息| E[Fanout Exchange]
    E -->|广播副本| Q1[队列A]
    E -->|广播副本| Q2[队列B]
    E -->|广播副本| Q3[队列C]
    
    Q1 --> C1[消费者1]
    Q2 --> C2[消费者2]
    Q3 --> C3[消费者3]
    
    style E fill:#f9f,stroke:#333

特点

  • 忽略 routing key: 即使消息设置了 routing key,也会被忽略
  • 一对多广播: 一条消息会被复制到所有绑定的队列
  • 高性能: 不需要路由匹配,性能最优

PHP 代码示例

生产者 - 发布广播消息

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$exchangeName = 'notifications';
$channel->exchange_declare($exchangeName, AMQPExchangeType::FANOUT, false, true, false);

$messageBody = json_encode([
    'type' => 'system_maintenance',
    'title' => '系统维护通知',
    'content' => '系统将于今晚 22:00 进行维护',
    'timestamp' => time()
]);

$message = new AMQPMessage(
    $messageBody,
    [
        'content_type' => 'application/json',
        'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
    ]
);

// routing key 会被忽略
$channel->basic_publish($message, $exchangeName);

echo "广播消息已发送\n";

$channel->close();
$connection->close();

消费者 - 订阅广播

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$exchangeName = 'notifications';
$channel->exchange_declare($exchangeName, AMQPExchangeType::FANOUT, false, true, false);

// 创建临时队列,自动生成队列名
list($queueName, ,) = $channel->queue_declare('', false, false, true, false);

// 绑定到交换机(无需 routing key)
$channel->queue_bind($queueName, $exchangeName);

echo "临时队列: {$queueName}\n";
echo "等待广播消息...\n";

$callback = function (AMQPMessage $msg) {
    $data = json_decode($msg->getBody(), true);
    
    echo "收到广播消息:\n";
    echo "  类型: {$data['type']}\n";
    echo "  标题: {$data['title']}\n";
    echo "  内容: {$data['content']}\n";
    echo "-------------------\n";
    
    $msg->ack();
};

$channel->basic_consume($queueName, '', false, false, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

持久化队列订阅

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;

class NotificationSubscriber
{
    private $channel;
    private $exchangeName;
    private $queueName;
    
    public function __construct($channel, $exchangeName, $queueName)
    {
        $this->channel = $channel;
        $this->exchangeName = $exchangeName;
        $this->queueName = $queueName;
        
        $this->setup();
    }
    
    private function setup()
    {
        // 声明持久化交换机
        $this->channel->exchange_declare(
            $this->exchangeName,
            AMQPExchangeType::FANOUT,
            false,
            true,
            false
        );
        
        // 声明持久化队列
        $this->channel->queue_declare(
            $this->queueName,
            false,
            true,
            false,
            false
        );
        
        // 绑定队列到交换机
        $this->channel->queue_bind($this->queueName, $this->exchangeName);
    }
    
    public function consume($callback)
    {
        $this->channel->basic_qos(null, 1, null);
        $this->channel->basic_consume(
            $this->queueName,
            '',
            false,
            false,
            false,
            false,
            $callback
        );
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
}

实际应用场景

1. 实时通知系统

php
<?php

class NotificationBroadcaster
{
    private $channel;
    private $exchangeName = 'realtime_notifications';
    
    public function __construct($channel)
    {
        $this->channel = $channel;
        $this->channel->exchange_declare(
            $this->exchangeName,
            AMQPExchangeType::FANOUT,
            false,
            true,
            false
        );
    }
    
    public function broadcast($type, $data)
    {
        $message = new AMQPMessage(
            json_encode([
                'type' => $type,
                'data' => $data,
                'timestamp' => time()
            ]),
            ['content_type' => 'application/json']
        );
        
        $this->channel->basic_publish($message, $this->exchangeName);
    }
    
    public function notifyAll($title, $message)
    {
        $this->broadcast('notification', [
            'title' => $title,
            'message' => $message
        ]);
    }
    
    public function alertAll($level, $message)
    {
        $this->broadcast('alert', [
            'level' => $level,
            'message' => $message
        ]);
    }
}

// WebSocket 服务器订阅通知
class WebSocketNotificationHandler
{
    private $connections = [];
    
    public function subscribe($channel)
    {
        list($queueName,) = $channel->queue_declare('', false, false, true, false);
        $channel->queue_bind($queueName, 'realtime_notifications');
        
        $callback = function ($msg) {
            $data = json_decode($msg->getBody(), true);
            $this->pushToAllConnections($data);
            $msg->ack();
        };
        
        $channel->basic_consume($queueName, '', false, false, false, false, $callback);
    }
    
    public function addConnection($conn)
    {
        $this->connections[] = $conn;
    }
    
    private function pushToAllConnections($data)
    {
        foreach ($this->connections as $conn) {
            $conn->send(json_encode($data));
        }
    }
}

2. 缓存失效通知

php
<?php

class CacheInvalidationPublisher
{
    private $channel;
    private $exchangeName = 'cache_invalidation';
    
    public function __construct($channel)
    {
        $this->channel = $channel;
        $this->channel->exchange_declare(
            $this->exchangeName,
            AMQPExchangeType::FANOUT,
            false,
            true,
            false
        );
    }
    
    public function invalidate($cacheKeys)
    {
        $message = new AMQPMessage(
            json_encode([
                'keys' => (array) $cacheKeys,
                'timestamp' => time()
            ]),
            ['content_type' => 'application/json']
        );
        
        $this->channel->basic_publish($message, $this->exchangeName);
    }
    
    public function invalidatePattern($pattern)
    {
        $message = new AMQPMessage(
            json_encode([
                'pattern' => $pattern,
                'timestamp' => time()
            ]),
            ['content_type' => 'application/json']
        );
        
        $this->channel->basic_publish($message, $this->exchangeName);
    }
}

// 各服务订阅缓存失效事件
class CacheInvalidationSubscriber
{
    private $cache;
    
    public function __construct($cache)
    {
        $this->cache = $cache;
    }
    
    public function subscribe($channel)
    {
        $queueName = 'cache_invalidation_' . gethostname();
        
        $channel->exchange_declare('cache_invalidation', AMQPExchangeType::FANOUT, false, true, false);
        $channel->queue_declare($queueName, false, true, false, false);
        $channel->queue_bind($queueName, 'cache_invalidation');
        
        $callback = function ($msg) {
            $data = json_decode($msg->getBody(), true);
            
            if (isset($data['keys'])) {
                foreach ($data['keys'] as $key) {
                    $this->cache->delete($key);
                }
            }
            
            if (isset($data['pattern'])) {
                $this->cache->deleteByPattern($data['pattern']);
            }
            
            $msg->ack();
        };
        
        $channel->basic_consume($queueName, '', false, false, false, false, $callback);
    }
}

3. 配置更新广播

php
<?php

class ConfigUpdateBroadcaster
{
    private $channel;
    private $exchangeName = 'config_updates';
    
    public function __construct($channel)
    {
        $this->channel = $channel;
        $this->channel->exchange_declare(
            $this->exchangeName,
            AMQPExchangeType::FANOUT,
            false,
            true,
            false
        );
    }
    
    public function broadcastUpdate($configKey, $newValue, $version)
    {
        $message = new AMQPMessage(
            json_encode([
                'action' => 'update',
                'key' => $configKey,
                'value' => $newValue,
                'version' => $version,
                'timestamp' => time()
            ]),
            ['content_type' => 'application/json']
        );
        
        $this->channel->basic_publish($message, $this->exchangeName);
    }
    
    public function broadcastReload($reason)
    {
        $message = new AMQPMessage(
            json_encode([
                'action' => 'reload',
                'reason' => $reason,
                'timestamp' => time()
            ]),
            ['content_type' => 'application/json']
        );
        
        $this->channel->basic_publish($message, $this->exchangeName);
    }
}

常见问题与解决方案

问题 1: 消息丢失

症状: 部分消费者未收到消息

原因: 临时队列在消费者断开后自动删除

解决方案: 使用持久化队列

php
<?php

// 临时队列(消费者断开后删除)
list($queueName,) = $channel->queue_declare('', false, false, true, false);

// 持久化队列
$channel->queue_declare('persistent-queue', false, true, false, false);

问题 2: 重复消费

症状: 同一服务多个实例重复处理消息

解决方案: 这是正常行为,每个队列独立消费。如果需要去重,在业务层处理:

php
<?php

$callback = function ($msg) {
    $data = json_decode($msg->getBody(), true);
    $messageId = $data['message_id'] ?? null;
    
    // 使用 Redis 或数据库进行幂等性检查
    if ($this->isProcessed($messageId)) {
        echo "消息已处理,跳过\n";
        $msg->ack();
        return;
    }
    
    $this->process($data);
    $this->markAsProcessed($messageId);
    
    $msg->ack();
};

最佳实践建议

  1. 使用持久化队列: 对于重要通知,使用持久化队列避免消息丢失
  2. 临时队列用于临时订阅: 对于 WebSocket 等实时推送场景,使用临时队列
  3. 消息幂等性: 消费者需要处理重复消息
  4. 监控队列数量: 大量队列会影响性能
  5. 合理使用场景: Fanout 适合广播,不适合点对点通信

相关链接