Skip to content

RabbitMQ 简介

概述

RabbitMQ 是一个开源的消息代理软件,最初由 Rabbit Technologies 开发,后被 VMware 收购。它实现了高级消息队列协议(AMQP),是最流行的开源消息中间件之一。RabbitMQ 以其可靠性、灵活性和易用性著称,被广泛应用于企业级应用架构中。

核心知识点

1. RabbitMQ 的核心概念

1.1 生产者(Producer)

生产者是创建并发送消息的应用程序。它不直接将消息发送到队列,而是将消息发送到交换机。

┌─────────────────┐
│    生产者        │
│                 │
│  创建消息       │
│  发送到交换机   │
└────────┬────────┘

1.2 消费者(Consumer)

消费者是接收并处理消息的应用程序。它从队列中获取消息并进行业务处理。


┌────────┴────────┐
│    消费者        │
│                 │
│  获取消息       │
│  处理业务       │
└─────────────────┘

1.3 队列(Queue)

队列是 RabbitMQ 中存储消息的缓冲区,它遵循 FIFO(先进先出)原则。队列存储消息直到消费者准备好处理它们。

php
// 声明队列
$channel->queue_declare(
    'task_queue',    // 队列名称
    false,           // passive
    true,            // durable - 持久化
    false,           // exclusive - 是否独占
    false            // auto_delete - 自动删除
);

队列参数详解

  • name:队列名称
  • durable:是否持久化(服务器重启后队列仍存在)
  • exclusive:是否独占(只能被当前连接使用)
  • auto_delete:是否自动删除(最后一个消费者断开后自动删除)
  • arguments:额外参数(如消息过期时间、最大长度等)

1.4 交换机(Exchange)

交换机是 RabbitMQ 的核心组件,负责接收生产者发送的消息,并根据路由规则将消息路由到一个或多个队列。

                    ┌──────────┐
                    │  队列 A   │
              ┌────→└──────────┘
┌─────────┐   │
│ 交换机   │───┼────→┌──────────┐
│         │   │     │  队列 B   │
└─────────┘   │     └──────────┘

              └────→┌──────────┐
                    │  队列 C   │
                    └──────────┘

交换机类型

  1. Direct Exchange(直连交换机)
    • 根据消息的路由键(routing key)精确匹配队列绑定键
    • 一对一消息传递
消息 routing_key = "order.create"


┌─────────────────┐
│  Direct 交换机   │
└────────┬────────┘
         │ 精确匹配

绑定键 "order.create" → 队列
  1. Fanout Exchange(扇出交换机)
    • 将消息广播到所有绑定的队列
    • 忽略路由键
消息


┌─────────────────┐
│  Fanout 交换机   │
└────────┬────────┘
         │ 广播到所有队列
    ┌────┼────┐
    ↓    ↓    ↓
 队列A 队列B 队列C
  1. Topic Exchange(主题交换机)
    • 根据路由键模式匹配
    • 支持通配符:*(匹配一个单词)、#(匹配零或多个单词)
消息 routing_key = "order.created.success"


┌─────────────────┐
│  Topic 交换机    │
└────────┬────────┘
         │ 模式匹配
    ┌────┼────┐
    ↓    ↓    ↓
"order.*"  "order.#"  "*.success"
  1. Headers Exchange(头交换机)
    • 根据消息头属性进行匹配
    • 不使用路由键

1.5 绑定(Binding)

绑定是交换机和队列之间的关系,它定义了消息如何从交换机路由到队列。

php
// 将队列绑定到交换机
$channel->queue_bind(
    'queue_name',      // 队列名称
    'exchange_name',   // 交换机名称
    'routing_key'      // 路由键
);

1.6 路由键(Routing Key)

路由键是交换机用来决定如何路由消息的键。它的作用取决于交换机类型。

1.7 虚拟主机(Virtual Host)

虚拟主机是 RabbitMQ 中的逻辑分组,每个虚拟主机都有独立的队列、交换机、绑定和权限。类似于数据库中的数据库概念。

php
// 连接到特定虚拟主机
$connection = new AMQPStreamConnection(
    'localhost',
    5672,
    'guest',
    'guest',
    '/vhost_name'    // 虚拟主机名称
);

2. 消息模型

2.1 简单队列模型

最简单的消息模型,一个生产者、一个队列、一个消费者。

┌─────────┐     ┌─────────┐     ┌─────────┐
│ 生产者   │ ──→ │  队列   │ ──→ │ 消费者   │
└─────────┘     └─────────┘     └─────────┘
php
// 生产者
$channel->queue_declare('hello');
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');

// 消费者
$channel->queue_declare('hello');
$channel->basic_consume('hello', '', false, true, false, false, function($msg) {
    echo $msg->body . PHP_EOL;
});

2.2 工作队列模型

一个生产者、一个队列、多个消费者。消息会在消费者之间轮询分发。

                    ┌─────────┐
              ┌───→ │ 消费者1  │
┌─────────┐   │     └─────────┘
│ 生产者   │ ──→ ┌─────────┐
└─────────┘     │  队列   │ ───→ ┌─────────┐
                └─────────┘     │ 消费者2  │
                    │           └─────────┘
                    └───→ ┌─────────┐
                          │ 消费者3  │
                          └─────────┘
php
// 公平分发:每个消费者一次只处理一条消息
$channel->basic_qos(null, 1, false);

// 消费者处理
$channel->basic_consume('task_queue', '', false, false, false, false, function($msg) {
    // 处理任务
    sleep(2);
    echo "完成: " . $msg->body . PHP_EOL;
    $msg->ack();
});

2.3 发布/订阅模型

生产者将消息发送到交换机,交换机将消息广播到所有绑定的队列。

┌─────────┐     ┌─────────┐     ┌─────────┐
│ 生产者   │ ──→ │ Fanout  │ ───→ │ 队列1   │ ──→ 消费者1
└─────────┘     │ 交换机   │ ───→ │ 队列2   │ ──→ 消费者2
                └─────────┘ ───→ │ 队列3   │ ──→ 消费者3
                              └─────────┘
php
// 生产者
$channel->exchange_declare('logs', 'fanout', false, false, false);
$channel->basic_publish($msg, 'logs');

// 消费者
list($queue_name, ,) = $channel->queue_declare('', false, false, true, false);
$channel->queue_bind($queue_name, 'logs');

2.4 路由模型

使用 Direct 交换机,根据路由键精确匹配。

php
// 声明 direct 交换机
$channel->exchange_declare('direct_logs', 'direct', false, false, false);

// 发送消息时指定路由键
$channel->basic_publish($msg, 'direct_logs', 'error');

// 消费者绑定特定路由键
$channel->queue_bind($queue_name, 'direct_logs', 'error');
$channel->queue_bind($queue_name, 'direct_logs', 'warning');

2.5 主题模型

使用 Topic 交换机,支持模式匹配。

php
// 声明 topic 交换机
$channel->exchange_declare('topic_logs', 'topic', false, false, false);

// 发送消息
$channel->basic_publish($msg, 'topic_logs', 'order.created.success');

// 消费者订阅模式
$channel->queue_bind($queue_name, 'topic_logs', 'order.*');     // 匹配 order.created
$channel->queue_bind($queue_name, 'topic_logs', 'order.#');     // 匹配 order.created.success
$channel->queue_bind($queue_name, 'topic_logs', '*.created.*'); // 匹配 order.created.success

3. RabbitMQ 的特性

3.1 可靠性

  • 消息持久化:消息可以持久化到磁盘
  • 消息确认:支持消息确认机制
  • 高可用:支持集群和镜像队列
php
// 持久化队列
$channel->queue_declare('durable_queue', false, true, false, false);

// 持久化消息
$msg = new AMQPMessage(
    'persistent message',
    ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);

3.2 灵活的路由

通过不同类型的交换机实现灵活的消息路由策略。

3.3 集群支持

RabbitMQ 支持集群部署,可以在多台服务器之间共享队列和消息。

┌─────────────────────────────────────────┐
│            RabbitMQ 集群                 │
│                                         │
│  ┌─────────┐   ┌─────────┐   ┌─────────┐│
│  │  节点 A  │←→│  节点 B  │←→│  节点 C  ││
│  │ (磁盘)  │   │ (内存)  │   │ (内存)  ││
│  └─────────┘   └─────────┘   └─────────┘│
└─────────────────────────────────────────┘

3.4 管理界面

RabbitMQ 提供了强大的 Web 管理界面,可以监控队列、交换机、连接等状态。

bash
# 启用管理插件
rabbitmq-plugins enable rabbitmq_management

# 访问管理界面
# http://localhost:15672
# 默认账号: guest / guest

3.5 多协议支持

  • AMQP 0-9-1(主要协议)
  • AMQP 1.0
  • MQTT
  • STOMP

3.6 多语言客户端

支持多种编程语言客户端:

  • Java
  • .NET
  • PHP
  • Python
  • Ruby
  • JavaScript
  • Go
  • Rust

代码示例

完整的 RabbitMQ 连接封装

php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

class RabbitMQClient
{
    private $connection;
    private $channel;
    private $config;
    
    public function __construct(array $config = [])
    {
        $this->config = array_merge([
            'host' => 'localhost',
            'port' => 5672,
            'user' => 'guest',
            'password' => 'guest',
            'vhost' => '/'
        ], $config);
        
        $this->connect();
    }
    
    private function connect(): void
    {
        try {
            $this->connection = new AMQPStreamConnection(
                $this->config['host'],
                $this->config['port'],
                $this->config['user'],
                $this->config['password'],
                $this->config['vhost']
            );
            
            $this->channel = $this->connection->channel();
            
        } catch (Exception $e) {
            throw new RuntimeException("RabbitMQ 连接失败: " . $e->getMessage());
        }
    }
    
    /**
     * 声明队列
     */
    public function declareQueue(
        string $name,
        bool $durable = true,
        bool $exclusive = false,
        bool $autoDelete = false,
        array $arguments = []
    ): void {
        $this->channel->queue_declare(
            $name,
            false,
            $durable,
            $exclusive,
            $autoDelete,
            false,
            $arguments ? new AMQPTable($arguments) : null
        );
    }
    
    /**
     * 声明交换机
     */
    public function declareExchange(
        string $name,
        string $type = AMQPExchangeType::DIRECT,
        bool $durable = true,
        bool $autoDelete = false
    ): void {
        $this->channel->exchange_declare(
            $name,
            $type,
            false,
            $durable,
            $autoDelete
        );
    }
    
    /**
     * 绑定队列到交换机
     */
    public function bindQueue(
        string $queueName,
        string $exchangeName,
        string $routingKey = ''
    ): void {
        $this->channel->queue_bind($queueName, $exchangeName, $routingKey);
    }
    
    /**
     * 发送消息
     */
    public function publish(
        string $exchange,
        string $routingKey,
        $message,
        array $properties = []
    ): void {
        $properties = array_merge([
            'content_type' => 'application/json',
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
        ], $properties);
        
        if (is_array($message)) {
            $message = json_encode($message);
        }
        
        $amqpMessage = new AMQPMessage($message, $properties);
        
        $this->channel->basic_publish($amqpMessage, $exchange, $routingKey);
    }
    
    /**
     * 消费消息
     */
    public function consume(
        string $queueName,
        callable $callback,
        bool $autoAck = false,
        int $prefetchCount = 1
    ): void {
        $this->channel->basic_qos(null, $prefetchCount, false);
        
        $this->channel->basic_consume(
            $queueName,
            '',
            false,
            $autoAck,
            false,
            false,
            function ($message) use ($callback, $autoAck) {
                $data = json_decode($message->body, true);
                
                try {
                    $result = $callback($data, $message);
                    
                    if (!$autoAck) {
                        if ($result === true) {
                            $message->ack();
                        } else {
                            $message->nack(true);
                        }
                    }
                } catch (Exception $e) {
                    if (!$autoAck) {
                        $message->nack(true);
                    }
                    throw $e;
                }
            }
        );
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
    
    /**
     * 关闭连接
     */
    public function close(): void
    {
        if ($this->channel) {
            $this->channel->close();
        }
        if ($this->connection) {
            $this->connection->close();
        }
    }
    
    public function __destruct()
    {
        $this->close();
    }
}

使用示例

php
<?php
// 初始化客户端
$mq = new RabbitMQClient([
    'host' => 'localhost',
    'port' => 5672,
    'user' => 'guest',
    'password' => 'guest'
]);

// 声明交换机和队列
$mq->declareExchange('order_exchange', 'direct');
$mq->declareQueue('order_queue');
$mq->bindQueue('order_queue', 'order_exchange', 'order.created');

// 生产者:发送订单消息
$orderData = [
    'order_id' => 'ORD' . time(),
    'user_id' => 1001,
    'amount' => 599.00,
    'status' => 'created'
];

$mq->publish('order_exchange', 'order.created', $orderData);
echo "订单消息已发送\n";

// 消费者:处理订单消息
$mq->consume('order_queue', function($data) {
    echo "处理订单: " . $data['order_id'] . "\n";
    echo "订单金额: " . $data['amount'] . "\n";
    
    return true;
});

实际应用场景

1. 订单系统

用户下单 → 订单服务 → RabbitMQ → 库存服务
                              → 支付服务
                              → 通知服务
                              → 物流服务

2. 日志收集

应用程序 → RabbitMQ → 日志处理服务 → Elasticsearch
                                  → 文件存储

3. 异步任务处理

用户请求 → API服务 → RabbitMQ → Worker处理
                              → 图片处理
                              → 报表生成

常见问题与解决方案

Q1: 如何保证消息不丢失?

解决方案

  1. 开启队列持久化
  2. 开启消息持久化
  3. 使用消息确认机制
php
// 1. 持久化队列
$channel->queue_declare('queue', false, true, false, false);

// 2. 持久化消息
$msg = new AMQPMessage($data, [
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);

// 3. 手动确认
$channel->basic_consume('queue', '', false, false, false, false, function($msg) {
    process($msg);
    $msg->ack();
});

Q2: 如何处理消息积压?

解决方案

  1. 增加消费者数量
  2. 优化消费者处理逻辑
  3. 设置队列最大长度
php
// 设置队列最大长度
$channel->queue_declare('queue', false, true, false, false, false,
    new AMQPTable(['x-max-length' => 10000])
);

Q3: 如何实现延迟消息?

解决方案:使用死信队列(DLX)或延迟插件

php
// 使用死信队列实现延迟
$args = new AMQPTable([
    'x-dead-letter-exchange' => 'delay_exchange',
    'x-message-ttl' => 60000  // 60秒后转发
]);
$channel->queue_declare('delay_queue', false, true, false, false, false, $args);

最佳实践建议

  1. 合理命名:队列和交换机使用有意义的命名规范
  2. 监控告警:监控队列长度、消费速率等关键指标
  3. 优雅关闭:消费者要处理 SIGTERM 信号,优雅关闭
  4. 错误重试:实现合理的重试机制,避免无限重试
  5. 死信处理:配置死信队列处理失败消息
  6. 连接池:生产环境使用连接池管理连接

相关链接