Appearance
Direct Exchange
概述
Direct Exchange(直连交换机)是 RabbitMQ 中最简单、最常用的交换机类型之一。它根据消息的 routing key(路由键)将消息精确路由到绑定的队列中。
核心原理
Direct Exchange 的工作原理非常简单:生产者发送消息时指定一个 routing key,交换机根据这个 routing key 将消息路由到所有绑定的队列中,前提是队列绑定的 routing key 与消息的 routing key 完全匹配。
mermaid
graph LR
P[生产者] -->|routing_key: order.create| E[Direct Exchange]
E -->|匹配| Q1[order-queue]
E -->|不匹配| Q2[payment-queue]
E -->|不匹配| Q3[notification-queue]
style E fill:#f9f,stroke:#333匹配规则
- 完全匹配:队列绑定的 routing key 必须与消息的 routing key 完全一致
- 一对一或多对一:一个 routing key 可以绑定多个队列,一个队列也可以绑定多个 routing key
PHP 代码示例
生产者 - 发送消息
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchangeName = 'direct_logs';
$exchangeType = AMQPExchangeType::DIRECT;
$channel->exchange_declare($exchangeName, $exchangeType, false, true, false);
$routingKey = 'order.create';
$messageBody = json_encode([
'event' => 'order_created',
'order_id' => 'ORD-2024-001',
'amount' => 299.99,
'timestamp' => time()
]);
$message = new AMQPMessage(
$messageBody,
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]
);
$channel->basic_publish($message, $exchangeName, $routingKey);
echo "消息已发送 - Routing Key: {$routingKey}\n";
$channel->close();
$connection->close();消费者 - 接收消息
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchangeName = 'direct_logs';
$queueName = 'order-queue';
$routingKey = 'order.create';
$channel->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, true, false);
$channel->queue_declare($queueName, false, true, false, false);
$channel->queue_bind($queueName, $exchangeName, $routingKey);
echo "等待消息中...\n";
$callback = function (AMQPMessage $msg) {
$data = json_decode($msg->getBody(), true);
echo "收到消息 - Routing Key: " . $msg->getRoutingKey() . "\n";
echo "消息内容: " . $msg->getBody() . "\n";
echo "-------------------\n";
// 处理消息
processOrder($data);
// 确认消息
$msg->ack();
};
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
function processOrder($data) {
// 订单处理逻辑
echo "处理订单: " . $data['order_id'] . "\n";
}
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();多 routing key 绑定
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchangeName = 'orders';
$channel->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, true, false);
// 队列绑定多个 routing key
$routingKeys = ['order.created', 'order.updated', 'order.cancelled'];
foreach ($routingKeys as $routingKey) {
$queueName = 'orders-queue';
$channel->queue_declare($queueName, false, true, false, false);
$channel->queue_bind($queueName, $exchangeName, $routingKey);
echo "队列已绑定到: {$routingKey}\n";
}
$channel->close();
$connection->close();实际应用场景
1. 订单状态通知
php
<?php
class OrderEventPublisher
{
private $channel;
private $exchangeName = 'order_events';
public function __construct($channel)
{
$this->channel = $channel;
$this->channel->exchange_declare(
$this->exchangeName,
AMQPExchangeType::DIRECT,
false,
true,
false
);
}
public function publishOrderCreated($orderData)
{
$this->publish('order.created', $orderData);
}
public function publishOrderUpdated($orderData)
{
$this->publish('order.updated', $orderData);
}
public function publishOrderCancelled($orderData)
{
$this->publish('order.cancelled', $orderData);
}
private function publish($routingKey, $data)
{
$message = new AMQPMessage(
json_encode($data),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'timestamp' => time()
]
);
$this->channel->basic_publish($message, $this->exchangeName, $routingKey);
}
}2. 日志级别过滤
php
<?php
class LogPublisher
{
private $channel;
private $exchangeName = 'application_logs';
public function __construct($channel)
{
$this->channel = $channel;
$this->channel->exchange_declare(
$this->exchangeName,
AMQPExchangeType::DIRECT,
false,
true,
false
);
}
public function log($level, $message, $context = [])
{
$routingKey = 'log.' . $level;
$data = [
'level' => $level,
'message' => $message,
'context' => $context,
'timestamp' => date('Y-m-d H:i:s')
];
$msg = new AMQPMessage(
json_encode($data),
['content_type' => 'application/json']
);
$this->channel->basic_publish($msg, $this->exchangeName, $routingKey);
}
}常见问题与解决方案
问题 1: 消息无法路由到队列
症状: 消息发送成功,但队列中没有消息
原因:
- routing key 与队列绑定的 key 不匹配
- 队列未正确绑定到交换机
解决方案:
php
<?php
// 检查绑定关系
$channel->queue_bind($queueName, $exchangeName, $routingKey);
// 使用 RabbitMQ 管理界面或命令行查看绑定
// rabbitmqctl list_bindings问题 2: 多个消费者消费同一队列
解决方案: 使用 basic_qos 限制并发
php
<?php
// 每次只预取一条消息
$channel->basic_qos(null, 1, null);
$channel->basic_consume($queueName, '', false, false, false, false, $callback);最佳实践建议
- 清晰的命名规范: 使用有意义的 routing key,如
order.created、payment.completed - 合理的队列设计: 根据业务功能划分队列,避免队列职责过于混杂
- 消息持久化: 对于重要消息,务必设置
delivery_mode为持久化 - 错误处理: 实现重试机制和死信队列处理失败消息
- 监控: 监控队列消息数量和消费速率
