Skip to content

AMQP 模型

一、概述

AMQP 模型定义了消息中间件的核心概念和交互方式,包括消息的生产、路由、存储和消费的完整流程。理解 AMQP 模型是掌握 RabbitMQ 的基础。

1.1 模型架构图

┌─────────────────────────────────────────────────────────────────────────┐
│                           AMQP 模型架构                                  │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│   ┌─────────┐                                           ┌─────────┐    │
│   │ 生产者   │                                           │ 消费者   │    │
│   │Producer │                                           │Consumer │    │
│   └────┬────┘                                           └────▲────┘    │
│        │                                                     │         │
│        │ 发布消息                                    消费消息 │         │
│        ▼                                                     │         │
│   ┌─────────────────────────────────────────────────────────┐         │
│   │                    AMQP Broker                          │         │
│   │  ┌─────────────────────────────────────────────────┐   │         │
│   │  │              Virtual Host (vhost)                │   │         │
│   │  │                                                  │   │         │
│   │  │   ┌─────────────┐                               │   │         │
│   │  │   │   Exchange  │◄─── 绑定(Binding) ───┐       │   │         │
│   │  │   │   交换器     │                      │       │   │         │
│   │  │   └──────┬──────┘                      │       │   │         │
│   │  │          │ 路由                        │       │   │         │
│   │  │          ▼                             │       │   │         │
│   │  │   ┌─────────────┐    ┌─────────────┐  │       │   │         │
│   │  │   │   Queue 1   │    │   Queue 2   │──┘       │   │         │
│   │  │   │   队列 1    │    │   队列 2    │          │   │         │
│   │  │   └─────────────┘    └─────────────┘          │   │         │
│   │  │                                                  │   │         │
│   │  └─────────────────────────────────────────────────┘   │         │
│   └─────────────────────────────────────────────────────────┘         │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

1.2 核心概念

概念说明类比
Broker消息代理服务器邮局
Virtual Host虚拟主机,资源隔离单元邮局的不同部门
Exchange交换器,负责消息路由邮局分拣中心
Queue队列,存储消息邮箱
Binding绑定,交换器与队列的关系投递规则
Routing Key路由键,消息路由依据收件地址
Message消息,传递的数据信件

二、核心知识点

2.1 消息模型

2.1.1 消息结构

text
┌─────────────────────────────────────────────────────────────┐
│                        Message 结构                         │
├─────────────────────────────────────────────────────────────┤
│  ┌─────────────────────────────────────────────────────┐   │
│  │                   Properties (属性)                  │   │
│  │  ┌──────────────┬──────────────────────────────────┐│   │
│  │  │ content_type │ 消息内容类型 (如 application/json)││   │
│  │  ├──────────────┼──────────────────────────────────┤│   │
│  │  │ content_enco │ 消息编码 (如 utf-8)              ││   │
│  │  ├──────────────┼──────────────────────────────────┤│   │
│  │  │ headers      │ 自定义消息头                     ││   │
│  │  ├──────────────┼──────────────────────────────────┤│   │
│  │  │ delivery_mode│ 投递模式 (1=非持久, 2=持久)      ││   │
│  │  ├──────────────┼──────────────────────────────────┤│   │
│  │  │ priority     │ 消息优先级 (0-9)                 ││   │
│  │  ├──────────────┼──────────────────────────────────┤│   │
│  │  │ correlation_ │ 关联ID (用于RPC)                 ││   │
│  │  ├──────────────┼──────────────────────────────────┤│   │
│  │  │ reply_to     │ 回复队列 (用于RPC)               ││   │
│  │  ├──────────────┼──────────────────────────────────┤│   │
│  │  │ expiration   │ 消息过期时间                     ││   │
│  │  ├──────────────┼──────────────────────────────────┤│   │
│  │  │ message_id   │ 消息ID                           ││   │
│  │  ├──────────────┼──────────────────────────────────┤│   │
│  │  │ timestamp    │ 消息时间戳                       ││   │
│  │  ├──────────────┼──────────────────────────────────┤│   │
│  │  │ type         │ 消息类型                         ││   │
│  │  ├──────────────┼──────────────────────────────────┤│   │
│  │  │ user_id      │ 用户ID                           ││   │
│  │  ├──────────────┼──────────────────────────────────┤│   │
│  │  │ app_id       │ 应用ID                           ││   │
│  │  └──────────────┴──────────────────────────────────┘│   │
│  └─────────────────────────────────────────────────────┘   │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                    Body (消息体)                     │   │
│  │                                                      │   │
│  │              实际传输的数据内容                       │   │
│  │              (二进制格式,无大小限制)                  │   │
│  └─────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────┘

2.1.2 消息生命周期

text
消息生命周期:
                                                                
  ┌─────────┐      ┌──────────┐      ┌─────────┐      ┌─────────┐
  │  创建    │ ───► │  发布    │ ───► │  路由   │ ───► │  存储   │
  │ Producer │      │ Publish  │      │  Route  │      │  Queue  │
  └─────────┘      └──────────┘      └─────────┘      └────┬────┘

       ┌────────────────────────────────────────────────────────┘


  ┌─────────┐      ┌──────────┐      ┌─────────┐      ┌─────────┐
  │  投递    │ ───► │  消费    │ ───► │  处理   │ ───► │  确认   │
  │ Deliver │      │ Consume  │      │ Process │      │   Ack   │
  └─────────┘      └──────────┘      └─────────┘      └─────────┘

2.2 交换器模型

2.2.1 交换器类型

text
┌─────────────────────────────────────────────────────────────────────┐
│                        交换器类型对比                                │
├──────────────┬──────────────────────────────────────────────────────┤
│     类型     │                     路由规则                         │
├──────────────┼──────────────────────────────────────────────────────┤
│    direct    │ 精确匹配:routing key 完全相等                       │
│              │ 示例:routing key = "order.create"                   │
│              │       只路由到绑定了 "order.create" 的队列           │
├──────────────┼──────────────────────────────────────────────────────┤
│    fanout    │ 广播:忽略 routing key,发送到所有绑定队列           │
│              │ 示例:发布消息后,所有绑定的队列都会收到              │
├──────────────┼──────────────────────────────────────────────────────┤
│    topic     │ 通配符匹配:支持 * 和 # 通配符                       │
│              │ * 匹配一个单词,# 匹配零个或多个单词                  │
│              │ 示例:order.* 匹配 order.create, order.cancel        │
│              │       order.# 匹配 order.create.success 等           │
├──────────────┼──────────────────────────────────────────────────────┤
│   headers    │ 头部匹配:根据消息 headers 属性匹配                  │
│              │ x-match: all(全部匹配) 或 any(任一匹配)              │
└──────────────┴──────────────────────────────────────────────────────┘

2.2.2 交换器属性

text
交换器属性:
├── name: 交换器名称
├── type: 交换器类型 (direct/fanout/topic/headers)
├── durable: 是否持久化
├── auto-delete: 最后一个绑定删除后是否自动删除
├── internal: 是否为内部交换器(用于交换器到交换器路由)
└── arguments: 额外参数
    ├── alternate-exchange: 备用交换器
    └── 其他插件特定参数

2.3 队列模型

2.3.1 队列类型

text
┌─────────────────────────────────────────────────────────────────────┐
│                         队列类型                                     │
├──────────────┬──────────────────────────────────────────────────────┤
│     类型     │                     特性说明                         │
├──────────────┼──────────────────────────────────────────────────────┤
│   classic    │ 经典队列,默认类型                                    │
│              │ - 支持持久化                                          │
│              │ - 支持镜像(HA)                                      │
│              │ - 内存或磁盘存储                                      │
├──────────────┼──────────────────────────────────────────────────────┤
│   quorum     │ 仲裁队列,RabbitMQ 3.8+                               │
│              │ - 基于 Raft 共识算法                                  │
│              │ - 高可用、数据安全                                    │
│              │ - 替代镜像队列                                        │
├──────────────┼──────────────────────────────────────────────────────┤
│   stream     │ 流队列,RabbitMQ 3.9+                                 │
│              │ - 类似 Kafka 的日志队列                               │
│              │ - 支持消息回溯消费                                    │
│              │ - 高吞吐量                                            │
└──────────────┴──────────────────────────────────────────────────────┘

2.3.2 队列属性

text
队列属性:
├── name: 队列名称(空则自动生成)
├── durable: 是否持久化
├── exclusive: 是否排他(仅对创建连接可见,连接关闭自动删除)
├── auto-delete: 最后一个消费者取消后是否自动删除
└── arguments: 额外参数
    ├── x-message-ttl: 消息存活时间(毫秒)
    ├── x-expires: 队列空闲多久后删除(毫秒)
    ├── x-max-length: 队列最大消息数
    ├── x-max-length-bytes: 队列最大字节数
    ├── x-dead-letter-exchange: 死信交换器
    ├── x-dead-letter-routing-key: 死信路由键
    ├── x-max-priority: 最大优先级
    ├── x-queue-type: 队列类型
    └── x-queue-mode: 队列模式(default/lazy)

2.4 绑定模型

2.4.1 绑定关系

text
绑定(Binding)定义了 Exchange 和 Queue 之间的关系:

┌─────────────┐                    ┌─────────────┐
│   Exchange  │                    │    Queue    │
│             │                    │             │
│   (direct)  │─── Binding ───────►│   orders    │
│             │   routing-key:     │             │
│             │   "order.create"   │             │
└─────────────┘                    └─────────────┘

绑定属性:
├── source: 源交换器
├── destination: 目标队列(或交换器)
├── routing-key: 路由键(可选)
└── arguments: 额外参数(用于 headers 交换器)

2.4.2 多重绑定

text
一个队列可以绑定多个交换器,一个交换器可以绑定多个队列:

                    ┌─────────────┐
                    │   orders    │
                    │  (queue)    │
                    └──────▲──────┘

              ┌────────────┼────────────┐
              │            │            │
     ┌────────┴─────┐     │    ┌───────┴──────┐
     │   exchange1  │     │    │   exchange2  │
     │   (direct)   │     │    │   (topic)    │
     └──────────────┘     │    └──────────────┘

                   ┌──────┴──────┐
                   │   exchange3  │
                   │   (fanout)  │
                   └─────────────┘

2.5 虚拟主机模型

text
虚拟主机(Virtual Host)是 AMQP 的资源隔离单元:

┌─────────────────────────────────────────────────────────────────────┐
│                         RabbitMQ Broker                             │
├─────────────────────────────────────────────────────────────────────┤
│  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐     │
│  │   vhost: /      │  │  vhost: /dev    │  │  vhost: /prod   │     │
│  │                 │  │                 │  │                 │     │
│  │  ┌───────────┐  │  │  ┌───────────┐  │  │  ┌───────────┐  │     │
│  │  │ exchanges │  │  │  │ exchanges │  │  │  │ exchanges │  │     │
│  │  └───────────┘  │  │  └───────────┘  │  │  └───────────┘  │     │
│  │  ┌───────────┐  │  │  ┌───────────┐  │  │  ┌───────────┐  │     │
│  │  │  queues   │  │  │  │  queues   │  │  │  │  queues   │  │     │
│  │  └───────────┘  │  │  └───────────┘  │  │  └───────────┘  │     │
│  │  ┌───────────┐  │  │  ┌───────────┐  │  │  ┌───────────┐  │     │
│  │  │ bindings  │  │  │  │ bindings  │  │  │  │ bindings  │  │     │
│  │  └───────────┘  │  │  └───────────┘  │  │  └───────────┘  │     │
│  └─────────────────┘  └─────────────────┘  └─────────────────┘     │
│                                                                     │
│  用户权限:                                                         │
│  ├── user1: / (configure, write, read)                             │
│  ├── user2: /dev (read)                                            │
│  └── user3: /prod (write, read)                                    │
└─────────────────────────────────────────────────────────────────────┘

三、代码示例

3.1 PHP 完整示例

php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

class AMQPModelDemo
{
    private $connection;
    private $channel;

    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest',
            '/',
            false,
            'AMQPLAIN',
            null,
            'en_US',
            3.0,
            3.0
        );
        $this->channel = $this->connection->channel();
    }

    public function declareExchange(string $name, string $type, bool $durable = true): void
    {
        $this->channel->exchange_declare(
            $name,
            $type,
            false,
            $durable,
            false
        );
        echo "交换器 '{$name}' (类型: {$type}) 声明成功\n";
    }

    public function declareQueue(
        string $name,
        bool $durable = true,
        bool $exclusive = false,
        bool $autoDelete = false,
        array $arguments = []
    ): void {
        $this->channel->queue_declare(
            $name,
            false,
            $durable,
            $exclusive,
            $autoDelete,
            false,
            $arguments
        );
        echo "队列 '{$name}' 声明成功\n";
    }

    public function bindQueue(string $queue, string $exchange, string $routingKey = ''): void
    {
        $this->channel->queue_bind($queue, $exchange, $routingKey);
        echo "绑定成功: {$exchange} -> {$queue} (routing key: {$routingKey})\n";
    }

    public function publishMessage(
        string $exchange,
        string $routingKey,
        string $body,
        array $properties = []
    ): void {
        $defaultProperties = [
            'content_type' => 'text/plain',
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
        ];
        $properties = array_merge($defaultProperties, $properties);
        
        $message = new AMQPMessage($body, $properties);
        $this->channel->basic_publish($message, $exchange, $routingKey);
        
        echo "消息已发布到交换器 '{$exchange}',路由键: '{$routingKey}'\n";
    }

    public function consumeMessages(string $queue, callable $callback): void
    {
        $this->channel->basic_consume(
            $queue,
            '',
            false,
            false,
            false,
            false,
            $callback
        );
        
        echo "开始消费队列 '{$queue}'...\n";
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }

    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
        echo "连接已关闭\n";
    }
}

$demo = new AMQPModelDemo();

$demo->declareExchange('orders.direct', AMQPExchangeType::DIRECT);
$demo->declareExchange('logs.fanout', AMQPExchangeType::FANOUT);
$demo->declareExchange('events.topic', AMQPExchangeType::TOPIC);

$demo->declareQueue('orders.create');
$demo->declareQueue('orders.cancel');
$demo->declareQueue('logs.all');
$demo->declareQueue('events.orders');

$demo->bindQueue('orders.create', 'orders.direct', 'order.create');
$demo->bindQueue('orders.cancel', 'orders.direct', 'order.cancel');
$demo->bindQueue('logs.all', 'logs.fanout', '');
$demo->bindQueue('events.orders', 'events.topic', 'order.#');

$demo->publishMessage('orders.direct', 'order.create', json_encode([
    'order_id' => 'ORD-001',
    'user_id' => 'USER-001',
    'amount' => 99.99,
    'created_at' => date('Y-m-d H:i:s')
]));

$demo->publishMessage('logs.fanout', '', '[INFO] 系统日志消息');

$demo->publishMessage('events.topic', 'order.create.success', json_encode([
    'event' => 'order.created',
    'data' => ['order_id' => 'ORD-001']
]));

$demo->close();

3.2 创建带属性的队列

php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$arguments = new AMQPTable([
    'x-message-ttl' => 3600000,
    'x-expires' => 7200000,
    'x-max-length' => 10000,
    'x-max-length-bytes' => 10485760,
    'x-dead-letter-exchange' => 'dlx.exchange',
    'x-dead-letter-routing-key' => 'dead.letter',
    'x-max-priority' => 10,
]);

$channel->queue_declare(
    'advanced.queue',
    false,
    true,
    false,
    false,
    false,
    $arguments
);

echo "高级队列创建成功\n";

$channel->close();
$connection->close();

3.3 Headers 交换器示例

php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('headers.exchange', 'headers', false, true, false);

$channel->queue_declare('headers.queue', false, true, false, false);

$bindArguments = new AMQPTable([
    'x-match' => 'all',
    'format' => 'pdf',
    'type' => 'report'
]);
$channel->queue_bind('headers.queue', 'headers.exchange', '', false, $bindArguments);

$message = new AMQPMessage('PDF Report Content', [
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
    'application_headers' => new AMQPTable([
        'format' => 'pdf',
        'type' => 'report'
    ])
]);

$channel->basic_publish($message, 'headers.exchange');

echo "Headers 交换器消息发送成功\n";

$channel->close();
$connection->close();

四、实际应用场景

4.1 订单系统消息模型

text
订单系统 AMQP 模型设计:

┌─────────────────────────────────────────────────────────────────────┐
│                        订单消息模型                                  │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌─────────────┐                                                    │
│  │ 订单服务     │                                                    │
│  │ (Producer)  │                                                    │
│  └──────┬──────┘                                                    │
│         │                                                           │
│         ▼                                                           │
│  ┌──────────────────────────────────────────────────────────────┐  │
│  │              order.exchange (topic)                          │  │
│  └──────────────────────────┬───────────────────────────────────┘  │
│                             │                                       │
│         ┌───────────────────┼───────────────────┐                  │
│         │                   │                   │                  │
│         ▼                   ▼                   ▼                  │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐            │
│  │order.create │    │order.cancel │    │ order.paid  │            │
│  │   (queue)   │    │   (queue)   │    │   (queue)   │            │
│  └──────┬──────┘    └──────┬──────┘    └──────┬──────┘            │
│         │                  │                  │                    │
│         ▼                  ▼                  ▼                    │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐            │
│  │ 库存服务    │    │ 库存服务    │    │ 支付服务    │            │
│  │ (Consumer)  │    │ (Consumer)  │    │ (Consumer)  │            │
│  └─────────────┘    └─────────────┘    └─────────────┘            │
│                                                                     │
│  路由键设计:                                                       │
│  ├── order.create  -> 库存服务扣减库存                              │
│  ├── order.cancel  -> 库存服务恢复库存                              │
│  └── order.paid    -> 支付服务处理支付                              │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

4.2 日志收集系统

text
日志收集 AMQP 模型:

┌─────────────┐  ┌─────────────┐  ┌─────────────┐
│   应用 A    │  │   应用 B    │  │   应用 C    │
└──────┬──────┘  └──────┬──────┘  └──────┬──────┘
       │                │                │
       └────────────────┼────────────────┘


              ┌─────────────────┐
              │  logs.exchange  │
              │    (topic)      │
              └────────┬────────┘

        ┌──────────────┼──────────────┐
        │              │              │
        ▼              ▼              ▼
  ┌───────────┐  ┌───────────┐  ┌───────────┐
  │logs.error │  │ logs.info │  │ logs.all  │
  │  (queue)  │  │  (queue)  │  │  (queue)  │
  └─────┬─────┘  └─────┬─────┘  └─────┬─────┘
        │              │              │
        ▼              ▼              ▼
  ┌───────────┐  ┌───────────┐  ┌───────────┐
  │告警服务   │  │ 日志分析  │  │ 日志存储  │
  └───────────┘  └───────────┘  └───────────┘

路由键格式:app.level (如: appA.error, appB.info)
绑定规则:
- logs.error: 绑定 *.error
- logs.info:  绑定 *.info
- logs.all:   绑定 #

五、常见问题与解决方案

5.1 消息路由失败

问题描述: 消息发布后没有到达预期的队列。

排查步骤

text
1. 检查交换器是否存在
2. 检查绑定关系是否正确
3. 检查路由键是否匹配
4. 检查是否有备用交换器
5. 查看服务端日志

解决方案

php
<?php
$channel->set_return_listener(function (
    $replyCode,
    $replyText,
    $exchange,
    $routingKey,
    $message
) {
    echo "消息路由失败: {$replyText}\n";
    echo "交换器: {$exchange}, 路由键: {$routingKey}\n";
});

$channel->basic_publish($message, $exchange, $routingKey, true);

5.2 队列消息堆积

问题描述: 队列中消息越来越多,消费速度跟不上生产速度。

解决方案

php
<?php
$arguments = new AMQPTable([
    'x-max-length' => 100000,
    'x-max-length-bytes' => 1073741824,
    'x-overflow' => 'reject-publish-dlx',
]);

$channel->queue_declare('limited.queue', false, true, false, false, false, $arguments);

5.3 消息丢失

问题描述: 消息在传输过程中丢失。

解决方案

php
<?php
$message = new AMQPMessage($body, [
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
    'content_type' => 'application/json',
]);

$channel->basic_publish($message, $exchange, $routingKey);

$channel->wait_for_pending_acks();

六、最佳实践建议

6.1 命名规范

text
命名建议:
├── 交换器:{业务}.{类型} (如: order.exchange, log.exchange)
├── 队列:{业务}.{功能} (如: order.create, log.error)
├── 路由键:{业务}.{事件} (如: order.created, user.registered)
└── 统一使用小写和点分隔

6.2 资源管理

text
资源管理建议:
├── 预先声明:启动时声明所需资源
├── 幂等操作:重复声明不会出错
├── 合理持久化:按需选择持久化策略
├── 设置过期:避免资源无限增长
└── 监控告警:监控队列深度和消费速率

6.3 性能优化

text
性能优化建议:
├── 合理选择交换器类型:direct 性能最好
├── 减少绑定数量:降低路由计算开销
├── 批量操作:减少网络往返
├── 预取数量:合理设置 QoS
└── 消息大小:控制单条消息大小

七、相关链接