Appearance
RabbitMQ 路由键(Routing Key)
概述
路由键(Routing Key)是 RabbitMQ 消息路由的核心概念。它是消息从 Exchange 路由到 Queue 的关键依据。生产者在发送消息时指定 Routing Key,Exchange 根据 Routing Key 和绑定规则决定将消息投递到哪些队列。理解 Routing Key 的工作机制对于设计高效的消息路由系统至关重要。
路由键的核心作用
mermaid
graph TB
subgraph 生产者
P[Producer]
end
subgraph Exchange
E[Exchange]
E -->|routing_key: order.created| B1[Binding 1]
E -->|routing_key: order.updated| B2[Binding 2]
E -->|routing_key: order.deleted| B3[Binding 3]
end
subgraph Queues
Q1[Order Created Queue]
Q2[Order Updated Queue]
Q3[Order Deleted Queue]
end
P -->|order.created| E
B1 --> Q1
B2 --> Q2
B3 --> Q3路由键的主要作用:
- 消息路由:决定消息的目标队列
- 分类标识:对消息进行分类
- 过滤机制:实现消息的精确或模糊匹配
- 解耦设计:生产者无需知道具体队列
核心知识点
1. 路由键格式
Routing Key 是一个字符串,通常使用点号分隔的单词组成:
mermaid
graph LR
A[Routing Key] --> B[domain]
B --> C[.]
C --> D[entity]
D --> E[.]
E --> F[action]
style C fill:#f9f,stroke:#333
style E fill:#f9f,stroke:#333命名规范:
<domain>.<entity>.<action>
<service>.<event>.<status>
<level>.<source>.<type>示例:
order.created.success
order.payment.failed
user.login.attempt
log.error.database
notification.email.sent2. 路由键与交换机类型
不同类型的交换机对 Routing Key 的处理方式不同:
mermaid
graph TB
subgraph Direct Exchange
D1[Routing Key] --> D2[精确匹配]
D2 --> D3[Binding Key]
end
subgraph Fanout Exchange
F1[Routing Key] --> F2[忽略]
F2 --> F3[广播到所有队列]
end
subgraph Topic Exchange
T1[Routing Key] --> T2[模式匹配]
T2 --> T3[Binding Pattern]
end
subgraph Headers Exchange
H1[Routing Key] --> H2[忽略]
H2 --> H3[使用消息头匹配]
end3. Topic Exchange 通配符
Topic Exchange 支持两种通配符:
mermaid
graph TB
A[通配符] --> B["* (星号)"]
A --> C["# (井号)"]
B --> B1[匹配一个单词]
B --> B2[不能跨越点号]
C --> C1[匹配零个或多个单词]
C --> C2[可以跨越点号]| 通配符 | 含义 | 示例 |
|---|---|---|
* | 匹配一个单词 | order.* 匹配 order.created 但不匹配 order.item.created |
# | 匹配零个或多个单词 | order.# 匹配 order.created 和 order.item.created |
匹配示例:
| Binding Pattern | Routing Key | 匹配结果 |
|---|---|---|
order.* | order.created | ✅ 匹配 |
order.* | order.updated | ✅ 匹配 |
order.* | order.item.created | ❌ 不匹配 |
order.# | order.created | ✅ 匹配 |
order.# | order.item.created | ✅ 匹配 |
*.error | system.error | ✅ 匹配 |
*.error | app.error | ✅ 匹配 |
#.error | error | ✅ 匹配 |
#.error | system.db.error | ✅ 匹配 |
# | 任意 | ✅ 匹配所有 |
4. 路由键命名最佳实践
mermaid
graph TB
A[命名最佳实践] --> B[使用点号分隔]
A --> C[保持一致性]
A --> D[语义清晰]
A --> E[避免过长]
B --> B1[order.created.success]
C --> C1[统一格式: domain.entity.action]
D --> D1[见名知意]
E --> E1[建议不超过255字节]代码示例
基础路由键使用
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
class RoutingKeyExample
{
private $connection;
private $channel;
public function __construct(array $config = [])
{
$this->connection = new AMQPStreamConnection(
$config['host'] ?? 'localhost',
$config['port'] ?? 5672,
$config['user'] ?? 'guest',
$config['password'] ?? 'guest',
$config['vhost'] ?? '/'
);
$this->channel = $this->connection->channel();
}
public function setup(): void
{
$this->channel->exchange_declare(
'routing_example',
AMQPExchangeType::DIRECT,
false,
true,
false
);
$queues = [
'orders_created' => 'order.created',
'orders_updated' => 'order.updated',
'orders_deleted' => 'order.deleted'
];
foreach ($queues as $queue => $routingKey) {
$this->channel->queue_declare($queue, false, true, false, false);
$this->channel->queue_bind($queue, 'routing_example', $routingKey);
}
echo "Setup completed\n";
}
public function publish(string $routingKey, array $data): void
{
$message = new AMQPMessage(json_encode($data), [
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);
$this->channel->basic_publish($message, 'routing_example', $routingKey);
echo "Message published with routing key: {$routingKey}\n";
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}
$example = new RoutingKeyExample();
$example->setup();
$example->publish('order.created', ['order_id' => 1001, 'status' => 'pending']);
$example->publish('order.updated', ['order_id' => 1001, 'status' => 'paid']);
$example->publish('order.deleted', ['order_id' => 1001, 'reason' => 'cancelled']);
$example->close();Topic Exchange 路由键模式
php
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
class TopicRoutingKeyExample
{
private $connection;
private $channel;
public function __construct(array $config = [])
{
$this->connection = new AMQPStreamConnection(
$config['host'] ?? 'localhost',
$config['port'] ?? 5672,
$config['user'] ?? 'guest',
$config['password'] ?? 'guest',
$config['vhost'] ?? '/'
);
$this->channel = $this->connection->channel();
}
public function setup(): void
{
$this->channel->exchange_declare(
'topic_events',
AMQPExchangeType::TOPIC,
false,
true,
false
);
$bindings = [
'all_orders' => 'order.#',
'order_created' => 'order.created',
'order_status' => 'order.*.status',
'all_errors' => '#.error',
'system_errors' => 'system.#.error',
'all_events' => '#'
];
foreach ($bindings as $queue => $pattern) {
$this->channel->queue_declare($queue, false, true, false, false);
$this->channel->queue_bind($queue, 'topic_events', $pattern);
echo "Queue {$queue} bound with pattern: {$pattern}\n";
}
}
public function publish(string $routingKey, array $data): void
{
$message = new AMQPMessage(json_encode($data), [
'content_type' => 'application/json'
]);
$this->channel->basic_publish($message, 'topic_events', $routingKey);
echo "Published with routing key: {$routingKey}\n";
}
public function testRouting(): void
{
$testCases = [
'order.created' => ['all_orders', 'order_created', 'all_events'],
'order.paid.status' => ['all_orders', 'order_status', 'all_events'],
'system.database.error' => ['all_errors', 'system_errors', 'all_events'],
'app.error' => ['all_errors', 'all_events'],
'user.login' => ['all_events']
];
foreach ($testCases as $routingKey => $expectedQueues) {
echo "\nRouting key: {$routingKey}\n";
echo "Expected to route to: " . implode(', ', $expectedQueues) . "\n";
}
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}
$example = new TopicRoutingKeyExample();
$example->setup();
$example->publish('order.created', ['order_id' => 1001]);
$example->publish('order.paid.status', ['order_id' => 1001]);
$example->publish('system.database.error', ['error' => 'Connection failed']);
$example->testRouting();
$example->close();路由键生成器
php
<?php
class RoutingKeyGenerator
{
private string $domain;
private string $separator = '.';
public function __construct(string $domain = '')
{
$this->domain = $domain;
}
public function generate(string ...$parts): string
{
$allParts = [];
if ($this->domain) {
$allParts[] = $this->domain;
}
foreach ($parts as $part) {
$allParts[] = $this->this->normalize($part);
}
return implode($this->separator, $allParts);
}
public function forEntity(string $entity, string $action, ?string $status = null): string
{
$parts = [$entity, $action];
if ($status) {
$parts[] = $status;
}
return $this->generate(...$parts);
}
public function forEvent(string $event, string $status = 'success'): string
{
return $this->generate($event, $status);
}
public function forLog(string $level, string $source, ?string $type = null): string
{
$parts = [$level, $source];
if ($type) {
$parts[] = $type;
}
return $this->generate(...$parts);
}
private function normalize(string $part): string
{
return strtolower(preg_replace('/[^a-zA-Z0-9_]/', '_', $part));
}
public function parse(string $routingKey): array
{
return explode($this->separator, $routingKey);
}
public function setSeparator(string $separator): self
{
$this->separator = $separator;
return $this;
}
}
$generator = new RoutingKeyGenerator('myapp');
echo $generator->forEntity('order', 'created') . "\n";
echo $generator->forEntity('order', 'payment', 'completed') . "\n";
echo $generator->forEvent('user.login', 'success') . "\n";
echo $generator->forLog('error', 'database', 'connection') . "\n";路由键匹配器
php
<?php
class RoutingKeyMatcher
{
public function match(string $pattern, string $routingKey): bool
{
$patternParts = explode('.', $pattern);
$keyParts = explode('.', $routingKey);
return $this->matchParts($patternParts, $keyParts);
}
private function matchParts(array $pattern, array $key): bool
{
if (empty($pattern) && empty($key)) {
return true;
}
if (empty($pattern)) {
return false;
}
$currentPattern = array_shift($pattern);
if ($currentPattern === '#') {
if (empty($pattern)) {
return true;
}
$nextPattern = $pattern[0];
for ($i = 0; $i <= count($key); $i++) {
$remainingKey = array_slice($key, $i);
if ($this->matchParts($pattern, $remainingKey)) {
return true;
}
}
return false;
}
if (empty($key)) {
return false;
}
$currentKey = array_shift($key);
if ($currentPattern === '*' || $currentPattern === $currentKey) {
return $this->matchParts($pattern, $key);
}
return false;
}
public function findMatchingQueues(string $routingKey, array $bindings): array
{
$matched = [];
foreach ($bindings as $queue => $pattern) {
if ($this->match($pattern, $routingKey)) {
$matched[] = $queue;
}
}
return $matched;
}
public function validatePattern(string $pattern): bool
{
if (empty($pattern)) {
return true;
}
$parts = explode('.', $pattern);
foreach ($parts as $part) {
if ($part === '#' && count($parts) > 1) {
$hashIndex = array_search('#', $parts);
if ($hashIndex !== count($parts) - 1) {
return false;
}
}
}
return true;
}
}
$matcher = new RoutingKeyMatcher();
$testCases = [
['order.*', 'order.created', true],
['order.*', 'order.item.created', false],
['order.#', 'order.created', true],
['order.#', 'order.item.created', true],
['*.error', 'system.error', true],
['#.error', 'system.db.error', true],
['#', 'anything.here', true],
];
foreach ($testCases as $test) {
$result = $matcher->match($test[0], $test[1]);
$status = $result === $test[2] ? '✓' : '✗';
echo "{$status} Pattern: {$test[0]}, Key: {$test[1]}, Expected: " . ($test[2] ? 'match' : 'no match') . "\n";
}路由键验证器
php
<?php
class RoutingKeyValidator
{
private int $maxLength = 255;
private array $allowedCharacters = '/^[a-zA-Z0-9_.*#-]+$/';
public function validate(string $routingKey): array
{
$errors = [];
if (strlen($routingKey) > $this->maxLength) {
$errors[] = "Routing key exceeds maximum length of {$this->maxLength} characters";
}
if (!preg_match($this->allowedCharacters, $routingKey)) {
$errors[] = "Routing key contains invalid characters";
}
$parts = explode('.', $routingKey);
foreach ($parts as $index => $part) {
if (empty($part) && count($parts) > 1) {
$errors[] = "Empty part found at position {$index}";
}
if (strpos($part, '*') !== false && strlen($part) > 1) {
$errors[] = "Wildcard * must be alone in part at position {$index}";
}
if (strpos($part, '#') !== false && strlen($part) > 1) {
$errors[] = "Wildcard # must be alone in part at position {$index}";
}
}
return [
'valid' => empty($errors),
'errors' => $errors
];
}
public function validatePattern(string $pattern): array
{
$errors = [];
$parts = explode('.', $pattern);
$hashCount = 0;
foreach ($parts as $index => $part) {
if ($part === '#') {
$hashCount++;
if ($index !== count($parts) - 1) {
$errors[] = "Wildcard # can only appear at the end of pattern";
}
}
}
if ($hashCount > 1) {
$errors[] = "Wildcard # can only appear once in pattern";
}
return [
'valid' => empty($errors),
'errors' => $errors
];
}
public function sanitize(string $routingKey): string
{
$sanitized = preg_replace('/[^a-zA-Z0-9._*-]/', '_', $routingKey);
$sanitized = preg_replace('/\.+/', '.', $sanitized);
$sanitized = trim($sanitized, '.');
return $sanitized;
}
}
$validator = new RoutingKeyValidator();
$testKeys = [
'order.created',
'order..created',
'order.$created',
str_repeat('a', 300),
];
foreach ($testKeys as $key) {
$result = $validator->validate($key);
echo "Key: {$key}\n";
echo "Valid: " . ($result['valid'] ? 'Yes' : 'No') . "\n";
if (!$result['valid']) {
echo "Errors: " . implode(', ', $result['errors']) . "\n";
}
echo "\n";
}实际应用场景
1. 事件驱动架构
php
<?php
class EventRoutingKeySystem
{
private $channel;
private $generator;
public function __construct(AMQPStreamConnection $connection)
{
$this->channel = $connection->channel();
$this->generator = new RoutingKeyGenerator('events');
}
public function setup(): void
{
$this->channel->exchange_declare('domain_events', 'topic', false, true, false);
$subscriptions = [
'inventory_service' => ['order.#', 'product.#'],
'payment_service' => ['order.created', 'order.paid', 'payment.#'],
'notification_service' => ['user.#', 'order.#'],
'analytics_service' => ['#']
];
foreach ($subscriptions as $service => $patterns) {
$queue = "{$service}_events";
$this->channel->queue_declare($queue, false, true, false, false);
foreach ($patterns as $pattern) {
$this->channel->queue_bind($queue, 'domain_events', $pattern);
}
}
}
public function emitOrderCreated(int $orderId, array $orderData): void
{
$routingKey = $this->generator->forEntity('order', 'created');
$this->publish($routingKey, $orderData);
}
public function emitOrderPaid(int $orderId, float $amount): void
{
$routingKey = $this->generator->forEntity('order', 'paid');
$this->publish($routingKey, ['order_id' => $orderId, 'amount' => $amount]);
}
public function emitUserLoggedIn(int $userId): void
{
$routingKey = $this->generator->forEntity('user', 'login', 'success');
$this->publish($routingKey, ['user_id' => $userId]);
}
private function publish(string $routingKey, array $data): void
{
$message = new AMQPMessage(json_encode([
'event' => $routingKey,
'data' => $data,
'timestamp' => time()
]), ['content_type' => 'application/json']);
$this->channel->basic_publish($message, 'domain_events', $routingKey);
}
}2. 日志级别路由
php
<?php
class LogRoutingKeySystem
{
private $channel;
public function __construct(AMQPStreamConnection $connection)
{
$this->channel = $connection->channel();
}
public function setup(): void
{
$this->channel->exchange_declare('logs', 'topic', false, true, false);
$bindings = [
'all_logs' => '#',
'error_logs' => '#.error',
'critical_logs' => '#.critical',
'app_logs' => 'app.#',
'system_logs' => 'system.#',
'database_logs' => '#.database.#'
];
foreach ($bindings as $queue => $pattern) {
$this->channel->queue_declare($queue, false, true, false, false);
$this->channel->queue_bind($queue, 'logs', $pattern);
}
}
public function log(string $level, string $source, string $message, array $context = []): void
{
$routingKey = "{$source}.{$level}";
$logMessage = new AMQPMessage(json_encode([
'level' => $level,
'source' => $source,
'message' => $message,
'context' => $context,
'timestamp' => date('Y-m-d H:i:s')
]), ['content_type' => 'application/json']);
$this->channel->basic_publish($logMessage, 'logs', $routingKey);
}
public function emergency(string $source, string $message, array $context = []): void
{
$this->log('emergency', $source, $message, $context);
}
public function error(string $source, string $message, array $context = []): void
{
$this->log('error', $source, $message, $context);
}
public function warning(string $source, string $message, array $context = []): void
{
$this->log('warning', $source, $message, $context);
}
public function info(string $source, string $message, array $context = []): void
{
$this->log('info', $source, $message, $context);
}
}3. 多租户路由
php
<?php
class MultiTenantRoutingKey
{
private $channel;
public function __construct(AMQPStreamConnection $connection)
{
$this->channel = $connection->channel();
}
public function setupTenant(string $tenantId): void
{
$exchange = "tenant_{$tenantId}";
$this->channel->exchange_declare($exchange, 'topic', false, true, false);
$queues = [
"{$tenantId}_orders" => 'order.#',
"{$tenantId}_users" => 'user.#',
"{$tenantId}_notifications" => 'notification.#'
];
foreach ($queues as $queue => $pattern) {
$this->channel->queue_declare($queue, false, true, false, false);
$this->channel->queue_bind($queue, $exchange, $pattern);
}
}
public function publish(string $tenantId, string $entity, string $action, array $data): void
{
$exchange = "tenant_{$tenantId}";
$routingKey = "{$entity}.{$action}";
$message = new AMQPMessage(json_encode([
'tenant_id' => $tenantId,
'entity' => $entity,
'action' => $action,
'data' => $data,
'timestamp' => time()
]), ['content_type' => 'application/json']);
$this->channel->basic_publish($message, $exchange, $routingKey);
}
public function generateTenantRoutingKey(string $tenantId, string $entity, string $action): string
{
return "{$tenantId}.{$entity}.{$action}";
}
}常见问题与解决方案
1. 路由键格式错误
问题原因:
- 包含非法字符
- 格式不一致
- 过长
解决方案:
php
<?php
class SafeRoutingKeyGenerator
{
public static function create(string ...$parts): string
{
$sanitizedParts = [];
foreach ($parts as $part) {
$sanitized = strtolower(trim($part));
$sanitized = preg_replace('/[^a-z0-9_]/', '_', $sanitized);
$sanitized = preg_replace('/_+/', '_', $sanitized);
$sanitized = trim($sanitized, '_');
if (!empty($sanitized)) {
$sanitizedParts[] = $sanitized;
}
}
return implode('.', $sanitizedParts);
}
public static function fromArray(array $data): string
{
return self::create(...$data);
}
}2. 路由键匹配失败
问题原因:
- 模式配置错误
- 大小写敏感
- 通配符使用不当
解决方案:
php
<?php
class RoutingKeyDebugger
{
public function debug(string $routingKey, array $bindings): array
{
$matcher = new RoutingKeyMatcher();
$results = [
'routing_key' => $routingKey,
'parts' => explode('.', $routingKey),
'matched_queues' => [],
'unmatched_bindings' => []
];
foreach ($bindings as $queue => $pattern) {
if ($matcher->match($pattern, $routingKey)) {
$results['matched_queues'][] = [
'queue' => $queue,
'pattern' => $pattern
];
} else {
$results['unmatched_bindings'][] = [
'queue' => $queue,
'pattern' => $pattern,
'reason' => $this->explainMismatch($pattern, $routingKey)
];
}
}
return $results;
}
private function explainMismatch(string $pattern, string $routingKey): string
{
$patternParts = explode('.', $pattern);
$keyParts = explode('.', $routingKey);
if (count($patternParts) !== count($keyParts) &&
!in_array('#', $patternParts) &&
!in_array('*', $patternParts)) {
return "Length mismatch: pattern has " . count($patternParts) .
" parts, key has " . count($keyParts) . " parts";
}
foreach ($patternParts as $index => $part) {
if ($part !== '*' && $part !== '#' &&
isset($keyParts[$index]) && $part !== $keyParts[$index]) {
return "Part {$index} mismatch: pattern='{$part}', key='{$keyParts[$index]}'";
}
}
return "Unknown reason";
}
}3. 路由键冲突
问题原因:
- 多个模式匹配同一个键
- 路由规则重叠
解决方案:
php
<?php
class RoutingKeyConflictResolver
{
public function findConflicts(array $bindings): array
{
$conflicts = [];
$patterns = array_keys($bindings);
for ($i = 0; $i < count($patterns); $i++) {
for ($j = $i + 1; $j < count($patterns); $j++) {
$pattern1 = $patterns[$i];
$pattern2 = $patterns[$j];
$overlap = $this->findPatternOverlap($pattern1, $pattern2);
if (!empty($overlap)) {
$conflicts[] = [
'patterns' => [$pattern1, $pattern2],
'queues' => [$bindings[$pattern1], $bindings[$pattern2]],
'overlapping_examples' => $overlap
];
}
}
}
return $conflicts;
}
private function findPatternOverlap(string $pattern1, string $pattern2): array
{
$examples = [];
$testKeys = $this->generateTestKeys($pattern1);
$matcher = new RoutingKeyMatcher();
foreach ($testKeys as $key) {
if ($matcher->match($pattern1, $key) && $matcher->match($pattern2, $key)) {
$examples[] = $key;
if (count($examples) >= 3) {
break;
}
}
}
return $examples;
}
private function generateTestKeys(string $pattern): array
{
$keys = [];
$parts = explode('.', $pattern);
$key = [];
foreach ($parts as $part) {
if ($part === '*' || $part === '#') {
$key[] = 'test';
} else {
$key[] = $part;
}
}
$keys[] = implode('.', $key);
return $keys;
}
}最佳实践建议
1. 路由键命名规范
php
<?php
class RoutingKeyStandards
{
const DOMAIN = 'myapp';
const SEPARATOR = '.';
public static function orderEvent(string $action, ?string $status = null): string
{
return self::build('order', $action, $status);
}
public static function userEvent(string $action, ?string $status = null): string
{
return self::build('user', $action, $status);
}
public static function paymentEvent(string $action, ?string $status = null): string
{
return self::build('payment', $action, $status);
}
public static function logEvent(string $level, string $source): string
{
return self::build($source, $level);
}
private static function build(string ...$parts): string
{
$nonEmpty = array_filter($parts, fn($p) => !empty($p));
return implode(self::SEPARATOR, $nonEmpty);
}
}2. 路由键文档化
php
<?php
class RoutingKeyDocumentation
{
public static function generate(array $routingKeys): string
{
$doc = "# Routing Keys Documentation\n\n";
foreach ($routingKeys as $key => $description) {
$doc .= "## `{$key}`\n";
$doc .= "{$description['description']}\n\n";
$doc .= "- **Pattern**: `{$description['pattern']}`\n";
$doc .= "- **Example**: `{$description['example']}`\n";
$doc .= "- **Target Queues**: " . implode(', ', $description['queues']) . "\n\n";
}
return $doc;
}
}3. 路由键测试
php
<?php
class RoutingKeyTester
{
private $matcher;
public function __construct()
{
$this->matcher = new RoutingKeyMatcher();
}
public function runTests(array $testCases): array
{
$results = [
'passed' => 0,
'failed' => 0,
'details' => []
];
foreach ($testCases as $test) {
$actual = $this->matcher->findMatchingQueues(
$test['routing_key'],
$test['bindings']
);
$passed = $this->arraysEqual($actual, $test['expected_queues']);
if ($passed) {
$results['passed']++;
} else {
$results['failed']++;
}
$results['details'][] = [
'routing_key' => $test['routing_key'],
'expected' => $test['expected_queues'],
'actual' => $actual,
'passed' => $passed
];
}
return $results;
}
private function arraysEqual(array $a, array $b): bool
{
sort($a);
sort($b);
return $a === $b;
}
}