Skip to content

默认交换机

概述

默认交换机(Default Exchange)是 RabbitMQ 中一个特殊的交换机,它是一个预先声明好的 Direct Exchange,其名称为空字符串 ""。每个队列都会自动绑定到默认交换机,绑定的 routing key 就是队列名称。

核心原理

默认交换机具有以下特点:

  1. 自动绑定: 每个队列自动以队列名作为 routing key 绑定到默认交换机
  2. 直接发送: 可以直接向队列发送消息,无需显式声明交换机
  3. Direct 类型: 本质上是 Direct Exchange,需要完全匹配队列名
mermaid
graph LR
    P[生产者] -->|routing_key: order-queue| E[默认交换机<br/>name: ""]
    
    E -->|自动绑定| Q1[order-queue]
    E -->|自动绑定| Q2[payment-queue]
    E -->|自动绑定| Q3[notification-queue]
    
    style E fill:#f9f,stroke:#333

工作机制

mermaid
sequenceDiagram
    participant P as 生产者
    participant DE as 默认交换机
    participant Q as 队列
    
    Note over P,Q: 1. 声明队列
    P->>Q: queue_declare("order-queue")
    Q-->>P: 队列创建成功
    
    Note over P,Q: 2. 自动绑定到默认交换机
    Note over DE,Q: routing_key = "order-queue"
    
    Note over P,Q: 3. 直接发送消息到队列
    P->>DE: basic_publish("", "order-queue", message)
    DE->>Q: 路由到 order-queue

PHP 代码示例

基本使用 - 直接发送到队列

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 声明队列
$queueName = 'order-queue';
$channel->queue_declare($queueName, false, true, false, false);

// 直接发送消息到队列(使用空字符串作为交换机名)
$messageBody = json_encode([
    'order_id' => 'ORD-2024-001',
    'amount' => 299.99,
    'timestamp' => time()
]);

$message = new AMQPMessage(
    $messageBody,
    [
        'content_type' => 'application/json',
        'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
    ]
);

// 第一个参数是空字符串,表示使用默认交换机
// 第二个参数是 routing key,即队列名
$channel->basic_publish($message, '', $queueName);

echo "消息已直接发送到队列: {$queueName}\n";

$channel->close();
$connection->close();

消费者 - 从队列接收消息

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$queueName = 'order-queue';

// 声明队列(确保队列存在)
$channel->queue_declare($queueName, false, true, false, false);

echo "等待消息...\n";

$callback = function (AMQPMessage $msg) {
    $data = json_decode($msg->getBody(), true);
    
    echo "收到消息:\n";
    echo "  订单ID: {$data['order_id']}\n";
    echo "  金额: {$data['amount']}\n";
    echo "-------------------\n";
    
    // 处理订单
    processOrder($data);
    
    // 确认消息
    $msg->ack();
};

function processOrder($data)
{
    echo "处理订单: {$data['order_id']}\n";
}

$channel->basic_qos(null, 1, null);
$channel->basic_consume($queueName, '', false, false, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

封装的队列操作类

php
<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class DirectQueue
{
    private $channel;
    
    public function __construct($channel)
    {
        $this->channel = $channel;
    }
    
    public function declareQueue($queueName, $durable = true)
    {
        $this->channel->queue_declare(
            $queueName,
            false,
            $durable,
            false,
            false
        );
    }
    
    public function send($queueName, $data, $persistent = true)
    {
        $properties = ['content_type' => 'application/json'];
        
        if ($persistent) {
            $properties['delivery_mode'] = AMQPMessage::DELIVERY_MODE_PERSISTENT;
        }
        
        $message = new AMQPMessage(
            json_encode($data),
            $properties
        );
        
        // 使用默认交换机直接发送
        $this->channel->basic_publish($message, '', $queueName);
    }
    
    public function consume($queueName, callable $callback, $prefetchCount = 1)
    {
        $this->channel->basic_qos(null, $prefetchCount, null);
        
        $wrapper = function ($msg) use ($callback) {
            $data = json_decode($msg->getBody(), true);
            $callback($data, $msg);
            $msg->ack();
        };
        
        $this->channel->basic_consume(
            $queueName,
            '',
            false,
            false,
            false,
            false,
            $wrapper
        );
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
    
    public function get($queueName)
    {
        $message = $this->channel->basic_get($queueName);
        
        if ($message) {
            $this->channel->basic_ack($message->getDeliveryTag());
            return json_decode($message->getBody(), true);
        }
        
        return null;
    }
}

工作队列模式

php
<?php

class TaskQueue
{
    private $queue;
    private $queueName = 'tasks';
    
    public function __construct($channel)
    {
        $this->queue = new DirectQueue($channel);
        $this->queue->declareQueue($this->queueName);
    }
    
    public function addTask($taskType, $payload)
    {
        $this->queue->send($this->queueName, [
            'type' => $taskType,
            'payload' => $payload,
            'created_at' => time()
        ]);
        
        echo "任务已添加: {$taskType}\n";
    }
    
    public function processTasks(callable $processor)
    {
        echo "开始处理任务...\n";
        
        $this->queue->consume($this->queueName, function ($data, $msg) use ($processor) {
            echo "处理任务: {$data['type']}\n";
            
            try {
                $result = $processor($data['type'], $data['payload']);
                echo "任务完成: {$data['type']}\n";
            } catch (Exception $e) {
                echo "任务失败: {$e->getMessage()}\n";
                // 可以选择重新入队或发送到死信队列
            }
        });
    }
}

// 使用示例
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$taskQueue = new TaskQueue($channel);

// 添加任务
$taskQueue->addTask('send_email', ['to' => 'user@example.com', 'subject' => 'Welcome']);
$taskQueue->addTask('generate_report', ['report_id' => 123]);

// 处理任务
$taskQueue->processTasks(function ($type, $payload) {
    switch ($type) {
        case 'send_email':
            // 发送邮件逻辑
            break;
        case 'generate_report':
            // 生成报告逻辑
            break;
    }
});

实际应用场景

1. 简单任务队列

php
<?php

class SimpleJobQueue
{
    private $channel;
    private $queueName;
    
    public function __construct($channel, $queueName)
    {
        $this->channel = $channel;
        $this->queueName = $queueName;
        
        $this->channel->queue_declare($queueName, false, true, false, false);
    }
    
    public function push($job)
    {
        $message = new AMQPMessage(
            json_encode($job),
            ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
        );
        
        $this->channel->basic_publish($message, '', $this->queueName);
    }
    
    public function pop()
    {
        $message = $this->channel->basic_get($this->queueName);
        
        if ($message) {
            $this->channel->basic_ack($message->getDeliveryTag());
            return json_decode($message->getBody(), true);
        }
        
        return null;
    }
    
    public function size()
    {
        list(, $messageCount,) = $this->channel->queue_declare(
            $this->queueName,
            false,
            true,
            false,
            false,
            true
        );
        
        return $messageCount;
    }
}

2. RPC 模式

php
<?php

class RpcClient
{
    private $channel;
    private $callbackQueue;
    private $response;
    private $corrId;
    
    public function __construct($channel)
    {
        $this->channel = $channel;
        
        list($this->callbackQueue,) = $this->channel->queue_declare(
            '',
            false,
            false,
            true,
            false
        );
        
        $this->channel->basic_consume(
            $this->callbackQueue,
            '',
            false,
            true,
            false,
            false,
            [$this, 'onResponse']
        );
    }
    
    public function onResponse($msg)
    {
        if ($msg->get('correlation_id') == $this->corrId) {
            $this->response = $msg->getBody();
        }
    }
    
    public function call($queueName, $data)
    {
        $this->response = null;
        $this->corrId = uniqid();
        
        $message = new AMQPMessage(
            json_encode($data),
            [
                'correlation_id' => $this->corrId,
                'reply_to' => $this->callbackQueue
            ]
        );
        
        $this->channel->basic_publish($message, '', $queueName);
        
        while (!$this->response) {
            $this->channel->wait();
        }
        
        return json_decode($this->response, true);
    }
}

class RpcServer
{
    private $channel;
    private $queueName;
    
    public function __construct($channel, $queueName)
    {
        $this->channel = $channel;
        $this->queueName = $queueName;
        
        $this->channel->queue_declare($queueName, false, false, false, false);
    }
    
    public function serve(callable $handler)
    {
        $this->channel->basic_qos(null, 1, null);
        
        $this->channel->basic_consume(
            $this->queueName,
            '',
            false,
            false,
            false,
            false,
            function ($msg) use ($handler) {
                $request = json_decode($msg->getBody(), true);
                $response = $handler($request);
                
                $replyMsg = new AMQPMessage(
                    json_encode($response),
                    ['correlation_id' => $msg->get('correlation_id')]
                );
                
                $msg->getChannel()->basic_publish(
                    $replyMsg,
                    '',
                    $msg->get('reply_to')
                );
                
                $msg->ack();
            }
        );
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
}

常见问题与解决方案

问题 1: 队列不存在时消息丢失

症状: 发送消息到不存在的队列,消息直接丢弃

解决方案: 确保队列已声明

php
<?php

// 发送前确保队列存在
$channel->queue_declare($queueName, false, true, false, false);

// 或者使用 mandatory 标志检测路由失败
$message = new AMQPMessage($body);
$channel->basic_publish($message, '', $queueName, true);

// 设置 return 监听器
$channel->set_return_listener(function ($replyCode, $replyText, $exchange, $routingKey, $msg) {
    echo "消息无法路由: {$replyText}\n";
});

问题 2: 无法实现复杂路由

症状: 需要根据不同条件路由到不同队列

解决方案: 使用其他类型的交换机

php
<?php

// 默认交换机只能直接发送到队列
// 如果需要复杂路由,应该使用 Direct/Topic/Fanout/Headers Exchange

// 例如使用 Direct Exchange
$channel->exchange_declare('orders', 'direct', false, true, false);
$channel->queue_bind('order-queue', 'orders', 'order.created');
$channel->basic_publish($message, 'orders', 'order.created');

最佳实践建议

  1. 简单场景优先: 默认交换机适合简单的点对点消息传递
  2. 队列先声明: 发送消息前确保队列已存在
  3. 考虑扩展性: 如果未来可能需要复杂路由,提前设计交换机架构
  4. 消息持久化: 重要消息设置 delivery_mode 为持久化
  5. 错误处理: 处理队列不存在等异常情况

默认交换机 vs 其他交换机

特性默认交换机Direct Exchange
名称空字符串 ""自定义名称
绑定自动绑定需要手动绑定
灵活性
适用场景简单队列复杂路由
可扩展性

相关链接