Appearance
路由模式
概述
路由模式(Routing Pattern)通过 Direct 交换器和路由键(Routing Key)实现消息的精确路由。生产者在发送消息时指定路由键,交换器根据路由键将消息路由到绑定了相应路由键的队列。这种模式适用于需要根据特定条件分发消息的场景。
核心知识点
架构图
mermaid
graph TB
subgraph 生产者
P[Producer]
end
subgraph RabbitMQ
E[Direct Exchange]
Q1[Queue: error]
Q2[Queue: info]
Q3[Queue: all]
end
subgraph 消费者
C1[Error Handler]
C2[Info Handler]
C3[All Handler]
end
P -->|routing_key=error| E
P -->|routing_key=info| E
E -->|error| Q1
E -->|info| Q2
E -->|error,info| Q3
Q1 --> C1
Q2 --> C2
Q3 --> C3
style E fill:#fff9c4工作流程
mermaid
sequenceDiagram
participant P as 生产者
participant E as Direct Exchange
participant Q1 as 队列error
participant Q2 as 队列info
participant C as 消费者
Note over E: 绑定关系:<br/>Q1 -> error<br/>Q2 -> info
P->>E: 消息(routing_key=error)
E->>Q1: 匹配成功
Q1->>C: 消费消息
P->>E: 消息(routing_key=info)
E->>Q2: 匹配成功
Q2->>C: 消费消息
P->>E: 消息(routing_key=debug)
Note over E: 无匹配队列,消息丢弃路由键匹配规则
| 路由键 | 绑定键 | 是否匹配 |
|---|---|---|
| error | error | ✓ |
| error | info | ✗ |
| error | * | ✗ (Direct 不支持通配符) |
| order.created | order.created | ✓ |
| order.created | order.* | ✗ |
PHP 代码示例
路由发布者
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RoutingPublisher
{
private $connection;
private $channel;
private $exchangeName = 'routing_exchange';
public function __construct()
{
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
$this->channel->exchange_declare(
$this->exchangeName,
'direct',
false,
true,
false
);
}
public function publish($routingKey, $message)
{
$msg = new AMQPMessage(
json_encode([
'content' => $message,
'routing_key' => $routingKey,
'timestamp' => time()
]),
['content_type' => 'application/json']
);
$this->channel->basic_publish(
$msg,
$this->exchangeName,
$routingKey
);
echo " [x] 发送消息 [{$routingKey}]: {$message}\n";
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}路由消费者
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
class RoutingConsumer
{
private $connection;
private $channel;
private $exchangeName = 'routing_exchange';
private $queueName;
private $bindingKeys;
public function __construct(array $bindingKeys)
{
$this->bindingKeys = $bindingKeys;
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
$this->channel->exchange_declare(
$this->exchangeName,
'direct',
false,
true,
false
);
list($this->queueName, ,) = $this->channel->queue_declare(
'',
false,
false,
true,
true
);
foreach ($bindingKeys as $bindingKey) {
$this->channel->queue_bind(
$this->queueName,
$this->exchangeName,
$bindingKey
);
}
$keys = implode(', ', $bindingKeys);
echo " [x] 消费者已启动,绑定键: {$keys}\n";
}
public function consume()
{
$callback = function ($message) {
$body = json_decode($message->body, true);
$routingKey = $message->delivery_info['routing_key'];
echo sprintf(
" [x] 接收 [%s]: %s\n",
$routingKey,
$body['content']
);
$message->ack();
};
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume(
$this->queueName,
'',
false,
false,
false,
false,
$callback
);
while ($this->channel->is_open()) {
$this->channel->wait();
}
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}完整示例:日志系统
php
<?php
class LogRouter
{
private $publisher;
const LEVEL_DEBUG = 'log.debug';
const LEVEL_INFO = 'log.info';
const LEVEL_WARNING = 'log.warning';
const LEVEL_ERROR = 'log.error';
const LEVEL_CRITICAL = 'log.critical';
public function __construct()
{
$this->publisher = new RoutingPublisher();
}
public function log($level, $message, array $context = [])
{
$logEntry = [
'level' => $level,
'message' => $message,
'context' => $context,
'timestamp' => time(),
'hostname' => gethostname()
];
$this->publisher->publish($level, $logEntry);
}
public function debug($message, array $context = [])
{
$this->log(self::LEVEL_DEBUG, $message, $context);
}
public function info($message, array $context = [])
{
$this->log(self::LEVEL_INFO, $message, $context);
}
public function warning($message, array $context = [])
{
$this->log(self::LEVEL_WARNING, $message, $context);
}
public function error($message, array $context = [])
{
$this->log(self::LEVEL_ERROR, $message, $context);
}
public function critical($message, array $context = [])
{
$this->log(self::LEVEL_CRITICAL, $message, $context);
}
}
class LogConsumer
{
private $consumer;
public function __construct(array $levels)
{
$this->consumer = new RoutingConsumer($levels);
}
public function start()
{
$this->consumer->consume();
}
}
$errorConsumer = new LogConsumer([
LogRouter::LEVEL_ERROR,
LogRouter::LEVEL_CRITICAL
]);
$allConsumer = new LogConsumer([
LogRouter::LEVEL_DEBUG,
LogRouter::LEVEL_INFO,
LogRouter::LEVEL_WARNING,
LogRouter::LEVEL_ERROR,
LogRouter::LEVEL_CRITICAL
]);订单事件路由
php
<?php
class OrderEventRouter
{
private $publisher;
const EVENT_CREATED = 'order.created';
const EVENT_PAID = 'order.paid';
const EVENT_SHIPPED = 'order.shipped';
const EVENT_DELIVERED = 'order.delivered';
const EVENT_CANCELLED = 'order.cancelled';
public function __construct()
{
$this->publisher = new RoutingPublisher();
}
public function emit($event, $orderId, array $data = [])
{
$payload = [
'event' => $event,
'order_id' => $orderId,
'data' => $data,
'timestamp' => time()
];
$this->publisher->publish($event, $payload);
}
public function orderCreated($orderId, $orderData)
{
$this->emit(self::EVENT_CREATED, $orderId, $orderData);
}
public function orderPaid($orderId, $paymentData)
{
$this->emit(self::EVENT_PAID, $orderId, $paymentData);
}
public function orderShipped($orderId, $shippingData)
{
$this->emit(self::EVENT_SHIPPED, $orderId, $shippingData);
}
public function orderDelivered($orderId, $deliveryData)
{
$this->emit(self::EVENT_DELIVERED, $orderId, $deliveryData);
}
public function orderCancelled($orderId, $reason)
{
$this->emit(self::EVENT_CANCELLED, $orderId, ['reason' => $reason]);
}
}
class OrderEventHandler
{
private $consumer;
public function __construct(array $events)
{
$this->consumer = new RoutingConsumer($events);
}
public function start()
{
$this->consumer->consume();
}
}
$inventoryHandler = new OrderEventHandler([
OrderEventRouter::EVENT_CREATED,
OrderEventRouter::EVENT_CANCELLED
]);
$notificationHandler = new OrderEventHandler([
OrderEventRouter::EVENT_PAID,
OrderEventRouter::EVENT_SHIPPED,
OrderEventRouter::EVENT_DELIVERED
]);多路由键绑定
php
<?php
class MultiBindingRouter
{
private $connection;
private $channel;
private $exchangeName = 'multi_routing_exchange';
public function __construct()
{
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
$this->channel->exchange_declare(
$this->exchangeName,
'direct',
false,
true,
false
);
}
public function createConsumerWithBindings($consumerId, array $routingKeys)
{
$queueName = "consumer_{$consumerId}_queue";
$this->channel->queue_declare(
$queueName,
false,
true,
false,
false
);
foreach ($routingKeys as $key) {
$this->channel->queue_bind(
$queueName,
$this->exchangeName,
$key
);
echo "队列 {$queueName} 已绑定路由键: {$key}\n";
}
return $queueName;
}
public function publish($routingKey, $message)
{
$msg = new AMQPMessage(json_encode($message));
$this->channel->basic_publish(
$msg,
$this->exchangeName,
$routingKey
);
}
}实际应用场景
1. 用户事件路由
php
<?php
class UserEventRouter
{
private $publisher;
const EVENT_REGISTERED = 'user.registered';
const EVENT_LOGIN = 'user.login';
const EVENT_LOGOUT = 'user.logout';
const EVENT_UPDATED = 'user.updated';
const EVENT_DELETED = 'user.deleted';
public function __construct()
{
$this->publisher = new RoutingPublisher();
}
public function userRegistered($userId, $userData)
{
$this->publisher->publish(self::EVENT_REGISTERED, [
'user_id' => $userId,
'user_data' => $userData,
'timestamp' => time()
]);
}
public function userLogin($userId, $loginData)
{
$this->publisher->publish(self::EVENT_LOGIN, [
'user_id' => $userId,
'ip' => $loginData['ip'] ?? null,
'user_agent' => $loginData['user_agent'] ?? null,
'timestamp' => time()
]);
}
public function userLogout($userId)
{
$this->publisher->publish(self::EVENT_LOGOUT, [
'user_id' => $userId,
'timestamp' => time()
]);
}
}2. 支付事件路由
php
<?php
class PaymentEventRouter
{
private $publisher;
const EVENT_INITIATED = 'payment.initiated';
const EVENT_COMPLETED = 'payment.completed';
const EVENT_FAILED = 'payment.failed';
const EVENT_REFUNDED = 'payment.refunded';
public function __construct()
{
$this->publisher = new RoutingPublisher();
}
public function paymentInitiated($paymentId, $amount, $orderId)
{
$this->publisher->publish(self::EVENT_INITIATED, [
'payment_id' => $paymentId,
'amount' => $amount,
'order_id' => $orderId,
'timestamp' => time()
]);
}
public function paymentCompleted($paymentId, $transactionId)
{
$this->publisher->publish(self::EVENT_COMPLETED, [
'payment_id' => $paymentId,
'transaction_id' => $transactionId,
'timestamp' => time()
]);
}
public function paymentFailed($paymentId, $reason)
{
$this->publisher->publish(self::EVENT_FAILED, [
'payment_id' => $paymentId,
'reason' => $reason,
'timestamp' => time()
]);
}
}3. 库存事件路由
php
<?php
class InventoryEventRouter
{
private $publisher;
const EVENT_RESERVED = 'inventory.reserved';
const EVENT_RELEASED = 'inventory.released';
const EVENT_RESTOCKED = 'inventory.restocked';
const EVENT_LOW_STOCK = 'inventory.low_stock';
public function __construct()
{
$this->publisher = new RoutingPublisher();
}
public function reserveStock($productId, $quantity, $orderId)
{
$this->publisher->publish(self::EVENT_RESERVED, [
'product_id' => $productId,
'quantity' => $quantity,
'order_id' => $orderId,
'timestamp' => time()
]);
}
public function releaseStock($productId, $quantity, $reason)
{
$this->publisher->publish(self::EVENT_RELEASED, [
'product_id' => $productId,
'quantity' => $quantity,
'reason' => $reason,
'timestamp' => time()
]);
}
public function restock($productId, $quantity)
{
$this->publisher->publish(self::EVENT_RESTOCKED, [
'product_id' => $productId,
'quantity' => $quantity,
'timestamp' => time()
]);
}
public function lowStockAlert($productId, $currentStock, $threshold)
{
$this->publisher->publish(self::EVENT_LOW_STOCK, [
'product_id' => $productId,
'current_stock' => $currentStock,
'threshold' => $threshold,
'timestamp' => time()
]);
}
}常见问题与解决方案
问题 1:路由键拼写错误
解决方案:
php
<?php
class RoutingKeyValidator
{
private $validKeys = [];
public function registerKey($key)
{
$this->validKeys[] = $key;
}
public function validate($key)
{
if (!in_array($key, $this->validKeys)) {
throw new InvalidArgumentException(
"无效的路由键: {$key}。有效键: " . implode(', ', $this->validKeys)
);
}
return true;
}
public function suggest($key)
{
$suggestions = [];
foreach ($this->validKeys as $validKey) {
similar_text($key, $validKey, $percent);
if ($percent > 70) {
$suggestions[] = $validKey;
}
}
return $suggestions;
}
}问题 2:消息无法路由
解决方案:
php
<?php
class UnroutableMessageHandler
{
private $connection;
private $channel;
private $exchangeName;
public function setupAlternateExchange()
{
$alternateExchange = $this->exchangeName . '.unrouted';
$this->channel->exchange_declare(
$alternateExchange,
'fanout',
false,
true,
false
);
$this->channel->queue_declare(
'unrouted_messages',
false,
true,
false,
false
);
$this->channel->queue_bind(
'unrouted_messages',
$alternateExchange
);
$this->channel->exchange_declare(
$this->exchangeName,
'direct',
false,
true,
false,
false,
false,
new PhpAmqpLib\Wire\AMQPTable([
'alternate-exchange' => $alternateExchange
])
);
}
public function handleUnroutedMessages()
{
$callback = function ($message) {
echo "无法路由的消息: " . $message->body . "\n";
$this->logUnroutedMessage($message);
$message->ack();
};
$this->channel->basic_consume(
'unrouted_messages',
'',
false,
false,
false,
false,
$callback
);
while ($this->channel->is_open()) {
$this->channel->wait();
}
}
private function logUnroutedMessage($message)
{
error_log("Unrouted message: " . $message->body);
}
}问题 3:动态路由配置
解决方案:
php
<?php
class DynamicRoutingManager
{
private $connection;
private $channel;
private $bindings = [];
public function addBinding($queueName, $routingKey)
{
$this->channel->queue_bind(
$queueName,
$this->exchangeName,
$routingKey
);
$this->bindings[$queueName][] = $routingKey;
echo "已添加绑定: {$queueName} -> {$routingKey}\n";
}
public function removeBinding($queueName, $routingKey)
{
$this->channel->queue_unbind(
$queueName,
$this->exchangeName,
$routingKey
);
if (isset($this->bindings[$queueName])) {
$this->bindings[$queueName] = array_diff(
$this->bindings[$queueName],
[$routingKey]
);
}
echo "已移除绑定: {$queueName} -> {$routingKey}\n";
}
public function getBindings($queueName)
{
return $this->bindings[$queueName] ?? [];
}
public function saveConfiguration()
{
file_put_contents(
'routing_config.json',
json_encode($this->bindings, JSON_PRETTY_PRINT)
);
}
public function loadConfiguration()
{
if (file_exists('routing_config.json')) {
$this->bindings = json_decode(
file_get_contents('routing_config.json'),
true
);
}
}
}最佳实践建议
1. 路由键命名规范
php
<?php
class RoutingKeyConvention
{
public static function build($domain, $entity, $action)
{
return "{$domain}.{$entity}.{$action}";
}
public static function parse($routingKey)
{
$parts = explode('.', $routingKey);
return [
'domain' => $parts[0] ?? null,
'entity' => $parts[1] ?? null,
'action' => $parts[2] ?? null
];
}
public static function validate($routingKey)
{
if (!preg_match('/^[a-z]+\.[a-z]+\.[a-z]+$/', $routingKey)) {
throw new InvalidArgumentException(
"路由键格式无效: {$routingKey}。应为: domain.entity.action"
);
}
return true;
}
}2. 路由监控
php
<?php
class RoutingMonitor
{
private $stats = [];
public function recordPublish($routingKey)
{
if (!isset($this->stats[$routingKey])) {
$this->stats[$routingKey] = ['published' => 0, 'consumed' => 0];
}
$this->stats[$routingKey]['published']++;
}
public function recordConsume($routingKey)
{
if (!isset($this->stats[$routingKey])) {
$this->stats[$routingKey] = ['published' => 0, 'consumed' => 0];
}
$this->stats[$routingKey]['consumed']++;
}
public function getUnconsumedKeys()
{
$unconsumed = [];
foreach ($this->stats as $key => $data) {
if ($data['published'] > 0 && $data['consumed'] === 0) {
$unconsumed[$key] = $data;
}
}
return $unconsumed;
}
public function getStats()
{
return $this->stats;
}
}3. 路由文档生成
php
<?php
class RoutingDocumentationGenerator
{
private $routes = [];
public function addRoute($routingKey, $description, $payloadSchema = [])
{
$this->routes[$routingKey] = [
'key' => $routingKey,
'description' => $description,
'payload_schema' => $payloadSchema
];
}
public function generateMarkdown()
{
$md = "# 路由键文档\n\n";
foreach ($this->routes as $route) {
$md .= "## {$route['key']}\n\n";
$md .= "{$route['description']}\n\n";
if (!empty($route['payload_schema'])) {
$md .= "**Payload Schema:**\n\n```json\n";
$md .= json_encode($route['payload_schema'], JSON_PRETTY_PRINT);
$md .= "\n```\n\n";
}
}
return $md;
}
}