Skip to content

消息路由模式

概述

消息路由模式是 RabbitMQ 中实现消息精确分发的核心机制。通过路由键(Routing Key)和交换器(Exchange)的组合,生产者可以将消息精确地发送到一个或多个目标队列,实现灵活的消息分发策略。

核心知识点

路由模式架构

mermaid
graph TB
    subgraph 生产者
        P1[生产者1]
        P2[生产者2]
    end

    subgraph 交换器
        E[Direct Exchange]
    end

    subgraph 队列
        Q1[队列: error]
        Q2[队列: info]
        Q3[队列: warning]
    end

    subgraph 消费者
        C1[错误日志处理器]
        C2[信息日志处理器]
        C3[警告日志处理器]
    end

    P1 -->|routing_key=error| E
    P2 -->|routing_key=info| E

    E -->|error| Q1
    E -->|info| Q2
    E -->|warning| Q3

    Q1 --> C1
    Q2 --> C2
    Q3 --> C3

交换器类型对比

交换器类型路由方式使用场景
Direct精确匹配 routing key点对点消息、RPC
Fanout广播到所有绑定队列发布订阅、广播通知
Topic模式匹配 routing key多条件过滤、日志收集
Headers基于消息头匹配复杂条件路由

Direct 交换器工作原理

mermaid
sequenceDiagram
    participant P as 生产者
    participant E as Direct Exchange
    participant Q1 as 队列A
    participant Q2 as 队列B
    participant C as 消费者

    Note over E: 绑定关系:<br/>队列A -> routing_key=A<br/>队列B -> routing_key=B

    P->>E: 发布消息(routing_key=A)
    E->>Q1: 匹配成功,投递到队列A
    Q1->>C: 消费消息

    P->>E: 发布消息(routing_key=B)
    E->>Q2: 匹配成功,投递到队列B
    Q2->>C: 消费消息

    P->>E: 发布消息(routing_key=C)
    Note over E: 无匹配队列,消息丢弃

PHP 代码示例

基础 Direct 路由示例

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class DirectRoutingPublisher
{
    private $connection;
    private $channel;
    private $exchangeName = 'direct_logs';

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        $this->channel->exchange_declare(
            $this->exchangeName,
            'direct',
            false,
            true,
            false
        );
    }

    public function publish($routingKey, $message)
    {
        $msg = new AMQPMessage(
            json_encode([
                'message' => $message,
                'routing_key' => $routingKey,
                'timestamp' => time()
            ]),
            ['content_type' => 'application/json']
        );

        $this->channel->basic_publish(
            $msg,
            $this->exchangeName,
            $routingKey
        );

        echo " [x] 发送消息 [{$routingKey}]: {$message}\n";
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

class DirectRoutingConsumer
{
    private $connection;
    private $channel;
    private $exchangeName = 'direct_logs';
    private $queueName;
    private $bindingKeys;

    public function __construct(array $bindingKeys)
    {
        $this->bindingKeys = $bindingKeys;

        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        $this->channel->exchange_declare(
            $this->exchangeName,
            'direct',
            false,
            true,
            false
        );

        list($this->queueName, ,) = $this->channel->queue_declare(
            '',
            false,
            false,
            true,
            true
        );

        foreach ($bindingKeys as $bindingKey) {
            $this->channel->queue_bind(
                $this->queueName,
                $this->exchangeName,
                $bindingKey
            );
        }
    }

    public function consume()
    {
        $callback = function (AMQPMessage $message) {
            $body = json_decode($message->body, true);
            $routingKey = $message->delivery_info['routing_key'];

            echo sprintf(
                " [x] 接收 [%s]: %s\n",
                $routingKey,
                $body['message']
            );

            $message->ack();
        };

        $this->channel->basic_qos(null, 1, null);
        $this->channel->basic_consume(
            $this->queueName,
            '',
            false,
            false,
            false,
            false,
            $callback
        );

        $bindingKeysStr = implode(', ', $this->bindingKeys);
        echo " [*] 等待消息,绑定键: {$bindingKeysStr}\n";

        while ($this->channel->is_open()) {
            $this->channel->wait();
        }
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

完整示例:日志系统

php
<?php

class LogSystem
{
    const LEVEL_DEBUG = 'debug';
    const LEVEL_INFO = 'info';
    const LEVEL_WARNING = 'warning';
    const LEVEL_ERROR = 'error';
    const LEVEL_CRITICAL = 'critical';

    private $publisher;

    public function __construct()
    {
        $this->publisher = new DirectRoutingPublisher();
    }

    public function log($level, $message, array $context = [])
    {
        $logData = [
            'level' => $level,
            'message' => $message,
            'context' => $context,
            'timestamp' => date('Y-m-d H:i:s'),
            'hostname' => gethostname()
        ];

        $this->publisher->publish($level, $logData);
    }

    public function debug($message, array $context = [])
    {
        $this->log(self::LEVEL_DEBUG, $message, $context);
    }

    public function info($message, array $context = [])
    {
        $this->log(self::LEVEL_INFO, $message, $context);
    }

    public function warning($message, array $context = [])
    {
        $this->log(self::LEVEL_WARNING, $message, $context);
    }

    public function error($message, array $context = [])
    {
        $this->log(self::LEVEL_ERROR, $message, $context);
    }

    public function critical($message, array $context = [])
    {
        $this->log(self::LEVEL_CRITICAL, $message, $context);
    }

    public function close()
    {
        $this->publisher->close();
    }
}

class LogConsumer
{
    private $consumer;
    private $levels;

    public function __construct(array $levels)
    {
        $this->levels = $levels;
        $this->consumer = new DirectRoutingConsumer($levels);
    }

    public function start()
    {
        $levelsStr = implode(', ', $this->levels);
        echo "日志消费者启动,监听级别: {$levelsStr}\n";
        $this->consumer->consume();
    }

    public function close()
    {
        $this->consumer->close();
    }
}

$logger = new LogSystem();

$logger->debug('调试信息', ['user_id' => 1]);
$logger->info('用户登录', ['user_id' => 1, 'ip' => '192.168.1.1']);
$logger->warning('内存使用率较高', ['usage' => '85%']);
$logger->error('数据库连接失败', ['error' => 'Connection refused']);
$logger->critical('系统崩溃', ['error' => 'Out of memory']);

$logger->close();

多路由键绑定示例

php
<?php

class MultiBindingConsumer
{
    private $connection;
    private $channel;
    private $exchangeName = 'multi_routing_exchange';
    private $queueName;

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        $this->channel->exchange_declare(
            $this->exchangeName,
            'direct',
            false,
            true,
            false
        );

        list($this->queueName, ,) = $this->channel->queue_declare(
            '',
            false,
            false,
            true,
            true
        );
    }

    public function bindMultipleKeys(array $routingKeys)
    {
        foreach ($routingKeys as $key) {
            $this->channel->queue_bind(
                $this->queueName,
                $this->exchangeName,
                $key
            );
            echo "已绑定路由键: {$key}\n";
        }
    }

    public function consume()
    {
        $callback = function (AMQPMessage $message) {
            $routingKey = $message->delivery_info['routing_key'];
            $body = json_decode($message->body, true);

            echo sprintf(
                "[%s] 路由键: %s, 消息: %s\n",
                date('H:i:s'),
                $routingKey,
                json_encode($body, JSON_UNESCAPED_UNICODE)
            );

            $message->ack();
        };

        $this->channel->basic_consume(
            $this->queueName,
            '',
            false,
            false,
            false,
            false,
            $callback
        );

        while ($this->channel->is_open()) {
            $this->channel->wait();
        }
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

Headers 交换器路由

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class HeadersRoutingPublisher
{
    private $connection;
    private $channel;
    private $exchangeName = 'headers_exchange';

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        $this->channel->exchange_declare(
            $this->exchangeName,
            'headers',
            false,
            true,
            false
        );
    }

    public function publish($message, array $headers)
    {
        $msg = new AMQPMessage(
            json_encode($message),
            [
                'content_type' => 'application/json',
                'application_headers' => new AMQPTable($headers)
            ]
        );

        $this->channel->basic_publish(
            $msg,
            $this->exchangeName
        );

        echo "消息已发送,头部: " . json_encode($headers) . "\n";
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

class HeadersRoutingConsumer
{
    private $connection;
    private $channel;
    private $exchangeName = 'headers_exchange';
    private $queueName;

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->channel = $this->connection->channel();

        $this->channel->exchange_declare(
            $this->exchangeName,
            'headers',
            false,
            true,
            false
        );

        list($this->queueName, ,) = $this->channel->queue_declare(
            '',
            false,
            false,
            true,
            true
        );
    }

    public function bindWithHeaders(array $headers, $xMatch = 'all')
    {
        $bindHeaders = array_merge($headers, ['x-match' => $xMatch]);

        $this->channel->queue_bind(
            $this->queueName,
            $this->exchangeName,
            '',
            false,
            new AMQPTable($bindHeaders)
        );

        echo "已绑定头部条件: " . json_encode($headers) . " (x-match: {$xMatch})\n";
    }

    public function consume()
    {
        $callback = function (AMQPMessage $message) {
            $body = json_decode($message->body, true);
            $headers = $message->get('application_headers');

            echo sprintf(
                "收到消息: %s\n头部: %s\n",
                json_encode($body, JSON_UNESCAPED_UNICODE),
                json_encode($headers ? $headers->getNativeData() : [])
            );

            $message->ack();
        };

        $this->channel->basic_consume(
            $this->queueName,
            '',
            false,
            false,
            false,
            false,
            $callback
        );

        while ($this->channel->is_open()) {
            $this->channel->wait();
        }
    }

    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

实际应用场景

1. 订单状态路由

php
<?php

class OrderStatusRouter
{
    private $publisher;
    private $exchangeName = 'order_status_exchange';

    const STATUS_CREATED = 'order.created';
    const STATUS_PAID = 'order.paid';
    const STATUS_SHIPPED = 'order.shipped';
    const STATUS_DELIVERED = 'order.delivered';
    const STATUS_CANCELLED = 'order.cancelled';

    public function __construct()
    {
        $this->publisher = new DirectRoutingPublisher();
    }

    public function publishStatusChange($orderId, $status, array $data = [])
    {
        $message = [
            'order_id' => $orderId,
            'status' => $status,
            'data' => $data,
            'timestamp' => time()
        ];

        $this->publisher->publish($status, $message);
    }

    public function orderCreated($orderId, $orderData)
    {
        $this->publishStatusChange($orderId, self::STATUS_CREATED, $orderData);
    }

    public function orderPaid($orderId, $paymentData)
    {
        $this->publishStatusChange($orderId, self::STATUS_PAID, $paymentData);
    }

    public function orderShipped($orderId, $shippingData)
    {
        $this->publishStatusChange($orderId, self::STATUS_SHIPPED, $shippingData);
    }

    public function orderDelivered($orderId, $deliveryData)
    {
        $this->publishStatusChange($orderId, self::STATUS_DELIVERED, $deliveryData);
    }

    public function orderCancelled($orderId, $reason)
    {
        $this->publishStatusChange($orderId, self::STATUS_CANCELLED, ['reason' => $reason]);
    }
}

class NotificationService
{
    public function __construct()
    {
        $this->consumer = new DirectRoutingConsumer([
            OrderStatusRouter::STATUS_CREATED,
            OrderStatusRouter::STATUS_PAID,
            OrderStatusRouter::STATUS_SHIPPED,
            OrderStatusRouter::STATUS_DELIVERED
        ]);
    }

    public function start()
    {
        $this->consumer->consume();
    }
}

class InventoryService
{
    public function __construct()
    {
        $this->consumer = new DirectRoutingConsumer([
            OrderStatusRouter::STATUS_CREATED,
            OrderStatusRouter::STATUS_CANCELLED
        ]);
    }

    public function start()
    {
        $this->consumer->consume();
    }
}

2. 用户事件路由

php
<?php

class UserEventRouter
{
    private $publisher;

    const EVENT_REGISTERED = 'user.registered';
    const EVENT_LOGIN = 'user.login';
    const EVENT_LOGOUT = 'user.logout';
    const EVENT_PROFILE_UPDATED = 'user.profile_updated';
    const EVENT_PASSWORD_CHANGED = 'user.password_changed';
    const EVENT_DELETED = 'user.deleted';

    public function __construct()
    {
        $this->publisher = new DirectRoutingPublisher();
    }

    public function userRegistered($userId, $userData)
    {
        $this->publisher->publish(self::EVENT_REGISTERED, [
            'user_id' => $userId,
            'user_data' => $userData,
            'timestamp' => time()
        ]);
    }

    public function userLogin($userId, $loginData)
    {
        $this->publisher->publish(self::EVENT_LOGIN, [
            'user_id' => $userId,
            'ip' => $loginData['ip'] ?? null,
            'user_agent' => $loginData['user_agent'] ?? null,
            'timestamp' => time()
        ]);
    }

    public function userLogout($userId)
    {
        $this->publisher->publish(self::EVENT_LOGOUT, [
            'user_id' => $userId,
            'timestamp' => time()
        ]);
    }

    public function profileUpdated($userId, $changes)
    {
        $this->publisher->publish(self::EVENT_PROFILE_UPDATED, [
            'user_id' => $userId,
            'changes' => $changes,
            'timestamp' => time()
        ]);
    }
}

3. 系统告警路由

php
<?php

class AlertRouter
{
    private $publisher;

    const ALERT_CPU = 'alert.cpu';
    const ALERT_MEMORY = 'alert.memory';
    const ALERT_DISK = 'alert.disk';
    const ALERT_NETWORK = 'alert.network';
    const ALERT_SERVICE = 'alert.service';

    public function __construct()
    {
        $this->publisher = new DirectRoutingPublisher();
    }

    public function cpuAlert($serverId, $usage, $threshold = 80)
    {
        $this->publisher->publish(self::ALERT_CPU, [
            'server_id' => $serverId,
            'usage' => $usage,
            'threshold' => $threshold,
            'severity' => $this->calculateSeverity($usage, $threshold),
            'timestamp' => time()
        ]);
    }

    public function memoryAlert($serverId, $usage, $threshold = 80)
    {
        $this->publisher->publish(self::ALERT_MEMORY, [
            'server_id' => $serverId,
            'usage' => $usage,
            'threshold' => $threshold,
            'severity' => $this->calculateSeverity($usage, $threshold),
            'timestamp' => time()
        ]);
    }

    public function diskAlert($serverId, $path, $usage, $threshold = 80)
    {
        $this->publisher->publish(self::ALERT_DISK, [
            'server_id' => $serverId,
            'path' => $path,
            'usage' => $usage,
            'threshold' => $threshold,
            'severity' => $this->calculateSeverity($usage, $threshold),
            'timestamp' => time()
        ]);
    }

    public function serviceAlert($serverId, $serviceName, $status, $message)
    {
        $this->publisher->publish(self::ALERT_SERVICE, [
            'server_id' => $serverId,
            'service' => $serviceName,
            'status' => $status,
            'message' => $message,
            'severity' => $status === 'down' ? 'critical' : 'warning',
            'timestamp' => time()
        ]);
    }

    private function calculateSeverity($value, $threshold)
    {
        $ratio = $value / $threshold;

        if ($ratio >= 1.2) {
            return 'critical';
        } elseif ($ratio >= 1.0) {
            return 'warning';
        } else {
            return 'info';
        }
    }
}

常见问题与解决方案

问题 1:路由键命名冲突

原因:不同业务使用相同路由键

解决方案

php
<?php

class RoutingKeyBuilder
{
    private $prefix;

    public function __construct($prefix = '')
    {
        $this->prefix = $prefix;
    }

    public function build($domain, $entity, $action)
    {
        $parts = [];

        if ($this->prefix) {
            $parts[] = $this->prefix;
        }

        $parts[] = $domain;
        $parts[] = $entity;
        $parts[] = $action;

        return implode('.', $parts);
    }

    public function parse($routingKey)
    {
        $parts = explode('.', $routingKey);

        $offset = $this->prefix ? 1 : 0;

        return [
            'domain' => $parts[$offset] ?? null,
            'entity' => $parts[$offset + 1] ?? null,
            'action' => $parts[$offset + 2] ?? null
        ];
    }
}

$builder = new RoutingKeyBuilder('app');

$key = $builder->build('order', 'payment', 'completed');
// 结果: app.order.payment.completed

问题 2:消息路由到多个队列的性能问题

原因:队列绑定过多

解决方案

php
<?php

class OptimizedRouter
{
    private $bindings = [];
    private $maxBindingsPerQueue = 10;

    public function bind($queueName, $routingKey)
    {
        if (!isset($this->bindings[$queueName])) {
            $this->bindings[$queueName] = [];
        }

        if (count($this->bindings[$queueName]) >= $this->maxBindingsPerQueue) {
            throw new RuntimeException("队列 {$queueName} 绑定数量已达上限");
        }

        $this->bindings[$queueName][] = $routingKey;
    }

    public function getOptimalQueue($routingKey)
    {
        foreach ($this->bindings as $queueName => $keys) {
            if (in_array($routingKey, $keys)) {
                return $queueName;
            }
        }

        return null;
    }

    public function suggestConsolidation()
    {
        $suggestions = [];

        foreach ($this->bindings as $queueName => $keys) {
            if (count($keys) > 5) {
                $suggestions[$queueName] = [
                    'binding_count' => count($keys),
                    'suggestion' => '考虑使用 Topic 交换器替代多个 Direct 绑定'
                ];
            }
        }

        return $suggestions;
    }
}

问题 3:路由键变更导致消息丢失

解决方案

php
<?php

class VersionedRouter
{
    private $currentVersion = 'v1';
    private $supportedVersions = ['v1', 'v2'];

    public function publish($routingKey, $message)
    {
        $versionedKey = "{$this->currentVersion}.{$routingKey}";

        $this->publisher->publish($versionedKey, [
            'version' => $this->currentVersion,
            'payload' => $message
        ]);
    }

    public function bindConsumer($queueName, $routingKey, $version = null)
    {
        $version = $version ?? $this->currentVersion;
        $versionedKey = "{$version}.{$routingKey}";

        $this->channel->queue_bind($queueName, $this->exchangeName, $versionedKey);
    }

    public function migrateToVersion($newVersion)
    {
        if (!in_array($newVersion, $this->supportedVersions)) {
            throw new InvalidArgumentException("不支持的版本: {$newVersion}");
        }

        $oldVersion = $this->currentVersion;
        $this->currentVersion = $newVersion;

        echo "路由版本已从 {$oldVersion} 迁移到 {$newVersion}\n";
    }
}

最佳实践建议

1. 路由键命名规范

php
<?php

class RoutingKeyConvention
{
    const PATTERN = '/^([a-z]+\.)+[a-z]+$/';

    public static function validate($routingKey)
    {
        if (!preg_match(self::PATTERN, $routingKey)) {
            throw new InvalidArgumentException(
                "路由键格式无效: {$routingKey},应使用点分隔的小写单词"
            );
        }

        return true;
    }

    public static function buildDomainKey($domain, $entity, $action)
    {
        $key = "{$domain}.{$entity}.{$action}";
        self::validate($key);
        return $key;
    }

    public static function buildTenantKey($tenantId, $domain, $entity, $action)
    {
        $key = "tenant.{$tenantId}.{$domain}.{$entity}.{$action}";
        self::validate($key);
        return $key;
    }
}

2. 路由监控与追踪

php
<?php

class RoutingMonitor
{
    private $stats = [];

    public function recordPublish($routingKey, $success = true)
    {
        $this->initStats($routingKey);

        $this->stats[$routingKey]['published']++;
        $this->stats[$routingKey]['last_publish'] = time();

        if (!$success) {
            $this->stats[$routingKey]['publish_errors']++;
        }
    }

    public function recordConsume($routingKey, $success = true)
    {
        $this->initStats($routingKey);

        $this->stats[$routingKey]['consumed']++;
        $this->stats[$routingKey]['last_consume'] = time();

        if (!$success) {
            $this->stats[$routingKey]['consume_errors']++;
        }
    }

    private function initStats($routingKey)
    {
        if (!isset($this->stats[$routingKey])) {
            $this->stats[$routingKey] = [
                'published' => 0,
                'consumed' => 0,
                'publish_errors' => 0,
                'consume_errors' => 0,
                'last_publish' => null,
                'last_consume' => null
            ];
        }
    }

    public function getStats()
    {
        return $this->stats;
    }

    public function getUnconsumedKeys()
    {
        $unconsumed = [];

        foreach ($this->stats as $key => $data) {
            if ($data['published'] > 0 && $data['consumed'] === 0) {
                $unconsumed[$key] = $data;
            }
        }

        return $unconsumed;
    }
}

3. 动态路由配置

php
<?php

class DynamicRouter
{
    private $config;
    private $routes = [];

    public function __construct(array $config)
    {
        $this->config = $config;
        $this->loadRoutes();
    }

    private function loadRoutes()
    {
        foreach ($this->config as $routeName => $routeConfig) {
            $this->routes[$routeName] = [
                'exchange' => $routeConfig['exchange'],
                'routing_key' => $routeConfig['routing_key'],
                'queue' => $routeConfig['queue'] ?? null
            ];
        }
    }

    public function publish($routeName, $message)
    {
        if (!isset($this->routes[$routeName])) {
            throw new RuntimeException("未定义的路由: {$routeName}");
        }

        $route = $this->routes[$routeName];

        return $this->doPublish(
            $route['exchange'],
            $route['routing_key'],
            $message
        );
    }

    private function doPublish($exchange, $routingKey, $message)
    {
        // 实际发布逻辑
    }

    public function addRoute($name, $exchange, $routingKey, $queue = null)
    {
        $this->routes[$name] = [
            'exchange' => $exchange,
            'routing_key' => $routingKey,
            'queue' => $queue
        ];
    }

    public function removeRoute($name)
    {
        unset($this->routes[$name]);
    }
}

相关链接