Appearance
秒杀系统设计
概述
秒杀系统是电商领域的经典高并发场景,需要在极短时间内处理海量请求,同时保证库存准确、订单可靠。RabbitMQ 在秒杀系统中承担流量削峰、异步处理、解耦服务的核心角色。
业务背景与需求
场景描述
某电商平台秒杀活动特征:
| 指标 | 数值 | 说明 |
|---|---|---|
| 活动时长 | 10分钟 | 限时抢购 |
| 预计流量 | 100万 QPS | 峰值请求量 |
| 商品数量 | 100件 | 限量秒杀 |
| 预计用户 | 50万人 | 参与用户数 |
| 成功订单 | 100单 | 实际成交 |
技术挑战
秒杀系统面临的核心挑战:
1. 瞬时高并发
- 流量在几秒内爆发
- 系统承载能力有限
2. 库存准确性
- 防止超卖
- 保证库存扣减原子性
3. 用户体验
- 快速响应
- 公平抢购
4. 系统稳定性
- 防止雪崩
- 降级保护需求目标
| 目标 | 指标 |
|---|---|
| 响应时间 | < 100ms |
| 系统可用性 | 99.99% |
| 库存准确率 | 100% |
| 防超卖 | 0 超卖 |
架构设计
整体架构图
mermaid
graph TB
subgraph "用户端"
A[用户请求]
end
subgraph "接入层"
B[CDN]
C[网关集群]
D[限流器]
end
subgraph "服务层"
E[秒杀API]
F[库存服务]
G[订单服务]
end
subgraph "RabbitMQ"
H[秒杀交换机<br/>seckill.exchange]
subgraph "队列集群"
I[请求队列<br/>seckill.request]
J[订单队列<br/>seckill.order]
K[通知队列<br/>seckill.notify]
end
L[死信队列<br/>seckill.dlq]
end
subgraph "存储层"
M[Redis集群]
N[MySQL主从]
end
subgraph "消费者集群"
O[请求处理器]
P[订单处理器]
Q[通知处理器]
end
A --> B
B --> C
C --> D
D --> E
E --> M
E --> H
H --> I
H --> J
H --> K
I --> O
J --> P
K --> Q
O --> F
O --> M
P --> G
P --> N
Q --> M
F --> M
G --> N
I -.-> L
J -.-> L秒杀流程图
mermaid
sequenceDiagram
participant User as 用户
participant Gateway as 网关
participant API as 秒杀API
participant Redis as Redis
participant MQ as RabbitMQ
participant Consumer as 消费者
participant DB as 数据库
User->>Gateway: 秒杀请求
Gateway->>Gateway: 限流检查
alt 被限流
Gateway-->>User: 返回排队中
else 通过限流
Gateway->>API: 转发请求
API->>Redis: 预扣库存(LUA脚本)
alt 库存不足
Redis-->>API: 库存不足
API-->>User: 秒杀失败
else 库存充足
Redis-->>API: 预扣成功
API->>MQ: 发送秒杀请求
API-->>User: 排队中(异步处理)
MQ->>Consumer: 消费请求
Consumer->>DB: 创建订单
Consumer->>Redis: 确认扣减
Consumer->>MQ: 发送通知
Consumer-->>MQ: ACK
MQ->>Consumer: 消费通知
Consumer->>User: 推送结果
end
end库存扣减流程
mermaid
stateDiagram-v2
[*] --> available: 初始化库存
available --> reserved: 预扣库存
reserved --> confirmed: 创建订单成功
reserved --> released: 创建订单失败/超时
confirmed --> [*]
released --> available: 释放回库存
note right of available: 可用库存
note right of reserved: 预扣状态
note right of confirmed: 已确认
note right of released: 已释放PHP 代码实现
秒杀请求结构
php
<?php
namespace App\Seckill;
class SeckillRequest
{
public string $requestId;
public int $userId;
public int $activityId;
public int $productId;
public int $skuId;
public int $quantity;
public int $timestamp;
public string $token;
public function __construct(
int $userId,
int $activityId,
int $productId,
int $skuId,
int $quantity = 1
) {
$this->requestId = $this->generateRequestId();
$this->userId = $userId;
$this->activityId = $activityId;
$this->productId = $productId;
$this->skuId = $skuId;
$this->quantity = $quantity;
$this->timestamp = time();
$this->token = $this->generateToken();
}
private function generateRequestId(): string
{
return sprintf('req_%s_%s', date('YmdHis'), bin2hex(random_bytes(8)));
}
private function generateToken(): string
{
$data = implode('|', [
$this->userId,
$this->activityId,
$this->productId,
$this->timestamp,
]);
return hash_hmac('sha256', $data, 'seckill_secret_key');
}
public function validate(): bool
{
if ($this->quantity < 1 || $this->quantity > 1) {
return false;
}
if (time() - $this->timestamp > 300) {
return false;
}
return true;
}
public function toArray(): array
{
return [
'request_id' => $this->requestId,
'user_id' => $this->userId,
'activity_id' => $this->activityId,
'product_id' => $this->productId,
'sku_id' => $this->skuId,
'quantity' => $this->quantity,
'timestamp' => $this->timestamp,
'token' => $this->token,
];
}
public static function fromArray(array $data): self
{
$request = new self(
$data['user_id'],
$data['activity_id'],
$data['product_id'],
$data['sku_id'],
$data['quantity']
);
$request->requestId = $data['request_id'];
$request->timestamp = $data['timestamp'];
$request->token = $data['token'];
return $request;
}
}库存服务实现
php
<?php
namespace App\Seckill;
class InventoryService
{
private $redis;
private array $config;
public function __construct($redis, array $config = [])
{
$this->redis = $redis;
$this->config = array_merge([
'stock_key_prefix' => 'seckill:stock:',
'reserved_key_prefix' => 'seckill:reserved:',
'user_limit_prefix' => 'seckill:user_limit:',
'user_limit' => 1,
], $config);
}
public function initStock(int $activityId, int $productId, int $stock): bool
{
$stockKey = $this->getStockKey($activityId, $productId);
$reservedKey = $this->getReservedKey($activityId, $productId);
$this->redis->set($stockKey, $stock);
$this->redis->del($reservedKey);
return true;
}
public function deduct(int $activityId, int $productId, int $userId, int $quantity = 1): array
{
$stockKey = $this->getStockKey($activityId, $productId);
$reservedKey = $this->getReservedKey($activityId, $productId);
$userLimitKey = $this->getUserLimitKey($activityId, $userId);
$luaScript = <<<'LUA'
local stockKey = KEYS[1]
local reservedKey = KEYS[2]
local userLimitKey = KEYS[3]
local userId = ARGV[1]
local quantity = tonumber(ARGV[2])
local userLimit = tonumber(ARGV[3])
local userPurchased = redis.call('GET', userLimitKey)
if userPurchased and tonumber(userPurchased) >= userLimit then
return {0, 'USER_LIMIT_EXCEEDED'}
end
local stock = redis.call('GET', stockKey)
if not stock then
return {0, 'STOCK_NOT_FOUND'}
end
if tonumber(stock) < quantity then
return {0, 'INSUFFICIENT_STOCK'}
end
local newStock = redis.call('DECRBY', stockKey, quantity)
if newStock < 0 then
redis.call('INCRBY', stockKey, quantity)
return {0, 'INSUFFICIENT_STOCK'}
end
redis.call('HSET', reservedKey, userId, quantity)
redis.call('INCRBY', userLimitKey, quantity)
redis.call('EXPIRE', userLimitKey, 86400)
return {1, newStock}
LUA;
$result = $this->redis->eval(
$luaScript,
[$stockKey, $reservedKey, $userLimitKey, $userId, $quantity, $this->config['user_limit']],
3
);
return [
'success' => $result[0] === 1,
'remaining_stock' => $result[1] ?? 0,
'error' => $result[1] ?? null,
];
}
public function confirm(int $activityId, int $productId, int $userId): bool
{
$reservedKey = $this->getReservedKey($activityId, $productId);
$this->redis->hDel($reservedKey, $userId);
return true;
}
public function release(int $activityId, int $productId, int $userId, int $quantity = 1): bool
{
$stockKey = $this->getStockKey($activityId, $productId);
$reservedKey = $this->getReservedKey($activityId, $productId);
$userLimitKey = $this->getUserLimitKey($activityId, $userId);
$luaScript = <<<'LUA'
local stockKey = KEYS[1]
local reservedKey = KEYS[2]
local userLimitKey = KEYS[3]
local userId = ARGV[1]
local quantity = tonumber(ARGV[2])
local reserved = redis.call('HGET', reservedKey, userId)
if not reserved then
return 0
end
redis.call('INCRBY', stockKey, quantity)
redis.call('HDEL', reservedKey, userId)
redis.call('DECRBY', userLimitKey, quantity)
return 1
LUA;
return $this->redis->eval(
$luaScript,
[$stockKey, $reservedKey, $userLimitKey, $userId, $quantity],
3
) === 1;
}
public function getStock(int $activityId, int $productId): int
{
$stockKey = $this->getStockKey($activityId, $productId);
return (int) $this->redis->get($stockKey);
}
private function getStockKey(int $activityId, int $productId): string
{
return $this->config['stock_key_prefix'] . $activityId . ':' . $productId;
}
private function getReservedKey(int $activityId, int $productId): string
{
return $this->config['reserved_key_prefix'] . $activityId . ':' . $productId;
}
private function getUserLimitKey(int $activityId, int $userId): string
{
return $this->config['user_limit_prefix'] . $activityId . ':' . $userId;
}
}秒杀服务实现
php
<?php
namespace App\Seckill;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
class SeckillService
{
private InventoryService $inventoryService;
private AMQPStreamConnection $connection;
private $channel;
private string $exchangeName = 'seckill.exchange';
private string $requestQueue = 'seckill.request';
private string $orderQueue = 'seckill.order';
private string $notifyQueue = 'seckill.notify';
public function __construct(
InventoryService $inventoryService,
AMQPStreamConnection $connection
) {
$this->inventoryService = $inventoryService;
$this->connection = $connection;
$this->channel = $connection->channel();
$this->setupInfrastructure();
}
private function setupInfrastructure(): void
{
$this->channel->exchange_declare(
$this->exchangeName,
AMQPExchangeType::DIRECT,
false,
true,
false
);
$queueArgs = [
'x-dead-letter-exchange' => ['S', 'seckill.dlx'],
'x-message-ttl' => ['I', 300000],
];
$this->channel->queue_declare(
$this->requestQueue,
false,
true,
false,
false,
false,
$queueArgs
);
$this->channel->queue_declare(
$this->orderQueue,
false,
true,
false,
false,
false,
$queueArgs
);
$this->channel->queue_declare(
$this->notifyQueue,
false,
true,
false,
false,
false,
$queueArgs
);
$this->channel->queue_bind($this->requestQueue, $this->exchangeName, 'request');
$this->channel->queue_bind($this->orderQueue, $this->exchangeName, 'order');
$this->channel->queue_bind($this->notifyQueue, $this->exchangeName, 'notify');
$this->channel->exchange_declare('seckill.dlx', AMQPExchangeType::DIRECT, false, true, false);
$this->channel->queue_declare('seckill.dlq', false, true, false, false);
$this->channel->queue_bind('seckill.dlq', 'seckill.dlx', 'failed');
}
public function submit(SeckillRequest $request): array
{
if (!$request->validate()) {
return [
'success' => false,
'code' => 'INVALID_REQUEST',
'message' => '请求参数无效',
];
}
$deductResult = $this->inventoryService->deduct(
$request->activityId,
$request->productId,
$request->userId,
$request->quantity
);
if (!$deductResult['success']) {
return [
'success' => false,
'code' => $deductResult['error'],
'message' => $this->getErrorMessage($deductResult['error']),
];
}
$this->publishRequest($request);
return [
'success' => true,
'code' => 'QUEUED',
'message' => '请求已提交,正在处理中',
'request_id' => $request->requestId,
];
}
private function publishRequest(SeckillRequest $request): void
{
$message = new AMQPMessage(
json_encode($request->toArray()),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'message_id' => $request->requestId,
'timestamp' => time(),
]
);
$this->channel->basic_publish(
$message,
$this->exchangeName,
'request'
);
}
public function publishOrder(array $orderData): void
{
$message = new AMQPMessage(
json_encode($orderData),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]
);
$this->channel->basic_publish(
$message,
$this->exchangeName,
'order'
);
}
public function publishNotify(array $notifyData): void
{
$message = new AMQPMessage(
json_encode($notifyData),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]
);
$this->channel->basic_publish(
$message,
$this->exchangeName,
'notify'
);
}
private function getErrorMessage(string $code): string
{
$messages = [
'INSUFFICIENT_STOCK' => '商品已售罄',
'USER_LIMIT_EXCEEDED' => '您已参与过此活动',
'STOCK_NOT_FOUND' => '活动不存在',
];
return $messages[$code] ?? '系统繁忙,请稍后重试';
}
public function close(): void
{
if ($this->channel) {
$this->channel->close();
}
}
}秒杀消费者实现
php
<?php
namespace App\Seckill;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class SeckillConsumer
{
private AMQPStreamConnection $connection;
private $channel;
private InventoryService $inventoryService;
private OrderService $orderService;
private SeckillService $seckillService;
private bool $running = true;
public function __construct(
AMQPStreamConnection $connection,
InventoryService $inventoryService,
OrderService $orderService,
SeckillService $seckillService
) {
$this->connection = $connection;
$this->channel = $connection->channel();
$this->inventoryService = $inventoryService;
$this->orderService = $orderService;
$this->seckillService = $seckillService;
}
public function consumeRequests(): void
{
$this->channel->basic_qos(null, 10, null);
$this->channel->basic_consume(
'seckill.request',
'',
false,
false,
false,
false,
[$this, 'processRequest']
);
while ($this->running && count($this->channel->callbacks)) {
$this->channel->wait(null, true);
if (!$this->running) break;
usleep(10000);
}
}
public function processRequest(AMQPMessage $message): void
{
$data = json_decode($message->body, true);
$request = SeckillRequest::fromArray($data);
try {
$order = $this->createOrder($request);
$this->inventoryService->confirm(
$request->activityId,
$request->productId,
$request->userId
);
$this->seckillService->publishOrder([
'request_id' => $request->requestId,
'order_id' => $order['order_id'],
'user_id' => $request->userId,
'product_id' => $request->productId,
'status' => 'created',
]);
$this->seckillService->publishNotify([
'type' => 'seckill_success',
'user_id' => $request->userId,
'order_id' => $order['order_id'],
'message' => '恭喜您秒杀成功!',
]);
$message->ack();
} catch (\Exception $e) {
$this->handleFailure($message, $request, $e);
}
}
private function createOrder(SeckillRequest $request): array
{
$orderId = $this->generateOrderId();
$order = [
'order_id' => $orderId,
'user_id' => $request->userId,
'activity_id' => $request->activityId,
'product_id' => $request->productId,
'sku_id' => $request->skuId,
'quantity' => $request->quantity,
'status' => 'pending_payment',
'created_at' => date('Y-m-d H:i:s'),
'expire_at' => date('Y-m-d H:i:s', time() + 900),
];
$this->orderService->create($order);
return $order;
}
private function handleFailure(AMQPMessage $message, SeckillRequest $request, \Exception $e): void
{
error_log(sprintf(
'[Seckill] Order creation failed: %s - %s',
$request->requestId,
$e->getMessage()
));
$this->inventoryService->release(
$request->activityId,
$request->productId,
$request->userId,
$request->quantity
);
$this->seckillService->publishNotify([
'type' => 'seckill_failed',
'user_id' => $request->userId,
'message' => '秒杀失败,请稍后重试',
]);
$message->ack();
}
private function generateOrderId(): string
{
return sprintf('SK%s%s', date('YmdHis'), strtoupper(bin2hex(random_bytes(4))));
}
public function stop(): void
{
$this->running = false;
}
public function close(): void
{
$this->stop();
if ($this->channel) {
$this->channel->close();
}
}
}限流器实现
php
<?php
namespace App\Seckill;
class RateLimiter
{
private $redis;
private array $config;
public function __construct($redis, array $config = [])
{
$this->redis = $redis;
$this->config = array_merge([
'global_limit' => 100000,
'user_limit' => 10,
'window_size' => 1,
], $config);
}
public function checkGlobal(): bool
{
$key = 'seckill:global:limit';
$current = $this->redis->incr($key);
if ($current === 1) {
$this->redis->expire($key, $this->config['window_size']);
}
return $current <= $this->config['global_limit'];
}
public function checkUser(int $userId, int $activityId): bool
{
$key = sprintf('seckill:user:limit:%d:%d', $activityId, $userId);
$current = $this->redis->incr($key);
if ($current === 1) {
$this->redis->expire($key, 60);
}
return $current <= $this->config['user_limit'];
}
public function acquireToken(int $activityId): bool
{
$key = sprintf('seckill:token:%d', $activityId);
$token = $this->redis->get($key);
if (!$token) {
return false;
}
return $this->redis->decr($key) >= 0;
}
public function initTokens(int $activityId, int $count): void
{
$key = sprintf('seckill:token:%d', $activityId);
$this->redis->set($key, $count);
}
}完整使用示例
php
<?php
require_once 'vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use App\Seckill\{
SeckillRequest,
SeckillService,
SeckillConsumer,
InventoryService,
RateLimiter
};
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$redis = new Redis();
$redis->connect('localhost', 6379);
$inventoryService = new InventoryService($redis);
$seckillService = new SeckillService($inventoryService, $connection);
$rateLimiter = new RateLimiter($redis, [
'global_limit' => 100000,
'user_limit' => 5,
]);
$activityId = 1001;
$productId = 2001;
$stock = 100;
$inventoryService->initStock($activityId, $productId, $stock);
$rateLimiter->initTokens($activityId, $stock);
echo "秒杀活动已初始化: 活动{$activityId}, 商品{$productId}, 库存{$stock}\n";
function handleSeckillRequest(
SeckillService $seckillService,
RateLimiter $rateLimiter,
int $userId,
int $activityId,
int $productId
): array {
if (!$rateLimiter->checkGlobal()) {
return ['success' => false, 'message' => '系统繁忙,请稍后重试'];
}
if (!$rateLimiter->checkUser($userId, $activityId)) {
return ['success' => false, 'message' => '请求过于频繁'];
}
$request = new SeckillRequest($userId, $activityId, $productId, $productId);
return $seckillService->submit($request);
}
for ($i = 1; $i <= 5; $i++) {
$result = handleSeckillRequest($seckillService, $rateLimiter, $i, $activityId, $productId);
echo "用户{$i}秒杀结果: " . json_encode($result) . "\n";
}
echo "\n启动消费者处理请求...\n";
$orderService = new OrderService();
$consumer = new SeckillConsumer(
$connection,
$inventoryService,
$orderService,
$seckillService
);
pcntl_async_signals(true);
pcntl_signal(SIGTERM, function () use ($consumer) {
echo "收到终止信号\n";
$consumer->stop();
});
pcntl_signal(SIGINT, function () use ($consumer) {
echo "收到中断信号\n";
$consumer->stop();
});
$consumer->consumeRequests();
$seckillService->close();
$consumer->close();
$connection->close();关键技术点解析
1. Redis 原子扣减
使用 Lua 脚本保证库存扣减的原子性:
lua
local stock = redis.call('GET', stockKey)
if tonumber(stock) < quantity then
return {0, 'INSUFFICIENT_STOCK'}
end
local newStock = redis.call('DECRBY', stockKey, quantity)
if newStock < 0 then
redis.call('INCRBY', stockKey, quantity)
return {0, 'INSUFFICIENT_STOCK'}
end
return {1, newStock}2. 流量削峰
通过 RabbitMQ 队列实现流量削峰:
- 瞬时高并发请求先入队
- 消费者按能力处理
- 保护后端系统不被压垮
3. 预扣库存机制
1. 预扣: 用户请求时先扣减 Redis 库存
2. 确认: 订单创建成功后确认扣减
3. 释放: 订单创建失败或超时后释放库存4. 用户限购
php
local userPurchased = redis.call('GET', userLimitKey)
if userPurchased and tonumber(userPurchased) >= userLimit then
return {0, 'USER_LIMIT_EXCEEDED'}
end性能优化建议
| 优化项 | 建议 | 说明 |
|---|---|---|
| CDN 静态化 | 静态资源上 CDN | 减少源站压力 |
| 页面静态化 | 秒杀页面静态化 | 减少动态渲染 |
| 按钮置灰 | 前端控制提交频率 | 减少无效请求 |
| 多级缓存 | CDN → Redis → MySQL | 分层抗压 |
| 异步处理 | 消息队列异步 | 提升响应速度 |
常见问题与解决方案
1. 超卖问题
问题: 库存被多次扣减
解决方案:
- Redis Lua 脚本原子操作
- 数据库乐观锁兜底
- 预扣 + 确认机制
2. 消息堆积
问题: 请求队列积压
解决方案:
- 增加消费者数量
- 批量处理消息
- 动态扩容
3. 库存回滚
问题: 订单超时未支付需回滚库存
解决方案:
- 延迟队列监控订单状态
- 超时自动取消并释放库存
- 定时任务扫描异常订单
4. 热点问题
问题: 单商品热点导致 Redis 压力大
解决方案:
- 库存分片到多个 Key
- 本地缓存 + Redis 双写
- 限流保护
