Appearance
电商系统应用
概述
电商系统是 RabbitMQ 应用的典型场景,涉及订单处理、库存管理、支付通知、物流跟踪等多个业务环节。通过消息队列实现各环节的解耦和异步处理,提升系统的性能和可靠性。
业务背景与需求
场景描述
某电商平台业务模块:
| 模块 | 功能 | 消息场景 |
|---|---|---|
| 用户中心 | 注册、登录、信息管理 | 用户事件、通知 |
| 商品中心 | 商品管理、库存管理 | 商品事件、库存预警 |
| 订单中心 | 下单、支付、发货、售后 | 订单事件、状态流转 |
| 营销中心 | 优惠券、活动、秒杀 | 营销事件、定时任务 |
| 支付中心 | 支付、退款、对账 | 支付事件、回调通知 |
| 物流中心 | 发货、跟踪、签收 | 物流事件、状态同步 |
技术挑战
电商系统挑战:
1. 高并发:大促期间流量激增
2. 高可用:核心业务不能中断
3. 数据一致:订单、库存、支付状态一致
4. 实时性:库存、订单状态实时更新架构设计
整体架构图
mermaid
graph TB
subgraph "用户端"
A[Web端]
B[App端]
C[小程序]
end
subgraph "接入层"
D[API网关]
E[负载均衡]
end
subgraph "业务服务"
F[用户服务]
G[商品服务]
H[订单服务]
I[支付服务]
J[物流服务]
K[营销服务]
end
subgraph "RabbitMQ"
L[事件总线]
subgraph "业务队列"
M[订单队列]
N[支付队列]
O[物流队列]
P[通知队列]
end
end
subgraph "数据层"
Q[MySQL]
R[Redis]
S[Elasticsearch]
end
A --> D
B --> D
C --> D
D --> E
E --> F
E --> G
E --> H
E --> I
E --> J
E --> K
H --> L
I --> L
J --> L
L --> M
L --> N
L --> O
L --> P
F --> Q
G --> Q
H --> Q
I --> Q
G --> R
H --> R
G --> S订单流程
mermaid
sequenceDiagram
participant User as 用户
participant Order as 订单服务
participant MQ as RabbitMQ
participant Inventory as 库存服务
participant Payment as 支付服务
participant Logistics as 物流服务
User->>Order: 提交订单
Order->>MQ: 发布订单创建事件
Order-->>User: 返回订单号
MQ->>Inventory: 扣减库存
Inventory-->>MQ: ACK
User->>Payment: 发起支付
Payment->>MQ: 发布支付事件
Payment-->>User: 支付成功
MQ->>Order: 更新订单状态
MQ->>Logistics: 创建物流单
Logistics->>MQ: 发布物流事件
MQ->>User: 推送物流信息PHP 代码实现
电商消息基类
php
<?php
namespace App\Ecommerce;
abstract class EcommerceMessage
{
public string $messageId;
public string $messageType;
public string $source;
public int $timestamp;
public array $metadata;
public function __construct(string $source)
{
$this->messageId = $this->generateMessageId();
$this->source = $source;
$this->timestamp = time();
$this->metadata = [];
}
private function generateMessageId(): string
{
return sprintf('msg_%s_%s', date('YmdHis'), bin2hex(random_bytes(8)));
}
abstract public function getMessageType(): string;
abstract public function getPayload(): array;
public function toArray(): array
{
return [
'message_id' => $this->messageId,
'message_type' => $this->getMessageType(),
'source' => $this->source,
'timestamp' => $this->timestamp,
'metadata' => $this->metadata,
'payload' => $this->getPayload(),
];
}
}订单消息处理
php
<?php
namespace App\Ecommerce\Order;
use App\Ecommerce\EcommerceMessage;
class OrderCreatedMessage extends EcommerceMessage
{
private array $order;
public function __construct(array $order)
{
parent::__construct('order-service');
$this->order = $order;
}
public function getMessageType(): string
{
return 'order.created';
}
public function getPayload(): array
{
return $this->order;
}
}
class OrderPaidMessage extends EcommerceMessage
{
private string $orderId;
private string $paymentId;
private float $amount;
public function __construct(string $orderId, string $paymentId, float $amount)
{
parent::__construct('payment-service');
$this->orderId = $orderId;
$this->paymentId = $paymentId;
$this->amount = $amount;
}
public function getMessageType(): string
{
return 'order.paid';
}
public function getPayload(): array
{
return [
'order_id' => $this->orderId,
'payment_id' => $this->paymentId,
'amount' => $this->amount,
];
}
}
class OrderShippedMessage extends EcommerceMessage
{
private string $orderId;
private string $trackingNumber;
private string $carrier;
public function __construct(string $orderId, string $trackingNumber, string $carrier)
{
parent::__construct('logistics-service');
$this->orderId = $orderId;
$this->trackingNumber = $trackingNumber;
$this->carrier = $carrier;
}
public function getMessageType(): string
{
return 'order.shipped';
}
public function getPayload(): array
{
return [
'order_id' => $this->orderId,
'tracking_number' => $this->trackingNumber,
'carrier' => $this->carrier,
];
}
}订单服务实现
php
<?php
namespace App\Ecommerce\Order;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
class OrderService
{
private AMQPStreamConnection $connection;
private $channel;
private OrderRepository $repository;
private string $exchangeName = 'ecommerce.exchange';
public function __construct(
AMQPStreamConnection $connection,
OrderRepository $repository
) {
$this->connection = $connection;
$this->channel = $connection->channel();
$this->repository = $repository;
$this->setupExchange();
}
private function setupExchange(): void
{
$this->channel->exchange_declare(
$this->exchangeName,
AMQPExchangeType::TOPIC,
false,
true,
false
);
}
public function createOrder(array $orderData): array
{
$orderId = $this->generateOrderId();
$order = [
'order_id' => $orderId,
'user_id' => $orderData['user_id'],
'items' => $orderData['items'],
'total_amount' => $orderData['total_amount'],
'status' => 'pending',
'shipping_address' => $orderData['shipping_address'],
'created_at' => date('Y-m-d H:i:s'),
];
$this->repository->create($order);
$this->publishMessage(new OrderCreatedMessage($order));
return ['order_id' => $orderId, 'status' => 'pending'];
}
public function confirmPayment(string $orderId, string $paymentId, float $amount): void
{
$this->repository->update($orderId, [
'status' => 'paid',
'payment_id' => $paymentId,
'paid_at' => date('Y-m-d H:i:s'),
]);
$this->publishMessage(new OrderPaidMessage($orderId, $paymentId, $amount));
}
public function shipOrder(string $orderId, string $trackingNumber, string $carrier): void
{
$this->repository->update($orderId, [
'status' => 'shipped',
'tracking_number' => $trackingNumber,
'carrier' => $carrier,
'shipped_at' => date('Y-m-d H:i:s'),
]);
$this->publishMessage(new OrderShippedMessage($orderId, $trackingNumber, $carrier));
}
private function publishMessage(EcommerceMessage $message): void
{
$amqpMessage = new AMQPMessage(
json_encode($message->toArray()),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'message_id' => $message->messageId,
]
);
$routingKey = $message->getMessageType();
$this->channel->basic_publish($amqpMessage, $this->exchangeName, $routingKey);
}
private function generateOrderId(): string
{
return sprintf('ORD%s%s', date('YmdHis'), strtoupper(bin2hex(random_bytes(4))));
}
}库存服务消费者
php
<?php
namespace App\Ecommerce\Inventory;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class InventoryConsumer
{
private AMQPStreamConnection $connection;
private $channel;
private InventoryRepository $repository;
private bool $running = true;
public function __construct(
AMQPStreamConnection $connection,
InventoryRepository $repository
) {
$this->connection = $connection;
$this->channel = $connection->channel();
$this->repository = $repository;
$this->setupQueue();
}
private function setupQueue(): void
{
$this->channel->exchange_declare('ecommerce.exchange', 'topic', false, true, false);
$this->channel->queue_declare('inventory.order', false, true, false, false);
$this->channel->queue_bind('inventory.order', 'ecommerce.exchange', 'order.created');
$this->channel->queue_bind('inventory.order', 'ecommerce.exchange', 'order.cancelled');
}
public function consume(): void
{
$this->channel->basic_qos(null, 10, null);
$this->channel->basic_consume('inventory.order', '', false, false, false, false, [$this, 'handleMessage']);
while ($this->running && count($this->channel->callbacks)) {
$this->channel->wait(null, true);
if (!$this->running) break;
usleep(100000);
}
}
public function handleMessage(AMQPMessage $message): void
{
$data = json_decode($message->body, true);
$messageType = $data['message_type'];
$payload = $data['payload'];
try {
switch ($messageType) {
case 'order.created':
$this->handleOrderCreated($payload);
break;
case 'order.cancelled':
$this->handleOrderCancelled($payload);
break;
}
$message->ack();
} catch (\Exception $e) {
error_log("Inventory error: " . $e->getMessage());
$message->nack(false, true);
}
}
private function handleOrderCreated(array $payload): void
{
foreach ($payload['items'] as $item) {
$this->repository->deduct(
$item['product_id'],
$item['sku_id'],
$item['quantity']
);
}
}
private function handleOrderCancelled(array $payload): void
{
foreach ($payload['items'] as $item) {
$this->repository->restore(
$item['product_id'],
$item['sku_id'],
$item['quantity']
);
}
}
public function stop(): void
{
$this->running = false;
}
}完整使用示例
php
<?php
require_once 'vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use App\Ecommerce\Order\OrderService;
use App\Ecommerce\Inventory\InventoryConsumer;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$orderService = new OrderService($connection, new OrderRepository());
echo "=== 电商系统示例 ===\n\n";
echo "1. 创建订单\n";
$result = $orderService->createOrder([
'user_id' => 'user_001',
'items' => [
['product_id' => 'P001', 'sku_id' => 'SKU001', 'quantity' => 2, 'price' => 99.00],
],
'total_amount' => 198.00,
'shipping_address' => '北京市朝阳区xxx',
]);
echo "订单创建成功: {$result['order_id']}\n";
echo "\n2. 支付确认\n";
$orderService->confirmPayment($result['order_id'], 'PAY001', 198.00);
echo "支付已确认\n";
echo "\n3. 发货处理\n";
$orderService->shipOrder($result['order_id'], 'SF123456789', '顺丰快递');
echo "订单已发货\n";
$connection->close();
echo "\n=== 示例完成 ===\n";关键技术点解析
1. 订单状态机
php
$stateMachine = [
'pending' => ['paid', 'cancelled'],
'paid' => ['shipped', 'refunded'],
'shipped' => ['delivered', 'returned'],
'delivered' => ['returned'],
];2. 库存原子扣减
php
public function deduct(string $productId, string $skuId, int $quantity): bool
{
$sql = "UPDATE inventory SET stock = stock - ?
WHERE product_id = ? AND sku_id = ? AND stock >= ?";
return $stmt->execute([$quantity, $productId, $skuId, $quantity]);
}3. 消息幂等性
php
if ($this->isProcessed($messageId)) {
return;
}
$this->doProcess($payload);
$this->markProcessed($messageId);性能优化建议
| 优化项 | 建议 | 说明 |
|---|---|---|
| 库存预热 | Redis 预加载库存 | 减少数据库压力 |
| 订单分表 | 按时间分表 | 提高查询效率 |
| 异步处理 | 非核心流程异步 | 提升响应速度 |
| 缓存策略 | 多级缓存 | 减少数据库访问 |
常见问题与解决方案
1. 超卖问题
解决方案: Redis 原子扣减 + 数据库乐观锁
2. 订单重复
解决方案: 幂等性处理 + 唯一订单号
3. 消息丢失
解决方案: 消息持久化 + 确认机制
