Skip to content

路由模式

概述

路由模式(Routing Pattern)通过 Direct 交换器和路由键(Routing Key)实现消息的精确路由。生产者在发送消息时指定路由键,交换器根据路由键将消息路由到绑定了相应路由键的队列。这种模式适用于需要根据特定条件分发消息的场景。

核心知识点

架构图

mermaid
graph TB
    subgraph 生产者
        P[Producer]
    end

    subgraph RabbitMQ
        E[Direct Exchange]
        Q1[Queue: error]
        Q2[Queue: info]
        Q3[Queue: all]
    end

    subgraph 消费者
        C1[Error Handler]
        C2[Info Handler]
        C3[All Handler]
    end

    P -->|routing_key=error| E
    P -->|routing_key=info| E

    E -->|error| Q1
    E -->|info| Q2
    E -->|error,info| Q3

    Q1 --> C1
    Q2 --> C2
    Q3 --> C3

    style E fill:#fff9c4

工作流程

mermaid
sequenceDiagram
    participant P as 生产者
    participant E as Direct Exchange
    participant Q1 as 队列error
    participant Q2 as 队列info
    participant C as 消费者

    Note over E: 绑定关系:<br/>Q1 -> error<br/>Q2 -> info

    P->>E: 消息(routing_key=error)
    E->>Q1: 匹配成功
    Q1->>C: 消费消息

    P->>E: 消息(routing_key=info)
    E->>Q2: 匹配成功
    Q2->>C: 消费消息

    P->>E: 消息(routing_key=debug)
    Note over E: 无匹配队列,消息丢弃

路由键匹配规则

路由键绑定键是否匹配
errorerror
errorinfo
error*✗ (Direct 不支持通配符)
order.createdorder.created
order.createdorder.*

PHP 代码示例

路由发布者

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class RoutingPublisher
{
    private $connection;
    private $channel;
    private $exchangeName = 'routing_exchange';

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        $this->channel->exchange_declare(
            $this->exchangeName,
            'direct',
            false,
            true,
            false
        );
    }

    public function publish($routingKey, $message)
    {
        $msg = new AMQPMessage(
            json_encode([
                'content' => $message,
                'routing_key' => $routingKey,
                'timestamp' => time()
            ]),
            ['content_type' => 'application/json']
        );

        $this->channel->basic_publish(
            $msg,
            $this->exchangeName,
            $routingKey
        );

        echo " [x] 发送消息 [{$routingKey}]: {$message}\n";
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

路由消费者

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

class RoutingConsumer
{
    private $connection;
    private $channel;
    private $exchangeName = 'routing_exchange';
    private $queueName;
    private $bindingKeys;

    public function __construct(array $bindingKeys)
    {
        $this->bindingKeys = $bindingKeys;

        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        $this->channel->exchange_declare(
            $this->exchangeName,
            'direct',
            false,
            true,
            false
        );

        list($this->queueName, ,) = $this->channel->queue_declare(
            '',
            false,
            false,
            true,
            true
        );

        foreach ($bindingKeys as $bindingKey) {
            $this->channel->queue_bind(
                $this->queueName,
                $this->exchangeName,
                $bindingKey
            );
        }

        $keys = implode(', ', $bindingKeys);
        echo " [x] 消费者已启动,绑定键: {$keys}\n";
    }

    public function consume()
    {
        $callback = function ($message) {
            $body = json_decode($message->body, true);
            $routingKey = $message->delivery_info['routing_key'];

            echo sprintf(
                " [x] 接收 [%s]: %s\n",
                $routingKey,
                $body['content']
            );

            $message->ack();
        };

        $this->channel->basic_qos(null, 1, null);
        $this->channel->basic_consume(
            $this->queueName,
            '',
            false,
            false,
            false,
            false,
            $callback
        );

        while ($this->channel->is_open()) {
            $this->channel->wait();
        }
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

完整示例:日志系统

php
<?php

class LogRouter
{
    private $publisher;

    const LEVEL_DEBUG = 'log.debug';
    const LEVEL_INFO = 'log.info';
    const LEVEL_WARNING = 'log.warning';
    const LEVEL_ERROR = 'log.error';
    const LEVEL_CRITICAL = 'log.critical';

    public function __construct()
    {
        $this->publisher = new RoutingPublisher();
    }

    public function log($level, $message, array $context = [])
    {
        $logEntry = [
            'level' => $level,
            'message' => $message,
            'context' => $context,
            'timestamp' => time(),
            'hostname' => gethostname()
        ];

        $this->publisher->publish($level, $logEntry);
    }

    public function debug($message, array $context = [])
    {
        $this->log(self::LEVEL_DEBUG, $message, $context);
    }

    public function info($message, array $context = [])
    {
        $this->log(self::LEVEL_INFO, $message, $context);
    }

    public function warning($message, array $context = [])
    {
        $this->log(self::LEVEL_WARNING, $message, $context);
    }

    public function error($message, array $context = [])
    {
        $this->log(self::LEVEL_ERROR, $message, $context);
    }

    public function critical($message, array $context = [])
    {
        $this->log(self::LEVEL_CRITICAL, $message, $context);
    }
}

class LogConsumer
{
    private $consumer;

    public function __construct(array $levels)
    {
        $this->consumer = new RoutingConsumer($levels);
    }

    public function start()
    {
        $this->consumer->consume();
    }
}

$errorConsumer = new LogConsumer([
    LogRouter::LEVEL_ERROR,
    LogRouter::LEVEL_CRITICAL
]);

$allConsumer = new LogConsumer([
    LogRouter::LEVEL_DEBUG,
    LogRouter::LEVEL_INFO,
    LogRouter::LEVEL_WARNING,
    LogRouter::LEVEL_ERROR,
    LogRouter::LEVEL_CRITICAL
]);

订单事件路由

php
<?php

class OrderEventRouter
{
    private $publisher;

    const EVENT_CREATED = 'order.created';
    const EVENT_PAID = 'order.paid';
    const EVENT_SHIPPED = 'order.shipped';
    const EVENT_DELIVERED = 'order.delivered';
    const EVENT_CANCELLED = 'order.cancelled';

    public function __construct()
    {
        $this->publisher = new RoutingPublisher();
    }

    public function emit($event, $orderId, array $data = [])
    {
        $payload = [
            'event' => $event,
            'order_id' => $orderId,
            'data' => $data,
            'timestamp' => time()
        ];

        $this->publisher->publish($event, $payload);
    }

    public function orderCreated($orderId, $orderData)
    {
        $this->emit(self::EVENT_CREATED, $orderId, $orderData);
    }

    public function orderPaid($orderId, $paymentData)
    {
        $this->emit(self::EVENT_PAID, $orderId, $paymentData);
    }

    public function orderShipped($orderId, $shippingData)
    {
        $this->emit(self::EVENT_SHIPPED, $orderId, $shippingData);
    }

    public function orderDelivered($orderId, $deliveryData)
    {
        $this->emit(self::EVENT_DELIVERED, $orderId, $deliveryData);
    }

    public function orderCancelled($orderId, $reason)
    {
        $this->emit(self::EVENT_CANCELLED, $orderId, ['reason' => $reason]);
    }
}

class OrderEventHandler
{
    private $consumer;

    public function __construct(array $events)
    {
        $this->consumer = new RoutingConsumer($events);
    }

    public function start()
    {
        $this->consumer->consume();
    }
}

$inventoryHandler = new OrderEventHandler([
    OrderEventRouter::EVENT_CREATED,
    OrderEventRouter::EVENT_CANCELLED
]);

$notificationHandler = new OrderEventHandler([
    OrderEventRouter::EVENT_PAID,
    OrderEventRouter::EVENT_SHIPPED,
    OrderEventRouter::EVENT_DELIVERED
]);

多路由键绑定

php
<?php

class MultiBindingRouter
{
    private $connection;
    private $channel;
    private $exchangeName = 'multi_routing_exchange';

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        $this->channel->exchange_declare(
            $this->exchangeName,
            'direct',
            false,
            true,
            false
        );
    }

    public function createConsumerWithBindings($consumerId, array $routingKeys)
    {
        $queueName = "consumer_{$consumerId}_queue";

        $this->channel->queue_declare(
            $queueName,
            false,
            true,
            false,
            false
        );

        foreach ($routingKeys as $key) {
            $this->channel->queue_bind(
                $queueName,
                $this->exchangeName,
                $key
            );

            echo "队列 {$queueName} 已绑定路由键: {$key}\n";
        }

        return $queueName;
    }

    public function publish($routingKey, $message)
    {
        $msg = new AMQPMessage(json_encode($message));

        $this->channel->basic_publish(
            $msg,
            $this->exchangeName,
            $routingKey
        );
    }
}

实际应用场景

1. 用户事件路由

php
<?php

class UserEventRouter
{
    private $publisher;

    const EVENT_REGISTERED = 'user.registered';
    const EVENT_LOGIN = 'user.login';
    const EVENT_LOGOUT = 'user.logout';
    const EVENT_UPDATED = 'user.updated';
    const EVENT_DELETED = 'user.deleted';

    public function __construct()
    {
        $this->publisher = new RoutingPublisher();
    }

    public function userRegistered($userId, $userData)
    {
        $this->publisher->publish(self::EVENT_REGISTERED, [
            'user_id' => $userId,
            'user_data' => $userData,
            'timestamp' => time()
        ]);
    }

    public function userLogin($userId, $loginData)
    {
        $this->publisher->publish(self::EVENT_LOGIN, [
            'user_id' => $userId,
            'ip' => $loginData['ip'] ?? null,
            'user_agent' => $loginData['user_agent'] ?? null,
            'timestamp' => time()
        ]);
    }

    public function userLogout($userId)
    {
        $this->publisher->publish(self::EVENT_LOGOUT, [
            'user_id' => $userId,
            'timestamp' => time()
        ]);
    }
}

2. 支付事件路由

php
<?php

class PaymentEventRouter
{
    private $publisher;

    const EVENT_INITIATED = 'payment.initiated';
    const EVENT_COMPLETED = 'payment.completed';
    const EVENT_FAILED = 'payment.failed';
    const EVENT_REFUNDED = 'payment.refunded';

    public function __construct()
    {
        $this->publisher = new RoutingPublisher();
    }

    public function paymentInitiated($paymentId, $amount, $orderId)
    {
        $this->publisher->publish(self::EVENT_INITIATED, [
            'payment_id' => $paymentId,
            'amount' => $amount,
            'order_id' => $orderId,
            'timestamp' => time()
        ]);
    }

    public function paymentCompleted($paymentId, $transactionId)
    {
        $this->publisher->publish(self::EVENT_COMPLETED, [
            'payment_id' => $paymentId,
            'transaction_id' => $transactionId,
            'timestamp' => time()
        ]);
    }

    public function paymentFailed($paymentId, $reason)
    {
        $this->publisher->publish(self::EVENT_FAILED, [
            'payment_id' => $paymentId,
            'reason' => $reason,
            'timestamp' => time()
        ]);
    }
}

3. 库存事件路由

php
<?php

class InventoryEventRouter
{
    private $publisher;

    const EVENT_RESERVED = 'inventory.reserved';
    const EVENT_RELEASED = 'inventory.released';
    const EVENT_RESTOCKED = 'inventory.restocked';
    const EVENT_LOW_STOCK = 'inventory.low_stock';

    public function __construct()
    {
        $this->publisher = new RoutingPublisher();
    }

    public function reserveStock($productId, $quantity, $orderId)
    {
        $this->publisher->publish(self::EVENT_RESERVED, [
            'product_id' => $productId,
            'quantity' => $quantity,
            'order_id' => $orderId,
            'timestamp' => time()
        ]);
    }

    public function releaseStock($productId, $quantity, $reason)
    {
        $this->publisher->publish(self::EVENT_RELEASED, [
            'product_id' => $productId,
            'quantity' => $quantity,
            'reason' => $reason,
            'timestamp' => time()
        ]);
    }

    public function restock($productId, $quantity)
    {
        $this->publisher->publish(self::EVENT_RESTOCKED, [
            'product_id' => $productId,
            'quantity' => $quantity,
            'timestamp' => time()
        ]);
    }

    public function lowStockAlert($productId, $currentStock, $threshold)
    {
        $this->publisher->publish(self::EVENT_LOW_STOCK, [
            'product_id' => $productId,
            'current_stock' => $currentStock,
            'threshold' => $threshold,
            'timestamp' => time()
        ]);
    }
}

常见问题与解决方案

问题 1:路由键拼写错误

解决方案

php
<?php

class RoutingKeyValidator
{
    private $validKeys = [];

    public function registerKey($key)
    {
        $this->validKeys[] = $key;
    }

    public function validate($key)
    {
        if (!in_array($key, $this->validKeys)) {
            throw new InvalidArgumentException(
                "无效的路由键: {$key}。有效键: " . implode(', ', $this->validKeys)
            );
        }

        return true;
    }

    public function suggest($key)
    {
        $suggestions = [];

        foreach ($this->validKeys as $validKey) {
            similar_text($key, $validKey, $percent);

            if ($percent > 70) {
                $suggestions[] = $validKey;
            }
        }

        return $suggestions;
    }
}

问题 2:消息无法路由

解决方案

php
<?php

class UnroutableMessageHandler
{
    private $connection;
    private $channel;
    private $exchangeName;

    public function setupAlternateExchange()
    {
        $alternateExchange = $this->exchangeName . '.unrouted';

        $this->channel->exchange_declare(
            $alternateExchange,
            'fanout',
            false,
            true,
            false
        );

        $this->channel->queue_declare(
            'unrouted_messages',
            false,
            true,
            false,
            false
        );

        $this->channel->queue_bind(
            'unrouted_messages',
            $alternateExchange
        );

        $this->channel->exchange_declare(
            $this->exchangeName,
            'direct',
            false,
            true,
            false,
            false,
            false,
            new PhpAmqpLib\Wire\AMQPTable([
                'alternate-exchange' => $alternateExchange
            ])
        );
    }

    public function handleUnroutedMessages()
    {
        $callback = function ($message) {
            echo "无法路由的消息: " . $message->body . "\n";

            $this->logUnroutedMessage($message);

            $message->ack();
        };

        $this->channel->basic_consume(
            'unrouted_messages',
            '',
            false,
            false,
            false,
            false,
            $callback
        );

        while ($this->channel->is_open()) {
            $this->channel->wait();
        }
    }

    private function logUnroutedMessage($message)
    {
        error_log("Unrouted message: " . $message->body);
    }
}

问题 3:动态路由配置

解决方案

php
<?php

class DynamicRoutingManager
{
    private $connection;
    private $channel;
    private $bindings = [];

    public function addBinding($queueName, $routingKey)
    {
        $this->channel->queue_bind(
            $queueName,
            $this->exchangeName,
            $routingKey
        );

        $this->bindings[$queueName][] = $routingKey;

        echo "已添加绑定: {$queueName} -> {$routingKey}\n";
    }

    public function removeBinding($queueName, $routingKey)
    {
        $this->channel->queue_unbind(
            $queueName,
            $this->exchangeName,
            $routingKey
        );

        if (isset($this->bindings[$queueName])) {
            $this->bindings[$queueName] = array_diff(
                $this->bindings[$queueName],
                [$routingKey]
            );
        }

        echo "已移除绑定: {$queueName} -> {$routingKey}\n";
    }

    public function getBindings($queueName)
    {
        return $this->bindings[$queueName] ?? [];
    }

    public function saveConfiguration()
    {
        file_put_contents(
            'routing_config.json',
            json_encode($this->bindings, JSON_PRETTY_PRINT)
        );
    }

    public function loadConfiguration()
    {
        if (file_exists('routing_config.json')) {
            $this->bindings = json_decode(
                file_get_contents('routing_config.json'),
                true
            );
        }
    }
}

最佳实践建议

1. 路由键命名规范

php
<?php

class RoutingKeyConvention
{
    public static function build($domain, $entity, $action)
    {
        return "{$domain}.{$entity}.{$action}";
    }

    public static function parse($routingKey)
    {
        $parts = explode('.', $routingKey);

        return [
            'domain' => $parts[0] ?? null,
            'entity' => $parts[1] ?? null,
            'action' => $parts[2] ?? null
        ];
    }

    public static function validate($routingKey)
    {
        if (!preg_match('/^[a-z]+\.[a-z]+\.[a-z]+$/', $routingKey)) {
            throw new InvalidArgumentException(
                "路由键格式无效: {$routingKey}。应为: domain.entity.action"
            );
        }

        return true;
    }
}

2. 路由监控

php
<?php

class RoutingMonitor
{
    private $stats = [];

    public function recordPublish($routingKey)
    {
        if (!isset($this->stats[$routingKey])) {
            $this->stats[$routingKey] = ['published' => 0, 'consumed' => 0];
        }

        $this->stats[$routingKey]['published']++;
    }

    public function recordConsume($routingKey)
    {
        if (!isset($this->stats[$routingKey])) {
            $this->stats[$routingKey] = ['published' => 0, 'consumed' => 0];
        }

        $this->stats[$routingKey]['consumed']++;
    }

    public function getUnconsumedKeys()
    {
        $unconsumed = [];

        foreach ($this->stats as $key => $data) {
            if ($data['published'] > 0 && $data['consumed'] === 0) {
                $unconsumed[$key] = $data;
            }
        }

        return $unconsumed;
    }

    public function getStats()
    {
        return $this->stats;
    }
}

3. 路由文档生成

php
<?php

class RoutingDocumentationGenerator
{
    private $routes = [];

    public function addRoute($routingKey, $description, $payloadSchema = [])
    {
        $this->routes[$routingKey] = [
            'key' => $routingKey,
            'description' => $description,
            'payload_schema' => $payloadSchema
        ];
    }

    public function generateMarkdown()
    {
        $md = "# 路由键文档\n\n";

        foreach ($this->routes as $route) {
            $md .= "## {$route['key']}\n\n";
            $md .= "{$route['description']}\n\n";

            if (!empty($route['payload_schema'])) {
                $md .= "**Payload Schema:**\n\n```json\n";
                $md .= json_encode($route['payload_schema'], JSON_PRETTY_PRINT);
                $md .= "\n```\n\n";
            }
        }

        return $md;
    }
}

相关链接