Skip to content

Topic Exchange

概述

Topic Exchange(主题交换机)是 RabbitMQ 中功能最强大的交换机类型之一。它支持通配符匹配 routing key,使得消息路由更加灵活,非常适合实现发布/订阅模式。

核心原理

Topic Exchange 使用类似正则表达式的模式匹配 routing key,支持两个通配符:

  • *(星号):匹配一个单词
  • #(井号):匹配零个或多个单词

routing key 必须由点号 . 分隔的单词组成,例如 order.created.success

mermaid
graph LR
    P[生产者] -->|order.created| E[Topic Exchange]
    P2[生产者] -->|order.cancelled| E
    P3[生产者] -->|payment.completed| E
    
    E -->|order.*| Q1[订单日志队列]
    E -->|order.#| Q2[订单全量队列]
    E -->|#.completed| Q3[完成事件队列]
    E -->|payment.*| Q4[支付队列]
    
    style E fill:#f9f,stroke:#333

匹配规则示例

Routing Key 模式匹配不匹配
order.*order.created, order.cancelledorder.created.success
order.#order, order.created, order.created.successpayment.completed
#.completedorder.completed, payment.completedorder.cancelled
*.*.successorder.create.success, payment.process.successorder.success

PHP 代码示例

生产者 - 发送消息

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$exchangeName = 'topic_events';
$channel->exchange_declare($exchangeName, AMQPExchangeType::TOPIC, false, true, false);

$events = [
    'order.created' => ['order_id' => 'ORD-001', 'status' => 'created'],
    'order.completed.success' => ['order_id' => 'ORD-001', 'status' => 'completed'],
    'order.cancelled.user' => ['order_id' => 'ORD-002', 'status' => 'cancelled'],
    'payment.completed' => ['payment_id' => 'PAY-001', 'status' => 'completed'],
    'payment.failed.timeout' => ['payment_id' => 'PAY-002', 'error' => 'timeout'],
];

foreach ($events as $routingKey => $data) {
    $message = new AMQPMessage(
        json_encode($data),
        [
            'content_type' => 'application/json',
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
        ]
    );
    
    $channel->basic_publish($message, $exchangeName, $routingKey);
    echo "消息已发送 - Routing Key: {$routingKey}\n";
}

$channel->close();
$connection->close();

消费者 - 订阅特定模式

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$exchangeName = 'topic_events';
$channel->exchange_declare($exchangeName, AMQPExchangeType::TOPIC, false, true, false);

$queueName = 'order-all-queue';
$bindingKey = 'order.#';

$channel->queue_declare($queueName, false, true, false, false);
$channel->queue_bind($queueName, $exchangeName, $bindingKey);

echo "队列 {$queueName} 已绑定到模式: {$bindingKey}\n";
echo "等待消息中...\n";

$callback = function (AMQPMessage $msg) {
    $routingKey = $msg->getRoutingKey();
    $body = json_decode($msg->getBody(), true);
    
    echo "收到消息\n";
    echo "  Routing Key: {$routingKey}\n";
    echo "  消息内容: " . json_encode($body, JSON_UNESCAPED_UNICODE) . "\n";
    echo "-------------------\n";
    
    $msg->ack();
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume($queueName, '', false, false, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

多模式绑定消费者

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$exchangeName = 'topic_events';
$channel->exchange_declare($exchangeName, AMQPExchangeType::TOPIC, false, true, false);

$queueName = 'completion-events-queue';
$bindingKeys = [
    '*.completed',      // 所有完成事件
    '*.success',        // 所有成功事件
    '#.failed',         // 所有失败事件
];

$channel->queue_declare($queueName, false, true, false, false);

foreach ($bindingKeys as $bindingKey) {
    $channel->queue_bind($queueName, $exchangeName, $bindingKey);
    echo "已绑定模式: {$bindingKey}\n";
}

echo "\n等待消息中...\n";

$callback = function ($msg) {
    echo sprintf(
        "[%s] %s\n",
        $msg->getRoutingKey(),
        $msg->getBody()
    );
    $msg->ack();
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume($queueName, '', false, false, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

实际应用场景

1. 微服务事件总线

php
<?php

class EventBus
{
    private $channel;
    private $exchangeName = 'microservice_events';
    
    public function __construct($channel)
    {
        $this->channel = $channel;
        $this->channel->exchange_declare(
            $this->exchangeName,
            AMQPExchangeType::TOPIC,
            false,
            true,
            false
        );
    }
    
    public function emit($event, $data)
    {
        $routingKey = $this->buildRoutingKey($event);
        
        $message = new AMQPMessage(
            json_encode([
                'event' => $event,
                'data' => $data,
                'timestamp' => time(),
                'service' => gethostname()
            ]),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
            ]
        );
        
        $this->channel->basic_publish($message, $this->exchangeName, $routingKey);
    }
    
    private function buildRoutingKey($event)
    {
        return str_replace('.', '.', strtolower($event));
    }
}

// 使用示例
$eventBus = new EventBus($channel);

// 发送用户事件
$eventBus->emit('user.registered', ['user_id' => 123, 'email' => 'user@example.com']);
$eventBus->emit('user.profile.updated', ['user_id' => 123, 'fields' => ['name', 'avatar']]);
$eventBus->emit('user.password.reset', ['user_id' => 123, 'ip' => '192.168.1.1']);

2. 日志收集系统

php
<?php

class LogCollector
{
    private $channel;
    private $exchangeName = 'logs';
    
    public function __construct($channel)
    {
        $this->channel = $channel;
        $this->channel->exchange_declare(
            $this->exchangeName,
            AMQPExchangeType::TOPIC,
            false,
            true,
            false
        );
    }
    
    public function log($level, $service, $message, $context = [])
    {
        $routingKey = sprintf('%s.%s', $service, $level);
        
        $logData = [
            'level' => $level,
            'service' => $service,
            'message' => $message,
            'context' => $context,
            'timestamp' => date('Y-m-d H:i:s'),
            'hostname' => gethostname()
        ];
        
        $msg = new AMQPMessage(
            json_encode($logData),
            ['content_type' => 'application/json']
        );
        
        $this->channel->basic_publish($msg, $this->exchangeName, $routingKey);
    }
}

// 日志消费者 - 收集所有错误日志
class ErrorLogConsumer
{
    public function subscribe($channel)
    {
        $channel->exchange_declare('logs', AMQPExchangeType::TOPIC, false, true, false);
        
        $queueName = 'error-logs-queue';
        $channel->queue_declare($queueName, false, true, false, false);
        
        // 订阅所有服务的错误和严重日志
        $channel->queue_bind($queueName, 'logs', '*.error');
        $channel->queue_bind($queueName, 'logs', '*.critical');
        
        $callback = function ($msg) {
            $log = json_decode($msg->getBody(), true);
            $this->processError($log);
            $msg->ack();
        };
        
        $channel->basic_consume($queueName, '', false, false, false, false, $callback);
    }
    
    private function processError($log)
    {
        // 写入错误日志文件或发送告警
        error_log(sprintf(
            "[%s] %s - %s: %s",
            $log['timestamp'],
            $log['service'],
            $log['level'],
            $log['message']
        ));
    }
}

3. 多租户消息分发

php
<?php

class TenantMessageRouter
{
    private $channel;
    private $exchangeName = 'tenant_events';
    
    public function __construct($channel)
    {
        $this->channel = $channel;
        $this->channel->exchange_declare(
            $this->exchangeName,
            AMQPExchangeType::TOPIC,
            false,
            true,
            false
        );
    }
    
    public function publish($tenantId, $eventType, $data)
    {
        $routingKey = sprintf('tenant.%s.%s', $tenantId, $eventType);
        
        $message = new AMQPMessage(
            json_encode($data),
            ['content_type' => 'application/json']
        );
        
        $this->channel->basic_publish($message, $this->exchangeName, $routingKey);
    }
    
    public function subscribeTenant($tenantId, $queueName, $eventPattern = '#')
    {
        $bindingKey = sprintf('tenant.%s.%s', $tenantId, $eventPattern);
        
        $this->channel->queue_declare($queueName, false, true, false, false);
        $this->channel->queue_bind($queueName, $this->exchangeName, $bindingKey);
    }
    
    public function subscribeAllTenants($queueName, $eventPattern = '#')
    {
        $bindingKey = sprintf('tenant.*.%s', $eventPattern);
        
        $this->channel->queue_declare($queueName, false, true, false, false);
        $this->channel->queue_bind($queueName, $this->exchangeName, $bindingKey);
    }
}

常见问题与解决方案

问题 1: 通配符使用不当

症状: 消息无法被正确路由

解决方案: 理解 *# 的区别

php
<?php

// 正确示例
$bindings = [
    'user.*'           // 匹配: user.created, user.deleted
                       // 不匹配: user.profile.updated
    
    'user.#'           // 匹配: user, user.created, user.profile.updated
    
    '*.created'        // 匹配: user.created, order.created
    
    '#.created'        // 匹配: created, user.created, api.v1.user.created
];

问题 2: Routing Key 设计不合理

解决方案: 采用层级化命名规范

php
<?php

// 推荐的 Routing Key 设计
class RoutingKeyBuilder
{
    public static function build($domain, $entity, $action, $status = null)
    {
        $parts = [$domain, $entity, $action];
        
        if ($status !== null) {
            $parts[] = $status;
        }
        
        return implode('.', $parts);
    }
}

// 示例
// ecommerce.order.created
// ecommerce.order.payment.completed
// ecommerce.order.shipment.failed
// user.profile.updated.success

最佳实践建议

  1. 统一的 Routing Key 命名规范: 使用 领域.实体.动作[.状态] 格式
  2. 避免过度细分: 不要创建过多层级的 routing key
  3. 合理使用通配符: # 尽量放在末尾,避免性能问题
  4. 监控绑定数量: 大量绑定会影响性能
  5. 文档化事件: 维护事件字典,记录所有事件类型

相关链接