Appearance
Fanout Exchange
概述
Fanout Exchange(扇出交换机)是 RabbitMQ 中最简单的交换机类型。它会将接收到的所有消息广播到所有绑定的队列中,忽略 routing key。
核心原理
Fanout Exchange 的工作方式类似于广播:所有绑定到该交换机的队列都会收到消息的副本。这种机制非常适合发布/订阅模式。
mermaid
graph LR
P[生产者] -->|消息| E[Fanout Exchange]
E -->|广播副本| Q1[队列A]
E -->|广播副本| Q2[队列B]
E -->|广播副本| Q3[队列C]
Q1 --> C1[消费者1]
Q2 --> C2[消费者2]
Q3 --> C3[消费者3]
style E fill:#f9f,stroke:#333特点
- 忽略 routing key: 即使消息设置了 routing key,也会被忽略
- 一对多广播: 一条消息会被复制到所有绑定的队列
- 高性能: 不需要路由匹配,性能最优
PHP 代码示例
生产者 - 发布广播消息
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchangeName = 'notifications';
$channel->exchange_declare($exchangeName, AMQPExchangeType::FANOUT, false, true, false);
$messageBody = json_encode([
'type' => 'system_maintenance',
'title' => '系统维护通知',
'content' => '系统将于今晚 22:00 进行维护',
'timestamp' => time()
]);
$message = new AMQPMessage(
$messageBody,
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]
);
// routing key 会被忽略
$channel->basic_publish($message, $exchangeName);
echo "广播消息已发送\n";
$channel->close();
$connection->close();消费者 - 订阅广播
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchangeName = 'notifications';
$channel->exchange_declare($exchangeName, AMQPExchangeType::FANOUT, false, true, false);
// 创建临时队列,自动生成队列名
list($queueName, ,) = $channel->queue_declare('', false, false, true, false);
// 绑定到交换机(无需 routing key)
$channel->queue_bind($queueName, $exchangeName);
echo "临时队列: {$queueName}\n";
echo "等待广播消息...\n";
$callback = function (AMQPMessage $msg) {
$data = json_decode($msg->getBody(), true);
echo "收到广播消息:\n";
echo " 类型: {$data['type']}\n";
echo " 标题: {$data['title']}\n";
echo " 内容: {$data['content']}\n";
echo "-------------------\n";
$msg->ack();
};
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();持久化队列订阅
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
class NotificationSubscriber
{
private $channel;
private $exchangeName;
private $queueName;
public function __construct($channel, $exchangeName, $queueName)
{
$this->channel = $channel;
$this->exchangeName = $exchangeName;
$this->queueName = $queueName;
$this->setup();
}
private function setup()
{
// 声明持久化交换机
$this->channel->exchange_declare(
$this->exchangeName,
AMQPExchangeType::FANOUT,
false,
true,
false
);
// 声明持久化队列
$this->channel->queue_declare(
$this->queueName,
false,
true,
false,
false
);
// 绑定队列到交换机
$this->channel->queue_bind($this->queueName, $this->exchangeName);
}
public function consume($callback)
{
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume(
$this->queueName,
'',
false,
false,
false,
false,
$callback
);
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
}实际应用场景
1. 实时通知系统
php
<?php
class NotificationBroadcaster
{
private $channel;
private $exchangeName = 'realtime_notifications';
public function __construct($channel)
{
$this->channel = $channel;
$this->channel->exchange_declare(
$this->exchangeName,
AMQPExchangeType::FANOUT,
false,
true,
false
);
}
public function broadcast($type, $data)
{
$message = new AMQPMessage(
json_encode([
'type' => $type,
'data' => $data,
'timestamp' => time()
]),
['content_type' => 'application/json']
);
$this->channel->basic_publish($message, $this->exchangeName);
}
public function notifyAll($title, $message)
{
$this->broadcast('notification', [
'title' => $title,
'message' => $message
]);
}
public function alertAll($level, $message)
{
$this->broadcast('alert', [
'level' => $level,
'message' => $message
]);
}
}
// WebSocket 服务器订阅通知
class WebSocketNotificationHandler
{
private $connections = [];
public function subscribe($channel)
{
list($queueName,) = $channel->queue_declare('', false, false, true, false);
$channel->queue_bind($queueName, 'realtime_notifications');
$callback = function ($msg) {
$data = json_decode($msg->getBody(), true);
$this->pushToAllConnections($data);
$msg->ack();
};
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
}
public function addConnection($conn)
{
$this->connections[] = $conn;
}
private function pushToAllConnections($data)
{
foreach ($this->connections as $conn) {
$conn->send(json_encode($data));
}
}
}2. 缓存失效通知
php
<?php
class CacheInvalidationPublisher
{
private $channel;
private $exchangeName = 'cache_invalidation';
public function __construct($channel)
{
$this->channel = $channel;
$this->channel->exchange_declare(
$this->exchangeName,
AMQPExchangeType::FANOUT,
false,
true,
false
);
}
public function invalidate($cacheKeys)
{
$message = new AMQPMessage(
json_encode([
'keys' => (array) $cacheKeys,
'timestamp' => time()
]),
['content_type' => 'application/json']
);
$this->channel->basic_publish($message, $this->exchangeName);
}
public function invalidatePattern($pattern)
{
$message = new AMQPMessage(
json_encode([
'pattern' => $pattern,
'timestamp' => time()
]),
['content_type' => 'application/json']
);
$this->channel->basic_publish($message, $this->exchangeName);
}
}
// 各服务订阅缓存失效事件
class CacheInvalidationSubscriber
{
private $cache;
public function __construct($cache)
{
$this->cache = $cache;
}
public function subscribe($channel)
{
$queueName = 'cache_invalidation_' . gethostname();
$channel->exchange_declare('cache_invalidation', AMQPExchangeType::FANOUT, false, true, false);
$channel->queue_declare($queueName, false, true, false, false);
$channel->queue_bind($queueName, 'cache_invalidation');
$callback = function ($msg) {
$data = json_decode($msg->getBody(), true);
if (isset($data['keys'])) {
foreach ($data['keys'] as $key) {
$this->cache->delete($key);
}
}
if (isset($data['pattern'])) {
$this->cache->deleteByPattern($data['pattern']);
}
$msg->ack();
};
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
}
}3. 配置更新广播
php
<?php
class ConfigUpdateBroadcaster
{
private $channel;
private $exchangeName = 'config_updates';
public function __construct($channel)
{
$this->channel = $channel;
$this->channel->exchange_declare(
$this->exchangeName,
AMQPExchangeType::FANOUT,
false,
true,
false
);
}
public function broadcastUpdate($configKey, $newValue, $version)
{
$message = new AMQPMessage(
json_encode([
'action' => 'update',
'key' => $configKey,
'value' => $newValue,
'version' => $version,
'timestamp' => time()
]),
['content_type' => 'application/json']
);
$this->channel->basic_publish($message, $this->exchangeName);
}
public function broadcastReload($reason)
{
$message = new AMQPMessage(
json_encode([
'action' => 'reload',
'reason' => $reason,
'timestamp' => time()
]),
['content_type' => 'application/json']
);
$this->channel->basic_publish($message, $this->exchangeName);
}
}常见问题与解决方案
问题 1: 消息丢失
症状: 部分消费者未收到消息
原因: 临时队列在消费者断开后自动删除
解决方案: 使用持久化队列
php
<?php
// 临时队列(消费者断开后删除)
list($queueName,) = $channel->queue_declare('', false, false, true, false);
// 持久化队列
$channel->queue_declare('persistent-queue', false, true, false, false);问题 2: 重复消费
症状: 同一服务多个实例重复处理消息
解决方案: 这是正常行为,每个队列独立消费。如果需要去重,在业务层处理:
php
<?php
$callback = function ($msg) {
$data = json_decode($msg->getBody(), true);
$messageId = $data['message_id'] ?? null;
// 使用 Redis 或数据库进行幂等性检查
if ($this->isProcessed($messageId)) {
echo "消息已处理,跳过\n";
$msg->ack();
return;
}
$this->process($data);
$this->markAsProcessed($messageId);
$msg->ack();
};最佳实践建议
- 使用持久化队列: 对于重要通知,使用持久化队列避免消息丢失
- 临时队列用于临时订阅: 对于 WebSocket 等实时推送场景,使用临时队列
- 消息幂等性: 消费者需要处理重复消息
- 监控队列数量: 大量队列会影响性能
- 合理使用场景: Fanout 适合广播,不适合点对点通信
