Appearance
默认交换机
概述
默认交换机(Default Exchange)是 RabbitMQ 中一个特殊的交换机,它是一个预先声明好的 Direct Exchange,其名称为空字符串 ""。每个队列都会自动绑定到默认交换机,绑定的 routing key 就是队列名称。
核心原理
默认交换机具有以下特点:
- 自动绑定: 每个队列自动以队列名作为 routing key 绑定到默认交换机
- 直接发送: 可以直接向队列发送消息,无需显式声明交换机
- Direct 类型: 本质上是 Direct Exchange,需要完全匹配队列名
mermaid
graph LR
P[生产者] -->|routing_key: order-queue| E[默认交换机<br/>name: ""]
E -->|自动绑定| Q1[order-queue]
E -->|自动绑定| Q2[payment-queue]
E -->|自动绑定| Q3[notification-queue]
style E fill:#f9f,stroke:#333工作机制
mermaid
sequenceDiagram
participant P as 生产者
participant DE as 默认交换机
participant Q as 队列
Note over P,Q: 1. 声明队列
P->>Q: queue_declare("order-queue")
Q-->>P: 队列创建成功
Note over P,Q: 2. 自动绑定到默认交换机
Note over DE,Q: routing_key = "order-queue"
Note over P,Q: 3. 直接发送消息到队列
P->>DE: basic_publish("", "order-queue", message)
DE->>Q: 路由到 order-queuePHP 代码示例
基本使用 - 直接发送到队列
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 声明队列
$queueName = 'order-queue';
$channel->queue_declare($queueName, false, true, false, false);
// 直接发送消息到队列(使用空字符串作为交换机名)
$messageBody = json_encode([
'order_id' => 'ORD-2024-001',
'amount' => 299.99,
'timestamp' => time()
]);
$message = new AMQPMessage(
$messageBody,
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]
);
// 第一个参数是空字符串,表示使用默认交换机
// 第二个参数是 routing key,即队列名
$channel->basic_publish($message, '', $queueName);
echo "消息已直接发送到队列: {$queueName}\n";
$channel->close();
$connection->close();消费者 - 从队列接收消息
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$queueName = 'order-queue';
// 声明队列(确保队列存在)
$channel->queue_declare($queueName, false, true, false, false);
echo "等待消息...\n";
$callback = function (AMQPMessage $msg) {
$data = json_decode($msg->getBody(), true);
echo "收到消息:\n";
echo " 订单ID: {$data['order_id']}\n";
echo " 金额: {$data['amount']}\n";
echo "-------------------\n";
// 处理订单
processOrder($data);
// 确认消息
$msg->ack();
};
function processOrder($data)
{
echo "处理订单: {$data['order_id']}\n";
}
$channel->basic_qos(null, 1, null);
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();封装的队列操作类
php
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class DirectQueue
{
private $channel;
public function __construct($channel)
{
$this->channel = $channel;
}
public function declareQueue($queueName, $durable = true)
{
$this->channel->queue_declare(
$queueName,
false,
$durable,
false,
false
);
}
public function send($queueName, $data, $persistent = true)
{
$properties = ['content_type' => 'application/json'];
if ($persistent) {
$properties['delivery_mode'] = AMQPMessage::DELIVERY_MODE_PERSISTENT;
}
$message = new AMQPMessage(
json_encode($data),
$properties
);
// 使用默认交换机直接发送
$this->channel->basic_publish($message, '', $queueName);
}
public function consume($queueName, callable $callback, $prefetchCount = 1)
{
$this->channel->basic_qos(null, $prefetchCount, null);
$wrapper = function ($msg) use ($callback) {
$data = json_decode($msg->getBody(), true);
$callback($data, $msg);
$msg->ack();
};
$this->channel->basic_consume(
$queueName,
'',
false,
false,
false,
false,
$wrapper
);
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
public function get($queueName)
{
$message = $this->channel->basic_get($queueName);
if ($message) {
$this->channel->basic_ack($message->getDeliveryTag());
return json_decode($message->getBody(), true);
}
return null;
}
}工作队列模式
php
<?php
class TaskQueue
{
private $queue;
private $queueName = 'tasks';
public function __construct($channel)
{
$this->queue = new DirectQueue($channel);
$this->queue->declareQueue($this->queueName);
}
public function addTask($taskType, $payload)
{
$this->queue->send($this->queueName, [
'type' => $taskType,
'payload' => $payload,
'created_at' => time()
]);
echo "任务已添加: {$taskType}\n";
}
public function processTasks(callable $processor)
{
echo "开始处理任务...\n";
$this->queue->consume($this->queueName, function ($data, $msg) use ($processor) {
echo "处理任务: {$data['type']}\n";
try {
$result = $processor($data['type'], $data['payload']);
echo "任务完成: {$data['type']}\n";
} catch (Exception $e) {
echo "任务失败: {$e->getMessage()}\n";
// 可以选择重新入队或发送到死信队列
}
});
}
}
// 使用示例
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$taskQueue = new TaskQueue($channel);
// 添加任务
$taskQueue->addTask('send_email', ['to' => 'user@example.com', 'subject' => 'Welcome']);
$taskQueue->addTask('generate_report', ['report_id' => 123]);
// 处理任务
$taskQueue->processTasks(function ($type, $payload) {
switch ($type) {
case 'send_email':
// 发送邮件逻辑
break;
case 'generate_report':
// 生成报告逻辑
break;
}
});实际应用场景
1. 简单任务队列
php
<?php
class SimpleJobQueue
{
private $channel;
private $queueName;
public function __construct($channel, $queueName)
{
$this->channel = $channel;
$this->queueName = $queueName;
$this->channel->queue_declare($queueName, false, true, false, false);
}
public function push($job)
{
$message = new AMQPMessage(
json_encode($job),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$this->channel->basic_publish($message, '', $this->queueName);
}
public function pop()
{
$message = $this->channel->basic_get($this->queueName);
if ($message) {
$this->channel->basic_ack($message->getDeliveryTag());
return json_decode($message->getBody(), true);
}
return null;
}
public function size()
{
list(, $messageCount,) = $this->channel->queue_declare(
$this->queueName,
false,
true,
false,
false,
true
);
return $messageCount;
}
}2. RPC 模式
php
<?php
class RpcClient
{
private $channel;
private $callbackQueue;
private $response;
private $corrId;
public function __construct($channel)
{
$this->channel = $channel;
list($this->callbackQueue,) = $this->channel->queue_declare(
'',
false,
false,
true,
false
);
$this->channel->basic_consume(
$this->callbackQueue,
'',
false,
true,
false,
false,
[$this, 'onResponse']
);
}
public function onResponse($msg)
{
if ($msg->get('correlation_id') == $this->corrId) {
$this->response = $msg->getBody();
}
}
public function call($queueName, $data)
{
$this->response = null;
$this->corrId = uniqid();
$message = new AMQPMessage(
json_encode($data),
[
'correlation_id' => $this->corrId,
'reply_to' => $this->callbackQueue
]
);
$this->channel->basic_publish($message, '', $queueName);
while (!$this->response) {
$this->channel->wait();
}
return json_decode($this->response, true);
}
}
class RpcServer
{
private $channel;
private $queueName;
public function __construct($channel, $queueName)
{
$this->channel = $channel;
$this->queueName = $queueName;
$this->channel->queue_declare($queueName, false, false, false, false);
}
public function serve(callable $handler)
{
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume(
$this->queueName,
'',
false,
false,
false,
false,
function ($msg) use ($handler) {
$request = json_decode($msg->getBody(), true);
$response = $handler($request);
$replyMsg = new AMQPMessage(
json_encode($response),
['correlation_id' => $msg->get('correlation_id')]
);
$msg->getChannel()->basic_publish(
$replyMsg,
'',
$msg->get('reply_to')
);
$msg->ack();
}
);
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
}常见问题与解决方案
问题 1: 队列不存在时消息丢失
症状: 发送消息到不存在的队列,消息直接丢弃
解决方案: 确保队列已声明
php
<?php
// 发送前确保队列存在
$channel->queue_declare($queueName, false, true, false, false);
// 或者使用 mandatory 标志检测路由失败
$message = new AMQPMessage($body);
$channel->basic_publish($message, '', $queueName, true);
// 设置 return 监听器
$channel->set_return_listener(function ($replyCode, $replyText, $exchange, $routingKey, $msg) {
echo "消息无法路由: {$replyText}\n";
});问题 2: 无法实现复杂路由
症状: 需要根据不同条件路由到不同队列
解决方案: 使用其他类型的交换机
php
<?php
// 默认交换机只能直接发送到队列
// 如果需要复杂路由,应该使用 Direct/Topic/Fanout/Headers Exchange
// 例如使用 Direct Exchange
$channel->exchange_declare('orders', 'direct', false, true, false);
$channel->queue_bind('order-queue', 'orders', 'order.created');
$channel->basic_publish($message, 'orders', 'order.created');最佳实践建议
- 简单场景优先: 默认交换机适合简单的点对点消息传递
- 队列先声明: 发送消息前确保队列已存在
- 考虑扩展性: 如果未来可能需要复杂路由,提前设计交换机架构
- 消息持久化: 重要消息设置
delivery_mode为持久化 - 错误处理: 处理队列不存在等异常情况
默认交换机 vs 其他交换机
| 特性 | 默认交换机 | Direct Exchange |
|---|---|---|
| 名称 | 空字符串 "" | 自定义名称 |
| 绑定 | 自动绑定 | 需要手动绑定 |
| 灵活性 | 低 | 高 |
| 适用场景 | 简单队列 | 复杂路由 |
| 可扩展性 | 差 | 好 |
