Skip to content

AMQP 方法类

一、概述

AMQP 协议定义了一系列方法(Methods),用于客户端与 Broker 之间的交互。方法按功能分组为不同的类(Class),每个类包含多个相关的方法。

1.1 方法类概览

text
┌─────────────────────────────────────────────────────────────────────┐
│                      AMQP 方法类概览                                 │
├──────────────┬──────────────────────────────────────────────────────┤
│   类 ID      │                    类名称                            │
├──────────────┼──────────────────────────────────────────────────────┤
│     10       │  Connection - 连接管理                               │
│     20       │  Channel - 通道管理                                  │
│     30       │  Access - 访问控制(已废弃)                         │
│     40       │  Exchange - 交换器管理                               │
│     50       │  Queue - 队列管理                                    │
│     60       │  Basic - 基本消息操作                                │
│     90       │  TX - 事务                                           │
└──────────────┴──────────────────────────────────────────────────────┘

1.2 方法命名规范

text
方法命名格式:Class.Method

示例:
├── Connection.Start - 连接开始
├── Channel.Open - 打开通道
├── Exchange.Declare - 声明交换器
├── Queue.Bind - 绑定队列
├── Basic.Publish - 发布消息
└── Basic.Consume - 消费消息

二、核心知识点

2.1 Connection 类(类 ID: 10)

Connection 类用于管理客户端与 Broker 之间的连接。

2.1.1 Connection 方法列表

text
┌─────────────────────────────────────────────────────────────────────┐
│                    Connection 类方法                                 │
├──────────────┬───────────────┬──────────────────────────────────────┤
│   方法 ID    │    方法名      │                说明                  │
├──────────────┼───────────────┼──────────────────────────────────────┤
│     10       │    Start      │ 服务端发起,开始连接协商             │
│     11       │   Start-Ok    │ 客户端响应,选择认证机制             │
│     20       │    Secure     │ 服务端发起,SASL 挑战               │
│     21       │   Secure-Ok   │ 客户端响应,SASL 响应               │
│     30       │     Tune      │ 服务端发起,协商参数                 │
│     31       │    Tune-Ok    │ 客户端响应,接受参数                 │
│     40       │     Open      │ 客户端发起,打开虚拟主机             │
│     41       │    Open-Ok    │ 服务端响应,连接就绪                 │
│     50       │    Close      │ 发起关闭连接                        │
│     51       │   Close-Ok    │ 确认关闭连接                        │
│     60       │   Blocked     │ 服务端通知,连接被阻塞               │
│     61       │   Unblocked   │ 服务端通知,连接解除阻塞             │
└──────────────┴───────────────┴──────────────────────────────────────┘

2.1.2 连接建立流程

text
连接建立完整流程:

客户端                                    服务端
   │                                        │
   │ ─────── Protocol Header ─────────────► │
   │         "AMQP" + Version               │
   │                                        │
   │ ◄────── Connection.Start ────────────  │
   │         (version, mechanisms, locales) │
   │                                        │
   │ ─────── Connection.Start-Ok ─────────► │
   │         (mechanism, response, locale)  │
   │                                        │
   │ ◄────── Connection.Tune ─────────────  │
   │         (channel-max, frame-max,       │
   │          heartbeat)                    │
   │                                        │
   │ ─────── Connection.Tune-Ok ──────────► │
   │         (channel-max, frame-max,       │
   │          heartbeat)                    │
   │                                        │
   │ ─────── Connection.Open ─────────────► │
   │         (virtual-host)                 │
   │                                        │
   │ ◄────── Connection.Open-Ok ──────────  │
   │                                        │
   │            连接建立完成                 │
   │                                        │

2.1.3 Connection.Start 参数

text
Connection.Start 参数:

┌─────────────────────────────────────────────────────────────────────┐
│  字段名            │ 类型      │ 说明                               │
├───────────────────┼──────────┼────────────────────────────────────┤
│  version-major    │ octet    │ 协议主版本号                       │
│  version-minor    │ octet    │ 协议次版本号                       │
│  server-properties│ table    │ 服务端属性                         │
│  mechanisms       │ longstr  │ 支持的认证机制                     │
│  locales          │ longstr  │ 支持的语言区域                     │
└─────────────────────────────────────────────────────────────────────┘

server-properties 示例:
{
    "product": "RabbitMQ",
    "version": "3.12.0",
    "platform": "Erlang/OTP 25",
    "copyright": "Copyright (c) 2007-2023 VMware, Inc.",
    "information": "Licensed under the MPL 2.0."
}

2.2 Channel 类(类 ID: 20)

Channel 类用于管理通道,通道是连接内的轻量级连接。

2.2.1 Channel 方法列表

text
┌─────────────────────────────────────────────────────────────────────┐
│                      Channel 类方法                                  │
├──────────────┬───────────────┬──────────────────────────────────────┤
│   方法 ID    │    方法名      │                说明                  │
├──────────────┼───────────────┼──────────────────────────────────────┤
│     10       │     Open      │ 客户端发起,打开通道                 │
│     11       │    Open-Ok    │ 服务端响应,通道就绪                 │
│     20       │     Flow      │ 发起流量控制                         │
│     21       │    Flow-Ok    │ 确认流量控制                         │
│     40       │     Close     │ 发起关闭通道                         │
│     41       │    Close-Ok   │ 确认关闭通道                         │
└──────────────┴───────────────┴──────────────────────────────────────┘

2.2.2 通道生命周期

text
通道生命周期:

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   打开通道   │ ──► │   使用通道   │ ──► │   关闭通道   │
│  Open       │     │  操作消息    │     │   Close     │
└─────────────┘     └─────────────┘     └─────────────┘
       │                   │                   │
       ▼                   ▼                   ▼
 Channel.Open       Basic.Publish      Channel.Close
 Channel.Open-Ok    Basic.Consume      Channel.Close-Ok
                    Basic.Ack

2.3 Exchange 类(类 ID: 40)

Exchange 类用于管理交换器。

2.3.1 Exchange 方法列表

text
┌─────────────────────────────────────────────────────────────────────┐
│                      Exchange 类方法                                 │
├──────────────┬───────────────┬──────────────────────────────────────┤
│   方法 ID    │    方法名      │                说明                  │
├──────────────┼───────────────┼──────────────────────────────────────┤
│     10       │   Declare     │ 声明交换器                           │
│     11       │  Declare-Ok   │ 确认声明成功                         │
│     20       │   Delete      │ 删除交换器                           │
│     21       │   Delete-Ok   │ 确认删除成功                         │
│     30       │    Bind       │ 绑定交换器到交换器                   │
│     31       │   Bind-Ok     │ 确认绑定成功                         │
│     40       │   Unbind      │ 解绑交换器                           │
│     41       │  Unbind-Ok    │ 确认解绑成功                         │
└──────────────┴───────────────┴──────────────────────────────────────┘

2.3.2 Exchange.Declare 参数

text
Exchange.Declare 参数:

┌─────────────────────────────────────────────────────────────────────┐
│  字段名            │ 类型      │ 说明                               │
├───────────────────┼──────────┼────────────────────────────────────┤
│  reserved-1       │ short    │ 保留字段                           │
│  exchange         │ shortstr │ 交换器名称                         │
│  type             │ shortstr │ 交换器类型                         │
│  passive          │ bit      │ 被动模式,仅检查是否存在           │
│  durable          │ bit      │ 是否持久化                         │
│  auto-delete      │ bit      │ 是否自动删除                       │
│  internal         │ bit      │ 是否为内部交换器                   │
│  no-wait          │ bit      │ 是否等待响应                       │
│  arguments        │ table    │ 额外参数                           │
└─────────────────────────────────────────────────────────────────────┘

2.4 Queue 类(类 ID: 50)

Queue 类用于管理队列。

2.4.1 Queue 方法列表

text
┌─────────────────────────────────────────────────────────────────────┐
│                       Queue 类方法                                   │
├──────────────┬───────────────┬──────────────────────────────────────┤
│   方法 ID    │    方法名      │                说明                  │
├──────────────┼───────────────┼──────────────────────────────────────┤
│     10       │   Declare     │ 声明队列                             │
│     11       │  Declare-Ok   │ 确认声明成功                         │
│     20       │    Bind       │ 绑定队列到交换器                     │
│     21       │   Bind-Ok     │ 确认绑定成功                         │
│     30       │   Purge       │ 清空队列                             │
│     31       │   Purge-Ok    │ 确认清空成功                         │
│     40       │   Delete      │ 删除队列                             │
│     41       │   Delete-Ok   │ 确认删除成功                         │
│     50       │   Unbind      │ 解绑队列                             │
│     51       │  Unbind-Ok    │ 确认解绑成功                         │
└──────────────┴───────────────┴──────────────────────────────────────┘

2.4.2 Queue.Declare 参数

text
Queue.Declare 参数:

┌─────────────────────────────────────────────────────────────────────┐
│  字段名            │ 类型      │ 说明                               │
├───────────────────┼──────────┼────────────────────────────────────┤
│  reserved-1       │ short    │ 保留字段                           │
│  queue            │ shortstr │ 队列名称                           │
│  passive          │ bit      │ 被动模式,仅检查是否存在           │
│  durable          │ bit      │ 是否持久化                         │
│  exclusive        │ bit      │ 是否排他                           │
│  auto-delete      │ bit      │ 是否自动删除                       │
│  no-wait          │ bit      │ 是否等待响应                       │
│  arguments        │ table    │ 额外参数                           │
└─────────────────────────────────────────────────────────────────────┘

Queue.Declare-Ok 响应:
├── queue: 实际队列名称
├── message-count: 队列中消息数量
└── consumer-count: 消费者数量

2.5 Basic 类(类 ID: 60)

Basic 类是最重要的类,包含消息发布和消费的方法。

2.5.1 Basic 方法列表

text
┌─────────────────────────────────────────────────────────────────────┐
│                       Basic 类方法                                   │
├──────────────┬───────────────┬──────────────────────────────────────┤
│   方法 ID    │    方法名      │                说明                  │
├──────────────┼───────────────┼──────────────────────────────────────┤
│     10       │     Qos       │ 设置服务质量                         │
│     11       │    Qos-Ok     │ 确认 QoS 设置                        │
│     20       │   Consume     │ 订阅消费                             │
│     21       │  Consume-Ok   │ 确认订阅成功                         │
│     30       │   Cancel      │ 取消订阅                             │
│     31       │  Cancel-Ok    │ 确认取消成功                         │
│     40       │   Publish     │ 发布消息                             │
│     50       │   Return      │ 消息退回(路由失败)                 │
│     60       │   Deliver     │ 消息投递                             │
│     70       │     Get       │ 主动获取消息                         │
│     71       │   Get-Ok      │ 获取成功                             │
│     72       │   Get-Empty   │ 队列为空                             │
│     80       │     Ack       │ 确认消息                             │
│     90       │   Reject      │ 拒绝消息                             │
│    100       │   Recover     │ 恢复消息                             │
│    110       │  Recover-Ok   │ 确认恢复                             │
│    120       │     Nack      │ 否定确认                             │
└──────────────┴───────────────┴──────────────────────────────────────┘

2.5.2 Basic.Publish 参数

text
Basic.Publish 参数:

┌─────────────────────────────────────────────────────────────────────┐
│  字段名            │ 类型      │ 说明                               │
├───────────────────┼──────────┼────────────────────────────────────┤
│  reserved-1       │ short    │ 保留字段                           │
│  exchange         │ shortstr │ 交换器名称                         │
│  routing-key      │ shortstr │ 路由键                             │
│  mandatory        │ bit      │ 必须路由成功                       │
│  immediate        │ bit      │ 立即投递(已废弃)                 │
└─────────────────────────────────────────────────────────────────────┘

2.5.3 Basic.Consume 参数

text
Basic.Consume 参数:

┌─────────────────────────────────────────────────────────────────────┐
│  字段名            │ 类型      │ 说明                               │
├───────────────────┼──────────┼────────────────────────────────────┤
│  reserved-1       │ short    │ 保留字段                           │
│  queue            │ shortstr │ 队列名称                           │
│  consumer-tag     │ shortstr │ 消费者标签                         │
│  no-local         │ bit      │ 不接收自己发布的消息               │
│  no-ack           │ bit      │ 不需要确认                         │
│  exclusive        │ bit      │ 独占消费                           │
│  no-wait          │ bit      │ 是否等待响应                       │
│  arguments        │ table    │ 额外参数                           │
└─────────────────────────────────────────────────────────────────────┘

2.5.4 Basic.Deliver 参数

text
Basic.Deliver 参数(服务端发送给消费者):

┌─────────────────────────────────────────────────────────────────────┐
│  字段名            │ 类型      │ 说明                               │
├───────────────────┼──────────┼────────────────────────────────────┤
│  consumer-tag     │ shortstr │ 消费者标签                         │
│  delivery-tag     │ longlong │ 投递标签                           │
│  redelivered      │ bit      │ 是否为重新投递                     │
│  exchange         │ shortstr │ 来源交换器                         │
│  routing-key      │ shortstr │ 路由键                             │
└─────────────────────────────────────────────────────────────────────┘

2.6 TX 类(类 ID: 90)

TX 类提供事务支持。

2.6.1 TX 方法列表

text
┌─────────────────────────────────────────────────────────────────────┐
│                        TX 类方法                                     │
├──────────────┬───────────────┬──────────────────────────────────────┤
│   方法 ID    │    方法名      │                说明                  │
├──────────────┼───────────────┼──────────────────────────────────────┤
│     10       │    Select     │ 开启事务模式                         │
│     11       │   Select-Ok   │ 确认开启事务                         │
│     20       │    Commit     │ 提交事务                             │
│     21       │   Commit-Ok   │ 确认提交成功                         │
│     30       │   Rollback    │ 回滚事务                             │
│     31       │  Rollback-Ok  │ 确认回滚成功                         │
└──────────────┴───────────────┴──────────────────────────────────────┘

2.6.2 事务流程

text
事务使用流程:

客户端                                    服务端
   │                                        │
   │ ─────── TX.Select ───────────────────► │
   │                                        │
   │ ◄────── TX.Select-Ok ────────────────  │
   │                                        │
   │ ─────── Basic.Publish ───────────────► │
   │        (消息 1)                         │
   │                                        │
   │ ─────── Basic.Publish ───────────────► │
   │        (消息 2)                         │
   │                                        │
   │ ─────── TX.Commit ───────────────────► │
   │                                        │
   │ ◄────── TX.Commit-Ok ────────────────  │
   │                                        │
   │           事务提交成功                  │
   │                                        │

三、代码示例

3.1 PHP 完整示例

php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;

class AMQPMethodsDemo
{
    private $connection;
    private $channel;

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest',
            '/'
        );
        $this->channel = $this->connection->channel();
    }

    public function connectionMethods(): void
    {
        echo "=== Connection 类方法演示 ===\n";
        
        echo "服务端属性:\n";
        $serverProps = $this->connection->getServerProperties();
        foreach ($serverProps as $key => $value) {
            echo "  {$key}: {$value}\n";
        }
        
        echo "连接状态: " . ($this->connection->isConnected() ? '已连接' : '未连接') . "\n";
    }

    public function channelMethods(): void
    {
        echo "\n=== Channel 类方法演示 ===\n";
        
        $channelId = $this->channel->getChannelId();
        echo "当前通道 ID: {$channelId}\n";
        
        $channel2 = $this->connection->channel();
        echo "新通道 ID: {$channel2->getChannelId()}\n";
        
        $channel2->close();
        echo "通道 2 已关闭\n";
    }

    public function exchangeMethods(): void
    {
        echo "\n=== Exchange 类方法演示 ===\n";
        
        $this->channel->exchange_declare(
            'demo.direct',
            AMQPExchangeType::DIRECT,
            false,
            true,
            false
        );
        echo "Exchange.Declare: demo.direct 创建成功\n";
        
        $this->channel->exchange_declare(
            'demo.topic',
            AMQPExchangeType::TOPIC,
            false,
            true,
            false
        );
        echo "Exchange.Declare: demo.topic 创建成功\n";
        
        $this->channel->exchange_bind('demo.topic', 'demo.direct', 'test.#');
        echo "Exchange.Bind: demo.direct -> demo.topic\n";
        
        $this->channel->exchange_unbind('demo.topic', 'demo.direct', 'test.#');
        echo "Exchange.Unbind: 解除绑定\n";
        
        $this->channel->exchange_delete('demo.topic');
        echo "Exchange.Delete: demo.topic 已删除\n";
    }

    public function queueMethods(): void
    {
        echo "\n=== Queue 类方法演示 ===\n";
        
        [$queueName, $messageCount, $consumerCount] = $this->channel->queue_declare(
            'demo.queue',
            false,
            true,
            false,
            false
        );
        echo "Queue.Declare: {$queueName} 创建成功\n";
        echo "  消息数量: {$messageCount}\n";
        echo "  消费者数量: {$consumerCount}\n";
        
        $this->channel->queue_bind('demo.queue', 'demo.direct', 'test.key');
        echo "Queue.Bind: demo.queue 绑定到 demo.direct\n";
        
        $this->channel->queue_unbind('demo.queue', 'demo.direct', 'test.key');
        echo "Queue.Unbind: 解除绑定\n";
        
        $this->channel->queue_delete('demo.queue');
        echo "Queue.Delete: demo.queue 已删除\n";
    }

    public function basicMethods(): void
    {
        echo "\n=== Basic 类方法演示 ===\n";
        
        $this->channel->exchange_declare('demo.exchange', AMQPExchangeType::DIRECT, false, true, false);
        $this->channel->queue_declare('demo.test', false, true, false, false);
        $this->channel->queue_bind('demo.test', 'demo.exchange', 'test');
        
        $this->channel->basic_qos(0, 10, false);
        echo "Basic.Qos: 设置预取数量为 10\n";
        
        for ($i = 1; $i <= 3; $i++) {
            $message = new AMQPMessage(
                json_encode(['id' => $i, 'message' => "测试消息 {$i}"]),
                [
                    'content_type' => 'application/json',
                    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                    'message_id' => "msg-{$i}",
                    'timestamp' => time(),
                ]
            );
            $this->channel->basic_publish($message, 'demo.exchange', 'test');
            echo "Basic.Publish: 消息 {$i} 已发布\n";
        }
        
        $message = $this->channel->basic_get('demo.test', true);
        if ($message) {
            echo "Basic.Get: 获取到消息 - " . $message->body . "\n";
        }
        
        $callback = function ($msg) {
            echo "Basic.Deliver: 收到消息 - " . $msg->body . "\n";
            $msg->ack();
            echo "Basic.Ack: 消息已确认\n";
        };
        
        $consumerTag = $this->channel->basic_consume(
            'demo.test',
            '',
            false,
            false,
            false,
            false,
            $callback
        );
        echo "Basic.Consume: 开始消费,消费者标签: {$consumerTag}\n";
        
        $timeout = 5;
        while ($this->channel->is_consuming() && $timeout > 0) {
            $this->channel->wait(null, false, 1);
            $timeout--;
        }
        
        $this->channel->basic_cancel($consumerTag);
        echo "Basic.Cancel: 取消消费\n";
        
        $this->channel->queue_purge('demo.test');
        echo "Queue.Purge: 队列已清空\n";
        
        $this->channel->queue_delete('demo.test');
        $this->channel->exchange_delete('demo.exchange');
    }

    public function txMethods(): void
    {
        echo "\n=== TX 类方法演示 ===\n";
        
        $txChannel = $this->connection->channel();
        
        $txChannel->tx_select();
        echo "TX.Select: 开启事务模式\n";
        
        $txChannel->exchange_declare('tx.exchange', AMQPExchangeType::DIRECT, false, true, false);
        $txChannel->queue_declare('tx.queue', false, true, false, false);
        $txChannel->queue_bind('tx.queue', 'tx.exchange', 'tx');
        
        $message = new AMQPMessage('事务消息', [
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
        ]);
        $txChannel->basic_publish($message, 'tx.exchange', 'tx');
        echo "Basic.Publish: 发布事务消息\n";
        
        $txChannel->tx_commit();
        echo "TX.Commit: 事务提交成功\n";
        
        $txChannel->tx_select();
        $txChannel->basic_publish(new AMQPMessage('回滚消息'), 'tx.exchange', 'tx');
        echo "Basic.Publish: 发布将被回滚的消息\n";
        
        $txChannel->tx_rollback();
        echo "TX.Rollback: 事务回滚成功\n";
        
        $txChannel->queue_delete('tx.queue');
        $txChannel->exchange_delete('tx.exchange');
        $txChannel->close();
    }

    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
        echo "\n连接已关闭\n";
    }
}

$demo = new AMQPMethodsDemo();
$demo->connectionMethods();
$demo->channelMethods();
$demo->exchangeMethods();
$demo->queueMethods();
$demo->basicMethods();
$demo->txMethods();
$demo->close();

3.2 消息确认示例

php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class AckDemo
{
    private $connection;
    private $channel;

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $this->channel = $this->connection->channel();
    }

    public function demonstrateAckNackReject(): void
    {
        $this->channel->queue_declare('ack.demo', false, true, false, false);
        
        $message = new AMQPMessage('测试消息', [
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
        ]);
        $this->channel->basic_publish($message, '', 'ack.demo');
        
        $callback = function ($msg) {
            $body = $msg->body;
            $deliveryTag = $msg->getDeliveryTag();
            
            echo "收到消息: {$body}\n";
            echo "Delivery Tag: {$deliveryTag}\n";
            
            try {
                if (strpos($body, 'ack') !== false) {
                    $msg->ack();
                    echo "Basic.Ack: 消息已确认\n";
                } elseif (strpos($body, 'reject') !== false) {
                    $msg->reject(false);
                    echo "Basic.Reject: 消息已拒绝(不重新入队)\n";
                } elseif (strpos($body, 'requeue') !== false) {
                    $msg->reject(true);
                    echo "Basic.Reject: 消息已拒绝(重新入队)\n";
                } else {
                    $msg->nack(true, false);
                    echo "Basic.Nack: 消息否定确认\n";
                }
            } catch (Exception $e) {
                echo "处理错误: " . $e->getMessage() . "\n";
            }
        };
        
        $this->channel->basic_consume('ack.demo', '', false, false, false, false, $callback);
        
        $timeout = 3;
        while ($this->channel->is_consuming() && $timeout > 0) {
            $this->channel->wait(null, false, 1);
            $timeout--;
        }
        
        $this->channel->queue_delete('ack.demo');
    }

    public function demonstrateRecover(): void
    {
        $this->channel->queue_declare('recover.demo', false, true, false, false);
        
        for ($i = 1; $i <= 3; $i++) {
            $this->channel->basic_publish(
                new AMQPMessage("消息 {$i}"),
                '',
                'recover.demo'
            );
        }
        
        $this->channel->basic_recover(true);
        echo "Basic.Recover: 请求重新投递未确认的消息\n";
        
        $this->channel->queue_delete('recover.demo');
    }

    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

$demo = new AckDemo();
$demo->demonstrateAckNackReject();
$demo->demonstrateRecover();
$demo->close();

3.3 获取消息示例

php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('get.demo', false, true, false, false);

for ($i = 1; $i <= 5; $i++) {
    $channel->basic_publish(
        new AMQPMessage("消息 #{$i}"),
        '',
        'get.demo'
    );
}

echo "使用 Basic.Get 主动获取消息:\n";

while (true) {
    $message = $channel->basic_get('get.demo', true);
    
    if (!$message) {
        echo "队列为空\n";
        break;
    }
    
    echo "获取到消息: " . $message->body . "\n";
    echo "  Delivery Tag: " . $message->getDeliveryTag() . "\n";
    echo "  Exchange: " . $message->getExchange() . "\n";
    echo "  Routing Key: " . $message->getRoutingKey() . "\n";
    echo "  Redelivered: " . ($message->isRedelivered() ? '是' : '否') . "\n";
}

$channel->queue_delete('get.demo');
$channel->close();
$connection->close();

四、实际应用场景

4.1 RPC 模式

text
使用 AMQP 方法实现 RPC:

┌─────────────┐                                           ┌─────────────┐
│   Client    │                                           │   Server    │
│  (请求方)    │                                           │  (响应方)    │
└──────┬──────┘                                           └──────┬──────┘
       │                                                         │
       │  1. Queue.Declare (reply-queue)                        │
       │ ◄─────────────────────────────────────────────────────│
       │                                                         │
       │  2. Basic.Publish                                       │
       │     - reply_to: reply-queue                             │
       │     - correlation_id: unique-id                         │
       │ ──────────────────────────────────────────────────────►│
       │                                                         │
       │  3. Basic.Consume (reply-queue)                         │
       │ ◄─────────────────────────────────────────────────────│
       │                                                         │
       │                                    4. Basic.Consume     │
       │                                   ◄─────────────────────│
       │                                                         │
       │                                    5. 处理请求           │
       │                                   ◄─────────────────────│
       │                                                         │
       │                                    6. Basic.Publish      │
       │                                      (to reply-queue)    │
       │                                   ◄─────────────────────│
       │                                                         │
       │  7. Basic.Deliver                                       │
       │ ◄─────────────────────────────────────────────────────│
       │     - correlation_id 匹配                               │
       │                                                         │

4.2 消息确认模式对比

text
┌─────────────────────────────────────────────────────────────────────┐
│                     消息确认模式对比                                 │
├───────────────┬─────────────────────────────────────────────────────┤
│     模式      │                    说明                             │
├───────────────┼─────────────────────────────────────────────────────┤
│  auto-ack     │ no-ack=true,消息投递后立即确认                      │
│               │ 优点:性能高                                        │
│               │ 缺点:消息可能丢失                                  │
├───────────────┼─────────────────────────────────────────────────────┤
│  manual-ack   │ no-ack=false,消费者手动确认                        │
│               │ Basic.Ack: 确认成功处理                             │
│               │ Basic.Nack: 批量否定确认                            │
│               │ Basic.Reject: 单条否定确认                          │
├───────────────┼─────────────────────────────────────────────────────┤
│  transaction  │ TX.Select + TX.Commit/Rollback                      │
│               │ 优点:原子性保证                                    │
│               │ 缺点:性能较差,不推荐使用                          │
├───────────────┼─────────────────────────────────────────────────────┤
│  confirm      │ 发布确认模式                                        │
│               │ 优点:性能与可靠性平衡                              │
│               │ 推荐:生产环境首选                                  │
└───────────────┴─────────────────────────────────────────────────────┘

五、常见问题与解决方案

5.1 方法调用失败

问题描述: 方法调用返回错误或异常。

常见错误码

text
┌──────────────┬─────────────────────────────────────────────────────┐
│   错误码     │                    说明                             │
├──────────────┼─────────────────────────────────────────────────────┤
│    311       │ CONTENT_TOO_LARGE - 内容过大                        │
│    312       │ NO_ROUTE - 无法路由                                 │
│    313       │ NO_CONSUMERS - 没有消费者                           │
│    403       │ ACCESS_REFUSED - 访问被拒绝                         │
│    404       │ NOT_FOUND - 资源不存在                              │
│    405       │ RESOURCE_LOCKED - 资源被锁定                        │
│    406       │ PRECONDITION_FAILED - 前置条件失败                  │
│    501       │ FRAME_ERROR - 帧错误                                │
│    502       │ SYNTAX_ERROR - 语法错误                             │
│    503       │ COMMAND_INVALID - 命令无效                          │
│    504       │ CHANNEL_ERROR - 通道错误                            │
│    505       │ UNEXPECTED_FRAME - 意外的帧                         │
│    506       │ RESOURCE_ERROR - 资源错误                           │
│    530       │ NOT_ALLOWED - 不允许的操作                          │
│    540       │ NOT_IMPLEMENTED - 未实现                            │
│    541       │ INTERNAL_ERROR - 内部错误                           │
└──────────────┴─────────────────────────────────────────────────────┘

5.2 通道意外关闭

问题描述: 通道在使用过程中意外关闭。

解决方案

php
<?php
$channel->set_close_handler(function ($replyCode, $replyText) {
    echo "通道关闭: [{$replyCode}] {$replyText}\n";
});

六、最佳实践建议

6.1 方法使用建议

text
最佳实践:
├── 资源声明:启动时预先声明所需资源
├── 错误处理:捕获并处理所有可能的异常
├── 资源清理:使用完毕后正确关闭和删除资源
├── 幂等操作:声明操作是幂等的,可重复执行
└── 参数验证:确保参数符合规范

6.2 性能优化建议

text
性能建议:
├── 批量操作:减少方法调用次数
├── 异步确认:使用确认模式而非事务
├── 合理 QoS:设置合适的预取数量
├── 连接复用:使用 Channel 多路复用
└── 避免轮询:使用 Consume 而非 Get

七、相关链接