Skip to content

RabbitMQ 交换机(Exchange)

概述

交换机(Exchange)是 RabbitMQ 消息路由的核心组件。生产者不直接将消息发送到队列,而是发送到交换机。交换机根据路由规则将消息分发到一个或多个队列。这种设计实现了生产者和队列之间的解耦,提供了灵活的消息路由能力。

交换机的核心作用

mermaid
graph TB
    subgraph 生产者
        P[Producer]
    end
    
    subgraph 交换机
        E[Exchange]
        E -->|路由规则| R1[Binding 1]
        E -->|路由规则| R2[Binding 2]
        E -->|路由规则| R3[Binding 3]
    end
    
    subgraph 队列
        Q1[Queue 1]
        Q2[Queue 2]
        Q3[Queue 3]
    end
    
    P -->|发布消息| E
    R1 --> Q1
    R2 --> Q2
    R3 --> Q3

交换机的主要职责:

  • 接收消息:接收生产者发送的消息
  • 路由分发:根据路由规则将消息分发到队列
  • 解耦生产者:生产者无需知道目标队列
  • 灵活路由:支持多种路由策略

核心知识点

1. 交换机类型

RabbitMQ 支持四种交换机类型:

mermaid
graph TB
    A[Exchange Types] --> B[Direct]
    A --> C[Fanout]
    A --> D[Topic]
    A --> E[Headers]
    
    B --> B1[精确匹配]
    B --> B2[点对点路由]
    
    C --> C1[广播]
    C --> C2[发布订阅]
    
    D --> D1[通配符匹配]
    D --> D2[多主题订阅]
    
    E --> E1[消息头匹配]
    E --> E2[复杂条件]

Direct Exchange(直连交换机)

mermaid
graph LR
    subgraph Direct Exchange
        E[Exchange]
        E -->|routing_key = info| Q1[Queue Info]
        E -->|routing_key = error| Q2[Queue Error]
        E -->|routing_key = warning| Q3[Queue Warning]
    end
    
    P[Producer] -->|info| E
    P -->|error| E

特点

  • 精确匹配 Routing Key
  • 最常用的交换机类型
  • 适合点对点消息传递

路由规则

消息 Routing Key = Binding Key → 消息路由到对应队列

Fanout Exchange(扇出交换机)

mermaid
graph LR
    subgraph Fanout Exchange
        E[Exchange]
        E --> Q1[Queue 1]
        E --> Q2[Queue 2]
        E --> Q3[Queue 3]
    end
    
    P[Producer] -->|忽略 Routing Key| E

特点

  • 广播消息到所有绑定队列
  • 忽略 Routing Key
  • 适合发布订阅模式

路由规则

所有绑定的队列都会收到消息

Topic Exchange(主题交换机)

mermaid
graph LR
    subgraph Topic Exchange
        E[Exchange]
        E -->|*.error| Q1[Error Queue]
        E -->|order.#| Q2[Order Queue]
        E -->|#.created| Q3[Created Queue]
    end
    
    P[Producer] -->|user.error| E
    P -->|order.created| E

特点

  • 支持通配符匹配
  • * 匹配一个单词
  • # 匹配零个或多个单词
  • 适合多条件路由

路由规则

user.error   匹配 *.error     → Q1
order.created 匹配 order.#    → Q2
order.created 匹配 #.created  → Q3

Headers Exchange(头交换机)

mermaid
graph LR
    subgraph Headers Exchange
        E[Exchange]
        E -->|x-match=all| Q1[Queue 1]
        E -->|x-match=any| Q2[Queue 2]
    end
    
    P[Producer] -->|headers| E

特点

  • 基于消息头匹配
  • 支持 x-match: all/any
  • 性能较低,较少使用

路由规则

x-match: all  → 所有头都匹配
x-match: any  → 任一头匹配

2. 交换机属性

属性说明
Name交换机名称
Type交换机类型
Durable是否持久化
Auto-delete最后一个绑定删除后是否自动删除
Internal是否为内部交换机
Arguments额外参数

3. 默认交换机

RabbitMQ 提供一个默认交换机:

mermaid
graph LR
    P[Producer] -->|routing_key = queue_name| D[Default Exchange]
    D -->|直接路由| Q[Queue]

特点

  • 名称为空字符串 ""
  • 类型为 Direct
  • 自动绑定到所有队列
  • Routing Key 等于队列名称

4. 路由流程

mermaid
flowchart TD
    A[消息到达 Exchange] --> B{查找绑定}
    B --> C{匹配 Routing Key}
    C -->|匹配成功| D[添加到队列]
    C -->|无匹配| E{备用交换机?}
    E -->|有| F[发送到备用交换机]
    E -->|无| G[丢弃或返回]
    D --> H[完成]
    F --> H
    G --> H

代码示例

Direct Exchange 示例

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

class DirectExchangeExample
{
    private $connection;
    private $channel;
    
    public function __construct(array $config = [])
    {
        $this->connection = new AMQPStreamConnection(
            $config['host'] ?? 'localhost',
            $config['port'] ?? 5672,
            $config['user'] ?? 'guest',
            $config['password'] ?? 'guest',
            $config['vhost'] ?? '/'
        );
        
        $this->channel = $this->connection->channel();
    }
    
    public function setup(): void
    {
        $this->channel->exchange_declare(
            'logs_direct',
            AMQPExchangeType::DIRECT,
            false,
            true,
            false
        );
        
        $this->channel->queue_declare('logs_info', false, true, false, false);
        $this->channel->queue_declare('logs_error', false, true, false, false);
        $this->channel->queue_declare('logs_warning', false, true, false, false);
        
        $this->channel->queue_bind('logs_info', 'logs_direct', 'info');
        $this->channel->queue_bind('logs_error', 'logs_direct', 'error');
        $this->channel->queue_bind('logs_warning', 'logs_direct', 'warning');
        
        echo "Direct exchange setup completed\n";
    }
    
    public function publishLog(string $level, string $message): void
    {
        $msg = new AMQPMessage(json_encode([
            'level' => $level,
            'message' => $message,
            'timestamp' => date('Y-m-d H:i:s')
        ]), [
            'content_type' => 'application/json',
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
        ]);
        
        $this->channel->basic_publish($msg, 'logs_direct', $level);
        
        echo "Log published with level: {$level}\n";
    }
    
    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

$example = new DirectExchangeExample();
$example->setup();

$example->publishLog('info', 'System started');
$example->publishLog('warning', 'Memory usage high');
$example->publishLog('error', 'Database connection failed');

$example->close();

Fanout Exchange 示例

php
<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

class FanoutExchangeExample
{
    private $connection;
    private $channel;
    
    public function __construct(array $config = [])
    {
        $this->connection = new AMQPStreamConnection(
            $config['host'] ?? 'localhost',
            $config['port'] ?? 5672,
            $config['user'] ?? 'guest',
            $config['password'] ?? 'guest',
            $config['vhost'] ?? '/'
        );
        
        $this->channel = $this->connection->channel();
    }
    
    public function setup(): void
    {
        $this->channel->exchange_declare(
            'notifications_fanout',
            AMQPExchangeType::FANOUT,
            false,
            true,
            false
        );
        
        $this->channel->queue_declare('notifications_email', false, true, false, false);
        $this->channel->queue_declare('notifications_sms', false, true, false, false);
        $this->channel->queue_declare('notifications_push', false, true, false, false);
        
        $this->channel->queue_bind('notifications_email', 'notifications_fanout');
        $this->channel->queue_bind('notifications_sms', 'notifications_fanout');
        $this->channel->queue_bind('notifications_push', 'notifications_fanout');
        
        echo "Fanout exchange setup completed\n";
    }
    
    public function broadcast(string $message): void
    {
        $msg = new AMQPMessage(json_encode([
            'message' => $message,
            'timestamp' => date('Y-m-d H:i:s')
        ]), [
            'content_type' => 'application/json',
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
        ]);
        
        $this->channel->basic_publish($msg, 'notifications_fanout');
        
        echo "Message broadcasted to all queues\n";
    }
    
    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

$example = new FanoutExchangeExample();
$example->setup();

$example->broadcast('New feature released!');

$example->close();

Topic Exchange 示例

php
<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

class TopicExchangeExample
{
    private $connection;
    private $channel;
    
    public function __construct(array $config = [])
    {
        $this->connection = new AMQPStreamConnection(
            $config['host'] ?? 'localhost',
            $config['port'] ?? 5672,
            $config['user'] ?? 'guest',
            $config['password'] ?? 'guest',
            $config['vhost'] ?? '/'
        );
        
        $this->channel = $this->connection->channel();
    }
    
    public function setup(): void
    {
        $this->channel->exchange_declare(
            'events_topic',
            AMQPExchangeType::TOPIC,
            false,
            true,
            false
        );
        
        $this->channel->queue_declare('all_errors', false, true, false, false);
        $this->channel->queue_declare('all_orders', false, true, false, false);
        $this->channel->queue_declare('user_events', false, true, false, false);
        $this->channel->queue_declare('all_events', false, true, false, false);
        
        $this->channel->queue_bind('all_errors', 'events_topic', '#.error');
        $this->channel->queue_bind('all_orders', 'events_topic', 'order.#');
        $this->channel->queue_bind('user_events', 'events_topic', 'user.*');
        $this->channel->queue_bind('all_events', 'events_topic', '#');
        
        echo "Topic exchange setup completed\n";
    }
    
    public function publishEvent(string $routingKey, array $data): void
    {
        $msg = new AMQPMessage(json_encode([
            'event' => $routingKey,
            'data' => $data,
            'timestamp' => date('Y-m-d H:i:s')
        ]), [
            'content_type' => 'application/json',
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
        ]);
        
        $this->channel->basic_publish($msg, 'events_topic', $routingKey);
        
        echo "Event published: {$routingKey}\n";
    }
    
    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

$example = new TopicExchangeExample();
$example->setup();

$example->publishEvent('order.created', ['order_id' => 1001]);
$example->publishEvent('order.paid', ['order_id' => 1001]);
$example->publishEvent('user.login', ['user_id' => 5001]);
$example->publishEvent('user.logout', ['user_id' => 5001]);
$example->publishEvent('payment.error', ['error' => 'timeout']);

$example->close();

Headers Exchange 示例

php
<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;

class HeadersExchangeExample
{
    private $connection;
    private $channel;
    
    public function __construct(array $config = [])
    {
        $this->connection = new AMQPStreamConnection(
            $config['host'] ?? 'localhost',
            $config['port'] ?? 5672,
            $config['user'] ?? 'guest',
            $config['password'] ?? 'guest',
            $config['vhost'] ?? '/'
        );
        
        $this->channel = $this->connection->channel();
    }
    
    public function setup(): void
    {
        $this->channel->exchange_declare(
            'headers_exchange',
            AMQPExchangeType::HEADERS,
            false,
            true,
            false
        );
        
        $this->channel->queue_declare('high_priority_queue', false, true, false, false);
        $this->channel->queue_declare('marketing_queue', false, true, false, false);
        
        $this->channel->queue_bind(
            'high_priority_queue',
            'headers_exchange',
            '',
            false,
            new AMQPTable([
                'x-match' => 'all',
                'priority' => 'high',
                'type' => 'order'
            ])
        );
        
        $this->channel->queue_bind(
            'marketing_queue',
            'headers_exchange',
            '',
            false,
            new AMQPTable([
                'x-match' => 'any',
                'category' => 'marketing',
                'source' => 'campaign'
            ])
        );
        
        echo "Headers exchange setup completed\n";
    }
    
    public function publishWithHeaders(array $data, array $headers): void
    {
        $msg = new AMQPMessage(json_encode($data), [
            'content_type' => 'application/json',
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
            'application_headers' => new AMQPTable($headers)
        ]);
        
        $this->channel->basic_publish($msg, 'headers_exchange');
        
        echo "Message published with headers: " . json_encode($headers) . "\n";
    }
    
    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

$example = new HeadersExchangeExample();
$example->setup();

$example->publishWithHeaders(
    ['order_id' => 1001],
    ['priority' => 'high', 'type' => 'order']
);

$example->publishWithHeaders(
    ['campaign_id' => 'summer2024'],
    ['category' => 'marketing', 'source' => 'campaign']
);

$example->close();

备用交换机示例

php
<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;

class AlternateExchangeExample
{
    private $connection;
    private $channel;
    
    public function __construct(array $config = [])
    {
        $this->connection = new AMQPStreamConnection(
            $config['host'] ?? 'localhost',
            $config['port'] ?? 5672,
            $config['user'] ?? 'guest',
            $config['password'] ?? 'guest',
            $config['vhost'] ?? '/'
        );
        
        $this->channel = $this->connection->channel();
    }
    
    public function setup(): void
    {
        $this->channel->exchange_declare(
            'unrouted_exchange',
            AMQPExchangeType::FANOUT,
            false,
            true,
            false
        );
        
        $this->channel->queue_declare('unrouted_messages', false, true, false, false);
        $this->channel->queue_bind('unrouted_messages', 'unrouted_exchange');
        
        $this->channel->exchange_declare(
            'main_exchange',
            AMQPExchangeType::DIRECT,
            false,
            true,
            false,
            false,
            false,
            new AMQPTable([
                'alternate-exchange' => 'unrouted_exchange'
            ])
        );
        
        $this->channel->queue_declare('orders_queue', false, true, false, false);
        $this->channel->queue_bind('orders_queue', 'main_exchange', 'order');
        
        echo "Alternate exchange setup completed\n";
    }
    
    public function publish(string $routingKey, string $message): void
    {
        $msg = new AMQPMessage($message, [
            'content_type' => 'text/plain',
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
        ]);
        
        $this->channel->basic_publish($msg, 'main_exchange', $routingKey);
        
        echo "Message published with routing key: {$routingKey}\n";
    }
    
    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

$example = new AlternateExchangeExample();
$example->setup();

$example->publish('order', 'This goes to orders_queue');
$example->publish('unknown', 'This goes to unrouted_messages');

$example->close();

实际应用场景

1. 日志系统

php
<?php

class LogSystemExchange
{
    private $channel;
    
    public function __construct(AMQPStreamConnection $connection)
    {
        $this->channel = $connection->channel();
    }
    
    public function setup(): void
    {
        $this->channel->exchange_declare('logs', 'topic', false, true, false);
        
        $queues = [
            'logs_all' => '#',
            'logs_error' => '*.error',
            'logs_app' => 'app.#',
            'logs_system' => 'system.#'
        ];
        
        foreach ($queues as $queue => $pattern) {
            $this->channel->queue_declare($queue, false, true, false, false);
            $this->channel->queue_bind($queue, 'logs', $pattern);
        }
    }
    
    public function log(string $source, string $level, string $message): void
    {
        $routingKey = "{$source}.{$level}";
        
        $msg = new AMQPMessage(json_encode([
            'source' => $source,
            'level' => $level,
            'message' => $message,
            'timestamp' => time()
        ]), ['content_type' => 'application/json']);
        
        $this->channel->basic_publish($msg, 'logs', $routingKey);
    }
}

2. 订单事件系统

php
<?php

class OrderEventExchange
{
    private $channel;
    
    public function __construct(AMQPStreamConnection $connection)
    {
        $this->channel = $connection->channel();
    }
    
    public function setup(): void
    {
        $this->channel->exchange_declare('order_events', 'topic', false, true, false);
        
        $bindings = [
            'inventory_service' => ['order.created', 'order.cancelled'],
            'payment_service' => ['order.created', 'order.paid'],
            'notification_service' => ['order.#'],
            'analytics_service' => ['order.#']
        ];
        
        foreach ($bindings as $queue => $patterns) {
            $this->channel->queue_declare($queue, false, true, false, false);
            foreach ($patterns as $pattern) {
                $this->channel->queue_bind($queue, 'order_events', $pattern);
            }
        }
    }
    
    public function emitOrderEvent(string $event, array $orderData): void
    {
        $msg = new AMQPMessage(json_encode([
            'event' => $event,
            'data' => $orderData,
            'timestamp' => time()
        ]), [
            'content_type' => 'application/json',
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
        ]);
        
        $this->channel->basic_publish($msg, 'order_events', $event);
    }
}

3. 多租户消息系统

php
<?php

class MultiTenantExchange
{
    private $channel;
    
    public function __construct(AMQPStreamConnection $connection)
    {
        $this->channel = $connection->channel();
    }
    
    public function setupTenant(string $tenantId): void
    {
        $exchangeName = "tenant_{$tenantId}";
        
        $this->channel->exchange_declare($exchangeName, 'topic', false, true, false);
        
        $queues = [
            "{$tenantId}_orders" => 'order.#',
            "{$tenantId}_notifications" => 'notification.#',
            "{$tenantId}_reports" => 'report.#'
        ];
        
        foreach ($queues as $queue => $pattern) {
            $this->channel->queue_declare($queue, false, true, false, false);
            $this->channel->queue_bind($queue, $exchangeName, $pattern);
        }
    }
    
    public function publishToTenant(string $tenantId, string $routingKey, string $message): void
    {
        $exchangeName = "tenant_{$tenantId}";
        
        $msg = new AMQPMessage($message, [
            'content_type' => 'application/json'
        ]);
        
        $this->channel->basic_publish($msg, $exchangeName, $routingKey);
    }
}

常见问题与解决方案

1. 消息路由失败

问题原因

  • 没有匹配的绑定
  • Routing Key 错误
  • Exchange 不存在

解决方案

php
<?php

class SafeExchangePublisher
{
    private $channel;
    
    public function publishSafe(
        string $exchange,
        string $routingKey,
        string $body,
        bool $mandatory = true
    ): bool {
        $msg = new AMQPMessage($body, [
            'content_type' => 'application/json',
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
        ]);
        
        $this->channel->set_return_listener(function (
            $replyCode,
            $replyText,
            $exchange,
            $routingKey,
            $message
        ) {
            echo "Message returned: {$replyText}\n";
            echo "Exchange: {$exchange}, Routing Key: {$routingKey}\n";
        });
        
        $this->channel->basic_publish(
            $msg,
            $exchange,
            $routingKey,
            $mandatory
        );
        
        $this->channel->wait_for_pending_acks_returns(5);
        
        return true;
    }
}

2. 交换机不存在

问题原因

  • 交换机未声明
  • 交换机名称错误
  • 交换机被删除

解决方案

php
<?php

class ExchangeManager
{
    private $channel;
    private $declaredExchanges = [];
    
    public function ensureExchangeExists(
        string $name,
        string $type,
        bool $durable = true
    ): void {
        if (isset($this->declaredExchanges[$name])) {
            return;
        }
        
        try {
            $this->channel->exchange_declare(
                $name,
                $type,
                false,
                $durable,
                false,
                false,
                false,
                null,
                true
            );
            
            $this->declaredExchanges[$name] = true;
        } catch (Exception $e) {
            $this->channel->exchange_declare(
                $name,
                $type,
                false,
                $durable,
                false
            );
            
            $this->declaredExchanges[$name] = true;
        }
    }
}

3. 性能问题

问题原因

  • 交换机绑定过多
  • 路由规则复杂
  • Headers Exchange 使用不当

解决方案

php
<?php

class OptimizedExchangeConfig
{
    public static function getOptimalExchangeType(string $useCase): string
    {
        $mapping = [
            'point_to_point' => 'direct',
            'broadcast' => 'fanout',
            'multi_topic' => 'topic',
            'complex_routing' => 'headers'
        ];
        
        return $mapping[$useCase] ?? 'direct';
    }
    
    public static function optimizeBindings(array $bindings): array
    {
        $optimized = [];
        
        foreach ($bindings as $queue => $patterns) {
            if (count($patterns) > 10) {
                $optimized[$queue] = [self::mergePatterns($patterns)];
            } else {
                $optimized[$queue] = $patterns;
            }
        }
        
        return $optimized;
    }
    
    private static function mergePatterns(array $patterns): string
    {
        $prefixes = [];
        foreach ($patterns as $pattern) {
            $parts = explode('.', $pattern);
            if (count($parts) > 1) {
                $prefixes[$parts[0]] = true;
            }
        }
        
        if (count($prefixes) === 1) {
            return array_key_first($prefixes) . '.#';
        }
        
        return '#';
    }
}

最佳实践建议

1. 交换机命名规范

php
<?php

class ExchangeNamingConvention
{
    public static function generateName(
        string $domain,
        string $purpose,
        ?string $suffix = null
    ): string {
        $parts = [$domain, $purpose];
        
        if ($suffix) {
            $parts[] = $suffix;
        }
        
        return implode('.', $parts) . '.exchange';
    }
    
    public static function getDLXName(string $originalExchange): string
    {
        return $originalExchange . '.dlx';
    }
}

2. 交换机配置模板

php
<?php

class ExchangeTemplates
{
    public static function standardExchange(string $name, string $type): array
    {
        return [
            'name' => $name,
            'type' => $type,
            'durable' => true,
            'auto_delete' => false,
            'internal' => false,
            'arguments' => []
        ];
    }
    
    public static function exchangeWithDLX(string $name, string $type, string $dlxName): array
    {
        return [
            'name' => $name,
            'type' => $type,
            'durable' => true,
            'auto_delete' => false,
            'internal' => false,
            'arguments' => [
                'alternate-exchange' => $dlxName
            ]
        ];
    }
}

3. 交换机监控

php
<?php

class ExchangeMonitor
{
    private $apiUrl;
    private $credentials;
    
    public function __construct(string $host, string $user, string $password)
    {
        $this->apiUrl = "http://{$host}:15672/api";
        $this->credentials = base64_encode("{$user}:{$password}");
    }
    
    public function getExchangeStats(string $vhost, string $exchange): array
    {
        $url = "{$this->apiUrl}/exchanges/" . urlencode($vhost) . "/" . urlencode($exchange);
        
        $ch = curl_init();
        curl_setopt($ch, CURLOPT_URL, $url);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
        curl_setopt($ch, CURLOPT_HTTPHEADER, [
            "Authorization: Basic {$this->credentials}"
        ]);
        
        $response = curl_exec($ch);
        curl_close($ch);
        
        return json_decode($response, true);
    }
    
    public function listExchanges(string $vhost): array
    {
        $url = "{$this->apiUrl}/exchanges/" . urlencode($vhost);
        
        $ch = curl_init();
        curl_setopt($ch, CURLOPT_URL, $url);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
        curl_setopt($ch, CURLOPT_HTTPHEADER, [
            "Authorization: Basic {$this->credentials}"
        ]);
        
        $response = curl_exec($ch);
        curl_close($ch);
        
        return json_decode($response, true);
    }
}

相关链接