Appearance
RabbitMQ 应用场景
概述
RabbitMQ 作为一款成熟的消息中间件,在各种业务场景中都有广泛的应用。本文将详细介绍 RabbitMQ 的典型应用场景,包括异步处理、应用解耦、流量削峰、日志处理、分布式事务等,并提供完整的 PHP 代码示例。
核心知识点
1. 异步处理
异步处理是消息队列最常见的应用场景。将耗时操作放入队列异步执行,可以显著提升系统响应速度。
1.1 用户注册场景
传统同步方式:
用户注册请求
│
├── 保存用户数据 (500ms)
├── 发送欢迎邮件 (2000ms)
├── 发送短信验证 (1500ms)
├── 初始化积分 (300ms)
└── 返回响应
总耗时: 4300ms异步处理方式:
用户注册请求
│
├── 保存用户数据 (500ms)
├── 发送异步任务到队列
└── 返回响应
总耗时: 500ms
后台 Worker 异步处理:
├── 发送欢迎邮件
├── 发送短信验证
└── 初始化积分1.2 PHP 代码示例
生产者:用户注册服务
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class UserRegistrationService
{
private $mqConnection;
private $mqChannel;
private $db;
public function __construct()
{
$this->mqConnection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$this->mqChannel = $this->mqConnection->channel();
// 声明队列
$this->mqChannel->queue_declare('user_tasks', false, true, false, false);
}
/**
* 用户注册
*/
public function register(array $userData): array
{
$startTime = microtime(true);
try {
// 1. 验证数据
$this->validateUserData($userData);
// 2. 保存用户到数据库(必须同步)
$user = $this->saveUserToDatabase($userData);
// 3. 发送异步任务到消息队列
$this->sendAsyncTasks($user);
$duration = round((microtime(true) - $startTime) * 1000, 2);
return [
'success' => true,
'user_id' => $user['id'],
'message' => '注册成功',
'duration_ms' => $duration
];
} catch (Exception $e) {
return [
'success' => false,
'message' => $e->getMessage()
];
}
}
/**
* 保存用户到数据库
*/
private function saveUserToDatabase(array $userData): array
{
// 模拟数据库保存
$user = [
'id' => rand(1000, 9999),
'username' => $userData['username'],
'email' => $userData['email'],
'phone' => $userData['phone'] ?? null,
'created_at' => date('Y-m-d H:i:s')
];
// 模拟数据库操作耗时
usleep(500000); // 500ms
return $user;
}
/**
* 发送异步任务
*/
private function sendAsyncTasks(array $user): void
{
$tasks = [
[
'type' => 'send_welcome_email',
'data' => ['email' => $user['email'], 'username' => $user['username']]
],
[
'type' => 'send_sms_verification',
'data' => ['phone' => $user['phone']]
],
[
'type' => 'init_user_points',
'data' => ['user_id' => $user['id'], 'points' => 100]
]
];
foreach ($tasks as $task) {
$message = new AMQPMessage(
json_encode($task),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$this->mqChannel->basic_publish($message, '', 'user_tasks');
}
}
private function validateUserData(array $data): void
{
if (empty($data['username']) || empty($data['email'])) {
throw new InvalidArgumentException('用户名和邮箱不能为空');
}
}
}
// 使用示例
$service = new UserRegistrationService();
$result = $service->register([
'username' => 'john_doe',
'email' => 'john@example.com',
'phone' => '13800138000'
]);
print_r($result);消费者:任务处理器
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
class UserTaskWorker
{
private $connection;
private $channel;
public function __construct()
{
$this->connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$this->channel = $this->connection->channel();
$this->channel->queue_declare('user_tasks', false, true, false, false);
$this->channel->basic_qos(null, 1, false);
}
/**
* 启动消费
*/
public function start(): void
{
echo "Worker 启动,等待任务...\n";
$this->channel->basic_consume('user_tasks', '', false, false, false, false, function ($msg) {
$task = json_decode($msg->body, true);
try {
$this->processTask($task);
$msg->ack();
echo "任务完成: {$task['type']}\n";
} catch (Exception $e) {
$msg->nack(true);
echo "任务失败: {$task['type']} - {$e->getMessage()}\n";
}
});
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
/**
* 处理任务
*/
private function processTask(array $task): void
{
switch ($task['type']) {
case 'send_welcome_email':
$this->sendWelcomeEmail($task['data']);
break;
case 'send_sms_verification':
$this->sendSmsVerification($task['data']);
break;
case 'init_user_points':
$this->initUserPoints($task['data']);
break;
default:
throw new RuntimeException("未知任务类型: {$task['type']}");
}
}
private function sendWelcomeEmail(array $data): void
{
echo "发送欢迎邮件到: {$data['email']}\n";
usleep(2000000); // 模拟耗时 2秒
}
private function sendSmsVerification(array $data): void
{
echo "发送短信到: {$data['phone']}\n";
usleep(1500000); // 模拟耗时 1.5秒
}
private function initUserPoints(array $data): void
{
echo "初始化用户积分: 用户ID {$data['user_id']}, 积分 {$data['points']}\n";
usleep(300000); // 模拟耗时 0.3秒
}
}
// 启动 Worker
$worker = new UserTaskWorker();
$worker->start();2. 应用解耦
消息队列可以实现系统间的解耦,各系统独立开发、部署和扩展。
2.1 订单系统解耦
┌─────────────────────────────────────────────────────────┐
│ 传统架构(耦合) │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 订单服务 │────→│ 库存服务 │────→│ 支付服务 │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │ │ │ │
│ └──────────────┴───────────────┘ │
│ 任意服务故障导致整体失败 │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ 解耦架构(MQ) │
│ │
│ ┌─────────┐ │
│ │ 订单服务 │────┐ │
│ └─────────┘ │ │
│ ↓ │
│ ┌─────────────┐ │
│ │ RabbitMQ │ │
│ │ 订单消息 │ │
│ └──────┬──────┘ │
│ │ │
│ ┌───────────┼───────────┬───────────┐ │
│ ↓ ↓ ↓ ↓ │
│ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │库存服务│ │支付服务│ │通知服务│ │物流服务│ │
│ └────────┘ └────────┘ └────────┘ └────────┘ │
│ │
│ 各服务独立运行,互不影响 │
└─────────────────────────────────────────────────────────┘2.2 PHP 代码示例
订单服务(生产者)
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
class OrderService
{
private $mq;
public function __construct()
{
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$this->mq = $connection->channel();
// 声明订单交换机(Topic 类型)
$this->mq->exchange_declare('order_events', AMQPExchangeType::TOPIC, false, true, false);
}
/**
* 创建订单
*/
public function createOrder(array $orderData): array
{
// 1. 创建订单记录
$order = $this->createOrderRecord($orderData);
// 2. 发布订单创建事件
$this->publishOrderEvent('order.created', $order);
return $order;
}
/**
* 支付订单
*/
public function payOrder(string $orderId, array $paymentData): array
{
// 1. 处理支付
$payment = $this->processPayment($orderId, $paymentData);
// 2. 发布支付成功事件
$this->publishOrderEvent('order.paid', [
'order_id' => $orderId,
'payment_id' => $payment['id'],
'amount' => $payment['amount']
]);
return $payment;
}
/**
* 取消订单
*/
public function cancelOrder(string $orderId, string $reason): void
{
// 1. 更新订单状态
$this->updateOrderStatus($orderId, 'cancelled');
// 2. 发布订单取消事件
$this->publishOrderEvent('order.cancelled', [
'order_id' => $orderId,
'reason' => $reason,
'cancelled_at' => date('Y-m-d H:i:s')
]);
}
/**
* 发布订单事件
*/
private function publishOrderEvent(string $routingKey, array $data): void
{
$event = [
'event_id' => uniqid('evt_', true),
'event_type' => $routingKey,
'data' => $data,
'timestamp' => time()
];
$message = new AMQPMessage(
json_encode($event),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]
);
$this->mq->basic_publish($message, 'order_events', $routingKey);
echo "事件已发布: {$routingKey}\n";
}
private function createOrderRecord(array $data): array
{
return [
'id' => 'ORD' . time(),
'user_id' => $data['user_id'],
'items' => $data['items'],
'amount' => $data['amount'],
'status' => 'created',
'created_at' => date('Y-m-d H:i:s')
];
}
private function processPayment(string $orderId, array $data): array
{
return [
'id' => 'PAY' . time(),
'order_id' => $orderId,
'amount' => $data['amount'],
'method' => $data['method'],
'status' => 'success'
];
}
private function updateOrderStatus(string $orderId, string $status): void
{
echo "订单 {$orderId} 状态更新为: {$status}\n";
}
}
// 使用示例
$orderService = new OrderService();
// 创建订单
$order = $orderService->createOrder([
'user_id' => 1001,
'items' => [
['product_id' => 101, 'quantity' => 2, 'price' => 99.00],
['product_id' => 102, 'quantity' => 1, 'price' => 199.00]
],
'amount' => 397.00
]);
echo "订单创建成功: {$order['id']}\n";
// 模拟支付
sleep(1);
$orderService->payOrder($order['id'], [
'amount' => 397.00,
'method' => 'alipay'
]);库存服务(消费者)
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
class InventoryService
{
private $mq;
public function __construct()
{
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$this->mq = $connection->channel();
// 声明交换机
$this->mq->exchange_declare('order_events', 'topic', false, true, false);
// 声明队列
$this->mq->queue_declare('inventory_queue', false, true, false, false);
// 绑定队列到感兴趣的订单事件
$this->mq->queue_bind('inventory_queue', 'order_events', 'order.created');
$this->mq->queue_bind('inventory_queue', 'order_events', 'order.cancelled');
$this->mq->basic_qos(null, 1, false);
}
public function start(): void
{
echo "库存服务启动...\n";
$this->mq->basic_consume('inventory_queue', '', false, false, false, false, function ($msg) {
$event = json_decode($msg->body, true);
try {
$this->handleEvent($event);
$msg->ack();
} catch (Exception $e) {
echo "处理失败: {$e->getMessage()}\n";
$msg->nack(true);
}
});
while ($this->mq->is_consuming()) {
$this->mq->wait();
}
}
private function handleEvent(array $event): void
{
switch ($event['event_type']) {
case 'order.created':
$this->reserveInventory($event['data']);
break;
case 'order.cancelled':
$this->releaseInventory($event['data']);
break;
}
}
private function reserveInventory(array $orderData): void
{
echo "预留库存 - 订单: {$orderData['id']}\n";
foreach ($orderData['items'] as $item) {
echo " 商品 {$item['product_id']}: 预留 {$item['quantity']} 件\n";
}
}
private function releaseInventory(array $data): void
{
echo "释放库存 - 订单: {$data['order_id']}\n";
}
}
$service = new InventoryService();
$service->start();通知服务(消费者)
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
class NotificationService
{
private $mq;
public function __construct()
{
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$this->mq = $connection->channel();
$this->mq->exchange_declare('order_events', 'topic', false, true, false);
$this->mq->queue_declare('notification_queue', false, true, false, false);
// 订阅所有订单事件
$this->mq->queue_bind('notification_queue', 'order_events', 'order.#');
$this->mq->basic_qos(null, 1, false);
}
public function start(): void
{
echo "通知服务启动...\n";
$this->mq->basic_consume('notification_queue', '', false, false, false, false, function ($msg) {
$event = json_decode($msg->body, true);
try {
$this->sendNotification($event);
$msg->ack();
} catch (Exception $e) {
echo "通知发送失败: {$e->getMessage()}\n";
$msg->nack(true);
}
});
while ($this->mq->is_consuming()) {
$this->mq->wait();
}
}
private function sendNotification(array $event): void
{
$eventType = $event['event_type'];
$data = $event['data'];
echo "发送通知: {$eventType}\n";
switch ($eventType) {
case 'order.created':
$this->sendSms($data['user_id'], "您的订单已创建成功");
break;
case 'order.paid':
$this->sendSms($data['user_id'], "您的订单已支付成功");
$this->sendEmail($data['user_id'], "订单支付成功通知");
break;
case 'order.cancelled':
$this->sendSms($data['user_id'], "您的订单已取消");
break;
}
}
private function sendSms(int $userId, string $message): void
{
echo " 发送短信给用户 {$userId}: {$message}\n";
}
private function sendEmail(int $userId, string $subject): void
{
echo " 发送邮件给用户 {$userId}: {$subject}\n";
}
}
$service = new NotificationService();
$service->start();3. 流量削峰
在高并发场景下,消息队列可以缓冲请求,保护后端服务不被压垮。
3.1 秒杀场景
┌─────────────────────────────────────────────────────────┐
│ 秒杀流量削峰 │
│ │
│ 高峰流量: 10000 请求/秒 │
│ ↓ │
│ ┌─────────────────┐ │
│ │ RabbitMQ │ 缓冲队列 │
│ │ [请求排队...] │ │
│ └────────┬────────┘ │
│ │ 控制消费速率: 100 请求/秒 │
│ ↓ │
│ ┌─────────────────┐ │
│ │ 后端服务 │ 正常处理 │
│ └─────────────────┘ │
└─────────────────────────────────────────────────────────┘3.2 PHP 代码示例
秒杀入口(生产者)
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class SeckillService
{
private $mq;
private $queueName = 'seckill_queue';
public function __construct()
{
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$this->mq = $connection->channel();
// 声明队列,设置最大长度
$args = new AMQPTable([
'x-max-length' => 10000, // 最大消息数
'x-overflow' => 'reject-publish', // 超出时拒绝新消息
'x-message-ttl' => 300000 // 消息存活时间 5分钟
]);
$this->mq->queue_declare($this->queueName, false, true, false, false, false, $args);
}
/**
* 参与秒杀
*/
public function participate(int $userId, int $productId, int $quantity): array
{
// 1. 检查用户是否已参与
if ($this->hasParticipated($userId, $productId)) {
return [
'success' => false,
'message' => '您已参与过此活动'
];
}
// 2. 检查库存
$stock = $this->getStock($productId);
if ($stock <= 0) {
return [
'success' => false,
'message' => '商品已售罄'
];
}
// 3. 发送秒杀请求到队列
$request = [
'request_id' => uniqid('sk_', true),
'user_id' => $userId,
'product_id' => $productId,
'quantity' => $quantity,
'timestamp' => time()
];
try {
$message = new AMQPMessage(
json_encode($request),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$this->mq->basic_publish($message, '', $this->queueName);
// 标记用户已参与
$this->markParticipated($userId, $productId);
return [
'success' => true,
'message' => '秒杀请求已提交,请等待处理结果',
'request_id' => $request['request_id']
];
} catch (Exception $e) {
// 队列已满
return [
'success' => false,
'message' => '系统繁忙,请稍后重试'
];
}
}
private function hasParticipated(int $userId, int $productId): bool
{
// 使用 Redis 检查
return false;
}
private function getStock(int $productId): int
{
// 使用 Redis 获取库存
return 100;
}
private function markParticipated(int $userId, int $productId): void
{
// 使用 Redis 标记
}
}
// API 入口
header('Content-Type: application/json');
$service = new SeckillService();
$userId = $_POST['user_id'] ?? 0;
$productId = $_POST['product_id'] ?? 0;
$quantity = $_POST['quantity'] ?? 1;
$result = $service->participate($userId, $productId, $quantity);
echo json_encode($result);秒杀处理器(消费者)
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
class SeckillWorker
{
private $mq;
private $queueName = 'seckill_queue';
public function __construct()
{
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$this->mq = $connection->channel();
$this->mq->queue_declare($this->queueName, false, true, false, false);
// 限制消费速率:每次只处理一条消息
$this->mq->basic_qos(null, 1, false);
}
public function start(): void
{
echo "秒杀处理器启动...\n";
$this->mq->basic_consume($this->queueName, '', false, false, false, false, function ($msg) {
$request = json_decode($msg->body, true);
try {
$result = $this->processSeckill($request);
if ($result['success']) {
echo "秒杀成功: 用户 {$request['user_id']} - 商品 {$request['product_id']}\n";
} else {
echo "秒杀失败: {$result['message']}\n";
}
$msg->ack();
} catch (Exception $e) {
echo "处理异常: {$e->getMessage()}\n";
$msg->nack(false); // 不重新入队
}
});
while ($this->mq->is_consuming()) {
$this->mq->wait();
}
}
private function processSeckill(array $request): array
{
// 1. 再次检查库存(使用锁)
$stock = $this->getStockWithLock($request['product_id']);
if ($stock < $request['quantity']) {
return [
'success' => false,
'message' => '商品已售罄'
];
}
// 2. 扣减库存
$this->decreaseStock($request['product_id'], $request['quantity']);
// 3. 创建订单
$order = $this->createOrder($request);
// 4. 发送成功通知
$this->notifyUser($request['user_id'], [
'type' => 'seckill_success',
'order_id' => $order['id']
]);
return [
'success' => true,
'order_id' => $order['id']
];
}
private function getStockWithLock(int $productId): int
{
// 使用 Redis + Lua 脚本实现原子操作
return 10;
}
private function decreaseStock(int $productId, int $quantity): void
{
// 使用 Redis 原子扣减
}
private function createOrder(array $request): array
{
return [
'id' => 'ORD' . time(),
'user_id' => $request['user_id'],
'product_id' => $request['product_id'],
'quantity' => $request['quantity']
];
}
private function notifyUser(int $userId, array $data): void
{
// 发送通知到消息队列
echo "通知用户 {$userId}: " . json_encode($data) . "\n";
}
}
$worker = new SeckillWorker();
$worker->start();4. 日志收集
RabbitMQ 可以作为日志收集系统的核心组件,收集、聚合和分发日志。
4.1 架构设计
┌─────────────────────────────────────────────────────────┐
│ 日志收集架构 │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Web服务 │ │ API服务 │ │后台任务 │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ └────────────┼────────────┘ │
│ ↓ │
│ ┌─────────────────┐ │
│ │ RabbitMQ │ │
│ │ logs_exchange │ │
│ └────────┬────────┘ │
│ │ │
│ ┌────────────┼────────────┐ │
│ ↓ ↓ ↓ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │日志存储 │ │日志分析 │ │告警服务 │ │
│ │(ES) │ │(分析) │ │(监控) │ │
│ └─────────┘ └─────────┘ └─────────┘ │
└─────────────────────────────────────────────────────────┘4.2 PHP 代码示例
日志生产者
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
class LogProducer
{
private $mq;
private $exchangeName = 'logs_exchange';
public function __construct()
{
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$this->mq = $connection->channel();
// 声明 Topic 交换机
$this->mq->exchange_declare($this->exchangeName, AMQPExchangeType::TOPIC, false, true, false);
}
/**
* 记录日志
*/
public function log(string $level, string $message, array $context = []): void
{
$logEntry = [
'timestamp' => date('Y-m-d H:i:s.u'),
'level' => $level,
'message' => $message,
'context' => $context,
'server' => gethostname(),
'pid' => getmypid()
];
$routingKey = "log.{$level}." . ($context['service'] ?? 'app');
$msg = new AMQPMessage(
json_encode($logEntry),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]
);
$this->mq->basic_publish($msg, $this->exchangeName, $routingKey);
}
public function info(string $message, array $context = []): void
{
$this->log('info', $message, $context);
}
public function error(string $message, array $context = []): void
{
$this->log('error', $message, $context);
}
public function warning(string $message, array $context = []): void
{
$this->log('warning', $message, $context);
}
}
// 使用示例
$logger = new LogProducer();
$logger->info('用户登录', ['user_id' => 1001, 'ip' => '192.168.1.1']);
$logger->error('数据库连接失败', ['service' => 'api', 'error' => 'Connection timeout']);
$logger->warning('内存使用过高', ['service' => 'worker', 'usage' => '85%']);日志消费者
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
class LogConsumer
{
private $mq;
private $exchangeName = 'logs_exchange';
public function __construct()
{
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$this->mq = $connection->channel();
$this->mq->exchange_declare($this->exchangeName, 'topic', false, true, false);
$this->mq->basic_qos(null, 10, false);
}
/**
* 消费错误日志
*/
public function consumeErrors(): void
{
$queueName = 'error_logs_queue';
$this->mq->queue_declare($queueName, false, true, false, false);
$this->mq->queue_bind($queueName, $this->exchangeName, 'log.error.*');
echo "开始消费错误日志...\n";
$this->mq->basic_consume($queueName, '', false, false, false, false, function ($msg) {
$log = json_decode($msg->body, true);
// 写入文件
$this->writeToFile('errors.log', $log);
// 发送告警
$this->sendAlert($log);
$msg->ack();
});
while ($this->mq->is_consuming()) {
$this->mq->wait();
}
}
/**
* 消费所有日志
*/
public function consumeAll(): void
{
$queueName = 'all_logs_queue';
$this->mq->queue_declare($queueName, false, true, false, false);
$this->mq->queue_bind($queueName, $this->exchangeName, 'log.#');
echo "开始消费所有日志...\n";
$this->mq->basic_consume($queueName, '', false, false, false, false, function ($msg) {
$log = json_decode($msg->body, true);
// 存储到 Elasticsearch
$this->storeToElasticsearch($log);
$msg->ack();
});
while ($this->mq->is_consuming()) {
$this->mq->wait();
}
}
private function writeToFile(string $file, array $log): void
{
$line = "[{$log['timestamp']}] {$log['level']}: {$log['message']}";
if (!empty($log['context'])) {
$line .= " " . json_encode($log['context']);
}
$line .= "\n";
file_put_contents($file, $line, FILE_APPEND);
}
private function sendAlert(array $log): void
{
echo "告警: {$log['message']}\n";
}
private function storeToElasticsearch(array $log): void
{
// 存储到 ES
echo "存储日志到 ES: {$log['level']} - {$log['message']}\n";
}
}
// 启动消费者
$consumer = new LogConsumer();
$consumer->consumeErrors();5. 延迟任务
RabbitMQ 可以实现延迟任务处理,如订单超时取消、定时提醒等。
5.1 订单超时取消
php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class OrderTimeoutService
{
private $mq;
public function __construct()
{
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$this->mq = $connection->channel();
$this->setupDelayedQueue();
}
/**
* 设置延迟队列
*/
private function setupDelayedQueue(): void
{
// 1. 声明死信交换机
$this->mq->exchange_declare('order_timeout_exchange', 'direct', false, true, false);
// 2. 声明死信队列(处理超时订单)
$this->mq->queue_declare('order_timeout_queue', false, true, false, false);
$this->mq->queue_bind('order_timeout_queue', 'order_timeout_exchange', 'order.timeout');
// 3. 声明延迟队列(消息在这里等待)
$args = new AMQPTable([
'x-dead-letter-exchange' => 'order_timeout_exchange',
'x-dead-letter-routing-key' => 'order.timeout',
'x-message-ttl' => 1800000 // 30分钟(毫秒)
]);
$this->mq->queue_declare('order_delay_queue', false, true, false, false, false, $args);
}
/**
* 创建订单时设置超时检查
*/
public function createOrderWithTimeout(array $orderData): void
{
// 创建订单...
$orderId = 'ORD' . time();
// 发送延迟消息
$message = new AMQPMessage(
json_encode(['order_id' => $orderId]),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$this->mq->basic_publish($message, '', 'order_delay_queue');
echo "订单创建成功: {$orderId},30分钟后检查支付状态\n";
}
/**
* 处理超时订单
*/
public function processTimeout(): void
{
$this->mq->basic_consume('order_timeout_queue', '', false, false, false, false, function ($msg) {
$data = json_decode($msg->body, true);
// 检查订单状态
$order = $this->getOrder($data['order_id']);
if ($order['status'] === 'pending') {
// 取消订单
$this->cancelOrder($data['order_id'], '支付超时');
echo "订单超时取消: {$data['order_id']}\n";
} else {
echo "订单已支付,跳过: {$data['order_id']}\n";
}
$msg->ack();
});
while ($this->mq->is_consuming()) {
$this->mq->wait();
}
}
private function getOrder(string $orderId): array
{
return ['id' => $orderId, 'status' => 'pending'];
}
private function cancelOrder(string $orderId, string $reason): void
{
echo "取消订单: {$orderId}, 原因: {$reason}\n";
}
}
// 创建订单
$service = new OrderTimeoutService();
$service->createOrderWithTimeout(['user_id' => 1001, 'amount' => 199.00]);
// 或者启动超时处理器
// $service->processTimeout();实际应用场景
1. 电商系统
- 订单处理异步化
- 库存同步
- 支付回调处理
- 物流状态更新
2. 社交平台
- 消息推送
- 动态分发
- 通知系统
3. 金融系统
- 交易处理
- 风控检查
- 账户同步
4. IoT 平台
- 设备数据上报
- 命令下发
- 状态同步
常见问题与解决方案
Q1: 如何保证消息顺序?
解决方案:使用单队列单消费者,或在消息中添加序列号。
Q2: 如何处理消息重复?
解决方案:实现幂等性,使用唯一 ID 去重。
Q3: 如何监控队列状态?
解决方案:使用 RabbitMQ Management API 或 Prometheus 监控。
最佳实践建议
- 合理设计队列:根据业务特点选择队列类型
- 做好监控告警:监控队列长度、消费速率
- 实现优雅关闭:处理 SIGTERM 信号
- 错误重试机制:设置合理的重试策略
- 死信处理:配置死信队列处理失败消息
- 容量规划:预估消息量,合理配置资源
