Appearance
RabbitMQ 整体架构
概述
RabbitMQ 是一个开源的消息代理软件,实现了高级消息队列协议(AMQP)。它起源于金融系统,用于在分布式系统中存储和转发消息。RabbitMQ 的核心设计理念是解耦:让消息的生产者和消费者不需要直接交互,通过消息队列实现异步通信。
为什么需要消息队列?
在传统的同步通信模式中,服务之间直接调用存在以下问题:
- 耦合度高:服务之间紧密依赖,一个服务故障会影响整个链路
- 响应延迟:同步调用需要等待下游服务响应
- 流量冲击:突发流量直接冲击后端服务,容易造成系统崩溃
- 扩展困难:难以灵活调整处理能力
消息队列通过引入中间层,解决了上述问题,实现了:
- 系统解耦
- 异步处理
- 流量削峰
- 可靠传输
核心架构图
mermaid
graph TB
subgraph 生产者端
P1[Producer 1]
P2[Producer 2]
P3[Producer 3]
end
subgraph RabbitMQ Broker
subgraph Virtual Host
E1[Exchange]
E2[Exchange]
Q1[Queue 1]
Q2[Queue 2]
Q3[Queue 3]
B1[Binding]
B2[Binding]
end
end
subgraph 消费者端
C1[Consumer 1]
C2[Consumer 2]
C3[Consumer 3]
end
P1 -->|发布消息| E1
P2 -->|发布消息| E1
P3 -->|发布消息| E2
E1 -->|路由| B1
E1 -->|路由| B2
E2 -->|路由| Q3
B1 --> Q1
B2 --> Q2
Q1 -->|消费| C1
Q2 -->|消费| C2
Q3 -->|消费| C3核心组件详解
1. Producer(生产者)
生产者是消息的创建者和发送者。它负责:
- 创建消息内容
- 设置消息属性(如持久化、优先级、过期时间等)
- 将消息发送到 Exchange
2. Consumer(消费者)
消费者是消息的接收者和处理者。它负责:
- 订阅 Queue
- 接收消息
- 处理业务逻辑
- 发送确认(ACK)
3. Exchange(交换机)
Exchange 是消息的路由中心,负责接收生产者发送的消息,并根据路由规则将消息分发到一个或多个 Queue。
Exchange 类型:
| 类型 | 说明 |
|---|---|
| Direct | 精确匹配路由键 |
| Fanout | 广播到所有绑定队列 |
| Topic | 通配符匹配路由键 |
| Headers | 基于消息头匹配 |
4. Queue(队列)
Queue 是消息的存储容器,具有以下特性:
- 先进先出(FIFO)
- 可持久化
- 支持消息确认机制
- 可设置 TTL 和最大长度
5. Binding(绑定)
Binding 定义了 Exchange 和 Queue 之间的关系,包含:
- 源 Exchange
- 目标 Queue
- Routing Key(可选)
- 绑定参数
6. Virtual Host(虚拟主机)
Virtual Host 是 RabbitMQ 的逻辑隔离单元:
- 每个 vhost 拥有独立的 Exchange、Queue、Binding
- 不同 vhost 之间完全隔离
- 用户权限基于 vhost 授予
7. Connection 和 Channel
mermaid
graph LR
subgraph 应用程序
A[Application]
end
subgraph 连接层
C1[Channel 1]
C2[Channel 2]
C3[Channel 3]
CONN[TCP Connection]
end
subgraph RabbitMQ
R[Broker]
end
A --> C1
A --> C2
A --> C3
C1 --> CONN
C2 --> CONN
C3 --> CONN
CONN --> R- Connection:TCP 连接,是客户端与 RabbitMQ 之间的物理连接
- Channel:轻量级连接,复用 TCP 连接,每个 Channel 相互独立
消息流转过程
mermaid
sequenceDiagram
participant P as Producer
participant E as Exchange
participant Q as Queue
participant C as Consumer
P->>E: 1. 发布消息(带 Routing Key)
E->>E: 2. 查找匹配的 Binding
E->>Q: 3. 路由消息到队列
Q->>Q: 4. 存储消息
C->>Q: 5. 订阅/拉取消息
Q->>C: 6. 投递消息
C->>C: 7. 处理消息
C->>Q: 8. 发送 ACK
Q->>Q: 9. 删除已确认消息代码示例
安装 php-amqplib
bash
composer require php-amqplib/php-amqplib完整的生产者示例
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
class RabbitMQProducer
{
private $connection;
private $channel;
public function __construct(
string $host = 'localhost',
int $port = 5672,
string $user = 'guest',
string $password = 'guest',
string $vhost = '/'
) {
$this->connection = new AMQPStreamConnection(
$host,
$port,
$user,
$password,
$vhost
);
$this->channel = $this->connection->channel();
}
public function publish(
string $exchange,
string $routingKey,
string $messageBody,
array $properties = []
): void {
$defaultProperties = [
'content_type' => 'text/plain',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
];
$properties = array_merge($defaultProperties, $properties);
$message = new AMQPMessage($messageBody, $properties);
$this->channel->basic_publish(
$message,
$exchange,
$routingKey
);
echo " [x] Sent message to exchange '{$exchange}' with key '{$routingKey}'\n";
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}
try {
$producer = new RabbitMQProducer();
$producer->publish(
'orders_exchange',
'order.created',
json_encode([
'order_id' => 1001,
'user_id' => 5001,
'amount' => 299.99,
'created_at' => date('Y-m-d H:i:s')
])
);
$producer->close();
} catch (Exception $e) {
echo "Error: " . $e->getMessage() . "\n";
}完整的消费者示例
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
class RabbitMQConsumer
{
private $connection;
private $channel;
public function __construct(
string $host = 'localhost',
int $port = 5672,
string $user = 'guest',
string $password = 'guest',
string $vhost = '/'
) {
$this->connection = new AMQPStreamConnection(
$host,
$port,
$user,
$password,
$vhost
);
$this->channel = $this->connection->channel();
}
public function consume(string $queue, callable $callback): void
{
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume(
$queue,
'',
false,
false,
false,
false,
function ($msg) use ($callback) {
try {
$callback($msg->body);
$msg->ack();
echo " [x] Message processed and acknowledged\n";
} catch (Exception $e) {
$msg->nack(true);
echo " [!] Error processing message: " . $e->getMessage() . "\n";
}
}
);
echo " [*] Waiting for messages. To exit press CTRL+C\n";
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
public function close(): void
{
$this->channel->close();
$this->connection->close();
}
}
try {
$consumer = new RabbitMQConsumer();
$consumer->consume('orders_queue', function ($body) {
$data = json_decode($body, true);
echo " [x] Processing order: {$data['order_id']}\n";
echo " [x] User: {$data['user_id']}, Amount: {$data['amount']}\n";
});
$consumer->close();
} catch (Exception $e) {
echo "Error: " . $e->getMessage() . "\n";
}实际应用场景
1. 异步任务处理
php
<?php
class AsyncTaskProducer
{
private $producer;
public function __construct(RabbitMQProducer $producer)
{
$this->producer = $producer;
}
public function sendEmailTask(string $to, string $subject, string $body): void
{
$this->producer->publish(
'tasks_exchange',
'task.email',
json_encode([
'type' => 'email',
'to' => $to,
'subject' => $subject,
'body' => $body,
'created_at' => time()
])
);
}
public function generateReportTask(int $reportId, string $format): void
{
$this->producer->publish(
'tasks_exchange',
'task.report',
json_encode([
'type' => 'report',
'report_id' => $reportId,
'format' => $format,
'created_at' => time()
])
);
}
}2. 订单处理流程
mermaid
graph LR
A[订单创建] --> B[订单队列]
B --> C[库存扣减]
B --> D[支付处理]
B --> E[通知发送]
C --> F[库存队列]
D --> G[支付队列]
E --> H[通知队列]3. 日志收集系统
php
<?php
class LogCollector
{
private $producer;
public function __construct(RabbitMQProducer $producer)
{
$this->producer = $producer;
}
public function log(string $level, string $message, array $context = []): void
{
$routingKey = "log.{$level}";
$this->producer->publish(
'logs_exchange',
$routingKey,
json_encode([
'level' => $level,
'message' => $message,
'context' => $context,
'timestamp' => date('Y-m-d H:i:s'),
'hostname' => gethostname()
])
);
}
}常见问题与解决方案
1. 消息丢失问题
原因:
- 消息未持久化
- 未启用消息确认机制
- Exchange 或 Queue 未持久化
解决方案:
php
<?php
$channel->exchange_declare(
'durable_exchange',
'direct',
false,
true,
false
);
$channel->queue_declare(
'durable_queue',
false,
true,
false,
false
);
$message = new AMQPMessage(
$body,
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$channel->basic_consume(
'durable_queue',
'',
false,
false,
false,
false,
function ($msg) {
try {
processMessage($msg);
$msg->ack();
} catch (Exception $e) {
$msg->nack(true);
}
}
);2. 消息重复消费问题
原因:
- 消费者处理完成后未发送 ACK
- 网络问题导致 ACK 丢失
- 消费者崩溃后消息重新入队
解决方案:
php
<?php
class IdempotentConsumer
{
private $redis;
public function processMessage($message): void
{
$data = json_decode($message->body, true);
$messageId = $data['message_id'] ?? md5($message->body);
if ($this->redis->exists("processed:{$messageId}")) {
echo "Message already processed, skipping\n";
return;
}
$this->doProcess($data);
$this->redis->setex("processed:{$messageId}", 86400, '1');
}
private function doProcess(array $data): void
{
// 实际业务处理
}
}3. 消息堆积问题
原因:
- 消费者处理速度慢
- 消费者数量不足
- 消费者异常
解决方案:
php
<?php
$channel->basic_qos(null, 10, null);
$channel->queue_declare(
'orders_queue',
false,
true,
false,
false,
false,
new \PhpAmqpLib\Wire\AMQPTable([
'x-max-length' => 10000,
'x-overflow' => 'reject-publish'
])
);最佳实践建议
1. 连接管理
php
<?php
class ConnectionManager
{
private static $instance = null;
private $connection = null;
private $channels = [];
public static function getInstance(): self
{
if (self::$instance === null) {
self::$instance = new self();
}
return self::$instance;
}
public function getConnection(): AMQPStreamConnection
{
if ($this->connection === null || !$this->connection->isConnected()) {
$this->connection = new AMQPStreamConnection(
getenv('RABBITMQ_HOST'),
getenv('RABBITMQ_PORT'),
getenv('RABBITMQ_USER'),
getenv('RABBITMQ_PASSWORD'),
getenv('RABBITMQ_VHOST')
);
}
return $this->connection;
}
public function getChannel(int $id = null): AMQPChannel
{
$connection = $this->getConnection();
if ($id !== null && isset($this->channels[$id])) {
return $this->channels[$id];
}
$channel = $connection->channel();
if ($id !== null) {
$this->channels[$id] = $channel;
}
return $channel;
}
}2. 错误处理与重试
php
<?php
class RobustConsumer
{
private $maxRetries = 3;
private $retryDelay = 1000;
public function consumeWithRetry($message): void
{
$data = json_decode($message->body, true);
$retryCount = $data['retry_count'] ?? 0;
try {
$this->process($data);
$message->ack();
} catch (Exception $e) {
if ($retryCount < $this->maxRetries) {
$data['retry_count'] = $retryCount + 1;
$data['last_error'] = $e->getMessage();
$newMessage = new AMQPMessage(
json_encode($data),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$message->getChannel()->basic_publish(
$newMessage,
'',
'retry_queue'
);
$message->ack();
} else {
$message->nack(false);
$this->sendToDeadLetterQueue($message);
}
}
}
private function sendToDeadLetterQueue($message): void
{
// 发送到死信队列
}
private function process(array $data): void
{
// 业务处理
}
}3. 监控与告警
php
<?php
class QueueMonitor
{
private $connection;
private $channel;
public function getQueueStats(string $queueName): array
{
try {
[$queue, $messageCount, $consumerCount] = $this->channel->queue_declare(
$queueName,
true
);
return [
'name' => $queueName,
'message_count' => $messageCount,
'consumer_count' => $consumerCount,
'status' => $this->determineStatus($messageCount, $consumerCount)
];
} catch (Exception $e) {
return [
'name' => $queueName,
'error' => $e->getMessage(),
'status' => 'error'
];
}
}
private function determineStatus(int $messageCount, int $consumerCount): string
{
if ($consumerCount === 0) {
return 'no_consumers';
}
if ($messageCount > 10000) {
return 'backlog_warning';
}
if ($messageCount > 50000) {
return 'backlog_critical';
}
return 'healthy';
}
}