Appearance
消息路由模式
概述
消息路由模式是 RabbitMQ 中实现消息精确分发的核心机制。通过路由键(Routing Key)和交换器(Exchange)的组合,生产者可以将消息精确地发送到一个或多个目标队列,实现灵活的消息分发策略。
核心知识点
路由模式架构
mermaid
graph TB
subgraph 生产者
P1[生产者1]
P2[生产者2]
end
subgraph 交换器
E[Direct Exchange]
end
subgraph 队列
Q1[队列: error]
Q2[队列: info]
Q3[队列: warning]
end
subgraph 消费者
C1[错误日志处理器]
C2[信息日志处理器]
C3[警告日志处理器]
end
P1 -->|routing_key=error| E
P2 -->|routing_key=info| E
E -->|error| Q1
E -->|info| Q2
E -->|warning| Q3
Q1 --> C1
Q2 --> C2
Q3 --> C3交换器类型对比
| 交换器类型 | 路由方式 | 使用场景 |
|---|---|---|
| Direct | 精确匹配 routing key | 点对点消息、RPC |
| Fanout | 广播到所有绑定队列 | 发布订阅、广播通知 |
| Topic | 模式匹配 routing key | 多条件过滤、日志收集 |
| Headers | 基于消息头匹配 | 复杂条件路由 |
Direct 交换器工作原理
mermaid
sequenceDiagram
participant P as 生产者
participant E as Direct Exchange
participant Q1 as 队列A
participant Q2 as 队列B
participant C as 消费者
Note over E: 绑定关系:<br/>队列A -> routing_key=A<br/>队列B -> routing_key=B
P->>E: 发布消息(routing_key=A)
E->>Q1: 匹配成功,投递到队列A
Q1->>C: 消费消息
P->>E: 发布消息(routing_key=B)
E->>Q2: 匹配成功,投递到队列B
Q2->>C: 消费消息
P->>E: 发布消息(routing_key=C)
Note over E: 无匹配队列,消息丢弃PHP 代码示例
基础 Direct 路由示例
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class DirectRoutingPublisher
{
private $connection;
private $channel;
private $exchangeName = 'direct_logs';
public function __construct()
{
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
$this->channel->exchange_declare(
$this->exchangeName,
'direct',
false,
true,
false
);
}
public function publish($routingKey, $message)
{
$msg = new AMQPMessage(
json_encode([
'message' => $message,
'routing_key' => $routingKey,
'timestamp' => time()
]),
['content_type' => 'application/json']
);
$this->channel->basic_publish(
$msg,
$this->exchangeName,
$routingKey
);
echo " [x] 发送消息 [{$routingKey}]: {$message}\n";
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}
class DirectRoutingConsumer
{
private $connection;
private $channel;
private $exchangeName = 'direct_logs';
private $queueName;
private $bindingKeys;
public function __construct(array $bindingKeys)
{
$this->bindingKeys = $bindingKeys;
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
$this->channel->exchange_declare(
$this->exchangeName,
'direct',
false,
true,
false
);
list($this->queueName, ,) = $this->channel->queue_declare(
'',
false,
false,
true,
true
);
foreach ($bindingKeys as $bindingKey) {
$this->channel->queue_bind(
$this->queueName,
$this->exchangeName,
$bindingKey
);
}
}
public function consume()
{
$callback = function (AMQPMessage $message) {
$body = json_decode($message->body, true);
$routingKey = $message->delivery_info['routing_key'];
echo sprintf(
" [x] 接收 [%s]: %s\n",
$routingKey,
$body['message']
);
$message->ack();
};
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume(
$this->queueName,
'',
false,
false,
false,
false,
$callback
);
$bindingKeysStr = implode(', ', $this->bindingKeys);
echo " [*] 等待消息,绑定键: {$bindingKeysStr}\n";
while ($this->channel->is_open()) {
$this->channel->wait();
}
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}完整示例:日志系统
php
<?php
class LogSystem
{
const LEVEL_DEBUG = 'debug';
const LEVEL_INFO = 'info';
const LEVEL_WARNING = 'warning';
const LEVEL_ERROR = 'error';
const LEVEL_CRITICAL = 'critical';
private $publisher;
public function __construct()
{
$this->publisher = new DirectRoutingPublisher();
}
public function log($level, $message, array $context = [])
{
$logData = [
'level' => $level,
'message' => $message,
'context' => $context,
'timestamp' => date('Y-m-d H:i:s'),
'hostname' => gethostname()
];
$this->publisher->publish($level, $logData);
}
public function debug($message, array $context = [])
{
$this->log(self::LEVEL_DEBUG, $message, $context);
}
public function info($message, array $context = [])
{
$this->log(self::LEVEL_INFO, $message, $context);
}
public function warning($message, array $context = [])
{
$this->log(self::LEVEL_WARNING, $message, $context);
}
public function error($message, array $context = [])
{
$this->log(self::LEVEL_ERROR, $message, $context);
}
public function critical($message, array $context = [])
{
$this->log(self::LEVEL_CRITICAL, $message, $context);
}
public function close()
{
$this->publisher->close();
}
}
class LogConsumer
{
private $consumer;
private $levels;
public function __construct(array $levels)
{
$this->levels = $levels;
$this->consumer = new DirectRoutingConsumer($levels);
}
public function start()
{
$levelsStr = implode(', ', $this->levels);
echo "日志消费者启动,监听级别: {$levelsStr}\n";
$this->consumer->consume();
}
public function close()
{
$this->consumer->close();
}
}
$logger = new LogSystem();
$logger->debug('调试信息', ['user_id' => 1]);
$logger->info('用户登录', ['user_id' => 1, 'ip' => '192.168.1.1']);
$logger->warning('内存使用率较高', ['usage' => '85%']);
$logger->error('数据库连接失败', ['error' => 'Connection refused']);
$logger->critical('系统崩溃', ['error' => 'Out of memory']);
$logger->close();多路由键绑定示例
php
<?php
class MultiBindingConsumer
{
private $connection;
private $channel;
private $exchangeName = 'multi_routing_exchange';
private $queueName;
public function __construct()
{
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
$this->channel->exchange_declare(
$this->exchangeName,
'direct',
false,
true,
false
);
list($this->queueName, ,) = $this->channel->queue_declare(
'',
false,
false,
true,
true
);
}
public function bindMultipleKeys(array $routingKeys)
{
foreach ($routingKeys as $key) {
$this->channel->queue_bind(
$this->queueName,
$this->exchangeName,
$key
);
echo "已绑定路由键: {$key}\n";
}
}
public function consume()
{
$callback = function (AMQPMessage $message) {
$routingKey = $message->delivery_info['routing_key'];
$body = json_decode($message->body, true);
echo sprintf(
"[%s] 路由键: %s, 消息: %s\n",
date('H:i:s'),
$routingKey,
json_encode($body, JSON_UNESCAPED_UNICODE)
);
$message->ack();
};
$this->channel->basic_consume(
$this->queueName,
'',
false,
false,
false,
false,
$callback
);
while ($this->channel->is_open()) {
$this->channel->wait();
}
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}Headers 交换器路由
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class HeadersRoutingPublisher
{
private $connection;
private $channel;
private $exchangeName = 'headers_exchange';
public function __construct()
{
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
$this->channel->exchange_declare(
$this->exchangeName,
'headers',
false,
true,
false
);
}
public function publish($message, array $headers)
{
$msg = new AMQPMessage(
json_encode($message),
[
'content_type' => 'application/json',
'application_headers' => new AMQPTable($headers)
]
);
$this->channel->basic_publish(
$msg,
$this->exchangeName
);
echo "消息已发送,头部: " . json_encode($headers) . "\n";
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}
class HeadersRoutingConsumer
{
private $connection;
private $channel;
private $exchangeName = 'headers_exchange';
private $queueName;
public function __construct()
{
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest'
);
$this->channel = $this->connection->channel();
$this->channel->exchange_declare(
$this->exchangeName,
'headers',
false,
true,
false
);
list($this->queueName, ,) = $this->channel->queue_declare(
'',
false,
false,
true,
true
);
}
public function bindWithHeaders(array $headers, $xMatch = 'all')
{
$bindHeaders = array_merge($headers, ['x-match' => $xMatch]);
$this->channel->queue_bind(
$this->queueName,
$this->exchangeName,
'',
false,
new AMQPTable($bindHeaders)
);
echo "已绑定头部条件: " . json_encode($headers) . " (x-match: {$xMatch})\n";
}
public function consume()
{
$callback = function (AMQPMessage $message) {
$body = json_decode($message->body, true);
$headers = $message->get('application_headers');
echo sprintf(
"收到消息: %s\n头部: %s\n",
json_encode($body, JSON_UNESCAPED_UNICODE),
json_encode($headers ? $headers->getNativeData() : [])
);
$message->ack();
};
$this->channel->basic_consume(
$this->queueName,
'',
false,
false,
false,
false,
$callback
);
while ($this->channel->is_open()) {
$this->channel->wait();
}
}
public function close()
{
$this->channel->close();
$this->connection->close();
}
}实际应用场景
1. 订单状态路由
php
<?php
class OrderStatusRouter
{
private $publisher;
private $exchangeName = 'order_status_exchange';
const STATUS_CREATED = 'order.created';
const STATUS_PAID = 'order.paid';
const STATUS_SHIPPED = 'order.shipped';
const STATUS_DELIVERED = 'order.delivered';
const STATUS_CANCELLED = 'order.cancelled';
public function __construct()
{
$this->publisher = new DirectRoutingPublisher();
}
public function publishStatusChange($orderId, $status, array $data = [])
{
$message = [
'order_id' => $orderId,
'status' => $status,
'data' => $data,
'timestamp' => time()
];
$this->publisher->publish($status, $message);
}
public function orderCreated($orderId, $orderData)
{
$this->publishStatusChange($orderId, self::STATUS_CREATED, $orderData);
}
public function orderPaid($orderId, $paymentData)
{
$this->publishStatusChange($orderId, self::STATUS_PAID, $paymentData);
}
public function orderShipped($orderId, $shippingData)
{
$this->publishStatusChange($orderId, self::STATUS_SHIPPED, $shippingData);
}
public function orderDelivered($orderId, $deliveryData)
{
$this->publishStatusChange($orderId, self::STATUS_DELIVERED, $deliveryData);
}
public function orderCancelled($orderId, $reason)
{
$this->publishStatusChange($orderId, self::STATUS_CANCELLED, ['reason' => $reason]);
}
}
class NotificationService
{
public function __construct()
{
$this->consumer = new DirectRoutingConsumer([
OrderStatusRouter::STATUS_CREATED,
OrderStatusRouter::STATUS_PAID,
OrderStatusRouter::STATUS_SHIPPED,
OrderStatusRouter::STATUS_DELIVERED
]);
}
public function start()
{
$this->consumer->consume();
}
}
class InventoryService
{
public function __construct()
{
$this->consumer = new DirectRoutingConsumer([
OrderStatusRouter::STATUS_CREATED,
OrderStatusRouter::STATUS_CANCELLED
]);
}
public function start()
{
$this->consumer->consume();
}
}2. 用户事件路由
php
<?php
class UserEventRouter
{
private $publisher;
const EVENT_REGISTERED = 'user.registered';
const EVENT_LOGIN = 'user.login';
const EVENT_LOGOUT = 'user.logout';
const EVENT_PROFILE_UPDATED = 'user.profile_updated';
const EVENT_PASSWORD_CHANGED = 'user.password_changed';
const EVENT_DELETED = 'user.deleted';
public function __construct()
{
$this->publisher = new DirectRoutingPublisher();
}
public function userRegistered($userId, $userData)
{
$this->publisher->publish(self::EVENT_REGISTERED, [
'user_id' => $userId,
'user_data' => $userData,
'timestamp' => time()
]);
}
public function userLogin($userId, $loginData)
{
$this->publisher->publish(self::EVENT_LOGIN, [
'user_id' => $userId,
'ip' => $loginData['ip'] ?? null,
'user_agent' => $loginData['user_agent'] ?? null,
'timestamp' => time()
]);
}
public function userLogout($userId)
{
$this->publisher->publish(self::EVENT_LOGOUT, [
'user_id' => $userId,
'timestamp' => time()
]);
}
public function profileUpdated($userId, $changes)
{
$this->publisher->publish(self::EVENT_PROFILE_UPDATED, [
'user_id' => $userId,
'changes' => $changes,
'timestamp' => time()
]);
}
}3. 系统告警路由
php
<?php
class AlertRouter
{
private $publisher;
const ALERT_CPU = 'alert.cpu';
const ALERT_MEMORY = 'alert.memory';
const ALERT_DISK = 'alert.disk';
const ALERT_NETWORK = 'alert.network';
const ALERT_SERVICE = 'alert.service';
public function __construct()
{
$this->publisher = new DirectRoutingPublisher();
}
public function cpuAlert($serverId, $usage, $threshold = 80)
{
$this->publisher->publish(self::ALERT_CPU, [
'server_id' => $serverId,
'usage' => $usage,
'threshold' => $threshold,
'severity' => $this->calculateSeverity($usage, $threshold),
'timestamp' => time()
]);
}
public function memoryAlert($serverId, $usage, $threshold = 80)
{
$this->publisher->publish(self::ALERT_MEMORY, [
'server_id' => $serverId,
'usage' => $usage,
'threshold' => $threshold,
'severity' => $this->calculateSeverity($usage, $threshold),
'timestamp' => time()
]);
}
public function diskAlert($serverId, $path, $usage, $threshold = 80)
{
$this->publisher->publish(self::ALERT_DISK, [
'server_id' => $serverId,
'path' => $path,
'usage' => $usage,
'threshold' => $threshold,
'severity' => $this->calculateSeverity($usage, $threshold),
'timestamp' => time()
]);
}
public function serviceAlert($serverId, $serviceName, $status, $message)
{
$this->publisher->publish(self::ALERT_SERVICE, [
'server_id' => $serverId,
'service' => $serviceName,
'status' => $status,
'message' => $message,
'severity' => $status === 'down' ? 'critical' : 'warning',
'timestamp' => time()
]);
}
private function calculateSeverity($value, $threshold)
{
$ratio = $value / $threshold;
if ($ratio >= 1.2) {
return 'critical';
} elseif ($ratio >= 1.0) {
return 'warning';
} else {
return 'info';
}
}
}常见问题与解决方案
问题 1:路由键命名冲突
原因:不同业务使用相同路由键
解决方案:
php
<?php
class RoutingKeyBuilder
{
private $prefix;
public function __construct($prefix = '')
{
$this->prefix = $prefix;
}
public function build($domain, $entity, $action)
{
$parts = [];
if ($this->prefix) {
$parts[] = $this->prefix;
}
$parts[] = $domain;
$parts[] = $entity;
$parts[] = $action;
return implode('.', $parts);
}
public function parse($routingKey)
{
$parts = explode('.', $routingKey);
$offset = $this->prefix ? 1 : 0;
return [
'domain' => $parts[$offset] ?? null,
'entity' => $parts[$offset + 1] ?? null,
'action' => $parts[$offset + 2] ?? null
];
}
}
$builder = new RoutingKeyBuilder('app');
$key = $builder->build('order', 'payment', 'completed');
// 结果: app.order.payment.completed问题 2:消息路由到多个队列的性能问题
原因:队列绑定过多
解决方案:
php
<?php
class OptimizedRouter
{
private $bindings = [];
private $maxBindingsPerQueue = 10;
public function bind($queueName, $routingKey)
{
if (!isset($this->bindings[$queueName])) {
$this->bindings[$queueName] = [];
}
if (count($this->bindings[$queueName]) >= $this->maxBindingsPerQueue) {
throw new RuntimeException("队列 {$queueName} 绑定数量已达上限");
}
$this->bindings[$queueName][] = $routingKey;
}
public function getOptimalQueue($routingKey)
{
foreach ($this->bindings as $queueName => $keys) {
if (in_array($routingKey, $keys)) {
return $queueName;
}
}
return null;
}
public function suggestConsolidation()
{
$suggestions = [];
foreach ($this->bindings as $queueName => $keys) {
if (count($keys) > 5) {
$suggestions[$queueName] = [
'binding_count' => count($keys),
'suggestion' => '考虑使用 Topic 交换器替代多个 Direct 绑定'
];
}
}
return $suggestions;
}
}问题 3:路由键变更导致消息丢失
解决方案:
php
<?php
class VersionedRouter
{
private $currentVersion = 'v1';
private $supportedVersions = ['v1', 'v2'];
public function publish($routingKey, $message)
{
$versionedKey = "{$this->currentVersion}.{$routingKey}";
$this->publisher->publish($versionedKey, [
'version' => $this->currentVersion,
'payload' => $message
]);
}
public function bindConsumer($queueName, $routingKey, $version = null)
{
$version = $version ?? $this->currentVersion;
$versionedKey = "{$version}.{$routingKey}";
$this->channel->queue_bind($queueName, $this->exchangeName, $versionedKey);
}
public function migrateToVersion($newVersion)
{
if (!in_array($newVersion, $this->supportedVersions)) {
throw new InvalidArgumentException("不支持的版本: {$newVersion}");
}
$oldVersion = $this->currentVersion;
$this->currentVersion = $newVersion;
echo "路由版本已从 {$oldVersion} 迁移到 {$newVersion}\n";
}
}最佳实践建议
1. 路由键命名规范
php
<?php
class RoutingKeyConvention
{
const PATTERN = '/^([a-z]+\.)+[a-z]+$/';
public static function validate($routingKey)
{
if (!preg_match(self::PATTERN, $routingKey)) {
throw new InvalidArgumentException(
"路由键格式无效: {$routingKey},应使用点分隔的小写单词"
);
}
return true;
}
public static function buildDomainKey($domain, $entity, $action)
{
$key = "{$domain}.{$entity}.{$action}";
self::validate($key);
return $key;
}
public static function buildTenantKey($tenantId, $domain, $entity, $action)
{
$key = "tenant.{$tenantId}.{$domain}.{$entity}.{$action}";
self::validate($key);
return $key;
}
}2. 路由监控与追踪
php
<?php
class RoutingMonitor
{
private $stats = [];
public function recordPublish($routingKey, $success = true)
{
$this->initStats($routingKey);
$this->stats[$routingKey]['published']++;
$this->stats[$routingKey]['last_publish'] = time();
if (!$success) {
$this->stats[$routingKey]['publish_errors']++;
}
}
public function recordConsume($routingKey, $success = true)
{
$this->initStats($routingKey);
$this->stats[$routingKey]['consumed']++;
$this->stats[$routingKey]['last_consume'] = time();
if (!$success) {
$this->stats[$routingKey]['consume_errors']++;
}
}
private function initStats($routingKey)
{
if (!isset($this->stats[$routingKey])) {
$this->stats[$routingKey] = [
'published' => 0,
'consumed' => 0,
'publish_errors' => 0,
'consume_errors' => 0,
'last_publish' => null,
'last_consume' => null
];
}
}
public function getStats()
{
return $this->stats;
}
public function getUnconsumedKeys()
{
$unconsumed = [];
foreach ($this->stats as $key => $data) {
if ($data['published'] > 0 && $data['consumed'] === 0) {
$unconsumed[$key] = $data;
}
}
return $unconsumed;
}
}3. 动态路由配置
php
<?php
class DynamicRouter
{
private $config;
private $routes = [];
public function __construct(array $config)
{
$this->config = $config;
$this->loadRoutes();
}
private function loadRoutes()
{
foreach ($this->config as $routeName => $routeConfig) {
$this->routes[$routeName] = [
'exchange' => $routeConfig['exchange'],
'routing_key' => $routeConfig['routing_key'],
'queue' => $routeConfig['queue'] ?? null
];
}
}
public function publish($routeName, $message)
{
if (!isset($this->routes[$routeName])) {
throw new RuntimeException("未定义的路由: {$routeName}");
}
$route = $this->routes[$routeName];
return $this->doPublish(
$route['exchange'],
$route['routing_key'],
$message
);
}
private function doPublish($exchange, $routingKey, $message)
{
// 实际发布逻辑
}
public function addRoute($name, $exchange, $routingKey, $queue = null)
{
$this->routes[$name] = [
'exchange' => $exchange,
'routing_key' => $routingKey,
'queue' => $queue
];
}
public function removeRoute($name)
{
unset($this->routes[$name]);
}
}