Appearance
Topic Exchange
概述
Topic Exchange(主题交换机)是 RabbitMQ 中功能最强大的交换机类型之一。它支持通配符匹配 routing key,使得消息路由更加灵活,非常适合实现发布/订阅模式。
核心原理
Topic Exchange 使用类似正则表达式的模式匹配 routing key,支持两个通配符:
*(星号):匹配一个单词#(井号):匹配零个或多个单词
routing key 必须由点号 . 分隔的单词组成,例如 order.created.success。
mermaid
graph LR
P[生产者] -->|order.created| E[Topic Exchange]
P2[生产者] -->|order.cancelled| E
P3[生产者] -->|payment.completed| E
E -->|order.*| Q1[订单日志队列]
E -->|order.#| Q2[订单全量队列]
E -->|#.completed| Q3[完成事件队列]
E -->|payment.*| Q4[支付队列]
style E fill:#f9f,stroke:#333匹配规则示例
| Routing Key 模式 | 匹配 | 不匹配 |
|---|---|---|
order.* | order.created, order.cancelled | order.created.success |
order.# | order, order.created, order.created.success | payment.completed |
#.completed | order.completed, payment.completed | order.cancelled |
*.*.success | order.create.success, payment.process.success | order.success |
PHP 代码示例
生产者 - 发送消息
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchangeName = 'topic_events';
$channel->exchange_declare($exchangeName, AMQPExchangeType::TOPIC, false, true, false);
$events = [
'order.created' => ['order_id' => 'ORD-001', 'status' => 'created'],
'order.completed.success' => ['order_id' => 'ORD-001', 'status' => 'completed'],
'order.cancelled.user' => ['order_id' => 'ORD-002', 'status' => 'cancelled'],
'payment.completed' => ['payment_id' => 'PAY-001', 'status' => 'completed'],
'payment.failed.timeout' => ['payment_id' => 'PAY-002', 'error' => 'timeout'],
];
foreach ($events as $routingKey => $data) {
$message = new AMQPMessage(
json_encode($data),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]
);
$channel->basic_publish($message, $exchangeName, $routingKey);
echo "消息已发送 - Routing Key: {$routingKey}\n";
}
$channel->close();
$connection->close();消费者 - 订阅特定模式
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchangeName = 'topic_events';
$channel->exchange_declare($exchangeName, AMQPExchangeType::TOPIC, false, true, false);
$queueName = 'order-all-queue';
$bindingKey = 'order.#';
$channel->queue_declare($queueName, false, true, false, false);
$channel->queue_bind($queueName, $exchangeName, $bindingKey);
echo "队列 {$queueName} 已绑定到模式: {$bindingKey}\n";
echo "等待消息中...\n";
$callback = function (AMQPMessage $msg) {
$routingKey = $msg->getRoutingKey();
$body = json_decode($msg->getBody(), true);
echo "收到消息\n";
echo " Routing Key: {$routingKey}\n";
echo " 消息内容: " . json_encode($body, JSON_UNESCAPED_UNICODE) . "\n";
echo "-------------------\n";
$msg->ack();
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();多模式绑定消费者
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchangeName = 'topic_events';
$channel->exchange_declare($exchangeName, AMQPExchangeType::TOPIC, false, true, false);
$queueName = 'completion-events-queue';
$bindingKeys = [
'*.completed', // 所有完成事件
'*.success', // 所有成功事件
'#.failed', // 所有失败事件
];
$channel->queue_declare($queueName, false, true, false, false);
foreach ($bindingKeys as $bindingKey) {
$channel->queue_bind($queueName, $exchangeName, $bindingKey);
echo "已绑定模式: {$bindingKey}\n";
}
echo "\n等待消息中...\n";
$callback = function ($msg) {
echo sprintf(
"[%s] %s\n",
$msg->getRoutingKey(),
$msg->getBody()
);
$msg->ack();
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();实际应用场景
1. 微服务事件总线
php
<?php
class EventBus
{
private $channel;
private $exchangeName = 'microservice_events';
public function __construct($channel)
{
$this->channel = $channel;
$this->channel->exchange_declare(
$this->exchangeName,
AMQPExchangeType::TOPIC,
false,
true,
false
);
}
public function emit($event, $data)
{
$routingKey = $this->buildRoutingKey($event);
$message = new AMQPMessage(
json_encode([
'event' => $event,
'data' => $data,
'timestamp' => time(),
'service' => gethostname()
]),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]
);
$this->channel->basic_publish($message, $this->exchangeName, $routingKey);
}
private function buildRoutingKey($event)
{
return str_replace('.', '.', strtolower($event));
}
}
// 使用示例
$eventBus = new EventBus($channel);
// 发送用户事件
$eventBus->emit('user.registered', ['user_id' => 123, 'email' => 'user@example.com']);
$eventBus->emit('user.profile.updated', ['user_id' => 123, 'fields' => ['name', 'avatar']]);
$eventBus->emit('user.password.reset', ['user_id' => 123, 'ip' => '192.168.1.1']);2. 日志收集系统
php
<?php
class LogCollector
{
private $channel;
private $exchangeName = 'logs';
public function __construct($channel)
{
$this->channel = $channel;
$this->channel->exchange_declare(
$this->exchangeName,
AMQPExchangeType::TOPIC,
false,
true,
false
);
}
public function log($level, $service, $message, $context = [])
{
$routingKey = sprintf('%s.%s', $service, $level);
$logData = [
'level' => $level,
'service' => $service,
'message' => $message,
'context' => $context,
'timestamp' => date('Y-m-d H:i:s'),
'hostname' => gethostname()
];
$msg = new AMQPMessage(
json_encode($logData),
['content_type' => 'application/json']
);
$this->channel->basic_publish($msg, $this->exchangeName, $routingKey);
}
}
// 日志消费者 - 收集所有错误日志
class ErrorLogConsumer
{
public function subscribe($channel)
{
$channel->exchange_declare('logs', AMQPExchangeType::TOPIC, false, true, false);
$queueName = 'error-logs-queue';
$channel->queue_declare($queueName, false, true, false, false);
// 订阅所有服务的错误和严重日志
$channel->queue_bind($queueName, 'logs', '*.error');
$channel->queue_bind($queueName, 'logs', '*.critical');
$callback = function ($msg) {
$log = json_decode($msg->getBody(), true);
$this->processError($log);
$msg->ack();
};
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
}
private function processError($log)
{
// 写入错误日志文件或发送告警
error_log(sprintf(
"[%s] %s - %s: %s",
$log['timestamp'],
$log['service'],
$log['level'],
$log['message']
));
}
}3. 多租户消息分发
php
<?php
class TenantMessageRouter
{
private $channel;
private $exchangeName = 'tenant_events';
public function __construct($channel)
{
$this->channel = $channel;
$this->channel->exchange_declare(
$this->exchangeName,
AMQPExchangeType::TOPIC,
false,
true,
false
);
}
public function publish($tenantId, $eventType, $data)
{
$routingKey = sprintf('tenant.%s.%s', $tenantId, $eventType);
$message = new AMQPMessage(
json_encode($data),
['content_type' => 'application/json']
);
$this->channel->basic_publish($message, $this->exchangeName, $routingKey);
}
public function subscribeTenant($tenantId, $queueName, $eventPattern = '#')
{
$bindingKey = sprintf('tenant.%s.%s', $tenantId, $eventPattern);
$this->channel->queue_declare($queueName, false, true, false, false);
$this->channel->queue_bind($queueName, $this->exchangeName, $bindingKey);
}
public function subscribeAllTenants($queueName, $eventPattern = '#')
{
$bindingKey = sprintf('tenant.*.%s', $eventPattern);
$this->channel->queue_declare($queueName, false, true, false, false);
$this->channel->queue_bind($queueName, $this->exchangeName, $bindingKey);
}
}常见问题与解决方案
问题 1: 通配符使用不当
症状: 消息无法被正确路由
解决方案: 理解 * 和 # 的区别
php
<?php
// 正确示例
$bindings = [
'user.*' // 匹配: user.created, user.deleted
// 不匹配: user.profile.updated
'user.#' // 匹配: user, user.created, user.profile.updated
'*.created' // 匹配: user.created, order.created
'#.created' // 匹配: created, user.created, api.v1.user.created
];问题 2: Routing Key 设计不合理
解决方案: 采用层级化命名规范
php
<?php
// 推荐的 Routing Key 设计
class RoutingKeyBuilder
{
public static function build($domain, $entity, $action, $status = null)
{
$parts = [$domain, $entity, $action];
if ($status !== null) {
$parts[] = $status;
}
return implode('.', $parts);
}
}
// 示例
// ecommerce.order.created
// ecommerce.order.payment.completed
// ecommerce.order.shipment.failed
// user.profile.updated.success最佳实践建议
- 统一的 Routing Key 命名规范: 使用
领域.实体.动作[.状态]格式 - 避免过度细分: 不要创建过多层级的 routing key
- 合理使用通配符:
#尽量放在末尾,避免性能问题 - 监控绑定数量: 大量绑定会影响性能
- 文档化事件: 维护事件字典,记录所有事件类型
