Skip to content

RabbitMQ 应用场景

概述

RabbitMQ 作为一款成熟的消息中间件,在各种业务场景中都有广泛的应用。本文将详细介绍 RabbitMQ 的典型应用场景,包括异步处理、应用解耦、流量削峰、日志处理、分布式事务等,并提供完整的 PHP 代码示例。

核心知识点

1. 异步处理

异步处理是消息队列最常见的应用场景。将耗时操作放入队列异步执行,可以显著提升系统响应速度。

1.1 用户注册场景

传统同步方式

用户注册请求

    ├── 保存用户数据 (500ms)
    ├── 发送欢迎邮件 (2000ms)
    ├── 发送短信验证 (1500ms)
    ├── 初始化积分 (300ms)
    └── 返回响应
    总耗时: 4300ms

异步处理方式

用户注册请求

    ├── 保存用户数据 (500ms)
    ├── 发送异步任务到队列
    └── 返回响应
    总耗时: 500ms

后台 Worker 异步处理:
    ├── 发送欢迎邮件
    ├── 发送短信验证
    └── 初始化积分

1.2 PHP 代码示例

生产者:用户注册服务

php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class UserRegistrationService
{
    private $mqConnection;
    private $mqChannel;
    private $db;
    
    public function __construct()
    {
        $this->mqConnection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $this->mqChannel = $this->mqConnection->channel();
        
        // 声明队列
        $this->mqChannel->queue_declare('user_tasks', false, true, false, false);
    }
    
    /**
     * 用户注册
     */
    public function register(array $userData): array
    {
        $startTime = microtime(true);
        
        try {
            // 1. 验证数据
            $this->validateUserData($userData);
            
            // 2. 保存用户到数据库(必须同步)
            $user = $this->saveUserToDatabase($userData);
            
            // 3. 发送异步任务到消息队列
            $this->sendAsyncTasks($user);
            
            $duration = round((microtime(true) - $startTime) * 1000, 2);
            
            return [
                'success' => true,
                'user_id' => $user['id'],
                'message' => '注册成功',
                'duration_ms' => $duration
            ];
            
        } catch (Exception $e) {
            return [
                'success' => false,
                'message' => $e->getMessage()
            ];
        }
    }
    
    /**
     * 保存用户到数据库
     */
    private function saveUserToDatabase(array $userData): array
    {
        // 模拟数据库保存
        $user = [
            'id' => rand(1000, 9999),
            'username' => $userData['username'],
            'email' => $userData['email'],
            'phone' => $userData['phone'] ?? null,
            'created_at' => date('Y-m-d H:i:s')
        ];
        
        // 模拟数据库操作耗时
        usleep(500000); // 500ms
        
        return $user;
    }
    
    /**
     * 发送异步任务
     */
    private function sendAsyncTasks(array $user): void
    {
        $tasks = [
            [
                'type' => 'send_welcome_email',
                'data' => ['email' => $user['email'], 'username' => $user['username']]
            ],
            [
                'type' => 'send_sms_verification',
                'data' => ['phone' => $user['phone']]
            ],
            [
                'type' => 'init_user_points',
                'data' => ['user_id' => $user['id'], 'points' => 100]
            ]
        ];
        
        foreach ($tasks as $task) {
            $message = new AMQPMessage(
                json_encode($task),
                ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
            );
            $this->mqChannel->basic_publish($message, '', 'user_tasks');
        }
    }
    
    private function validateUserData(array $data): void
    {
        if (empty($data['username']) || empty($data['email'])) {
            throw new InvalidArgumentException('用户名和邮箱不能为空');
        }
    }
}

// 使用示例
$service = new UserRegistrationService();

$result = $service->register([
    'username' => 'john_doe',
    'email' => 'john@example.com',
    'phone' => '13800138000'
]);

print_r($result);

消费者:任务处理器

php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

class UserTaskWorker
{
    private $connection;
    private $channel;
    
    public function __construct()
    {
        $this->connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $this->channel = $this->connection->channel();
        $this->channel->queue_declare('user_tasks', false, true, false, false);
        $this->channel->basic_qos(null, 1, false);
    }
    
    /**
     * 启动消费
     */
    public function start(): void
    {
        echo "Worker 启动,等待任务...\n";
        
        $this->channel->basic_consume('user_tasks', '', false, false, false, false, function ($msg) {
            $task = json_decode($msg->body, true);
            
            try {
                $this->processTask($task);
                $msg->ack();
                echo "任务完成: {$task['type']}\n";
            } catch (Exception $e) {
                $msg->nack(true);
                echo "任务失败: {$task['type']} - {$e->getMessage()}\n";
            }
        });
        
        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }
    
    /**
     * 处理任务
     */
    private function processTask(array $task): void
    {
        switch ($task['type']) {
            case 'send_welcome_email':
                $this->sendWelcomeEmail($task['data']);
                break;
            case 'send_sms_verification':
                $this->sendSmsVerification($task['data']);
                break;
            case 'init_user_points':
                $this->initUserPoints($task['data']);
                break;
            default:
                throw new RuntimeException("未知任务类型: {$task['type']}");
        }
    }
    
    private function sendWelcomeEmail(array $data): void
    {
        echo "发送欢迎邮件到: {$data['email']}\n";
        usleep(2000000); // 模拟耗时 2秒
    }
    
    private function sendSmsVerification(array $data): void
    {
        echo "发送短信到: {$data['phone']}\n";
        usleep(1500000); // 模拟耗时 1.5秒
    }
    
    private function initUserPoints(array $data): void
    {
        echo "初始化用户积分: 用户ID {$data['user_id']}, 积分 {$data['points']}\n";
        usleep(300000); // 模拟耗时 0.3秒
    }
}

// 启动 Worker
$worker = new UserTaskWorker();
$worker->start();

2. 应用解耦

消息队列可以实现系统间的解耦,各系统独立开发、部署和扩展。

2.1 订单系统解耦

┌─────────────────────────────────────────────────────────┐
│                    传统架构(耦合)                       │
│                                                         │
│  ┌─────────┐     ┌─────────┐     ┌─────────┐           │
│  │ 订单服务 │────→│ 库存服务 │────→│ 支付服务 │           │
│  └─────────┘     └─────────┘     └─────────┘           │
│       │              │               │                  │
│       └──────────────┴───────────────┘                  │
│              任意服务故障导致整体失败                      │
└─────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────┐
│                    解耦架构(MQ)                         │
│                                                         │
│  ┌─────────┐                                            │
│  │ 订单服务 │────┐                                       │
│  └─────────┘    │                                       │
│                 ↓                                       │
│          ┌─────────────┐                                │
│          │  RabbitMQ   │                                │
│          │   订单消息   │                                │
│          └──────┬──────┘                                │
│                 │                                       │
│     ┌───────────┼───────────┬───────────┐              │
│     ↓           ↓           ↓           ↓              │
│ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐            │
│ │库存服务│ │支付服务│ │通知服务│ │物流服务│            │
│ └────────┘ └────────┘ └────────┘ └────────┘            │
│                                                         │
│      各服务独立运行,互不影响                             │
└─────────────────────────────────────────────────────────┘

2.2 PHP 代码示例

订单服务(生产者)

php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

class OrderService
{
    private $mq;
    
    public function __construct()
    {
        $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $this->mq = $connection->channel();
        
        // 声明订单交换机(Topic 类型)
        $this->mq->exchange_declare('order_events', AMQPExchangeType::TOPIC, false, true, false);
    }
    
    /**
     * 创建订单
     */
    public function createOrder(array $orderData): array
    {
        // 1. 创建订单记录
        $order = $this->createOrderRecord($orderData);
        
        // 2. 发布订单创建事件
        $this->publishOrderEvent('order.created', $order);
        
        return $order;
    }
    
    /**
     * 支付订单
     */
    public function payOrder(string $orderId, array $paymentData): array
    {
        // 1. 处理支付
        $payment = $this->processPayment($orderId, $paymentData);
        
        // 2. 发布支付成功事件
        $this->publishOrderEvent('order.paid', [
            'order_id' => $orderId,
            'payment_id' => $payment['id'],
            'amount' => $payment['amount']
        ]);
        
        return $payment;
    }
    
    /**
     * 取消订单
     */
    public function cancelOrder(string $orderId, string $reason): void
    {
        // 1. 更新订单状态
        $this->updateOrderStatus($orderId, 'cancelled');
        
        // 2. 发布订单取消事件
        $this->publishOrderEvent('order.cancelled', [
            'order_id' => $orderId,
            'reason' => $reason,
            'cancelled_at' => date('Y-m-d H:i:s')
        ]);
    }
    
    /**
     * 发布订单事件
     */
    private function publishOrderEvent(string $routingKey, array $data): void
    {
        $event = [
            'event_id' => uniqid('evt_', true),
            'event_type' => $routingKey,
            'data' => $data,
            'timestamp' => time()
        ];
        
        $message = new AMQPMessage(
            json_encode($event),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
            ]
        );
        
        $this->mq->basic_publish($message, 'order_events', $routingKey);
        
        echo "事件已发布: {$routingKey}\n";
    }
    
    private function createOrderRecord(array $data): array
    {
        return [
            'id' => 'ORD' . time(),
            'user_id' => $data['user_id'],
            'items' => $data['items'],
            'amount' => $data['amount'],
            'status' => 'created',
            'created_at' => date('Y-m-d H:i:s')
        ];
    }
    
    private function processPayment(string $orderId, array $data): array
    {
        return [
            'id' => 'PAY' . time(),
            'order_id' => $orderId,
            'amount' => $data['amount'],
            'method' => $data['method'],
            'status' => 'success'
        ];
    }
    
    private function updateOrderStatus(string $orderId, string $status): void
    {
        echo "订单 {$orderId} 状态更新为: {$status}\n";
    }
}

// 使用示例
$orderService = new OrderService();

// 创建订单
$order = $orderService->createOrder([
    'user_id' => 1001,
    'items' => [
        ['product_id' => 101, 'quantity' => 2, 'price' => 99.00],
        ['product_id' => 102, 'quantity' => 1, 'price' => 199.00]
    ],
    'amount' => 397.00
]);

echo "订单创建成功: {$order['id']}\n";

// 模拟支付
sleep(1);
$orderService->payOrder($order['id'], [
    'amount' => 397.00,
    'method' => 'alipay'
]);

库存服务(消费者)

php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

class InventoryService
{
    private $mq;
    
    public function __construct()
    {
        $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $this->mq = $connection->channel();
        
        // 声明交换机
        $this->mq->exchange_declare('order_events', 'topic', false, true, false);
        
        // 声明队列
        $this->mq->queue_declare('inventory_queue', false, true, false, false);
        
        // 绑定队列到感兴趣的订单事件
        $this->mq->queue_bind('inventory_queue', 'order_events', 'order.created');
        $this->mq->queue_bind('inventory_queue', 'order_events', 'order.cancelled');
        
        $this->mq->basic_qos(null, 1, false);
    }
    
    public function start(): void
    {
        echo "库存服务启动...\n";
        
        $this->mq->basic_consume('inventory_queue', '', false, false, false, false, function ($msg) {
            $event = json_decode($msg->body, true);
            
            try {
                $this->handleEvent($event);
                $msg->ack();
            } catch (Exception $e) {
                echo "处理失败: {$e->getMessage()}\n";
                $msg->nack(true);
            }
        });
        
        while ($this->mq->is_consuming()) {
            $this->mq->wait();
        }
    }
    
    private function handleEvent(array $event): void
    {
        switch ($event['event_type']) {
            case 'order.created':
                $this->reserveInventory($event['data']);
                break;
            case 'order.cancelled':
                $this->releaseInventory($event['data']);
                break;
        }
    }
    
    private function reserveInventory(array $orderData): void
    {
        echo "预留库存 - 订单: {$orderData['id']}\n";
        foreach ($orderData['items'] as $item) {
            echo "  商品 {$item['product_id']}: 预留 {$item['quantity']} 件\n";
        }
    }
    
    private function releaseInventory(array $data): void
    {
        echo "释放库存 - 订单: {$data['order_id']}\n";
    }
}

$service = new InventoryService();
$service->start();

通知服务(消费者)

php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

class NotificationService
{
    private $mq;
    
    public function __construct()
    {
        $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $this->mq = $connection->channel();
        
        $this->mq->exchange_declare('order_events', 'topic', false, true, false);
        $this->mq->queue_declare('notification_queue', false, true, false, false);
        
        // 订阅所有订单事件
        $this->mq->queue_bind('notification_queue', 'order_events', 'order.#');
        
        $this->mq->basic_qos(null, 1, false);
    }
    
    public function start(): void
    {
        echo "通知服务启动...\n";
        
        $this->mq->basic_consume('notification_queue', '', false, false, false, false, function ($msg) {
            $event = json_decode($msg->body, true);
            
            try {
                $this->sendNotification($event);
                $msg->ack();
            } catch (Exception $e) {
                echo "通知发送失败: {$e->getMessage()}\n";
                $msg->nack(true);
            }
        });
        
        while ($this->mq->is_consuming()) {
            $this->mq->wait();
        }
    }
    
    private function sendNotification(array $event): void
    {
        $eventType = $event['event_type'];
        $data = $event['data'];
        
        echo "发送通知: {$eventType}\n";
        
        switch ($eventType) {
            case 'order.created':
                $this->sendSms($data['user_id'], "您的订单已创建成功");
                break;
            case 'order.paid':
                $this->sendSms($data['user_id'], "您的订单已支付成功");
                $this->sendEmail($data['user_id'], "订单支付成功通知");
                break;
            case 'order.cancelled':
                $this->sendSms($data['user_id'], "您的订单已取消");
                break;
        }
    }
    
    private function sendSms(int $userId, string $message): void
    {
        echo "  发送短信给用户 {$userId}: {$message}\n";
    }
    
    private function sendEmail(int $userId, string $subject): void
    {
        echo "  发送邮件给用户 {$userId}: {$subject}\n";
    }
}

$service = new NotificationService();
$service->start();

3. 流量削峰

在高并发场景下,消息队列可以缓冲请求,保护后端服务不被压垮。

3.1 秒杀场景

┌─────────────────────────────────────────────────────────┐
│                    秒杀流量削峰                          │
│                                                         │
│  高峰流量: 10000 请求/秒                                 │
│           ↓                                             │
│  ┌─────────────────┐                                    │
│  │   RabbitMQ      │  缓冲队列                          │
│  │  [请求排队...]   │                                    │
│  └────────┬────────┘                                    │
│           │ 控制消费速率: 100 请求/秒                    │
│           ↓                                             │
│  ┌─────────────────┐                                    │
│  │   后端服务       │  正常处理                          │
│  └─────────────────┘                                    │
└─────────────────────────────────────────────────────────┘

3.2 PHP 代码示例

秒杀入口(生产者)

php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class SeckillService
{
    private $mq;
    private $queueName = 'seckill_queue';
    
    public function __construct()
    {
        $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $this->mq = $connection->channel();
        
        // 声明队列,设置最大长度
        $args = new AMQPTable([
            'x-max-length' => 10000,           // 最大消息数
            'x-overflow' => 'reject-publish',  // 超出时拒绝新消息
            'x-message-ttl' => 300000          // 消息存活时间 5分钟
        ]);
        
        $this->mq->queue_declare($this->queueName, false, true, false, false, false, $args);
    }
    
    /**
     * 参与秒杀
     */
    public function participate(int $userId, int $productId, int $quantity): array
    {
        // 1. 检查用户是否已参与
        if ($this->hasParticipated($userId, $productId)) {
            return [
                'success' => false,
                'message' => '您已参与过此活动'
            ];
        }
        
        // 2. 检查库存
        $stock = $this->getStock($productId);
        if ($stock <= 0) {
            return [
                'success' => false,
                'message' => '商品已售罄'
            ];
        }
        
        // 3. 发送秒杀请求到队列
        $request = [
            'request_id' => uniqid('sk_', true),
            'user_id' => $userId,
            'product_id' => $productId,
            'quantity' => $quantity,
            'timestamp' => time()
        ];
        
        try {
            $message = new AMQPMessage(
                json_encode($request),
                ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
            );
            
            $this->mq->basic_publish($message, '', $this->queueName);
            
            // 标记用户已参与
            $this->markParticipated($userId, $productId);
            
            return [
                'success' => true,
                'message' => '秒杀请求已提交,请等待处理结果',
                'request_id' => $request['request_id']
            ];
            
        } catch (Exception $e) {
            // 队列已满
            return [
                'success' => false,
                'message' => '系统繁忙,请稍后重试'
            ];
        }
    }
    
    private function hasParticipated(int $userId, int $productId): bool
    {
        // 使用 Redis 检查
        return false;
    }
    
    private function getStock(int $productId): int
    {
        // 使用 Redis 获取库存
        return 100;
    }
    
    private function markParticipated(int $userId, int $productId): void
    {
        // 使用 Redis 标记
    }
}

// API 入口
header('Content-Type: application/json');

$service = new SeckillService();

$userId = $_POST['user_id'] ?? 0;
$productId = $_POST['product_id'] ?? 0;
$quantity = $_POST['quantity'] ?? 1;

$result = $service->participate($userId, $productId, $quantity);

echo json_encode($result);

秒杀处理器(消费者)

php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

class SeckillWorker
{
    private $mq;
    private $queueName = 'seckill_queue';
    
    public function __construct()
    {
        $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $this->mq = $connection->channel();
        $this->mq->queue_declare($this->queueName, false, true, false, false);
        
        // 限制消费速率:每次只处理一条消息
        $this->mq->basic_qos(null, 1, false);
    }
    
    public function start(): void
    {
        echo "秒杀处理器启动...\n";
        
        $this->mq->basic_consume($this->queueName, '', false, false, false, false, function ($msg) {
            $request = json_decode($msg->body, true);
            
            try {
                $result = $this->processSeckill($request);
                
                if ($result['success']) {
                    echo "秒杀成功: 用户 {$request['user_id']} - 商品 {$request['product_id']}\n";
                } else {
                    echo "秒杀失败: {$result['message']}\n";
                }
                
                $msg->ack();
                
            } catch (Exception $e) {
                echo "处理异常: {$e->getMessage()}\n";
                $msg->nack(false); // 不重新入队
            }
        });
        
        while ($this->mq->is_consuming()) {
            $this->mq->wait();
        }
    }
    
    private function processSeckill(array $request): array
    {
        // 1. 再次检查库存(使用锁)
        $stock = $this->getStockWithLock($request['product_id']);
        
        if ($stock < $request['quantity']) {
            return [
                'success' => false,
                'message' => '商品已售罄'
            ];
        }
        
        // 2. 扣减库存
        $this->decreaseStock($request['product_id'], $request['quantity']);
        
        // 3. 创建订单
        $order = $this->createOrder($request);
        
        // 4. 发送成功通知
        $this->notifyUser($request['user_id'], [
            'type' => 'seckill_success',
            'order_id' => $order['id']
        ]);
        
        return [
            'success' => true,
            'order_id' => $order['id']
        ];
    }
    
    private function getStockWithLock(int $productId): int
    {
        // 使用 Redis + Lua 脚本实现原子操作
        return 10;
    }
    
    private function decreaseStock(int $productId, int $quantity): void
    {
        // 使用 Redis 原子扣减
    }
    
    private function createOrder(array $request): array
    {
        return [
            'id' => 'ORD' . time(),
            'user_id' => $request['user_id'],
            'product_id' => $request['product_id'],
            'quantity' => $request['quantity']
        ];
    }
    
    private function notifyUser(int $userId, array $data): void
    {
        // 发送通知到消息队列
        echo "通知用户 {$userId}: " . json_encode($data) . "\n";
    }
}

$worker = new SeckillWorker();
$worker->start();

4. 日志收集

RabbitMQ 可以作为日志收集系统的核心组件,收集、聚合和分发日志。

4.1 架构设计

┌─────────────────────────────────────────────────────────┐
│                    日志收集架构                          │
│                                                         │
│  ┌─────────┐  ┌─────────┐  ┌─────────┐                 │
│  │ Web服务  │  │ API服务  │  │后台任务 │                 │
│  └────┬────┘  └────┬────┘  └────┬────┘                 │
│       │            │            │                       │
│       └────────────┼────────────┘                       │
│                    ↓                                    │
│           ┌─────────────────┐                           │
│           │   RabbitMQ      │                           │
│           │  logs_exchange  │                           │
│           └────────┬────────┘                           │
│                    │                                    │
│       ┌────────────┼────────────┐                       │
│       ↓            ↓            ↓                       │
│  ┌─────────┐ ┌─────────┐ ┌─────────┐                   │
│  │日志存储 │ │日志分析 │ │告警服务 │                   │
│  │(ES)     │ │(分析)   │ │(监控)   │                   │
│  └─────────┘ └─────────┘ └─────────┘                   │
└─────────────────────────────────────────────────────────┘

4.2 PHP 代码示例

日志生产者

php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

class LogProducer
{
    private $mq;
    private $exchangeName = 'logs_exchange';
    
    public function __construct()
    {
        $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $this->mq = $connection->channel();
        
        // 声明 Topic 交换机
        $this->mq->exchange_declare($this->exchangeName, AMQPExchangeType::TOPIC, false, true, false);
    }
    
    /**
     * 记录日志
     */
    public function log(string $level, string $message, array $context = []): void
    {
        $logEntry = [
            'timestamp' => date('Y-m-d H:i:s.u'),
            'level' => $level,
            'message' => $message,
            'context' => $context,
            'server' => gethostname(),
            'pid' => getmypid()
        ];
        
        $routingKey = "log.{$level}." . ($context['service'] ?? 'app');
        
        $msg = new AMQPMessage(
            json_encode($logEntry),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
            ]
        );
        
        $this->mq->basic_publish($msg, $this->exchangeName, $routingKey);
    }
    
    public function info(string $message, array $context = []): void
    {
        $this->log('info', $message, $context);
    }
    
    public function error(string $message, array $context = []): void
    {
        $this->log('error', $message, $context);
    }
    
    public function warning(string $message, array $context = []): void
    {
        $this->log('warning', $message, $context);
    }
}

// 使用示例
$logger = new LogProducer();

$logger->info('用户登录', ['user_id' => 1001, 'ip' => '192.168.1.1']);
$logger->error('数据库连接失败', ['service' => 'api', 'error' => 'Connection timeout']);
$logger->warning('内存使用过高', ['service' => 'worker', 'usage' => '85%']);

日志消费者

php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

class LogConsumer
{
    private $mq;
    private $exchangeName = 'logs_exchange';
    
    public function __construct()
    {
        $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $this->mq = $connection->channel();
        
        $this->mq->exchange_declare($this->exchangeName, 'topic', false, true, false);
        $this->mq->basic_qos(null, 10, false);
    }
    
    /**
     * 消费错误日志
     */
    public function consumeErrors(): void
    {
        $queueName = 'error_logs_queue';
        $this->mq->queue_declare($queueName, false, true, false, false);
        $this->mq->queue_bind($queueName, $this->exchangeName, 'log.error.*');
        
        echo "开始消费错误日志...\n";
        
        $this->mq->basic_consume($queueName, '', false, false, false, false, function ($msg) {
            $log = json_decode($msg->body, true);
            
            // 写入文件
            $this->writeToFile('errors.log', $log);
            
            // 发送告警
            $this->sendAlert($log);
            
            $msg->ack();
        });
        
        while ($this->mq->is_consuming()) {
            $this->mq->wait();
        }
    }
    
    /**
     * 消费所有日志
     */
    public function consumeAll(): void
    {
        $queueName = 'all_logs_queue';
        $this->mq->queue_declare($queueName, false, true, false, false);
        $this->mq->queue_bind($queueName, $this->exchangeName, 'log.#');
        
        echo "开始消费所有日志...\n";
        
        $this->mq->basic_consume($queueName, '', false, false, false, false, function ($msg) {
            $log = json_decode($msg->body, true);
            
            // 存储到 Elasticsearch
            $this->storeToElasticsearch($log);
            
            $msg->ack();
        });
        
        while ($this->mq->is_consuming()) {
            $this->mq->wait();
        }
    }
    
    private function writeToFile(string $file, array $log): void
    {
        $line = "[{$log['timestamp']}] {$log['level']}: {$log['message']}";
        if (!empty($log['context'])) {
            $line .= " " . json_encode($log['context']);
        }
        $line .= "\n";
        
        file_put_contents($file, $line, FILE_APPEND);
    }
    
    private function sendAlert(array $log): void
    {
        echo "告警: {$log['message']}\n";
    }
    
    private function storeToElasticsearch(array $log): void
    {
        // 存储到 ES
        echo "存储日志到 ES: {$log['level']} - {$log['message']}\n";
    }
}

// 启动消费者
$consumer = new LogConsumer();
$consumer->consumeErrors();

5. 延迟任务

RabbitMQ 可以实现延迟任务处理,如订单超时取消、定时提醒等。

5.1 订单超时取消

php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class OrderTimeoutService
{
    private $mq;
    
    public function __construct()
    {
        $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $this->mq = $connection->channel();
        
        $this->setupDelayedQueue();
    }
    
    /**
     * 设置延迟队列
     */
    private function setupDelayedQueue(): void
    {
        // 1. 声明死信交换机
        $this->mq->exchange_declare('order_timeout_exchange', 'direct', false, true, false);
        
        // 2. 声明死信队列(处理超时订单)
        $this->mq->queue_declare('order_timeout_queue', false, true, false, false);
        $this->mq->queue_bind('order_timeout_queue', 'order_timeout_exchange', 'order.timeout');
        
        // 3. 声明延迟队列(消息在这里等待)
        $args = new AMQPTable([
            'x-dead-letter-exchange' => 'order_timeout_exchange',
            'x-dead-letter-routing-key' => 'order.timeout',
            'x-message-ttl' => 1800000  // 30分钟(毫秒)
        ]);
        $this->mq->queue_declare('order_delay_queue', false, true, false, false, false, $args);
    }
    
    /**
     * 创建订单时设置超时检查
     */
    public function createOrderWithTimeout(array $orderData): void
    {
        // 创建订单...
        $orderId = 'ORD' . time();
        
        // 发送延迟消息
        $message = new AMQPMessage(
            json_encode(['order_id' => $orderId]),
            ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
        );
        
        $this->mq->basic_publish($message, '', 'order_delay_queue');
        
        echo "订单创建成功: {$orderId},30分钟后检查支付状态\n";
    }
    
    /**
     * 处理超时订单
     */
    public function processTimeout(): void
    {
        $this->mq->basic_consume('order_timeout_queue', '', false, false, false, false, function ($msg) {
            $data = json_decode($msg->body, true);
            
            // 检查订单状态
            $order = $this->getOrder($data['order_id']);
            
            if ($order['status'] === 'pending') {
                // 取消订单
                $this->cancelOrder($data['order_id'], '支付超时');
                echo "订单超时取消: {$data['order_id']}\n";
            } else {
                echo "订单已支付,跳过: {$data['order_id']}\n";
            }
            
            $msg->ack();
        });
        
        while ($this->mq->is_consuming()) {
            $this->mq->wait();
        }
    }
    
    private function getOrder(string $orderId): array
    {
        return ['id' => $orderId, 'status' => 'pending'];
    }
    
    private function cancelOrder(string $orderId, string $reason): void
    {
        echo "取消订单: {$orderId}, 原因: {$reason}\n";
    }
}

// 创建订单
$service = new OrderTimeoutService();
$service->createOrderWithTimeout(['user_id' => 1001, 'amount' => 199.00]);

// 或者启动超时处理器
// $service->processTimeout();

实际应用场景

1. 电商系统

  • 订单处理异步化
  • 库存同步
  • 支付回调处理
  • 物流状态更新

2. 社交平台

  • 消息推送
  • 动态分发
  • 通知系统

3. 金融系统

  • 交易处理
  • 风控检查
  • 账户同步

4. IoT 平台

  • 设备数据上报
  • 命令下发
  • 状态同步

常见问题与解决方案

Q1: 如何保证消息顺序?

解决方案:使用单队列单消费者,或在消息中添加序列号。

Q2: 如何处理消息重复?

解决方案:实现幂等性,使用唯一 ID 去重。

Q3: 如何监控队列状态?

解决方案:使用 RabbitMQ Management API 或 Prometheus 监控。

最佳实践建议

  1. 合理设计队列:根据业务特点选择队列类型
  2. 做好监控告警:监控队列长度、消费速率
  3. 实现优雅关闭:处理 SIGTERM 信号
  4. 错误重试机制:设置合理的重试策略
  5. 死信处理:配置死信队列处理失败消息
  6. 容量规划:预估消息量,合理配置资源

相关链接