Skip to content

Direct Exchange

概述

Direct Exchange(直连交换机)是 RabbitMQ 中最简单、最常用的交换机类型之一。它根据消息的 routing key(路由键)将消息精确路由到绑定的队列中。

核心原理

Direct Exchange 的工作原理非常简单:生产者发送消息时指定一个 routing key,交换机根据这个 routing key 将消息路由到所有绑定的队列中,前提是队列绑定的 routing key 与消息的 routing key 完全匹配。

mermaid
graph LR
    P[生产者] -->|routing_key: order.create| E[Direct Exchange]
    E -->|匹配| Q1[order-queue]
    E -->|不匹配| Q2[payment-queue]
    E -->|不匹配| Q3[notification-queue]
    
    style E fill:#f9f,stroke:#333

匹配规则

  • 完全匹配:队列绑定的 routing key 必须与消息的 routing key 完全一致
  • 一对一或多对一:一个 routing key 可以绑定多个队列,一个队列也可以绑定多个 routing key

PHP 代码示例

生产者 - 发送消息

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$exchangeName = 'direct_logs';
$exchangeType = AMQPExchangeType::DIRECT;
$channel->exchange_declare($exchangeName, $exchangeType, false, true, false);

$routingKey = 'order.create';
$messageBody = json_encode([
    'event' => 'order_created',
    'order_id' => 'ORD-2024-001',
    'amount' => 299.99,
    'timestamp' => time()
]);

$message = new AMQPMessage(
    $messageBody,
    [
        'content_type' => 'application/json',
        'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
    ]
);

$channel->basic_publish($message, $exchangeName, $routingKey);

echo "消息已发送 - Routing Key: {$routingKey}\n";

$channel->close();
$connection->close();

消费者 - 接收消息

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$exchangeName = 'direct_logs';
$queueName = 'order-queue';
$routingKey = 'order.create';

$channel->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, true, false);
$channel->queue_declare($queueName, false, true, false, false);
$channel->queue_bind($queueName, $exchangeName, $routingKey);

echo "等待消息中...\n";

$callback = function (AMQPMessage $msg) {
    $data = json_decode($msg->getBody(), true);
    echo "收到消息 - Routing Key: " . $msg->getRoutingKey() . "\n";
    echo "消息内容: " . $msg->getBody() . "\n";
    echo "-------------------\n";
    
    // 处理消息
    processOrder($data);
    
    // 确认消息
    $msg->ack();
};

$channel->basic_consume($queueName, '', false, false, false, false, $callback);

function processOrder($data) {
    // 订单处理逻辑
    echo "处理订单: " . $data['order_id'] . "\n";
}

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

多 routing key 绑定

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$exchangeName = 'orders';
$channel->exchange_declare($exchangeName, AMQPExchangeType::DIRECT, false, true, false);

// 队列绑定多个 routing key
$routingKeys = ['order.created', 'order.updated', 'order.cancelled'];

foreach ($routingKeys as $routingKey) {
    $queueName = 'orders-queue';
    $channel->queue_declare($queueName, false, true, false, false);
    $channel->queue_bind($queueName, $exchangeName, $routingKey);
    
    echo "队列已绑定到: {$routingKey}\n";
}

$channel->close();
$connection->close();

实际应用场景

1. 订单状态通知

php
<?php

class OrderEventPublisher
{
    private $channel;
    private $exchangeName = 'order_events';
    
    public function __construct($channel)
    {
        $this->channel = $channel;
        $this->channel->exchange_declare(
            $this->exchangeName, 
            AMQPExchangeType::DIRECT, 
            false, 
            true, 
            false
        );
    }
    
    public function publishOrderCreated($orderData)
    {
        $this->publish('order.created', $orderData);
    }
    
    public function publishOrderUpdated($orderData)
    {
        $this->publish('order.updated', $orderData);
    }
    
    public function publishOrderCancelled($orderData)
    {
        $this->publish('order.cancelled', $orderData);
    }
    
    private function publish($routingKey, $data)
    {
        $message = new AMQPMessage(
            json_encode($data),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'timestamp' => time()
            ]
        );
        
        $this->channel->basic_publish($message, $this->exchangeName, $routingKey);
    }
}

2. 日志级别过滤

php
<?php

class LogPublisher
{
    private $channel;
    private $exchangeName = 'application_logs';
    
    public function __construct($channel)
    {
        $this->channel = $channel;
        $this->channel->exchange_declare(
            $this->exchangeName,
            AMQPExchangeType::DIRECT,
            false,
            true,
            false
        );
    }
    
    public function log($level, $message, $context = [])
    {
        $routingKey = 'log.' . $level;
        $data = [
            'level' => $level,
            'message' => $message,
            'context' => $context,
            'timestamp' => date('Y-m-d H:i:s')
        ];
        
        $msg = new AMQPMessage(
            json_encode($data),
            ['content_type' => 'application/json']
        );
        
        $this->channel->basic_publish($msg, $this->exchangeName, $routingKey);
    }
}

常见问题与解决方案

问题 1: 消息无法路由到队列

症状: 消息发送成功,但队列中没有消息

原因:

  • routing key 与队列绑定的 key 不匹配
  • 队列未正确绑定到交换机

解决方案:

php
<?php

// 检查绑定关系
$channel->queue_bind($queueName, $exchangeName, $routingKey);

// 使用 RabbitMQ 管理界面或命令行查看绑定
// rabbitmqctl list_bindings

问题 2: 多个消费者消费同一队列

解决方案: 使用 basic_qos 限制并发

php
<?php

// 每次只预取一条消息
$channel->basic_qos(null, 1, null);

$channel->basic_consume($queueName, '', false, false, false, false, $callback);

最佳实践建议

  1. 清晰的命名规范: 使用有意义的 routing key,如 order.createdpayment.completed
  2. 合理的队列设计: 根据业务功能划分队列,避免队列职责过于混杂
  3. 消息持久化: 对于重要消息,务必设置 delivery_mode 为持久化
  4. 错误处理: 实现重试机制和死信队列处理失败消息
  5. 监控: 监控队列消息数量和消费速率

相关链接