Appearance
RabbitMQ 交换机(Exchange)
概述
交换机(Exchange)是 RabbitMQ 消息路由的核心组件。生产者不直接将消息发送到队列,而是发送到交换机。交换机根据路由规则将消息分发到一个或多个队列。这种设计实现了生产者和队列之间的解耦,提供了灵活的消息路由能力。
交换机的核心作用
mermaid
graph TB
subgraph 生产者
P[Producer]
end
subgraph 交换机
E[Exchange]
E -->|路由规则| R1[Binding 1]
E -->|路由规则| R2[Binding 2]
E -->|路由规则| R3[Binding 3]
end
subgraph 队列
Q1[Queue 1]
Q2[Queue 2]
Q3[Queue 3]
end
P -->|发布消息| E
R1 --> Q1
R2 --> Q2
R3 --> Q3交换机的主要职责:
- 接收消息:接收生产者发送的消息
- 路由分发:根据路由规则将消息分发到队列
- 解耦生产者:生产者无需知道目标队列
- 灵活路由:支持多种路由策略
核心知识点
1. 交换机类型
RabbitMQ 支持四种交换机类型:
mermaid
graph TB
A[Exchange Types] --> B[Direct]
A --> C[Fanout]
A --> D[Topic]
A --> E[Headers]
B --> B1[精确匹配]
B --> B2[点对点路由]
C --> C1[广播]
C --> C2[发布订阅]
D --> D1[通配符匹配]
D --> D2[多主题订阅]
E --> E1[消息头匹配]
E --> E2[复杂条件]Direct Exchange(直连交换机)
mermaid
graph LR
subgraph Direct Exchange
E[Exchange]
E -->|routing_key = info| Q1[Queue Info]
E -->|routing_key = error| Q2[Queue Error]
E -->|routing_key = warning| Q3[Queue Warning]
end
P[Producer] -->|info| E
P -->|error| E特点:
- 精确匹配 Routing Key
- 最常用的交换机类型
- 适合点对点消息传递
路由规则:
消息 Routing Key = Binding Key → 消息路由到对应队列Fanout Exchange(扇出交换机)
mermaid
graph LR
subgraph Fanout Exchange
E[Exchange]
E --> Q1[Queue 1]
E --> Q2[Queue 2]
E --> Q3[Queue 3]
end
P[Producer] -->|忽略 Routing Key| E特点:
- 广播消息到所有绑定队列
- 忽略 Routing Key
- 适合发布订阅模式
路由规则:
所有绑定的队列都会收到消息Topic Exchange(主题交换机)
mermaid
graph LR
subgraph Topic Exchange
E[Exchange]
E -->|*.error| Q1[Error Queue]
E -->|order.#| Q2[Order Queue]
E -->|#.created| Q3[Created Queue]
end
P[Producer] -->|user.error| E
P -->|order.created| E特点:
- 支持通配符匹配
*匹配一个单词#匹配零个或多个单词- 适合多条件路由
路由规则:
user.error 匹配 *.error → Q1
order.created 匹配 order.# → Q2
order.created 匹配 #.created → Q3Headers Exchange(头交换机)
mermaid
graph LR
subgraph Headers Exchange
E[Exchange]
E -->|x-match=all| Q1[Queue 1]
E -->|x-match=any| Q2[Queue 2]
end
P[Producer] -->|headers| E特点:
- 基于消息头匹配
- 支持 x-match: all/any
- 性能较低,较少使用
路由规则:
x-match: all → 所有头都匹配
x-match: any → 任一头匹配2. 交换机属性
| 属性 | 说明 |
|---|---|
| Name | 交换机名称 |
| Type | 交换机类型 |
| Durable | 是否持久化 |
| Auto-delete | 最后一个绑定删除后是否自动删除 |
| Internal | 是否为内部交换机 |
| Arguments | 额外参数 |
3. 默认交换机
RabbitMQ 提供一个默认交换机:
mermaid
graph LR
P[Producer] -->|routing_key = queue_name| D[Default Exchange]
D -->|直接路由| Q[Queue]特点:
- 名称为空字符串
"" - 类型为 Direct
- 自动绑定到所有队列
- Routing Key 等于队列名称
4. 路由流程
mermaid
flowchart TD
A[消息到达 Exchange] --> B{查找绑定}
B --> C{匹配 Routing Key}
C -->|匹配成功| D[添加到队列]
C -->|无匹配| E{备用交换机?}
E -->|有| F[发送到备用交换机]
E -->|无| G[丢弃或返回]
D --> H[完成]
F --> H
G --> H代码示例
Direct Exchange 示例
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
class DirectExchangeExample
{
private $connection;
private $channel;
public function __construct(array $config = [])
{
$this->connection = new AMQPStreamConnection(
$config['host'] ?? 'localhost',
$config['port'] ?? 5672,
$config['user'] ?? 'guest',
$config['password'] ?? 'guest',
$config['vhost'] ?? '/'
);
$this->channel = $this->connection->channel();
}
public function setup(): void
{
$this->channel->exchange_declare(
'logs_direct',
AMQPExchangeType::DIRECT,
false,
true,
false
);
$this->channel->queue_declare('logs_info', false, true, false, false);
$this->channel->queue_declare('logs_error', false, true, false, false);
$this->channel->queue_declare('logs_warning', false, true, false, false);
$this->channel->queue_bind('logs_info', 'logs_direct', 'info');
$this->channel->queue_bind('logs_error', 'logs_direct', 'error');
$this->channel->queue_bind('logs_warning', 'logs_direct', 'warning');
echo "Direct exchange setup completed\n";
}
public function publishLog(string $level, string $message): void
{
$msg = new AMQPMessage(json_encode([
'level' => $level,
'message' => $message,
'timestamp' => date('Y-m-d H:i:s')
]), [
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);
$this->channel->basic_publish($msg, 'logs_direct', $level);
echo "Log published with level: {$level}\n";
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}
$example = new DirectExchangeExample();
$example->setup();
$example->publishLog('info', 'System started');
$example->publishLog('warning', 'Memory usage high');
$example->publishLog('error', 'Database connection failed');
$example->close();Fanout Exchange 示例
php
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
class FanoutExchangeExample
{
private $connection;
private $channel;
public function __construct(array $config = [])
{
$this->connection = new AMQPStreamConnection(
$config['host'] ?? 'localhost',
$config['port'] ?? 5672,
$config['user'] ?? 'guest',
$config['password'] ?? 'guest',
$config['vhost'] ?? '/'
);
$this->channel = $this->connection->channel();
}
public function setup(): void
{
$this->channel->exchange_declare(
'notifications_fanout',
AMQPExchangeType::FANOUT,
false,
true,
false
);
$this->channel->queue_declare('notifications_email', false, true, false, false);
$this->channel->queue_declare('notifications_sms', false, true, false, false);
$this->channel->queue_declare('notifications_push', false, true, false, false);
$this->channel->queue_bind('notifications_email', 'notifications_fanout');
$this->channel->queue_bind('notifications_sms', 'notifications_fanout');
$this->channel->queue_bind('notifications_push', 'notifications_fanout');
echo "Fanout exchange setup completed\n";
}
public function broadcast(string $message): void
{
$msg = new AMQPMessage(json_encode([
'message' => $message,
'timestamp' => date('Y-m-d H:i:s')
]), [
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);
$this->channel->basic_publish($msg, 'notifications_fanout');
echo "Message broadcasted to all queues\n";
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}
$example = new FanoutExchangeExample();
$example->setup();
$example->broadcast('New feature released!');
$example->close();Topic Exchange 示例
php
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
class TopicExchangeExample
{
private $connection;
private $channel;
public function __construct(array $config = [])
{
$this->connection = new AMQPStreamConnection(
$config['host'] ?? 'localhost',
$config['port'] ?? 5672,
$config['user'] ?? 'guest',
$config['password'] ?? 'guest',
$config['vhost'] ?? '/'
);
$this->channel = $this->connection->channel();
}
public function setup(): void
{
$this->channel->exchange_declare(
'events_topic',
AMQPExchangeType::TOPIC,
false,
true,
false
);
$this->channel->queue_declare('all_errors', false, true, false, false);
$this->channel->queue_declare('all_orders', false, true, false, false);
$this->channel->queue_declare('user_events', false, true, false, false);
$this->channel->queue_declare('all_events', false, true, false, false);
$this->channel->queue_bind('all_errors', 'events_topic', '#.error');
$this->channel->queue_bind('all_orders', 'events_topic', 'order.#');
$this->channel->queue_bind('user_events', 'events_topic', 'user.*');
$this->channel->queue_bind('all_events', 'events_topic', '#');
echo "Topic exchange setup completed\n";
}
public function publishEvent(string $routingKey, array $data): void
{
$msg = new AMQPMessage(json_encode([
'event' => $routingKey,
'data' => $data,
'timestamp' => date('Y-m-d H:i:s')
]), [
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);
$this->channel->basic_publish($msg, 'events_topic', $routingKey);
echo "Event published: {$routingKey}\n";
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}
$example = new TopicExchangeExample();
$example->setup();
$example->publishEvent('order.created', ['order_id' => 1001]);
$example->publishEvent('order.paid', ['order_id' => 1001]);
$example->publishEvent('user.login', ['user_id' => 5001]);
$example->publishEvent('user.logout', ['user_id' => 5001]);
$example->publishEvent('payment.error', ['error' => 'timeout']);
$example->close();Headers Exchange 示例
php
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;
class HeadersExchangeExample
{
private $connection;
private $channel;
public function __construct(array $config = [])
{
$this->connection = new AMQPStreamConnection(
$config['host'] ?? 'localhost',
$config['port'] ?? 5672,
$config['user'] ?? 'guest',
$config['password'] ?? 'guest',
$config['vhost'] ?? '/'
);
$this->channel = $this->connection->channel();
}
public function setup(): void
{
$this->channel->exchange_declare(
'headers_exchange',
AMQPExchangeType::HEADERS,
false,
true,
false
);
$this->channel->queue_declare('high_priority_queue', false, true, false, false);
$this->channel->queue_declare('marketing_queue', false, true, false, false);
$this->channel->queue_bind(
'high_priority_queue',
'headers_exchange',
'',
false,
new AMQPTable([
'x-match' => 'all',
'priority' => 'high',
'type' => 'order'
])
);
$this->channel->queue_bind(
'marketing_queue',
'headers_exchange',
'',
false,
new AMQPTable([
'x-match' => 'any',
'category' => 'marketing',
'source' => 'campaign'
])
);
echo "Headers exchange setup completed\n";
}
public function publishWithHeaders(array $data, array $headers): void
{
$msg = new AMQPMessage(json_encode($data), [
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'application_headers' => new AMQPTable($headers)
]);
$this->channel->basic_publish($msg, 'headers_exchange');
echo "Message published with headers: " . json_encode($headers) . "\n";
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}
$example = new HeadersExchangeExample();
$example->setup();
$example->publishWithHeaders(
['order_id' => 1001],
['priority' => 'high', 'type' => 'order']
);
$example->publishWithHeaders(
['campaign_id' => 'summer2024'],
['category' => 'marketing', 'source' => 'campaign']
);
$example->close();备用交换机示例
php
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;
class AlternateExchangeExample
{
private $connection;
private $channel;
public function __construct(array $config = [])
{
$this->connection = new AMQPStreamConnection(
$config['host'] ?? 'localhost',
$config['port'] ?? 5672,
$config['user'] ?? 'guest',
$config['password'] ?? 'guest',
$config['vhost'] ?? '/'
);
$this->channel = $this->connection->channel();
}
public function setup(): void
{
$this->channel->exchange_declare(
'unrouted_exchange',
AMQPExchangeType::FANOUT,
false,
true,
false
);
$this->channel->queue_declare('unrouted_messages', false, true, false, false);
$this->channel->queue_bind('unrouted_messages', 'unrouted_exchange');
$this->channel->exchange_declare(
'main_exchange',
AMQPExchangeType::DIRECT,
false,
true,
false,
false,
false,
new AMQPTable([
'alternate-exchange' => 'unrouted_exchange'
])
);
$this->channel->queue_declare('orders_queue', false, true, false, false);
$this->channel->queue_bind('orders_queue', 'main_exchange', 'order');
echo "Alternate exchange setup completed\n";
}
public function publish(string $routingKey, string $message): void
{
$msg = new AMQPMessage($message, [
'content_type' => 'text/plain',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);
$this->channel->basic_publish($msg, 'main_exchange', $routingKey);
echo "Message published with routing key: {$routingKey}\n";
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}
$example = new AlternateExchangeExample();
$example->setup();
$example->publish('order', 'This goes to orders_queue');
$example->publish('unknown', 'This goes to unrouted_messages');
$example->close();实际应用场景
1. 日志系统
php
<?php
class LogSystemExchange
{
private $channel;
public function __construct(AMQPStreamConnection $connection)
{
$this->channel = $connection->channel();
}
public function setup(): void
{
$this->channel->exchange_declare('logs', 'topic', false, true, false);
$queues = [
'logs_all' => '#',
'logs_error' => '*.error',
'logs_app' => 'app.#',
'logs_system' => 'system.#'
];
foreach ($queues as $queue => $pattern) {
$this->channel->queue_declare($queue, false, true, false, false);
$this->channel->queue_bind($queue, 'logs', $pattern);
}
}
public function log(string $source, string $level, string $message): void
{
$routingKey = "{$source}.{$level}";
$msg = new AMQPMessage(json_encode([
'source' => $source,
'level' => $level,
'message' => $message,
'timestamp' => time()
]), ['content_type' => 'application/json']);
$this->channel->basic_publish($msg, 'logs', $routingKey);
}
}2. 订单事件系统
php
<?php
class OrderEventExchange
{
private $channel;
public function __construct(AMQPStreamConnection $connection)
{
$this->channel = $connection->channel();
}
public function setup(): void
{
$this->channel->exchange_declare('order_events', 'topic', false, true, false);
$bindings = [
'inventory_service' => ['order.created', 'order.cancelled'],
'payment_service' => ['order.created', 'order.paid'],
'notification_service' => ['order.#'],
'analytics_service' => ['order.#']
];
foreach ($bindings as $queue => $patterns) {
$this->channel->queue_declare($queue, false, true, false, false);
foreach ($patterns as $pattern) {
$this->channel->queue_bind($queue, 'order_events', $pattern);
}
}
}
public function emitOrderEvent(string $event, array $orderData): void
{
$msg = new AMQPMessage(json_encode([
'event' => $event,
'data' => $orderData,
'timestamp' => time()
]), [
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);
$this->channel->basic_publish($msg, 'order_events', $event);
}
}3. 多租户消息系统
php
<?php
class MultiTenantExchange
{
private $channel;
public function __construct(AMQPStreamConnection $connection)
{
$this->channel = $connection->channel();
}
public function setupTenant(string $tenantId): void
{
$exchangeName = "tenant_{$tenantId}";
$this->channel->exchange_declare($exchangeName, 'topic', false, true, false);
$queues = [
"{$tenantId}_orders" => 'order.#',
"{$tenantId}_notifications" => 'notification.#',
"{$tenantId}_reports" => 'report.#'
];
foreach ($queues as $queue => $pattern) {
$this->channel->queue_declare($queue, false, true, false, false);
$this->channel->queue_bind($queue, $exchangeName, $pattern);
}
}
public function publishToTenant(string $tenantId, string $routingKey, string $message): void
{
$exchangeName = "tenant_{$tenantId}";
$msg = new AMQPMessage($message, [
'content_type' => 'application/json'
]);
$this->channel->basic_publish($msg, $exchangeName, $routingKey);
}
}常见问题与解决方案
1. 消息路由失败
问题原因:
- 没有匹配的绑定
- Routing Key 错误
- Exchange 不存在
解决方案:
php
<?php
class SafeExchangePublisher
{
private $channel;
public function publishSafe(
string $exchange,
string $routingKey,
string $body,
bool $mandatory = true
): bool {
$msg = new AMQPMessage($body, [
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);
$this->channel->set_return_listener(function (
$replyCode,
$replyText,
$exchange,
$routingKey,
$message
) {
echo "Message returned: {$replyText}\n";
echo "Exchange: {$exchange}, Routing Key: {$routingKey}\n";
});
$this->channel->basic_publish(
$msg,
$exchange,
$routingKey,
$mandatory
);
$this->channel->wait_for_pending_acks_returns(5);
return true;
}
}2. 交换机不存在
问题原因:
- 交换机未声明
- 交换机名称错误
- 交换机被删除
解决方案:
php
<?php
class ExchangeManager
{
private $channel;
private $declaredExchanges = [];
public function ensureExchangeExists(
string $name,
string $type,
bool $durable = true
): void {
if (isset($this->declaredExchanges[$name])) {
return;
}
try {
$this->channel->exchange_declare(
$name,
$type,
false,
$durable,
false,
false,
false,
null,
true
);
$this->declaredExchanges[$name] = true;
} catch (Exception $e) {
$this->channel->exchange_declare(
$name,
$type,
false,
$durable,
false
);
$this->declaredExchanges[$name] = true;
}
}
}3. 性能问题
问题原因:
- 交换机绑定过多
- 路由规则复杂
- Headers Exchange 使用不当
解决方案:
php
<?php
class OptimizedExchangeConfig
{
public static function getOptimalExchangeType(string $useCase): string
{
$mapping = [
'point_to_point' => 'direct',
'broadcast' => 'fanout',
'multi_topic' => 'topic',
'complex_routing' => 'headers'
];
return $mapping[$useCase] ?? 'direct';
}
public static function optimizeBindings(array $bindings): array
{
$optimized = [];
foreach ($bindings as $queue => $patterns) {
if (count($patterns) > 10) {
$optimized[$queue] = [self::mergePatterns($patterns)];
} else {
$optimized[$queue] = $patterns;
}
}
return $optimized;
}
private static function mergePatterns(array $patterns): string
{
$prefixes = [];
foreach ($patterns as $pattern) {
$parts = explode('.', $pattern);
if (count($parts) > 1) {
$prefixes[$parts[0]] = true;
}
}
if (count($prefixes) === 1) {
return array_key_first($prefixes) . '.#';
}
return '#';
}
}最佳实践建议
1. 交换机命名规范
php
<?php
class ExchangeNamingConvention
{
public static function generateName(
string $domain,
string $purpose,
?string $suffix = null
): string {
$parts = [$domain, $purpose];
if ($suffix) {
$parts[] = $suffix;
}
return implode('.', $parts) . '.exchange';
}
public static function getDLXName(string $originalExchange): string
{
return $originalExchange . '.dlx';
}
}2. 交换机配置模板
php
<?php
class ExchangeTemplates
{
public static function standardExchange(string $name, string $type): array
{
return [
'name' => $name,
'type' => $type,
'durable' => true,
'auto_delete' => false,
'internal' => false,
'arguments' => []
];
}
public static function exchangeWithDLX(string $name, string $type, string $dlxName): array
{
return [
'name' => $name,
'type' => $type,
'durable' => true,
'auto_delete' => false,
'internal' => false,
'arguments' => [
'alternate-exchange' => $dlxName
]
];
}
}3. 交换机监控
php
<?php
class ExchangeMonitor
{
private $apiUrl;
private $credentials;
public function __construct(string $host, string $user, string $password)
{
$this->apiUrl = "http://{$host}:15672/api";
$this->credentials = base64_encode("{$user}:{$password}");
}
public function getExchangeStats(string $vhost, string $exchange): array
{
$url = "{$this->apiUrl}/exchanges/" . urlencode($vhost) . "/" . urlencode($exchange);
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_HTTPHEADER, [
"Authorization: Basic {$this->credentials}"
]);
$response = curl_exec($ch);
curl_close($ch);
return json_decode($response, true);
}
public function listExchanges(string $vhost): array
{
$url = "{$this->apiUrl}/exchanges/" . urlencode($vhost);
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_HTTPHEADER, [
"Authorization: Basic {$this->credentials}"
]);
$response = curl_exec($ch);
curl_close($ch);
return json_decode($response, true);
}
}