Appearance
AMQP 方法类
一、概述
AMQP 协议定义了一系列方法(Methods),用于客户端与 Broker 之间的交互。方法按功能分组为不同的类(Class),每个类包含多个相关的方法。
1.1 方法类概览
text
┌─────────────────────────────────────────────────────────────────────┐
│ AMQP 方法类概览 │
├──────────────┬──────────────────────────────────────────────────────┤
│ 类 ID │ 类名称 │
├──────────────┼──────────────────────────────────────────────────────┤
│ 10 │ Connection - 连接管理 │
│ 20 │ Channel - 通道管理 │
│ 30 │ Access - 访问控制(已废弃) │
│ 40 │ Exchange - 交换器管理 │
│ 50 │ Queue - 队列管理 │
│ 60 │ Basic - 基本消息操作 │
│ 90 │ TX - 事务 │
└──────────────┴──────────────────────────────────────────────────────┘1.2 方法命名规范
text
方法命名格式:Class.Method
示例:
├── Connection.Start - 连接开始
├── Channel.Open - 打开通道
├── Exchange.Declare - 声明交换器
├── Queue.Bind - 绑定队列
├── Basic.Publish - 发布消息
└── Basic.Consume - 消费消息二、核心知识点
2.1 Connection 类(类 ID: 10)
Connection 类用于管理客户端与 Broker 之间的连接。
2.1.1 Connection 方法列表
text
┌─────────────────────────────────────────────────────────────────────┐
│ Connection 类方法 │
├──────────────┬───────────────┬──────────────────────────────────────┤
│ 方法 ID │ 方法名 │ 说明 │
├──────────────┼───────────────┼──────────────────────────────────────┤
│ 10 │ Start │ 服务端发起,开始连接协商 │
│ 11 │ Start-Ok │ 客户端响应,选择认证机制 │
│ 20 │ Secure │ 服务端发起,SASL 挑战 │
│ 21 │ Secure-Ok │ 客户端响应,SASL 响应 │
│ 30 │ Tune │ 服务端发起,协商参数 │
│ 31 │ Tune-Ok │ 客户端响应,接受参数 │
│ 40 │ Open │ 客户端发起,打开虚拟主机 │
│ 41 │ Open-Ok │ 服务端响应,连接就绪 │
│ 50 │ Close │ 发起关闭连接 │
│ 51 │ Close-Ok │ 确认关闭连接 │
│ 60 │ Blocked │ 服务端通知,连接被阻塞 │
│ 61 │ Unblocked │ 服务端通知,连接解除阻塞 │
└──────────────┴───────────────┴──────────────────────────────────────┘2.1.2 连接建立流程
text
连接建立完整流程:
客户端 服务端
│ │
│ ─────── Protocol Header ─────────────► │
│ "AMQP" + Version │
│ │
│ ◄────── Connection.Start ──────────── │
│ (version, mechanisms, locales) │
│ │
│ ─────── Connection.Start-Ok ─────────► │
│ (mechanism, response, locale) │
│ │
│ ◄────── Connection.Tune ───────────── │
│ (channel-max, frame-max, │
│ heartbeat) │
│ │
│ ─────── Connection.Tune-Ok ──────────► │
│ (channel-max, frame-max, │
│ heartbeat) │
│ │
│ ─────── Connection.Open ─────────────► │
│ (virtual-host) │
│ │
│ ◄────── Connection.Open-Ok ────────── │
│ │
│ 连接建立完成 │
│ │2.1.3 Connection.Start 参数
text
Connection.Start 参数:
┌─────────────────────────────────────────────────────────────────────┐
│ 字段名 │ 类型 │ 说明 │
├───────────────────┼──────────┼────────────────────────────────────┤
│ version-major │ octet │ 协议主版本号 │
│ version-minor │ octet │ 协议次版本号 │
│ server-properties│ table │ 服务端属性 │
│ mechanisms │ longstr │ 支持的认证机制 │
│ locales │ longstr │ 支持的语言区域 │
└─────────────────────────────────────────────────────────────────────┘
server-properties 示例:
{
"product": "RabbitMQ",
"version": "3.12.0",
"platform": "Erlang/OTP 25",
"copyright": "Copyright (c) 2007-2023 VMware, Inc.",
"information": "Licensed under the MPL 2.0."
}2.2 Channel 类(类 ID: 20)
Channel 类用于管理通道,通道是连接内的轻量级连接。
2.2.1 Channel 方法列表
text
┌─────────────────────────────────────────────────────────────────────┐
│ Channel 类方法 │
├──────────────┬───────────────┬──────────────────────────────────────┤
│ 方法 ID │ 方法名 │ 说明 │
├──────────────┼───────────────┼──────────────────────────────────────┤
│ 10 │ Open │ 客户端发起,打开通道 │
│ 11 │ Open-Ok │ 服务端响应,通道就绪 │
│ 20 │ Flow │ 发起流量控制 │
│ 21 │ Flow-Ok │ 确认流量控制 │
│ 40 │ Close │ 发起关闭通道 │
│ 41 │ Close-Ok │ 确认关闭通道 │
└──────────────┴───────────────┴──────────────────────────────────────┘2.2.2 通道生命周期
text
通道生命周期:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 打开通道 │ ──► │ 使用通道 │ ──► │ 关闭通道 │
│ Open │ │ 操作消息 │ │ Close │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
▼ ▼ ▼
Channel.Open Basic.Publish Channel.Close
Channel.Open-Ok Basic.Consume Channel.Close-Ok
Basic.Ack2.3 Exchange 类(类 ID: 40)
Exchange 类用于管理交换器。
2.3.1 Exchange 方法列表
text
┌─────────────────────────────────────────────────────────────────────┐
│ Exchange 类方法 │
├──────────────┬───────────────┬──────────────────────────────────────┤
│ 方法 ID │ 方法名 │ 说明 │
├──────────────┼───────────────┼──────────────────────────────────────┤
│ 10 │ Declare │ 声明交换器 │
│ 11 │ Declare-Ok │ 确认声明成功 │
│ 20 │ Delete │ 删除交换器 │
│ 21 │ Delete-Ok │ 确认删除成功 │
│ 30 │ Bind │ 绑定交换器到交换器 │
│ 31 │ Bind-Ok │ 确认绑定成功 │
│ 40 │ Unbind │ 解绑交换器 │
│ 41 │ Unbind-Ok │ 确认解绑成功 │
└──────────────┴───────────────┴──────────────────────────────────────┘2.3.2 Exchange.Declare 参数
text
Exchange.Declare 参数:
┌─────────────────────────────────────────────────────────────────────┐
│ 字段名 │ 类型 │ 说明 │
├───────────────────┼──────────┼────────────────────────────────────┤
│ reserved-1 │ short │ 保留字段 │
│ exchange │ shortstr │ 交换器名称 │
│ type │ shortstr │ 交换器类型 │
│ passive │ bit │ 被动模式,仅检查是否存在 │
│ durable │ bit │ 是否持久化 │
│ auto-delete │ bit │ 是否自动删除 │
│ internal │ bit │ 是否为内部交换器 │
│ no-wait │ bit │ 是否等待响应 │
│ arguments │ table │ 额外参数 │
└─────────────────────────────────────────────────────────────────────┘2.4 Queue 类(类 ID: 50)
Queue 类用于管理队列。
2.4.1 Queue 方法列表
text
┌─────────────────────────────────────────────────────────────────────┐
│ Queue 类方法 │
├──────────────┬───────────────┬──────────────────────────────────────┤
│ 方法 ID │ 方法名 │ 说明 │
├──────────────┼───────────────┼──────────────────────────────────────┤
│ 10 │ Declare │ 声明队列 │
│ 11 │ Declare-Ok │ 确认声明成功 │
│ 20 │ Bind │ 绑定队列到交换器 │
│ 21 │ Bind-Ok │ 确认绑定成功 │
│ 30 │ Purge │ 清空队列 │
│ 31 │ Purge-Ok │ 确认清空成功 │
│ 40 │ Delete │ 删除队列 │
│ 41 │ Delete-Ok │ 确认删除成功 │
│ 50 │ Unbind │ 解绑队列 │
│ 51 │ Unbind-Ok │ 确认解绑成功 │
└──────────────┴───────────────┴──────────────────────────────────────┘2.4.2 Queue.Declare 参数
text
Queue.Declare 参数:
┌─────────────────────────────────────────────────────────────────────┐
│ 字段名 │ 类型 │ 说明 │
├───────────────────┼──────────┼────────────────────────────────────┤
│ reserved-1 │ short │ 保留字段 │
│ queue │ shortstr │ 队列名称 │
│ passive │ bit │ 被动模式,仅检查是否存在 │
│ durable │ bit │ 是否持久化 │
│ exclusive │ bit │ 是否排他 │
│ auto-delete │ bit │ 是否自动删除 │
│ no-wait │ bit │ 是否等待响应 │
│ arguments │ table │ 额外参数 │
└─────────────────────────────────────────────────────────────────────┘
Queue.Declare-Ok 响应:
├── queue: 实际队列名称
├── message-count: 队列中消息数量
└── consumer-count: 消费者数量2.5 Basic 类(类 ID: 60)
Basic 类是最重要的类,包含消息发布和消费的方法。
2.5.1 Basic 方法列表
text
┌─────────────────────────────────────────────────────────────────────┐
│ Basic 类方法 │
├──────────────┬───────────────┬──────────────────────────────────────┤
│ 方法 ID │ 方法名 │ 说明 │
├──────────────┼───────────────┼──────────────────────────────────────┤
│ 10 │ Qos │ 设置服务质量 │
│ 11 │ Qos-Ok │ 确认 QoS 设置 │
│ 20 │ Consume │ 订阅消费 │
│ 21 │ Consume-Ok │ 确认订阅成功 │
│ 30 │ Cancel │ 取消订阅 │
│ 31 │ Cancel-Ok │ 确认取消成功 │
│ 40 │ Publish │ 发布消息 │
│ 50 │ Return │ 消息退回(路由失败) │
│ 60 │ Deliver │ 消息投递 │
│ 70 │ Get │ 主动获取消息 │
│ 71 │ Get-Ok │ 获取成功 │
│ 72 │ Get-Empty │ 队列为空 │
│ 80 │ Ack │ 确认消息 │
│ 90 │ Reject │ 拒绝消息 │
│ 100 │ Recover │ 恢复消息 │
│ 110 │ Recover-Ok │ 确认恢复 │
│ 120 │ Nack │ 否定确认 │
└──────────────┴───────────────┴──────────────────────────────────────┘2.5.2 Basic.Publish 参数
text
Basic.Publish 参数:
┌─────────────────────────────────────────────────────────────────────┐
│ 字段名 │ 类型 │ 说明 │
├───────────────────┼──────────┼────────────────────────────────────┤
│ reserved-1 │ short │ 保留字段 │
│ exchange │ shortstr │ 交换器名称 │
│ routing-key │ shortstr │ 路由键 │
│ mandatory │ bit │ 必须路由成功 │
│ immediate │ bit │ 立即投递(已废弃) │
└─────────────────────────────────────────────────────────────────────┘2.5.3 Basic.Consume 参数
text
Basic.Consume 参数:
┌─────────────────────────────────────────────────────────────────────┐
│ 字段名 │ 类型 │ 说明 │
├───────────────────┼──────────┼────────────────────────────────────┤
│ reserved-1 │ short │ 保留字段 │
│ queue │ shortstr │ 队列名称 │
│ consumer-tag │ shortstr │ 消费者标签 │
│ no-local │ bit │ 不接收自己发布的消息 │
│ no-ack │ bit │ 不需要确认 │
│ exclusive │ bit │ 独占消费 │
│ no-wait │ bit │ 是否等待响应 │
│ arguments │ table │ 额外参数 │
└─────────────────────────────────────────────────────────────────────┘2.5.4 Basic.Deliver 参数
text
Basic.Deliver 参数(服务端发送给消费者):
┌─────────────────────────────────────────────────────────────────────┐
│ 字段名 │ 类型 │ 说明 │
├───────────────────┼──────────┼────────────────────────────────────┤
│ consumer-tag │ shortstr │ 消费者标签 │
│ delivery-tag │ longlong │ 投递标签 │
│ redelivered │ bit │ 是否为重新投递 │
│ exchange │ shortstr │ 来源交换器 │
│ routing-key │ shortstr │ 路由键 │
└─────────────────────────────────────────────────────────────────────┘2.6 TX 类(类 ID: 90)
TX 类提供事务支持。
2.6.1 TX 方法列表
text
┌─────────────────────────────────────────────────────────────────────┐
│ TX 类方法 │
├──────────────┬───────────────┬──────────────────────────────────────┤
│ 方法 ID │ 方法名 │ 说明 │
├──────────────┼───────────────┼──────────────────────────────────────┤
│ 10 │ Select │ 开启事务模式 │
│ 11 │ Select-Ok │ 确认开启事务 │
│ 20 │ Commit │ 提交事务 │
│ 21 │ Commit-Ok │ 确认提交成功 │
│ 30 │ Rollback │ 回滚事务 │
│ 31 │ Rollback-Ok │ 确认回滚成功 │
└──────────────┴───────────────┴──────────────────────────────────────┘2.6.2 事务流程
text
事务使用流程:
客户端 服务端
│ │
│ ─────── TX.Select ───────────────────► │
│ │
│ ◄────── TX.Select-Ok ──────────────── │
│ │
│ ─────── Basic.Publish ───────────────► │
│ (消息 1) │
│ │
│ ─────── Basic.Publish ───────────────► │
│ (消息 2) │
│ │
│ ─────── TX.Commit ───────────────────► │
│ │
│ ◄────── TX.Commit-Ok ──────────────── │
│ │
│ 事务提交成功 │
│ │三、代码示例
3.1 PHP 完整示例
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;
class AMQPMethodsDemo
{
private $connection;
private $channel;
public function __construct()
{
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'guest',
'guest',
'/'
);
$this->channel = $this->connection->channel();
}
public function connectionMethods(): void
{
echo "=== Connection 类方法演示 ===\n";
echo "服务端属性:\n";
$serverProps = $this->connection->getServerProperties();
foreach ($serverProps as $key => $value) {
echo " {$key}: {$value}\n";
}
echo "连接状态: " . ($this->connection->isConnected() ? '已连接' : '未连接') . "\n";
}
public function channelMethods(): void
{
echo "\n=== Channel 类方法演示 ===\n";
$channelId = $this->channel->getChannelId();
echo "当前通道 ID: {$channelId}\n";
$channel2 = $this->connection->channel();
echo "新通道 ID: {$channel2->getChannelId()}\n";
$channel2->close();
echo "通道 2 已关闭\n";
}
public function exchangeMethods(): void
{
echo "\n=== Exchange 类方法演示 ===\n";
$this->channel->exchange_declare(
'demo.direct',
AMQPExchangeType::DIRECT,
false,
true,
false
);
echo "Exchange.Declare: demo.direct 创建成功\n";
$this->channel->exchange_declare(
'demo.topic',
AMQPExchangeType::TOPIC,
false,
true,
false
);
echo "Exchange.Declare: demo.topic 创建成功\n";
$this->channel->exchange_bind('demo.topic', 'demo.direct', 'test.#');
echo "Exchange.Bind: demo.direct -> demo.topic\n";
$this->channel->exchange_unbind('demo.topic', 'demo.direct', 'test.#');
echo "Exchange.Unbind: 解除绑定\n";
$this->channel->exchange_delete('demo.topic');
echo "Exchange.Delete: demo.topic 已删除\n";
}
public function queueMethods(): void
{
echo "\n=== Queue 类方法演示 ===\n";
[$queueName, $messageCount, $consumerCount] = $this->channel->queue_declare(
'demo.queue',
false,
true,
false,
false
);
echo "Queue.Declare: {$queueName} 创建成功\n";
echo " 消息数量: {$messageCount}\n";
echo " 消费者数量: {$consumerCount}\n";
$this->channel->queue_bind('demo.queue', 'demo.direct', 'test.key');
echo "Queue.Bind: demo.queue 绑定到 demo.direct\n";
$this->channel->queue_unbind('demo.queue', 'demo.direct', 'test.key');
echo "Queue.Unbind: 解除绑定\n";
$this->channel->queue_delete('demo.queue');
echo "Queue.Delete: demo.queue 已删除\n";
}
public function basicMethods(): void
{
echo "\n=== Basic 类方法演示 ===\n";
$this->channel->exchange_declare('demo.exchange', AMQPExchangeType::DIRECT, false, true, false);
$this->channel->queue_declare('demo.test', false, true, false, false);
$this->channel->queue_bind('demo.test', 'demo.exchange', 'test');
$this->channel->basic_qos(0, 10, false);
echo "Basic.Qos: 设置预取数量为 10\n";
for ($i = 1; $i <= 3; $i++) {
$message = new AMQPMessage(
json_encode(['id' => $i, 'message' => "测试消息 {$i}"]),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'message_id' => "msg-{$i}",
'timestamp' => time(),
]
);
$this->channel->basic_publish($message, 'demo.exchange', 'test');
echo "Basic.Publish: 消息 {$i} 已发布\n";
}
$message = $this->channel->basic_get('demo.test', true);
if ($message) {
echo "Basic.Get: 获取到消息 - " . $message->body . "\n";
}
$callback = function ($msg) {
echo "Basic.Deliver: 收到消息 - " . $msg->body . "\n";
$msg->ack();
echo "Basic.Ack: 消息已确认\n";
};
$consumerTag = $this->channel->basic_consume(
'demo.test',
'',
false,
false,
false,
false,
$callback
);
echo "Basic.Consume: 开始消费,消费者标签: {$consumerTag}\n";
$timeout = 5;
while ($this->channel->is_consuming() && $timeout > 0) {
$this->channel->wait(null, false, 1);
$timeout--;
}
$this->channel->basic_cancel($consumerTag);
echo "Basic.Cancel: 取消消费\n";
$this->channel->queue_purge('demo.test');
echo "Queue.Purge: 队列已清空\n";
$this->channel->queue_delete('demo.test');
$this->channel->exchange_delete('demo.exchange');
}
public function txMethods(): void
{
echo "\n=== TX 类方法演示 ===\n";
$txChannel = $this->connection->channel();
$txChannel->tx_select();
echo "TX.Select: 开启事务模式\n";
$txChannel->exchange_declare('tx.exchange', AMQPExchangeType::DIRECT, false, true, false);
$txChannel->queue_declare('tx.queue', false, true, false, false);
$txChannel->queue_bind('tx.queue', 'tx.exchange', 'tx');
$message = new AMQPMessage('事务消息', [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);
$txChannel->basic_publish($message, 'tx.exchange', 'tx');
echo "Basic.Publish: 发布事务消息\n";
$txChannel->tx_commit();
echo "TX.Commit: 事务提交成功\n";
$txChannel->tx_select();
$txChannel->basic_publish(new AMQPMessage('回滚消息'), 'tx.exchange', 'tx');
echo "Basic.Publish: 发布将被回滚的消息\n";
$txChannel->tx_rollback();
echo "TX.Rollback: 事务回滚成功\n";
$txChannel->queue_delete('tx.queue');
$txChannel->exchange_delete('tx.exchange');
$txChannel->close();
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
echo "\n连接已关闭\n";
}
}
$demo = new AMQPMethodsDemo();
$demo->connectionMethods();
$demo->channelMethods();
$demo->exchangeMethods();
$demo->queueMethods();
$demo->basicMethods();
$demo->txMethods();
$demo->close();3.2 消息确认示例
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class AckDemo
{
private $connection;
private $channel;
public function __construct()
{
$this->connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$this->channel = $this->connection->channel();
}
public function demonstrateAckNackReject(): void
{
$this->channel->queue_declare('ack.demo', false, true, false, false);
$message = new AMQPMessage('测试消息', [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);
$this->channel->basic_publish($message, '', 'ack.demo');
$callback = function ($msg) {
$body = $msg->body;
$deliveryTag = $msg->getDeliveryTag();
echo "收到消息: {$body}\n";
echo "Delivery Tag: {$deliveryTag}\n";
try {
if (strpos($body, 'ack') !== false) {
$msg->ack();
echo "Basic.Ack: 消息已确认\n";
} elseif (strpos($body, 'reject') !== false) {
$msg->reject(false);
echo "Basic.Reject: 消息已拒绝(不重新入队)\n";
} elseif (strpos($body, 'requeue') !== false) {
$msg->reject(true);
echo "Basic.Reject: 消息已拒绝(重新入队)\n";
} else {
$msg->nack(true, false);
echo "Basic.Nack: 消息否定确认\n";
}
} catch (Exception $e) {
echo "处理错误: " . $e->getMessage() . "\n";
}
};
$this->channel->basic_consume('ack.demo', '', false, false, false, false, $callback);
$timeout = 3;
while ($this->channel->is_consuming() && $timeout > 0) {
$this->channel->wait(null, false, 1);
$timeout--;
}
$this->channel->queue_delete('ack.demo');
}
public function demonstrateRecover(): void
{
$this->channel->queue_declare('recover.demo', false, true, false, false);
for ($i = 1; $i <= 3; $i++) {
$this->channel->basic_publish(
new AMQPMessage("消息 {$i}"),
'',
'recover.demo'
);
}
$this->channel->basic_recover(true);
echo "Basic.Recover: 请求重新投递未确认的消息\n";
$this->channel->queue_delete('recover.demo');
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}
$demo = new AckDemo();
$demo->demonstrateAckNackReject();
$demo->demonstrateRecover();
$demo->close();3.3 获取消息示例
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('get.demo', false, true, false, false);
for ($i = 1; $i <= 5; $i++) {
$channel->basic_publish(
new AMQPMessage("消息 #{$i}"),
'',
'get.demo'
);
}
echo "使用 Basic.Get 主动获取消息:\n";
while (true) {
$message = $channel->basic_get('get.demo', true);
if (!$message) {
echo "队列为空\n";
break;
}
echo "获取到消息: " . $message->body . "\n";
echo " Delivery Tag: " . $message->getDeliveryTag() . "\n";
echo " Exchange: " . $message->getExchange() . "\n";
echo " Routing Key: " . $message->getRoutingKey() . "\n";
echo " Redelivered: " . ($message->isRedelivered() ? '是' : '否') . "\n";
}
$channel->queue_delete('get.demo');
$channel->close();
$connection->close();四、实际应用场景
4.1 RPC 模式
text
使用 AMQP 方法实现 RPC:
┌─────────────┐ ┌─────────────┐
│ Client │ │ Server │
│ (请求方) │ │ (响应方) │
└──────┬──────┘ └──────┬──────┘
│ │
│ 1. Queue.Declare (reply-queue) │
│ ◄─────────────────────────────────────────────────────│
│ │
│ 2. Basic.Publish │
│ - reply_to: reply-queue │
│ - correlation_id: unique-id │
│ ──────────────────────────────────────────────────────►│
│ │
│ 3. Basic.Consume (reply-queue) │
│ ◄─────────────────────────────────────────────────────│
│ │
│ 4. Basic.Consume │
│ ◄─────────────────────│
│ │
│ 5. 处理请求 │
│ ◄─────────────────────│
│ │
│ 6. Basic.Publish │
│ (to reply-queue) │
│ ◄─────────────────────│
│ │
│ 7. Basic.Deliver │
│ ◄─────────────────────────────────────────────────────│
│ - correlation_id 匹配 │
│ │4.2 消息确认模式对比
text
┌─────────────────────────────────────────────────────────────────────┐
│ 消息确认模式对比 │
├───────────────┬─────────────────────────────────────────────────────┤
│ 模式 │ 说明 │
├───────────────┼─────────────────────────────────────────────────────┤
│ auto-ack │ no-ack=true,消息投递后立即确认 │
│ │ 优点:性能高 │
│ │ 缺点:消息可能丢失 │
├───────────────┼─────────────────────────────────────────────────────┤
│ manual-ack │ no-ack=false,消费者手动确认 │
│ │ Basic.Ack: 确认成功处理 │
│ │ Basic.Nack: 批量否定确认 │
│ │ Basic.Reject: 单条否定确认 │
├───────────────┼─────────────────────────────────────────────────────┤
│ transaction │ TX.Select + TX.Commit/Rollback │
│ │ 优点:原子性保证 │
│ │ 缺点:性能较差,不推荐使用 │
├───────────────┼─────────────────────────────────────────────────────┤
│ confirm │ 发布确认模式 │
│ │ 优点:性能与可靠性平衡 │
│ │ 推荐:生产环境首选 │
└───────────────┴─────────────────────────────────────────────────────┘五、常见问题与解决方案
5.1 方法调用失败
问题描述: 方法调用返回错误或异常。
常见错误码:
text
┌──────────────┬─────────────────────────────────────────────────────┐
│ 错误码 │ 说明 │
├──────────────┼─────────────────────────────────────────────────────┤
│ 311 │ CONTENT_TOO_LARGE - 内容过大 │
│ 312 │ NO_ROUTE - 无法路由 │
│ 313 │ NO_CONSUMERS - 没有消费者 │
│ 403 │ ACCESS_REFUSED - 访问被拒绝 │
│ 404 │ NOT_FOUND - 资源不存在 │
│ 405 │ RESOURCE_LOCKED - 资源被锁定 │
│ 406 │ PRECONDITION_FAILED - 前置条件失败 │
│ 501 │ FRAME_ERROR - 帧错误 │
│ 502 │ SYNTAX_ERROR - 语法错误 │
│ 503 │ COMMAND_INVALID - 命令无效 │
│ 504 │ CHANNEL_ERROR - 通道错误 │
│ 505 │ UNEXPECTED_FRAME - 意外的帧 │
│ 506 │ RESOURCE_ERROR - 资源错误 │
│ 530 │ NOT_ALLOWED - 不允许的操作 │
│ 540 │ NOT_IMPLEMENTED - 未实现 │
│ 541 │ INTERNAL_ERROR - 内部错误 │
└──────────────┴─────────────────────────────────────────────────────┘5.2 通道意外关闭
问题描述: 通道在使用过程中意外关闭。
解决方案:
php
<?php
$channel->set_close_handler(function ($replyCode, $replyText) {
echo "通道关闭: [{$replyCode}] {$replyText}\n";
});六、最佳实践建议
6.1 方法使用建议
text
最佳实践:
├── 资源声明:启动时预先声明所需资源
├── 错误处理:捕获并处理所有可能的异常
├── 资源清理:使用完毕后正确关闭和删除资源
├── 幂等操作:声明操作是幂等的,可重复执行
└── 参数验证:确保参数符合规范6.2 性能优化建议
text
性能建议:
├── 批量操作:减少方法调用次数
├── 异步确认:使用确认模式而非事务
├── 合理 QoS:设置合适的预取数量
├── 连接复用:使用 Channel 多路复用
└── 避免轮询:使用 Consume 而非 Get