Skip to content

电商系统应用

概述

电商系统是 RabbitMQ 应用的典型场景,涉及订单处理、库存管理、支付通知、物流跟踪等多个业务环节。通过消息队列实现各环节的解耦和异步处理,提升系统的性能和可靠性。

业务背景与需求

场景描述

某电商平台业务模块:

模块功能消息场景
用户中心注册、登录、信息管理用户事件、通知
商品中心商品管理、库存管理商品事件、库存预警
订单中心下单、支付、发货、售后订单事件、状态流转
营销中心优惠券、活动、秒杀营销事件、定时任务
支付中心支付、退款、对账支付事件、回调通知
物流中心发货、跟踪、签收物流事件、状态同步

技术挑战

电商系统挑战:
1. 高并发:大促期间流量激增
2. 高可用:核心业务不能中断
3. 数据一致:订单、库存、支付状态一致
4. 实时性:库存、订单状态实时更新

架构设计

整体架构图

mermaid
graph TB
    subgraph "用户端"
        A[Web端]
        B[App端]
        C[小程序]
    end
    
    subgraph "接入层"
        D[API网关]
        E[负载均衡]
    end
    
    subgraph "业务服务"
        F[用户服务]
        G[商品服务]
        H[订单服务]
        I[支付服务]
        J[物流服务]
        K[营销服务]
    end
    
    subgraph "RabbitMQ"
        L[事件总线]
        
        subgraph "业务队列"
            M[订单队列]
            N[支付队列]
            O[物流队列]
            P[通知队列]
        end
    end
    
    subgraph "数据层"
        Q[MySQL]
        R[Redis]
        S[Elasticsearch]
    end
    
    A --> D
    B --> D
    C --> D
    D --> E
    
    E --> F
    E --> G
    E --> H
    E --> I
    E --> J
    E --> K
    
    H --> L
    I --> L
    J --> L
    
    L --> M
    L --> N
    L --> O
    L --> P
    
    F --> Q
    G --> Q
    H --> Q
    I --> Q
    
    G --> R
    H --> R
    
    G --> S

订单流程

mermaid
sequenceDiagram
    participant User as 用户
    participant Order as 订单服务
    participant MQ as RabbitMQ
    participant Inventory as 库存服务
    participant Payment as 支付服务
    participant Logistics as 物流服务
    
    User->>Order: 提交订单
    Order->>MQ: 发布订单创建事件
    Order-->>User: 返回订单号
    
    MQ->>Inventory: 扣减库存
    Inventory-->>MQ: ACK
    
    User->>Payment: 发起支付
    Payment->>MQ: 发布支付事件
    Payment-->>User: 支付成功
    
    MQ->>Order: 更新订单状态
    MQ->>Logistics: 创建物流单
    
    Logistics->>MQ: 发布物流事件
    MQ->>User: 推送物流信息

PHP 代码实现

电商消息基类

php
<?php

namespace App\Ecommerce;

abstract class EcommerceMessage
{
    public string $messageId;
    public string $messageType;
    public string $source;
    public int $timestamp;
    public array $metadata;

    public function __construct(string $source)
    {
        $this->messageId = $this->generateMessageId();
        $this->source = $source;
        $this->timestamp = time();
        $this->metadata = [];
    }

    private function generateMessageId(): string
    {
        return sprintf('msg_%s_%s', date('YmdHis'), bin2hex(random_bytes(8)));
    }

    abstract public function getMessageType(): string;

    abstract public function getPayload(): array;

    public function toArray(): array
    {
        return [
            'message_id' => $this->messageId,
            'message_type' => $this->getMessageType(),
            'source' => $this->source,
            'timestamp' => $this->timestamp,
            'metadata' => $this->metadata,
            'payload' => $this->getPayload(),
        ];
    }
}

订单消息处理

php
<?php

namespace App\Ecommerce\Order;

use App\Ecommerce\EcommerceMessage;

class OrderCreatedMessage extends EcommerceMessage
{
    private array $order;

    public function __construct(array $order)
    {
        parent::__construct('order-service');
        $this->order = $order;
    }

    public function getMessageType(): string
    {
        return 'order.created';
    }

    public function getPayload(): array
    {
        return $this->order;
    }
}

class OrderPaidMessage extends EcommerceMessage
{
    private string $orderId;
    private string $paymentId;
    private float $amount;

    public function __construct(string $orderId, string $paymentId, float $amount)
    {
        parent::__construct('payment-service');
        $this->orderId = $orderId;
        $this->paymentId = $paymentId;
        $this->amount = $amount;
    }

    public function getMessageType(): string
    {
        return 'order.paid';
    }

    public function getPayload(): array
    {
        return [
            'order_id' => $this->orderId,
            'payment_id' => $this->paymentId,
            'amount' => $this->amount,
        ];
    }
}

class OrderShippedMessage extends EcommerceMessage
{
    private string $orderId;
    private string $trackingNumber;
    private string $carrier;

    public function __construct(string $orderId, string $trackingNumber, string $carrier)
    {
        parent::__construct('logistics-service');
        $this->orderId = $orderId;
        $this->trackingNumber = $trackingNumber;
        $this->carrier = $carrier;
    }

    public function getMessageType(): string
    {
        return 'order.shipped';
    }

    public function getPayload(): array
    {
        return [
            'order_id' => $this->orderId,
            'tracking_number' => $this->trackingNumber,
            'carrier' => $this->carrier,
        ];
    }
}

订单服务实现

php
<?php

namespace App\Ecommerce\Order;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

class OrderService
{
    private AMQPStreamConnection $connection;
    private $channel;
    private OrderRepository $repository;
    private string $exchangeName = 'ecommerce.exchange';

    public function __construct(
        AMQPStreamConnection $connection,
        OrderRepository $repository
    ) {
        $this->connection = $connection;
        $this->channel = $connection->channel();
        $this->repository = $repository;
        $this->setupExchange();
    }

    private function setupExchange(): void
    {
        $this->channel->exchange_declare(
            $this->exchangeName,
            AMQPExchangeType::TOPIC,
            false,
            true,
            false
        );
    }

    public function createOrder(array $orderData): array
    {
        $orderId = $this->generateOrderId();

        $order = [
            'order_id' => $orderId,
            'user_id' => $orderData['user_id'],
            'items' => $orderData['items'],
            'total_amount' => $orderData['total_amount'],
            'status' => 'pending',
            'shipping_address' => $orderData['shipping_address'],
            'created_at' => date('Y-m-d H:i:s'),
        ];

        $this->repository->create($order);

        $this->publishMessage(new OrderCreatedMessage($order));

        return ['order_id' => $orderId, 'status' => 'pending'];
    }

    public function confirmPayment(string $orderId, string $paymentId, float $amount): void
    {
        $this->repository->update($orderId, [
            'status' => 'paid',
            'payment_id' => $paymentId,
            'paid_at' => date('Y-m-d H:i:s'),
        ]);

        $this->publishMessage(new OrderPaidMessage($orderId, $paymentId, $amount));
    }

    public function shipOrder(string $orderId, string $trackingNumber, string $carrier): void
    {
        $this->repository->update($orderId, [
            'status' => 'shipped',
            'tracking_number' => $trackingNumber,
            'carrier' => $carrier,
            'shipped_at' => date('Y-m-d H:i:s'),
        ]);

        $this->publishMessage(new OrderShippedMessage($orderId, $trackingNumber, $carrier));
    }

    private function publishMessage(EcommerceMessage $message): void
    {
        $amqpMessage = new AMQPMessage(
            json_encode($message->toArray()),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'message_id' => $message->messageId,
            ]
        );

        $routingKey = $message->getMessageType();
        $this->channel->basic_publish($amqpMessage, $this->exchangeName, $routingKey);
    }

    private function generateOrderId(): string
    {
        return sprintf('ORD%s%s', date('YmdHis'), strtoupper(bin2hex(random_bytes(4))));
    }
}

库存服务消费者

php
<?php

namespace App\Ecommerce\Inventory;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class InventoryConsumer
{
    private AMQPStreamConnection $connection;
    private $channel;
    private InventoryRepository $repository;
    private bool $running = true;

    public function __construct(
        AMQPStreamConnection $connection,
        InventoryRepository $repository
    ) {
        $this->connection = $connection;
        $this->channel = $connection->channel();
        $this->repository = $repository;
        $this->setupQueue();
    }

    private function setupQueue(): void
    {
        $this->channel->exchange_declare('ecommerce.exchange', 'topic', false, true, false);

        $this->channel->queue_declare('inventory.order', false, true, false, false);
        $this->channel->queue_bind('inventory.order', 'ecommerce.exchange', 'order.created');
        $this->channel->queue_bind('inventory.order', 'ecommerce.exchange', 'order.cancelled');
    }

    public function consume(): void
    {
        $this->channel->basic_qos(null, 10, null);
        $this->channel->basic_consume('inventory.order', '', false, false, false, false, [$this, 'handleMessage']);

        while ($this->running && count($this->channel->callbacks)) {
            $this->channel->wait(null, true);
            if (!$this->running) break;
            usleep(100000);
        }
    }

    public function handleMessage(AMQPMessage $message): void
    {
        $data = json_decode($message->body, true);
        $messageType = $data['message_type'];
        $payload = $data['payload'];

        try {
            switch ($messageType) {
                case 'order.created':
                    $this->handleOrderCreated($payload);
                    break;
                case 'order.cancelled':
                    $this->handleOrderCancelled($payload);
                    break;
            }
            $message->ack();
        } catch (\Exception $e) {
            error_log("Inventory error: " . $e->getMessage());
            $message->nack(false, true);
        }
    }

    private function handleOrderCreated(array $payload): void
    {
        foreach ($payload['items'] as $item) {
            $this->repository->deduct(
                $item['product_id'],
                $item['sku_id'],
                $item['quantity']
            );
        }
    }

    private function handleOrderCancelled(array $payload): void
    {
        foreach ($payload['items'] as $item) {
            $this->repository->restore(
                $item['product_id'],
                $item['sku_id'],
                $item['quantity']
            );
        }
    }

    public function stop(): void
    {
        $this->running = false;
    }
}

完整使用示例

php
<?php

require_once 'vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use App\Ecommerce\Order\OrderService;
use App\Ecommerce\Inventory\InventoryConsumer;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$orderService = new OrderService($connection, new OrderRepository());

echo "=== 电商系统示例 ===\n\n";

echo "1. 创建订单\n";
$result = $orderService->createOrder([
    'user_id' => 'user_001',
    'items' => [
        ['product_id' => 'P001', 'sku_id' => 'SKU001', 'quantity' => 2, 'price' => 99.00],
    ],
    'total_amount' => 198.00,
    'shipping_address' => '北京市朝阳区xxx',
]);
echo "订单创建成功: {$result['order_id']}\n";

echo "\n2. 支付确认\n";
$orderService->confirmPayment($result['order_id'], 'PAY001', 198.00);
echo "支付已确认\n";

echo "\n3. 发货处理\n";
$orderService->shipOrder($result['order_id'], 'SF123456789', '顺丰快递');
echo "订单已发货\n";

$connection->close();

echo "\n=== 示例完成 ===\n";

关键技术点解析

1. 订单状态机

php
$stateMachine = [
    'pending' => ['paid', 'cancelled'],
    'paid' => ['shipped', 'refunded'],
    'shipped' => ['delivered', 'returned'],
    'delivered' => ['returned'],
];

2. 库存原子扣减

php
public function deduct(string $productId, string $skuId, int $quantity): bool
{
    $sql = "UPDATE inventory SET stock = stock - ? 
            WHERE product_id = ? AND sku_id = ? AND stock >= ?";
    return $stmt->execute([$quantity, $productId, $skuId, $quantity]);
}

3. 消息幂等性

php
if ($this->isProcessed($messageId)) {
    return;
}
$this->doProcess($payload);
$this->markProcessed($messageId);

性能优化建议

优化项建议说明
库存预热Redis 预加载库存减少数据库压力
订单分表按时间分表提高查询效率
异步处理非核心流程异步提升响应速度
缓存策略多级缓存减少数据库访问

常见问题与解决方案

1. 超卖问题

解决方案: Redis 原子扣减 + 数据库乐观锁

2. 订单重复

解决方案: 幂等性处理 + 唯一订单号

3. 消息丢失

解决方案: 消息持久化 + 确认机制

相关链接