Skip to content

RabbitMQ 整体架构

概述

RabbitMQ 是一个开源的消息代理软件,实现了高级消息队列协议(AMQP)。它起源于金融系统,用于在分布式系统中存储和转发消息。RabbitMQ 的核心设计理念是解耦:让消息的生产者和消费者不需要直接交互,通过消息队列实现异步通信。

为什么需要消息队列?

在传统的同步通信模式中,服务之间直接调用存在以下问题:

  • 耦合度高:服务之间紧密依赖,一个服务故障会影响整个链路
  • 响应延迟:同步调用需要等待下游服务响应
  • 流量冲击:突发流量直接冲击后端服务,容易造成系统崩溃
  • 扩展困难:难以灵活调整处理能力

消息队列通过引入中间层,解决了上述问题,实现了:

  • 系统解耦
  • 异步处理
  • 流量削峰
  • 可靠传输

核心架构图

mermaid
graph TB
    subgraph 生产者端
        P1[Producer 1]
        P2[Producer 2]
        P3[Producer 3]
    end
    
    subgraph RabbitMQ Broker
        subgraph Virtual Host
            E1[Exchange]
            E2[Exchange]
            Q1[Queue 1]
            Q2[Queue 2]
            Q3[Queue 3]
            B1[Binding]
            B2[Binding]
        end
    end
    
    subgraph 消费者端
        C1[Consumer 1]
        C2[Consumer 2]
        C3[Consumer 3]
    end
    
    P1 -->|发布消息| E1
    P2 -->|发布消息| E1
    P3 -->|发布消息| E2
    E1 -->|路由| B1
    E1 -->|路由| B2
    E2 -->|路由| Q3
    B1 --> Q1
    B2 --> Q2
    Q1 -->|消费| C1
    Q2 -->|消费| C2
    Q3 -->|消费| C3

核心组件详解

1. Producer(生产者)

生产者是消息的创建者和发送者。它负责:

  • 创建消息内容
  • 设置消息属性(如持久化、优先级、过期时间等)
  • 将消息发送到 Exchange

2. Consumer(消费者)

消费者是消息的接收者和处理者。它负责:

  • 订阅 Queue
  • 接收消息
  • 处理业务逻辑
  • 发送确认(ACK)

3. Exchange(交换机)

Exchange 是消息的路由中心,负责接收生产者发送的消息,并根据路由规则将消息分发到一个或多个 Queue。

Exchange 类型:

类型说明
Direct精确匹配路由键
Fanout广播到所有绑定队列
Topic通配符匹配路由键
Headers基于消息头匹配

4. Queue(队列)

Queue 是消息的存储容器,具有以下特性:

  • 先进先出(FIFO)
  • 可持久化
  • 支持消息确认机制
  • 可设置 TTL 和最大长度

5. Binding(绑定)

Binding 定义了 Exchange 和 Queue 之间的关系,包含:

  • 源 Exchange
  • 目标 Queue
  • Routing Key(可选)
  • 绑定参数

6. Virtual Host(虚拟主机)

Virtual Host 是 RabbitMQ 的逻辑隔离单元:

  • 每个 vhost 拥有独立的 Exchange、Queue、Binding
  • 不同 vhost 之间完全隔离
  • 用户权限基于 vhost 授予

7. Connection 和 Channel

mermaid
graph LR
    subgraph 应用程序
        A[Application]
    end
    
    subgraph 连接层
        C1[Channel 1]
        C2[Channel 2]
        C3[Channel 3]
        CONN[TCP Connection]
    end
    
    subgraph RabbitMQ
        R[Broker]
    end
    
    A --> C1
    A --> C2
    A --> C3
    C1 --> CONN
    C2 --> CONN
    C3 --> CONN
    CONN --> R
  • Connection:TCP 连接,是客户端与 RabbitMQ 之间的物理连接
  • Channel:轻量级连接,复用 TCP 连接,每个 Channel 相互独立

消息流转过程

mermaid
sequenceDiagram
    participant P as Producer
    participant E as Exchange
    participant Q as Queue
    participant C as Consumer
    
    P->>E: 1. 发布消息(带 Routing Key)
    E->>E: 2. 查找匹配的 Binding
    E->>Q: 3. 路由消息到队列
    Q->>Q: 4. 存储消息
    C->>Q: 5. 订阅/拉取消息
    Q->>C: 6. 投递消息
    C->>C: 7. 处理消息
    C->>Q: 8. 发送 ACK
    Q->>Q: 9. 删除已确认消息

代码示例

安装 php-amqplib

bash
composer require php-amqplib/php-amqplib

完整的生产者示例

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

class RabbitMQProducer
{
    private $connection;
    private $channel;
    
    public function __construct(
        string $host = 'localhost',
        int $port = 5672,
        string $user = 'guest',
        string $password = 'guest',
        string $vhost = '/'
    ) {
        $this->connection = new AMQPStreamConnection(
            $host,
            $port,
            $user,
            $password,
            $vhost
        );
        $this->channel = $this->connection->channel();
    }
    
    public function publish(
        string $exchange,
        string $routingKey,
        string $messageBody,
        array $properties = []
    ): void {
        $defaultProperties = [
            'content_type' => 'text/plain',
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
        ];
        
        $properties = array_merge($defaultProperties, $properties);
        
        $message = new AMQPMessage($messageBody, $properties);
        
        $this->channel->basic_publish(
            $message,
            $exchange,
            $routingKey
        );
        
        echo " [x] Sent message to exchange '{$exchange}' with key '{$routingKey}'\n";
    }
    
    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

try {
    $producer = new RabbitMQProducer();
    
    $producer->publish(
        'orders_exchange',
        'order.created',
        json_encode([
            'order_id' => 1001,
            'user_id' => 5001,
            'amount' => 299.99,
            'created_at' => date('Y-m-d H:i:s')
        ])
    );
    
    $producer->close();
} catch (Exception $e) {
    echo "Error: " . $e->getMessage() . "\n";
}

完整的消费者示例

php
<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

class RabbitMQConsumer
{
    private $connection;
    private $channel;
    
    public function __construct(
        string $host = 'localhost',
        int $port = 5672,
        string $user = 'guest',
        string $password = 'guest',
        string $vhost = '/'
    ) {
        $this->connection = new AMQPStreamConnection(
            $host,
            $port,
            $user,
            $password,
            $vhost
        );
        $this->channel = $this->connection->channel();
    }
    
    public function consume(string $queue, callable $callback): void
    {
        $this->channel->basic_qos(null, 1, null);
        
        $this->channel->basic_consume(
            $queue,
            '',
            false,
            false,
            false,
            false,
            function ($msg) use ($callback) {
                try {
                    $callback($msg->body);
                    $msg->ack();
                    echo " [x] Message processed and acknowledged\n";
                } catch (Exception $e) {
                    $msg->nack(true);
                    echo " [!] Error processing message: " . $e->getMessage() . "\n";
                }
            }
        );
        
        echo " [*] Waiting for messages. To exit press CTRL+C\n";
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
    
    public function close(): void
    {
        $this->channel->close();
        $this->connection->close();
    }
}

try {
    $consumer = new RabbitMQConsumer();
    
    $consumer->consume('orders_queue', function ($body) {
        $data = json_decode($body, true);
        echo " [x] Processing order: {$data['order_id']}\n";
        echo " [x] User: {$data['user_id']}, Amount: {$data['amount']}\n";
    });
    
    $consumer->close();
} catch (Exception $e) {
    echo "Error: " . $e->getMessage() . "\n";
}

实际应用场景

1. 异步任务处理

php
<?php

class AsyncTaskProducer
{
    private $producer;
    
    public function __construct(RabbitMQProducer $producer)
    {
        $this->producer = $producer;
    }
    
    public function sendEmailTask(string $to, string $subject, string $body): void
    {
        $this->producer->publish(
            'tasks_exchange',
            'task.email',
            json_encode([
                'type' => 'email',
                'to' => $to,
                'subject' => $subject,
                'body' => $body,
                'created_at' => time()
            ])
        );
    }
    
    public function generateReportTask(int $reportId, string $format): void
    {
        $this->producer->publish(
            'tasks_exchange',
            'task.report',
            json_encode([
                'type' => 'report',
                'report_id' => $reportId,
                'format' => $format,
                'created_at' => time()
            ])
        );
    }
}

2. 订单处理流程

mermaid
graph LR
    A[订单创建] --> B[订单队列]
    B --> C[库存扣减]
    B --> D[支付处理]
    B --> E[通知发送]
    C --> F[库存队列]
    D --> G[支付队列]
    E --> H[通知队列]

3. 日志收集系统

php
<?php

class LogCollector
{
    private $producer;
    
    public function __construct(RabbitMQProducer $producer)
    {
        $this->producer = $producer;
    }
    
    public function log(string $level, string $message, array $context = []): void
    {
        $routingKey = "log.{$level}";
        
        $this->producer->publish(
            'logs_exchange',
            $routingKey,
            json_encode([
                'level' => $level,
                'message' => $message,
                'context' => $context,
                'timestamp' => date('Y-m-d H:i:s'),
                'hostname' => gethostname()
            ])
        );
    }
}

常见问题与解决方案

1. 消息丢失问题

原因

  • 消息未持久化
  • 未启用消息确认机制
  • Exchange 或 Queue 未持久化

解决方案

php
<?php

$channel->exchange_declare(
    'durable_exchange',
    'direct',
    false,
    true,
    false
);

$channel->queue_declare(
    'durable_queue',
    false,
    true,
    false,
    false
);

$message = new AMQPMessage(
    $body,
    ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);

$channel->basic_consume(
    'durable_queue',
    '',
    false,
    false,
    false,
    false,
    function ($msg) {
        try {
            processMessage($msg);
            $msg->ack();
        } catch (Exception $e) {
            $msg->nack(true);
        }
    }
);

2. 消息重复消费问题

原因

  • 消费者处理完成后未发送 ACK
  • 网络问题导致 ACK 丢失
  • 消费者崩溃后消息重新入队

解决方案

php
<?php

class IdempotentConsumer
{
    private $redis;
    
    public function processMessage($message): void
    {
        $data = json_decode($message->body, true);
        $messageId = $data['message_id'] ?? md5($message->body);
        
        if ($this->redis->exists("processed:{$messageId}")) {
            echo "Message already processed, skipping\n";
            return;
        }
        
        $this->doProcess($data);
        
        $this->redis->setex("processed:{$messageId}", 86400, '1');
    }
    
    private function doProcess(array $data): void
    {
        // 实际业务处理
    }
}

3. 消息堆积问题

原因

  • 消费者处理速度慢
  • 消费者数量不足
  • 消费者异常

解决方案

php
<?php

$channel->basic_qos(null, 10, null);

$channel->queue_declare(
    'orders_queue',
    false,
    true,
    false,
    false,
    false,
    new \PhpAmqpLib\Wire\AMQPTable([
        'x-max-length' => 10000,
        'x-overflow' => 'reject-publish'
    ])
);

最佳实践建议

1. 连接管理

php
<?php

class ConnectionManager
{
    private static $instance = null;
    private $connection = null;
    private $channels = [];
    
    public static function getInstance(): self
    {
        if (self::$instance === null) {
            self::$instance = new self();
        }
        return self::$instance;
    }
    
    public function getConnection(): AMQPStreamConnection
    {
        if ($this->connection === null || !$this->connection->isConnected()) {
            $this->connection = new AMQPStreamConnection(
                getenv('RABBITMQ_HOST'),
                getenv('RABBITMQ_PORT'),
                getenv('RABBITMQ_USER'),
                getenv('RABBITMQ_PASSWORD'),
                getenv('RABBITMQ_VHOST')
            );
        }
        return $this->connection;
    }
    
    public function getChannel(int $id = null): AMQPChannel
    {
        $connection = $this->getConnection();
        
        if ($id !== null && isset($this->channels[$id])) {
            return $this->channels[$id];
        }
        
        $channel = $connection->channel();
        
        if ($id !== null) {
            $this->channels[$id] = $channel;
        }
        
        return $channel;
    }
}

2. 错误处理与重试

php
<?php

class RobustConsumer
{
    private $maxRetries = 3;
    private $retryDelay = 1000;
    
    public function consumeWithRetry($message): void
    {
        $data = json_decode($message->body, true);
        $retryCount = $data['retry_count'] ?? 0;
        
        try {
            $this->process($data);
            $message->ack();
        } catch (Exception $e) {
            if ($retryCount < $this->maxRetries) {
                $data['retry_count'] = $retryCount + 1;
                $data['last_error'] = $e->getMessage();
                
                $newMessage = new AMQPMessage(
                    json_encode($data),
                    ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
                );
                
                $message->getChannel()->basic_publish(
                    $newMessage,
                    '',
                    'retry_queue'
                );
                
                $message->ack();
            } else {
                $message->nack(false);
                
                $this->sendToDeadLetterQueue($message);
            }
        }
    }
    
    private function sendToDeadLetterQueue($message): void
    {
        // 发送到死信队列
    }
    
    private function process(array $data): void
    {
        // 业务处理
    }
}

3. 监控与告警

php
<?php

class QueueMonitor
{
    private $connection;
    private $channel;
    
    public function getQueueStats(string $queueName): array
    {
        try {
            [$queue, $messageCount, $consumerCount] = $this->channel->queue_declare(
                $queueName,
                true
            );
            
            return [
                'name' => $queueName,
                'message_count' => $messageCount,
                'consumer_count' => $consumerCount,
                'status' => $this->determineStatus($messageCount, $consumerCount)
            ];
        } catch (Exception $e) {
            return [
                'name' => $queueName,
                'error' => $e->getMessage(),
                'status' => 'error'
            ];
        }
    }
    
    private function determineStatus(int $messageCount, int $consumerCount): string
    {
        if ($consumerCount === 0) {
            return 'no_consumers';
        }
        
        if ($messageCount > 10000) {
            return 'backlog_warning';
        }
        
        if ($messageCount > 50000) {
            return 'backlog_critical';
        }
        
        return 'healthy';
    }
}

相关链接