Appearance
主题模式
概述
主题模式(Topic Pattern)使用 Topic 交换器实现基于模式的消息路由。通过通配符匹配路由键,可以实现灵活的消息过滤和分发。这种模式比 Direct 交换器更灵活,适用于需要多条件过滤的场景。
核心知识点
架构图
mermaid
graph TB
subgraph 生产者
P[Producer]
end
subgraph RabbitMQ
E[Topic Exchange]
Q1[Queue: *.orange.*]
Q2[Queue: *.*.rabbit]
Q3[Queue: lazy.#]
end
subgraph 消费者
C1[橙色动物处理器]
C2[兔子处理器]
C3[懒惰动物处理器]
end
P -->|quick.orange.rabbit| E
P -->|lazy.orange.elephant| E
P -->|quick.brown.fox| E
E -->|匹配| Q1
E -->|匹配| Q2
E -->|匹配| Q3
Q1 --> C1
Q2 --> C2
Q3 --> C3
style E fill:#e1bee7通配符规则
| 通配符 | 说明 | 示例 |
|---|---|---|
| * | 匹配一个单词 | .orange. 匹配 quick.orange.rabbit |
| # | 匹配零个或多个单词 | lazy.# 匹配 lazy、lazy.orange、lazy.orange.rabbit |
匹配示例
mermaid
graph LR
subgraph 路由键
K1[quick.orange.rabbit]
K2[quick.orange.fox]
K3[lazy.orange.elephant]
K4[lazy.brown.fox]
K5[lazy.pink.rabbit]
K6[quick.brown.fox]
end
subgraph 绑定模式
B1["*.orange.*"]
B2["*.*.rabbit"]
B3["lazy.#"]
end
K1 --> B1
K1 --> B2
K2 --> B1
K3 --> B1
K3 --> B3
K4 --> B3
K5 --> B2
K5 --> B3PHP 代码示例
主题发布者
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class TopicPublisher
{
private $connection;
private $channel;
private $exchangeName = 'topic_exchange';
public function __construct()
{
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
$this->channel->exchange_declare(
$this->exchangeName,
'topic',
false,
true,
false
);
}
public function publish($routingKey, $message)
{
$msg = new AMQPMessage(
json_encode([
'content' => $message,
'routing_key' => $routingKey,
'timestamp' => time()
]),
['content_type' => 'application/json']
);
$this->channel->basic_publish(
$msg,
$this->exchangeName,
$routingKey
);
echo " [x] 发送消息 [{$routingKey}]: " . json_encode($message) . "\n";
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}主题消费者
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
class TopicConsumer
{
private $connection;
private $channel;
private $exchangeName = 'topic_exchange';
private $queueName;
private $bindingKeys;
public function __construct(array $bindingKeys)
{
$this->bindingKeys = $bindingKeys;
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
$this->channel->exchange_declare(
$this->exchangeName,
'topic',
false,
true,
false
);
list($this->queueName, ,) = $this->channel->queue_declare(
'',
false,
false,
true,
true
);
foreach ($bindingKeys as $bindingKey) {
$this->channel->queue_bind(
$this->queueName,
$this->exchangeName,
$bindingKey
);
}
$keys = implode(', ', $bindingKeys);
echo " [x] 消费者已启动,绑定模式: {$keys}\n";
}
public function consume()
{
$callback = function ($message) {
$body = json_decode($message->body, true);
$routingKey = $message->delivery_info['routing_key'];
echo sprintf(
" [x] 接收 [%s]: %s\n",
$routingKey,
json_encode($body['content'], JSON_UNESCAPED_UNICODE)
);
$message->ack();
};
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume(
$this->queueName,
'',
false,
false,
false,
false,
$callback
);
while ($this->channel->is_open()) {
$this->channel->wait();
}
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}完整示例:日志系统
php
<?php
class TopicLogSystem
{
private $publisher;
public function __construct()
{
$this->publisher = new TopicPublisher();
}
public function log($facility, $level, $message, array $context = [])
{
$routingKey = "{$facility}.{$level}";
$this->publisher->publish($routingKey, [
'facility' => $facility,
'level' => $level,
'message' => $message,
'context' => $context,
'timestamp' => time()
]);
}
public function kernInfo($message, array $context = [])
{
$this->log('kern', 'info', $message, $context);
}
public function kernError($message, array $context = [])
{
$this->log('kern', 'error', $message, $context);
}
public function authInfo($message, array $context = [])
{
$this->log('auth', 'info', $message, $context);
}
public function authError($message, array $context = [])
{
$this->log('auth', 'error', $message, $context);
}
}
class AllLogsConsumer
{
public function __construct()
{
$this->consumer = new TopicConsumer(['#']);
}
public function start()
{
echo "监听所有日志...\n";
$this->consumer->consume();
}
}
class ErrorLogsConsumer
{
public function __construct()
{
$this->consumer = new TopicConsumer(['*.error']);
}
public function start()
{
echo "监听所有错误日志...\n";
$this->consumer->consume();
}
}
class KernLogsConsumer
{
public function __construct()
{
$this->consumer = new TopicConsumer(['kern.*']);
}
public function start()
{
echo "监听内核日志...\n";
$this->consumer->consume();
}
}多维度消息路由
php
<?php
class MultiDimensionalRouter
{
private $publisher;
public function __construct()
{
$this->publisher = new TopicPublisher();
}
public function route($region, $service, $event, $data)
{
$routingKey = "{$region}.{$service}.{$event}";
$this->publisher->publish($routingKey, [
'region' => $region,
'service' => $service,
'event' => $event,
'data' => $data,
'timestamp' => time()
]);
}
public function usEastUserServiceCreated($userData)
{
$this->route('us-east', 'user', 'created', $userData);
}
public function usWestOrderCompleted($orderData)
{
$this->route('us-west', 'order', 'completed', $orderData);
}
public function euCentralPaymentProcessed($paymentData)
{
$this->route('eu-central', 'payment', 'processed', $paymentData);
}
}
class RegionConsumer
{
public function __construct($region)
{
$this->consumer = new TopicConsumer(["{$region}.*.*"]);
}
public function start()
{
$this->consumer->consume();
}
}
class ServiceConsumer
{
public function __construct($service)
{
$this->consumer = new TopicConsumer(["*.{$service}.*"]);
}
public function start()
{
$this->consumer->consume();
}
}
class EventConsumer
{
public function __construct($event)
{
$this->consumer = new TopicConsumer(["*.*.{$event}"]);
}
public function start()
{
$this->consumer->consume();
}
}物联网设备消息路由
php
<?php
class IoTDeviceRouter
{
private $publisher;
public function __construct()
{
$this->publisher = new TopicPublisher();
}
public function deviceMessage($building, $floor, $deviceType, $deviceId, $data)
{
$routingKey = "{$building}.{$floor}.{$deviceType}.{$deviceId}";
$this->publisher->publish($routingKey, [
'building' => $building,
'floor' => $floor,
'device_type' => $deviceType,
'device_id' => $deviceId,
'data' => $data,
'timestamp' => time()
]);
}
public function temperatureReading($building, $floor, $deviceId, $temperature)
{
$this->deviceMessage($building, $floor, 'temperature', $deviceId, [
'temperature' => $temperature
]);
}
public function motionDetection($building, $floor, $deviceId, $detected)
{
$this->deviceMessage($building, $floor, 'motion', $deviceId, [
'motion_detected' => $detected
]);
}
}
class BuildingConsumer
{
public function __construct($building)
{
$this->consumer = new TopicConsumer(["{$building}.#"]);
}
public function start()
{
echo "监听建筑物 {$building} 的所有设备消息...\n";
$this->consumer->consume();
}
}
class DeviceTypeConsumer
{
public function __construct($deviceType)
{
$this->consumer = new TopicConsumer(["*.*.{$deviceType}.*"]);
}
public function start()
{
echo "监听所有 {$deviceType} 设备消息...\n";
$this->consumer->consume();
}
}实际应用场景
1. 多租户系统
php
<?php
class TenantEventRouter
{
private $publisher;
public function __construct()
{
$this->publisher = new TopicPublisher();
}
public function tenantEvent($tenantId, $module, $action, $data)
{
$routingKey = "tenant.{$tenantId}.{$module}.{$action}";
$this->publisher->publish($routingKey, [
'tenant_id' => $tenantId,
'module' => $module,
'action' => $action,
'data' => $data,
'timestamp' => time()
]);
}
public function tenantUserCreated($tenantId, $userData)
{
$this->tenantEvent($tenantId, 'user', 'created', $userData);
}
public function tenantOrderPlaced($tenantId, $orderData)
{
$this->tenantEvent($tenantId, 'order', 'placed', $orderData);
}
}
class TenantConsumer
{
public function __construct($tenantId)
{
$this->consumer = new TopicConsumer(["tenant.{$tenantId}.*"]);
}
public function start()
{
$this->consumer->consume();
}
}
class ModuleConsumer
{
public function __construct($module)
{
$this->consumer = new TopicConsumer(["tenant.*.{$module}.*"]);
}
public function start()
{
$this->consumer->consume();
}
}2. 微服务事件总线
php
<?php
class ServiceEventBus
{
private $publisher;
public function __construct()
{
$this->publisher = new TopicPublisher();
}
public function emit($service, $event, $version, $data)
{
$routingKey = "service.{$service}.{$event}.v{$version}";
$this->publisher->publish($routingKey, [
'service' => $service,
'event' => $event,
'version' => $version,
'data' => $data,
'timestamp' => time()
]);
}
public function userServiceCreated($userData)
{
$this->emit('user', 'created', 1, $userData);
}
public function orderServiceCompleted($orderData)
{
$this->emit('order', 'completed', 1, $orderData);
}
}
class ServiceEventConsumer
{
public function __construct($service, $event = '*')
{
$this->consumer = new TopicConsumer(["service.{$service}.{$event}.*"]);
}
public function start()
{
$this->consumer->consume();
}
}3. 地理位置路由
php
<?php
class GeoLocationRouter
{
private $publisher;
public function __construct()
{
$this->publisher = new TopicPublisher();
}
public function locationUpdate($country, $state, $city, $data)
{
$routingKey = "geo.{$country}.{$state}.{$city}";
$this->publisher->publish($routingKey, [
'country' => $country,
'state' => $state,
'city' => $city,
'data' => $data,
'timestamp' => time()
]);
}
public function chinaBeijingUpdate($data)
{
$this->locationUpdate('china', 'beijing', '*', $data);
}
}
class CountryConsumer
{
public function __construct($country)
{
$this->consumer = new TopicConsumer(["geo.{$country}.#"]);
}
public function start()
{
$this->consumer->consume();
}
}
class CityConsumer
{
public function __construct($country, $state, $city)
{
$this->consumer = new TopicConsumer(["geo.{$country}.{$state}.{$city}"]);
}
public function start()
{
$this->consumer->consume();
}
}常见问题与解决方案
问题 1:路由键层级不一致
解决方案:
php
<?php
class RoutingKeyNormalizer
{
public static function normalize($parts)
{
$normalized = [];
foreach ($parts as $part) {
$normalized[] = strtolower(preg_replace('/[^a-z0-9]/i', '_', $part));
}
return implode('.', $normalized);
}
public static function validate($routingKey, $expectedDepth)
{
$parts = explode('.', $routingKey);
if (count($parts) !== $expectedDepth) {
throw new InvalidArgumentException(
"路由键层级不正确: {$routingKey}。期望 {$expectedDepth} 层"
);
}
return true;
}
}问题 2:绑定模式过于宽泛
解决方案:
php
<?php
class BindingPatternAnalyzer
{
public function analyze($pattern)
{
$parts = explode('.', $pattern);
$specificity = 0;
foreach ($parts as $part) {
if ($part === '#') {
$specificity += 0;
} elseif ($part === '*') {
$specificity += 1;
} else {
$specificity += 10;
}
}
return [
'pattern' => $pattern,
'specificity' => $specificity,
'wildcards' => [
'hash' => substr_count($pattern, '#'),
'star' => substr_count($pattern, '*')
],
'is_too_broad' => $specificity < 5
];
}
public function suggestImprovement($pattern)
{
$analysis = $this->analyze($pattern);
if ($analysis['is_too_broad']) {
return [
'warning' => '绑定模式过于宽泛,可能匹配过多消息',
'suggestion' => '考虑使用更具体的模式替代通配符'
];
}
return ['status' => 'ok'];
}
}问题 3:消息无法匹配
解决方案:
php
<?php
class TopicMatcher
{
public function match($routingKey, $pattern)
{
$patternParts = explode('.', $pattern);
$keyParts = explode('.', $routingKey);
return $this->matchParts($keyParts, $patternParts, 0, 0);
}
private function matchParts($keyParts, $patternParts, $keyIndex, $patternIndex)
{
if ($patternIndex >= count($patternParts)) {
return $keyIndex >= count($keyParts);
}
if ($keyIndex >= count($keyParts)) {
for ($i = $patternIndex; $i < count($patternParts); $i++) {
if ($patternParts[$i] !== '#') {
return false;
}
}
return true;
}
$patternPart = $patternParts[$patternIndex];
if ($patternPart === '#') {
for ($i = $keyIndex; $i <= count($keyParts); $i++) {
if ($this->matchParts($keyParts, $patternParts, $i, $patternIndex + 1)) {
return true;
}
}
return false;
}
if ($patternPart === '*' || $patternPart === $keyParts[$keyIndex]) {
return $this->matchParts($keyParts, $patternParts, $keyIndex + 1, $patternIndex + 1);
}
return false;
}
public function findMatchingPatterns($routingKey, array $patterns)
{
$matching = [];
foreach ($patterns as $pattern) {
if ($this->match($routingKey, $pattern)) {
$matching[] = $pattern;
}
}
return $matching;
}
}最佳实践建议
1. 路由键设计规范
php
<?php
class TopicKeyBuilder
{
const SEPARATOR = '.';
public static function build(array $parts)
{
$normalized = array_map(function ($part) {
return self::normalizePart($part);
}, $parts);
return implode(self::SEPARATOR, $normalized);
}
private static function normalizePart($part)
{
$part = strtolower($part);
$part = preg_replace('/[^a-z0-9_-]/', '_', $part);
return $part;
}
public static function parse($routingKey)
{
return explode(self::SEPARATOR, $routingKey);
}
}2. 模式注册表
php
<?php
class TopicPatternRegistry
{
private $patterns = [];
public function register($name, $pattern, $description)
{
$this->patterns[$name] = [
'name' => $name,
'pattern' => $pattern,
'description' => $description,
'registered_at' => time()
];
}
public function getPattern($name)
{
return $this->patterns[$name] ?? null;
}
public function getAllPatterns()
{
return $this->patterns;
}
public function findPatternsForRoutingKey($routingKey)
{
$matcher = new TopicMatcher();
$matching = [];
foreach ($this->patterns as $name => $info) {
if ($matcher->match($routingKey, $info['pattern'])) {
$matching[$name] = $info;
}
}
return $matching;
}
}3. 消息追踪
php
<?php
class TopicMessageTracer
{
private $redis;
public function __construct()
{
$this->redis = new Redis();
$this->redis->connect('127.0.0.1', 6379);
}
public function trace($routingKey, $message)
{
$key = "topic:trace:" . date('Ymd');
$this->redis->lPush($key, json_encode([
'routing_key' => $routingKey,
'message' => $message,
'timestamp' => time()
]));
$this->redis->expire($key, 86400 * 7);
}
public function getTrace($date = null)
{
$date = $date ?? date('Ymd');
$key = "topic:trace:{$date}";
return array_map(function ($item) {
return json_decode($item, true);
}, $this->redis->lRange($key, 0, -1));
}
}