Skip to content

主题模式

概述

主题模式(Topic Pattern)使用 Topic 交换器实现基于模式的消息路由。通过通配符匹配路由键,可以实现灵活的消息过滤和分发。这种模式比 Direct 交换器更灵活,适用于需要多条件过滤的场景。

核心知识点

架构图

mermaid
graph TB
    subgraph 生产者
        P[Producer]
    end

    subgraph RabbitMQ
        E[Topic Exchange]
        Q1[Queue: *.orange.*]
        Q2[Queue: *.*.rabbit]
        Q3[Queue: lazy.#]
    end

    subgraph 消费者
        C1[橙色动物处理器]
        C2[兔子处理器]
        C3[懒惰动物处理器]
    end

    P -->|quick.orange.rabbit| E
    P -->|lazy.orange.elephant| E
    P -->|quick.brown.fox| E

    E -->|匹配| Q1
    E -->|匹配| Q2
    E -->|匹配| Q3

    Q1 --> C1
    Q2 --> C2
    Q3 --> C3

    style E fill:#e1bee7

通配符规则

通配符说明示例
*匹配一个单词.orange. 匹配 quick.orange.rabbit
#匹配零个或多个单词lazy.# 匹配 lazy、lazy.orange、lazy.orange.rabbit

匹配示例

mermaid
graph LR
    subgraph 路由键
        K1[quick.orange.rabbit]
        K2[quick.orange.fox]
        K3[lazy.orange.elephant]
        K4[lazy.brown.fox]
        K5[lazy.pink.rabbit]
        K6[quick.brown.fox]
    end

    subgraph 绑定模式
        B1["*.orange.*"]
        B2["*.*.rabbit"]
        B3["lazy.#"]
    end

    K1 --> B1
    K1 --> B2
    K2 --> B1
    K3 --> B1
    K3 --> B3
    K4 --> B3
    K5 --> B2
    K5 --> B3

PHP 代码示例

主题发布者

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class TopicPublisher
{
    private $connection;
    private $channel;
    private $exchangeName = 'topic_exchange';

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        $this->channel->exchange_declare(
            $this->exchangeName,
            'topic',
            false,
            true,
            false
        );
    }

    public function publish($routingKey, $message)
    {
        $msg = new AMQPMessage(
            json_encode([
                'content' => $message,
                'routing_key' => $routingKey,
                'timestamp' => time()
            ]),
            ['content_type' => 'application/json']
        );

        $this->channel->basic_publish(
            $msg,
            $this->exchangeName,
            $routingKey
        );

        echo " [x] 发送消息 [{$routingKey}]: " . json_encode($message) . "\n";
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

主题消费者

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

class TopicConsumer
{
    private $connection;
    private $channel;
    private $exchangeName = 'topic_exchange';
    private $queueName;
    private $bindingKeys;

    public function __construct(array $bindingKeys)
    {
        $this->bindingKeys = $bindingKeys;

        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        $this->channel->exchange_declare(
            $this->exchangeName,
            'topic',
            false,
            true,
            false
        );

        list($this->queueName, ,) = $this->channel->queue_declare(
            '',
            false,
            false,
            true,
            true
        );

        foreach ($bindingKeys as $bindingKey) {
            $this->channel->queue_bind(
                $this->queueName,
                $this->exchangeName,
                $bindingKey
            );
        }

        $keys = implode(', ', $bindingKeys);
        echo " [x] 消费者已启动,绑定模式: {$keys}\n";
    }

    public function consume()
    {
        $callback = function ($message) {
            $body = json_decode($message->body, true);
            $routingKey = $message->delivery_info['routing_key'];

            echo sprintf(
                " [x] 接收 [%s]: %s\n",
                $routingKey,
                json_encode($body['content'], JSON_UNESCAPED_UNICODE)
            );

            $message->ack();
        };

        $this->channel->basic_qos(null, 1, null);
        $this->channel->basic_consume(
            $this->queueName,
            '',
            false,
            false,
            false,
            false,
            $callback
        );

        while ($this->channel->is_open()) {
            $this->channel->wait();
        }
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

完整示例:日志系统

php
<?php

class TopicLogSystem
{
    private $publisher;

    public function __construct()
    {
        $this->publisher = new TopicPublisher();
    }

    public function log($facility, $level, $message, array $context = [])
    {
        $routingKey = "{$facility}.{$level}";

        $this->publisher->publish($routingKey, [
            'facility' => $facility,
            'level' => $level,
            'message' => $message,
            'context' => $context,
            'timestamp' => time()
        ]);
    }

    public function kernInfo($message, array $context = [])
    {
        $this->log('kern', 'info', $message, $context);
    }

    public function kernError($message, array $context = [])
    {
        $this->log('kern', 'error', $message, $context);
    }

    public function authInfo($message, array $context = [])
    {
        $this->log('auth', 'info', $message, $context);
    }

    public function authError($message, array $context = [])
    {
        $this->log('auth', 'error', $message, $context);
    }
}

class AllLogsConsumer
{
    public function __construct()
    {
        $this->consumer = new TopicConsumer(['#']);
    }

    public function start()
    {
        echo "监听所有日志...\n";
        $this->consumer->consume();
    }
}

class ErrorLogsConsumer
{
    public function __construct()
    {
        $this->consumer = new TopicConsumer(['*.error']);
    }

    public function start()
    {
        echo "监听所有错误日志...\n";
        $this->consumer->consume();
    }
}

class KernLogsConsumer
{
    public function __construct()
    {
        $this->consumer = new TopicConsumer(['kern.*']);
    }

    public function start()
    {
        echo "监听内核日志...\n";
        $this->consumer->consume();
    }
}

多维度消息路由

php
<?php

class MultiDimensionalRouter
{
    private $publisher;

    public function __construct()
    {
        $this->publisher = new TopicPublisher();
    }

    public function route($region, $service, $event, $data)
    {
        $routingKey = "{$region}.{$service}.{$event}";

        $this->publisher->publish($routingKey, [
            'region' => $region,
            'service' => $service,
            'event' => $event,
            'data' => $data,
            'timestamp' => time()
        ]);
    }

    public function usEastUserServiceCreated($userData)
    {
        $this->route('us-east', 'user', 'created', $userData);
    }

    public function usWestOrderCompleted($orderData)
    {
        $this->route('us-west', 'order', 'completed', $orderData);
    }

    public function euCentralPaymentProcessed($paymentData)
    {
        $this->route('eu-central', 'payment', 'processed', $paymentData);
    }
}

class RegionConsumer
{
    public function __construct($region)
    {
        $this->consumer = new TopicConsumer(["{$region}.*.*"]);
    }

    public function start()
    {
        $this->consumer->consume();
    }
}

class ServiceConsumer
{
    public function __construct($service)
    {
        $this->consumer = new TopicConsumer(["*.{$service}.*"]);
    }

    public function start()
    {
        $this->consumer->consume();
    }
}

class EventConsumer
{
    public function __construct($event)
    {
        $this->consumer = new TopicConsumer(["*.*.{$event}"]);
    }

    public function start()
    {
        $this->consumer->consume();
    }
}

物联网设备消息路由

php
<?php

class IoTDeviceRouter
{
    private $publisher;

    public function __construct()
    {
        $this->publisher = new TopicPublisher();
    }

    public function deviceMessage($building, $floor, $deviceType, $deviceId, $data)
    {
        $routingKey = "{$building}.{$floor}.{$deviceType}.{$deviceId}";

        $this->publisher->publish($routingKey, [
            'building' => $building,
            'floor' => $floor,
            'device_type' => $deviceType,
            'device_id' => $deviceId,
            'data' => $data,
            'timestamp' => time()
        ]);
    }

    public function temperatureReading($building, $floor, $deviceId, $temperature)
    {
        $this->deviceMessage($building, $floor, 'temperature', $deviceId, [
            'temperature' => $temperature
        ]);
    }

    public function motionDetection($building, $floor, $deviceId, $detected)
    {
        $this->deviceMessage($building, $floor, 'motion', $deviceId, [
            'motion_detected' => $detected
        ]);
    }
}

class BuildingConsumer
{
    public function __construct($building)
    {
        $this->consumer = new TopicConsumer(["{$building}.#"]);
    }

    public function start()
    {
        echo "监听建筑物 {$building} 的所有设备消息...\n";
        $this->consumer->consume();
    }
}

class DeviceTypeConsumer
{
    public function __construct($deviceType)
    {
        $this->consumer = new TopicConsumer(["*.*.{$deviceType}.*"]);
    }

    public function start()
    {
        echo "监听所有 {$deviceType} 设备消息...\n";
        $this->consumer->consume();
    }
}

实际应用场景

1. 多租户系统

php
<?php

class TenantEventRouter
{
    private $publisher;

    public function __construct()
    {
        $this->publisher = new TopicPublisher();
    }

    public function tenantEvent($tenantId, $module, $action, $data)
    {
        $routingKey = "tenant.{$tenantId}.{$module}.{$action}";

        $this->publisher->publish($routingKey, [
            'tenant_id' => $tenantId,
            'module' => $module,
            'action' => $action,
            'data' => $data,
            'timestamp' => time()
        ]);
    }

    public function tenantUserCreated($tenantId, $userData)
    {
        $this->tenantEvent($tenantId, 'user', 'created', $userData);
    }

    public function tenantOrderPlaced($tenantId, $orderData)
    {
        $this->tenantEvent($tenantId, 'order', 'placed', $orderData);
    }
}

class TenantConsumer
{
    public function __construct($tenantId)
    {
        $this->consumer = new TopicConsumer(["tenant.{$tenantId}.*"]);
    }

    public function start()
    {
        $this->consumer->consume();
    }
}

class ModuleConsumer
{
    public function __construct($module)
    {
        $this->consumer = new TopicConsumer(["tenant.*.{$module}.*"]);
    }

    public function start()
    {
        $this->consumer->consume();
    }
}

2. 微服务事件总线

php
<?php

class ServiceEventBus
{
    private $publisher;

    public function __construct()
    {
        $this->publisher = new TopicPublisher();
    }

    public function emit($service, $event, $version, $data)
    {
        $routingKey = "service.{$service}.{$event}.v{$version}";

        $this->publisher->publish($routingKey, [
            'service' => $service,
            'event' => $event,
            'version' => $version,
            'data' => $data,
            'timestamp' => time()
        ]);
    }

    public function userServiceCreated($userData)
    {
        $this->emit('user', 'created', 1, $userData);
    }

    public function orderServiceCompleted($orderData)
    {
        $this->emit('order', 'completed', 1, $orderData);
    }
}

class ServiceEventConsumer
{
    public function __construct($service, $event = '*')
    {
        $this->consumer = new TopicConsumer(["service.{$service}.{$event}.*"]);
    }

    public function start()
    {
        $this->consumer->consume();
    }
}

3. 地理位置路由

php
<?php

class GeoLocationRouter
{
    private $publisher;

    public function __construct()
    {
        $this->publisher = new TopicPublisher();
    }

    public function locationUpdate($country, $state, $city, $data)
    {
        $routingKey = "geo.{$country}.{$state}.{$city}";

        $this->publisher->publish($routingKey, [
            'country' => $country,
            'state' => $state,
            'city' => $city,
            'data' => $data,
            'timestamp' => time()
        ]);
    }

    public function chinaBeijingUpdate($data)
    {
        $this->locationUpdate('china', 'beijing', '*', $data);
    }
}

class CountryConsumer
{
    public function __construct($country)
    {
        $this->consumer = new TopicConsumer(["geo.{$country}.#"]);
    }

    public function start()
    {
        $this->consumer->consume();
    }
}

class CityConsumer
{
    public function __construct($country, $state, $city)
    {
        $this->consumer = new TopicConsumer(["geo.{$country}.{$state}.{$city}"]);
    }

    public function start()
    {
        $this->consumer->consume();
    }
}

常见问题与解决方案

问题 1:路由键层级不一致

解决方案

php
<?php

class RoutingKeyNormalizer
{
    public static function normalize($parts)
    {
        $normalized = [];

        foreach ($parts as $part) {
            $normalized[] = strtolower(preg_replace('/[^a-z0-9]/i', '_', $part));
        }

        return implode('.', $normalized);
    }

    public static function validate($routingKey, $expectedDepth)
    {
        $parts = explode('.', $routingKey);

        if (count($parts) !== $expectedDepth) {
            throw new InvalidArgumentException(
                "路由键层级不正确: {$routingKey}。期望 {$expectedDepth} 层"
            );
        }

        return true;
    }
}

问题 2:绑定模式过于宽泛

解决方案

php
<?php

class BindingPatternAnalyzer
{
    public function analyze($pattern)
    {
        $parts = explode('.', $pattern);

        $specificity = 0;

        foreach ($parts as $part) {
            if ($part === '#') {
                $specificity += 0;
            } elseif ($part === '*') {
                $specificity += 1;
            } else {
                $specificity += 10;
            }
        }

        return [
            'pattern' => $pattern,
            'specificity' => $specificity,
            'wildcards' => [
                'hash' => substr_count($pattern, '#'),
                'star' => substr_count($pattern, '*')
            ],
            'is_too_broad' => $specificity < 5
        ];
    }

    public function suggestImprovement($pattern)
    {
        $analysis = $this->analyze($pattern);

        if ($analysis['is_too_broad']) {
            return [
                'warning' => '绑定模式过于宽泛,可能匹配过多消息',
                'suggestion' => '考虑使用更具体的模式替代通配符'
            ];
        }

        return ['status' => 'ok'];
    }
}

问题 3:消息无法匹配

解决方案

php
<?php

class TopicMatcher
{
    public function match($routingKey, $pattern)
    {
        $patternParts = explode('.', $pattern);
        $keyParts = explode('.', $routingKey);

        return $this->matchParts($keyParts, $patternParts, 0, 0);
    }

    private function matchParts($keyParts, $patternParts, $keyIndex, $patternIndex)
    {
        if ($patternIndex >= count($patternParts)) {
            return $keyIndex >= count($keyParts);
        }

        if ($keyIndex >= count($keyParts)) {
            for ($i = $patternIndex; $i < count($patternParts); $i++) {
                if ($patternParts[$i] !== '#') {
                    return false;
                }
            }
            return true;
        }

        $patternPart = $patternParts[$patternIndex];

        if ($patternPart === '#') {
            for ($i = $keyIndex; $i <= count($keyParts); $i++) {
                if ($this->matchParts($keyParts, $patternParts, $i, $patternIndex + 1)) {
                    return true;
                }
            }
            return false;
        }

        if ($patternPart === '*' || $patternPart === $keyParts[$keyIndex]) {
            return $this->matchParts($keyParts, $patternParts, $keyIndex + 1, $patternIndex + 1);
        }

        return false;
    }

    public function findMatchingPatterns($routingKey, array $patterns)
    {
        $matching = [];

        foreach ($patterns as $pattern) {
            if ($this->match($routingKey, $pattern)) {
                $matching[] = $pattern;
            }
        }

        return $matching;
    }
}

最佳实践建议

1. 路由键设计规范

php
<?php

class TopicKeyBuilder
{
    const SEPARATOR = '.';

    public static function build(array $parts)
    {
        $normalized = array_map(function ($part) {
            return self::normalizePart($part);
        }, $parts);

        return implode(self::SEPARATOR, $normalized);
    }

    private static function normalizePart($part)
    {
        $part = strtolower($part);
        $part = preg_replace('/[^a-z0-9_-]/', '_', $part);

        return $part;
    }

    public static function parse($routingKey)
    {
        return explode(self::SEPARATOR, $routingKey);
    }
}

2. 模式注册表

php
<?php

class TopicPatternRegistry
{
    private $patterns = [];

    public function register($name, $pattern, $description)
    {
        $this->patterns[$name] = [
            'name' => $name,
            'pattern' => $pattern,
            'description' => $description,
            'registered_at' => time()
        ];
    }

    public function getPattern($name)
    {
        return $this->patterns[$name] ?? null;
    }

    public function getAllPatterns()
    {
        return $this->patterns;
    }

    public function findPatternsForRoutingKey($routingKey)
    {
        $matcher = new TopicMatcher();
        $matching = [];

        foreach ($this->patterns as $name => $info) {
            if ($matcher->match($routingKey, $info['pattern'])) {
                $matching[$name] = $info;
            }
        }

        return $matching;
    }
}

3. 消息追踪

php
<?php

class TopicMessageTracer
{
    private $redis;

    public function __construct()
    {
        $this->redis = new Redis();
        $this->redis->connect('127.0.0.1', 6379);
    }

    public function trace($routingKey, $message)
    {
        $key = "topic:trace:" . date('Ymd');

        $this->redis->lPush($key, json_encode([
            'routing_key' => $routingKey,
            'message' => $message,
            'timestamp' => time()
        ]));

        $this->redis->expire($key, 86400 * 7);
    }

    public function getTrace($date = null)
    {
        $date = $date ?? date('Ymd');
        $key = "topic:trace:{$date}";

        return array_map(function ($item) {
            return json_decode($item, true);
        }, $this->redis->lRange($key, 0, -1));
    }
}

相关链接