Skip to content

秒杀系统设计

概述

秒杀系统是电商领域的经典高并发场景,需要在极短时间内处理海量请求,同时保证库存准确、订单可靠。RabbitMQ 在秒杀系统中承担流量削峰、异步处理、解耦服务的核心角色。

业务背景与需求

场景描述

某电商平台秒杀活动特征:

指标数值说明
活动时长10分钟限时抢购
预计流量100万 QPS峰值请求量
商品数量100件限量秒杀
预计用户50万人参与用户数
成功订单100单实际成交

技术挑战

秒杀系统面临的核心挑战:

1. 瞬时高并发
   - 流量在几秒内爆发
   - 系统承载能力有限

2. 库存准确性
   - 防止超卖
   - 保证库存扣减原子性

3. 用户体验
   - 快速响应
   - 公平抢购

4. 系统稳定性
   - 防止雪崩
   - 降级保护

需求目标

目标指标
响应时间< 100ms
系统可用性99.99%
库存准确率100%
防超卖0 超卖

架构设计

整体架构图

mermaid
graph TB
    subgraph "用户端"
        A[用户请求]
    end
    
    subgraph "接入层"
        B[CDN]
        C[网关集群]
        D[限流器]
    end
    
    subgraph "服务层"
        E[秒杀API]
        F[库存服务]
        G[订单服务]
    end
    
    subgraph "RabbitMQ"
        H[秒杀交换机<br/>seckill.exchange]
        
        subgraph "队列集群"
            I[请求队列<br/>seckill.request]
            J[订单队列<br/>seckill.order]
            K[通知队列<br/>seckill.notify]
        end
        
        L[死信队列<br/>seckill.dlq]
    end
    
    subgraph "存储层"
        M[Redis集群]
        N[MySQL主从]
    end
    
    subgraph "消费者集群"
        O[请求处理器]
        P[订单处理器]
        Q[通知处理器]
    end
    
    A --> B
    B --> C
    C --> D
    D --> E
    
    E --> M
    E --> H
    
    H --> I
    H --> J
    H --> K
    
    I --> O
    J --> P
    K --> Q
    
    O --> F
    O --> M
    P --> G
    P --> N
    Q --> M
    
    F --> M
    G --> N
    
    I -.-> L
    J -.-> L

秒杀流程图

mermaid
sequenceDiagram
    participant User as 用户
    participant Gateway as 网关
    participant API as 秒杀API
    participant Redis as Redis
    participant MQ as RabbitMQ
    participant Consumer as 消费者
    participant DB as 数据库
    
    User->>Gateway: 秒杀请求
    Gateway->>Gateway: 限流检查
    
    alt 被限流
        Gateway-->>User: 返回排队中
    else 通过限流
        Gateway->>API: 转发请求
        API->>Redis: 预扣库存(LUA脚本)
        
        alt 库存不足
            Redis-->>API: 库存不足
            API-->>User: 秒杀失败
        else 库存充足
            Redis-->>API: 预扣成功
            API->>MQ: 发送秒杀请求
            API-->>User: 排队中(异步处理)
            
            MQ->>Consumer: 消费请求
            Consumer->>DB: 创建订单
            Consumer->>Redis: 确认扣减
            Consumer->>MQ: 发送通知
            Consumer-->>MQ: ACK
            
            MQ->>Consumer: 消费通知
            Consumer->>User: 推送结果
        end
    end

库存扣减流程

mermaid
stateDiagram-v2
    [*] --> available: 初始化库存
    available --> reserved: 预扣库存
    reserved --> confirmed: 创建订单成功
    reserved --> released: 创建订单失败/超时
    confirmed --> [*]
    released --> available: 释放回库存
    
    note right of available: 可用库存
    note right of reserved: 预扣状态
    note right of confirmed: 已确认
    note right of released: 已释放

PHP 代码实现

秒杀请求结构

php
<?php

namespace App\Seckill;

class SeckillRequest
{
    public string $requestId;
    public int $userId;
    public int $activityId;
    public int $productId;
    public int $skuId;
    public int $quantity;
    public int $timestamp;
    public string $token;

    public function __construct(
        int $userId,
        int $activityId,
        int $productId,
        int $skuId,
        int $quantity = 1
    ) {
        $this->requestId = $this->generateRequestId();
        $this->userId = $userId;
        $this->activityId = $activityId;
        $this->productId = $productId;
        $this->skuId = $skuId;
        $this->quantity = $quantity;
        $this->timestamp = time();
        $this->token = $this->generateToken();
    }

    private function generateRequestId(): string
    {
        return sprintf('req_%s_%s', date('YmdHis'), bin2hex(random_bytes(8)));
    }

    private function generateToken(): string
    {
        $data = implode('|', [
            $this->userId,
            $this->activityId,
            $this->productId,
            $this->timestamp,
        ]);
        return hash_hmac('sha256', $data, 'seckill_secret_key');
    }

    public function validate(): bool
    {
        if ($this->quantity < 1 || $this->quantity > 1) {
            return false;
        }

        if (time() - $this->timestamp > 300) {
            return false;
        }

        return true;
    }

    public function toArray(): array
    {
        return [
            'request_id' => $this->requestId,
            'user_id' => $this->userId,
            'activity_id' => $this->activityId,
            'product_id' => $this->productId,
            'sku_id' => $this->skuId,
            'quantity' => $this->quantity,
            'timestamp' => $this->timestamp,
            'token' => $this->token,
        ];
    }

    public static function fromArray(array $data): self
    {
        $request = new self(
            $data['user_id'],
            $data['activity_id'],
            $data['product_id'],
            $data['sku_id'],
            $data['quantity']
        );
        $request->requestId = $data['request_id'];
        $request->timestamp = $data['timestamp'];
        $request->token = $data['token'];
        return $request;
    }
}

库存服务实现

php
<?php

namespace App\Seckill;

class InventoryService
{
    private $redis;
    private array $config;

    public function __construct($redis, array $config = [])
    {
        $this->redis = $redis;
        $this->config = array_merge([
            'stock_key_prefix' => 'seckill:stock:',
            'reserved_key_prefix' => 'seckill:reserved:',
            'user_limit_prefix' => 'seckill:user_limit:',
            'user_limit' => 1,
        ], $config);
    }

    public function initStock(int $activityId, int $productId, int $stock): bool
    {
        $stockKey = $this->getStockKey($activityId, $productId);
        $reservedKey = $this->getReservedKey($activityId, $productId);

        $this->redis->set($stockKey, $stock);
        $this->redis->del($reservedKey);

        return true;
    }

    public function deduct(int $activityId, int $productId, int $userId, int $quantity = 1): array
    {
        $stockKey = $this->getStockKey($activityId, $productId);
        $reservedKey = $this->getReservedKey($activityId, $productId);
        $userLimitKey = $this->getUserLimitKey($activityId, $userId);

        $luaScript = <<<'LUA'
            local stockKey = KEYS[1]
            local reservedKey = KEYS[2]
            local userLimitKey = KEYS[3]
            local userId = ARGV[1]
            local quantity = tonumber(ARGV[2])
            local userLimit = tonumber(ARGV[3])
            
            local userPurchased = redis.call('GET', userLimitKey)
            if userPurchased and tonumber(userPurchased) >= userLimit then
                return {0, 'USER_LIMIT_EXCEEDED'}
            end
            
            local stock = redis.call('GET', stockKey)
            if not stock then
                return {0, 'STOCK_NOT_FOUND'}
            end
            
            if tonumber(stock) < quantity then
                return {0, 'INSUFFICIENT_STOCK'}
            end
            
            local newStock = redis.call('DECRBY', stockKey, quantity)
            if newStock < 0 then
                redis.call('INCRBY', stockKey, quantity)
                return {0, 'INSUFFICIENT_STOCK'}
            end
            
            redis.call('HSET', reservedKey, userId, quantity)
            redis.call('INCRBY', userLimitKey, quantity)
            redis.call('EXPIRE', userLimitKey, 86400)
            
            return {1, newStock}
        LUA;

        $result = $this->redis->eval(
            $luaScript,
            [$stockKey, $reservedKey, $userLimitKey, $userId, $quantity, $this->config['user_limit']],
            3
        );

        return [
            'success' => $result[0] === 1,
            'remaining_stock' => $result[1] ?? 0,
            'error' => $result[1] ?? null,
        ];
    }

    public function confirm(int $activityId, int $productId, int $userId): bool
    {
        $reservedKey = $this->getReservedKey($activityId, $productId);
        $this->redis->hDel($reservedKey, $userId);
        return true;
    }

    public function release(int $activityId, int $productId, int $userId, int $quantity = 1): bool
    {
        $stockKey = $this->getStockKey($activityId, $productId);
        $reservedKey = $this->getReservedKey($activityId, $productId);
        $userLimitKey = $this->getUserLimitKey($activityId, $userId);

        $luaScript = <<<'LUA'
            local stockKey = KEYS[1]
            local reservedKey = KEYS[2]
            local userLimitKey = KEYS[3]
            local userId = ARGV[1]
            local quantity = tonumber(ARGV[2])
            
            local reserved = redis.call('HGET', reservedKey, userId)
            if not reserved then
                return 0
            end
            
            redis.call('INCRBY', stockKey, quantity)
            redis.call('HDEL', reservedKey, userId)
            redis.call('DECRBY', userLimitKey, quantity)
            
            return 1
        LUA;

        return $this->redis->eval(
            $luaScript,
            [$stockKey, $reservedKey, $userLimitKey, $userId, $quantity],
            3
        ) === 1;
    }

    public function getStock(int $activityId, int $productId): int
    {
        $stockKey = $this->getStockKey($activityId, $productId);
        return (int) $this->redis->get($stockKey);
    }

    private function getStockKey(int $activityId, int $productId): string
    {
        return $this->config['stock_key_prefix'] . $activityId . ':' . $productId;
    }

    private function getReservedKey(int $activityId, int $productId): string
    {
        return $this->config['reserved_key_prefix'] . $activityId . ':' . $productId;
    }

    private function getUserLimitKey(int $activityId, int $userId): string
    {
        return $this->config['user_limit_prefix'] . $activityId . ':' . $userId;
    }
}

秒杀服务实现

php
<?php

namespace App\Seckill;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;

class SeckillService
{
    private InventoryService $inventoryService;
    private AMQPStreamConnection $connection;
    private $channel;
    private string $exchangeName = 'seckill.exchange';
    private string $requestQueue = 'seckill.request';
    private string $orderQueue = 'seckill.order';
    private string $notifyQueue = 'seckill.notify';

    public function __construct(
        InventoryService $inventoryService,
        AMQPStreamConnection $connection
    ) {
        $this->inventoryService = $inventoryService;
        $this->connection = $connection;
        $this->channel = $connection->channel();
        $this->setupInfrastructure();
    }

    private function setupInfrastructure(): void
    {
        $this->channel->exchange_declare(
            $this->exchangeName,
            AMQPExchangeType::DIRECT,
            false,
            true,
            false
        );

        $queueArgs = [
            'x-dead-letter-exchange' => ['S', 'seckill.dlx'],
            'x-message-ttl' => ['I', 300000],
        ];

        $this->channel->queue_declare(
            $this->requestQueue,
            false,
            true,
            false,
            false,
            false,
            $queueArgs
        );

        $this->channel->queue_declare(
            $this->orderQueue,
            false,
            true,
            false,
            false,
            false,
            $queueArgs
        );

        $this->channel->queue_declare(
            $this->notifyQueue,
            false,
            true,
            false,
            false,
            false,
            $queueArgs
        );

        $this->channel->queue_bind($this->requestQueue, $this->exchangeName, 'request');
        $this->channel->queue_bind($this->orderQueue, $this->exchangeName, 'order');
        $this->channel->queue_bind($this->notifyQueue, $this->exchangeName, 'notify');

        $this->channel->exchange_declare('seckill.dlx', AMQPExchangeType::DIRECT, false, true, false);
        $this->channel->queue_declare('seckill.dlq', false, true, false, false);
        $this->channel->queue_bind('seckill.dlq', 'seckill.dlx', 'failed');
    }

    public function submit(SeckillRequest $request): array
    {
        if (!$request->validate()) {
            return [
                'success' => false,
                'code' => 'INVALID_REQUEST',
                'message' => '请求参数无效',
            ];
        }

        $deductResult = $this->inventoryService->deduct(
            $request->activityId,
            $request->productId,
            $request->userId,
            $request->quantity
        );

        if (!$deductResult['success']) {
            return [
                'success' => false,
                'code' => $deductResult['error'],
                'message' => $this->getErrorMessage($deductResult['error']),
            ];
        }

        $this->publishRequest($request);

        return [
            'success' => true,
            'code' => 'QUEUED',
            'message' => '请求已提交,正在处理中',
            'request_id' => $request->requestId,
        ];
    }

    private function publishRequest(SeckillRequest $request): void
    {
        $message = new AMQPMessage(
            json_encode($request->toArray()),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                'message_id' => $request->requestId,
                'timestamp' => time(),
            ]
        );

        $this->channel->basic_publish(
            $message,
            $this->exchangeName,
            'request'
        );
    }

    public function publishOrder(array $orderData): void
    {
        $message = new AMQPMessage(
            json_encode($orderData),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
            ]
        );

        $this->channel->basic_publish(
            $message,
            $this->exchangeName,
            'order'
        );
    }

    public function publishNotify(array $notifyData): void
    {
        $message = new AMQPMessage(
            json_encode($notifyData),
            [
                'content_type' => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
            ]
        );

        $this->channel->basic_publish(
            $message,
            $this->exchangeName,
            'notify'
        );
    }

    private function getErrorMessage(string $code): string
    {
        $messages = [
            'INSUFFICIENT_STOCK' => '商品已售罄',
            'USER_LIMIT_EXCEEDED' => '您已参与过此活动',
            'STOCK_NOT_FOUND' => '活动不存在',
        ];
        return $messages[$code] ?? '系统繁忙,请稍后重试';
    }

    public function close(): void
    {
        if ($this->channel) {
            $this->channel->close();
        }
    }
}

秒杀消费者实现

php
<?php

namespace App\Seckill;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class SeckillConsumer
{
    private AMQPStreamConnection $connection;
    private $channel;
    private InventoryService $inventoryService;
    private OrderService $orderService;
    private SeckillService $seckillService;
    private bool $running = true;

    public function __construct(
        AMQPStreamConnection $connection,
        InventoryService $inventoryService,
        OrderService $orderService,
        SeckillService $seckillService
    ) {
        $this->connection = $connection;
        $this->channel = $connection->channel();
        $this->inventoryService = $inventoryService;
        $this->orderService = $orderService;
        $this->seckillService = $seckillService;
    }

    public function consumeRequests(): void
    {
        $this->channel->basic_qos(null, 10, null);

        $this->channel->basic_consume(
            'seckill.request',
            '',
            false,
            false,
            false,
            false,
            [$this, 'processRequest']
        );

        while ($this->running && count($this->channel->callbacks)) {
            $this->channel->wait(null, true);
            if (!$this->running) break;
            usleep(10000);
        }
    }

    public function processRequest(AMQPMessage $message): void
    {
        $data = json_decode($message->body, true);
        $request = SeckillRequest::fromArray($data);

        try {
            $order = $this->createOrder($request);

            $this->inventoryService->confirm(
                $request->activityId,
                $request->productId,
                $request->userId
            );

            $this->seckillService->publishOrder([
                'request_id' => $request->requestId,
                'order_id' => $order['order_id'],
                'user_id' => $request->userId,
                'product_id' => $request->productId,
                'status' => 'created',
            ]);

            $this->seckillService->publishNotify([
                'type' => 'seckill_success',
                'user_id' => $request->userId,
                'order_id' => $order['order_id'],
                'message' => '恭喜您秒杀成功!',
            ]);

            $message->ack();

        } catch (\Exception $e) {
            $this->handleFailure($message, $request, $e);
        }
    }

    private function createOrder(SeckillRequest $request): array
    {
        $orderId = $this->generateOrderId();

        $order = [
            'order_id' => $orderId,
            'user_id' => $request->userId,
            'activity_id' => $request->activityId,
            'product_id' => $request->productId,
            'sku_id' => $request->skuId,
            'quantity' => $request->quantity,
            'status' => 'pending_payment',
            'created_at' => date('Y-m-d H:i:s'),
            'expire_at' => date('Y-m-d H:i:s', time() + 900),
        ];

        $this->orderService->create($order);

        return $order;
    }

    private function handleFailure(AMQPMessage $message, SeckillRequest $request, \Exception $e): void
    {
        error_log(sprintf(
            '[Seckill] Order creation failed: %s - %s',
            $request->requestId,
            $e->getMessage()
        ));

        $this->inventoryService->release(
            $request->activityId,
            $request->productId,
            $request->userId,
            $request->quantity
        );

        $this->seckillService->publishNotify([
            'type' => 'seckill_failed',
            'user_id' => $request->userId,
            'message' => '秒杀失败,请稍后重试',
        ]);

        $message->ack();
    }

    private function generateOrderId(): string
    {
        return sprintf('SK%s%s', date('YmdHis'), strtoupper(bin2hex(random_bytes(4))));
    }

    public function stop(): void
    {
        $this->running = false;
    }

    public function close(): void
    {
        $this->stop();
        if ($this->channel) {
            $this->channel->close();
        }
    }
}

限流器实现

php
<?php

namespace App\Seckill;

class RateLimiter
{
    private $redis;
    private array $config;

    public function __construct($redis, array $config = [])
    {
        $this->redis = $redis;
        $this->config = array_merge([
            'global_limit' => 100000,
            'user_limit' => 10,
            'window_size' => 1,
        ], $config);
    }

    public function checkGlobal(): bool
    {
        $key = 'seckill:global:limit';
        $current = $this->redis->incr($key);

        if ($current === 1) {
            $this->redis->expire($key, $this->config['window_size']);
        }

        return $current <= $this->config['global_limit'];
    }

    public function checkUser(int $userId, int $activityId): bool
    {
        $key = sprintf('seckill:user:limit:%d:%d', $activityId, $userId);
        $current = $this->redis->incr($key);

        if ($current === 1) {
            $this->redis->expire($key, 60);
        }

        return $current <= $this->config['user_limit'];
    }

    public function acquireToken(int $activityId): bool
    {
        $key = sprintf('seckill:token:%d', $activityId);
        $token = $this->redis->get($key);

        if (!$token) {
            return false;
        }

        return $this->redis->decr($key) >= 0;
    }

    public function initTokens(int $activityId, int $count): void
    {
        $key = sprintf('seckill:token:%d', $activityId);
        $this->redis->set($key, $count);
    }
}

完整使用示例

php
<?php

require_once 'vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use App\Seckill\{
    SeckillRequest,
    SeckillService,
    SeckillConsumer,
    InventoryService,
    RateLimiter
};

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$redis = new Redis();
$redis->connect('localhost', 6379);

$inventoryService = new InventoryService($redis);
$seckillService = new SeckillService($inventoryService, $connection);
$rateLimiter = new RateLimiter($redis, [
    'global_limit' => 100000,
    'user_limit' => 5,
]);

$activityId = 1001;
$productId = 2001;
$stock = 100;

$inventoryService->initStock($activityId, $productId, $stock);
$rateLimiter->initTokens($activityId, $stock);

echo "秒杀活动已初始化: 活动{$activityId}, 商品{$productId}, 库存{$stock}\n";

function handleSeckillRequest(
    SeckillService $seckillService,
    RateLimiter $rateLimiter,
    int $userId,
    int $activityId,
    int $productId
): array {
    if (!$rateLimiter->checkGlobal()) {
        return ['success' => false, 'message' => '系统繁忙,请稍后重试'];
    }

    if (!$rateLimiter->checkUser($userId, $activityId)) {
        return ['success' => false, 'message' => '请求过于频繁'];
    }

    $request = new SeckillRequest($userId, $activityId, $productId, $productId);

    return $seckillService->submit($request);
}

for ($i = 1; $i <= 5; $i++) {
    $result = handleSeckillRequest($seckillService, $rateLimiter, $i, $activityId, $productId);
    echo "用户{$i}秒杀结果: " . json_encode($result) . "\n";
}

echo "\n启动消费者处理请求...\n";

$orderService = new OrderService();
$consumer = new SeckillConsumer(
    $connection,
    $inventoryService,
    $orderService,
    $seckillService
);

pcntl_async_signals(true);
pcntl_signal(SIGTERM, function () use ($consumer) {
    echo "收到终止信号\n";
    $consumer->stop();
});
pcntl_signal(SIGINT, function () use ($consumer) {
    echo "收到中断信号\n";
    $consumer->stop();
});

$consumer->consumeRequests();

$seckillService->close();
$consumer->close();
$connection->close();

关键技术点解析

1. Redis 原子扣减

使用 Lua 脚本保证库存扣减的原子性:

lua
local stock = redis.call('GET', stockKey)
if tonumber(stock) < quantity then
    return {0, 'INSUFFICIENT_STOCK'}
end
local newStock = redis.call('DECRBY', stockKey, quantity)
if newStock < 0 then
    redis.call('INCRBY', stockKey, quantity)
    return {0, 'INSUFFICIENT_STOCK'}
end
return {1, newStock}

2. 流量削峰

通过 RabbitMQ 队列实现流量削峰:

  • 瞬时高并发请求先入队
  • 消费者按能力处理
  • 保护后端系统不被压垮

3. 预扣库存机制

1. 预扣: 用户请求时先扣减 Redis 库存
2. 确认: 订单创建成功后确认扣减
3. 释放: 订单创建失败或超时后释放库存

4. 用户限购

php
local userPurchased = redis.call('GET', userLimitKey)
if userPurchased and tonumber(userPurchased) >= userLimit then
    return {0, 'USER_LIMIT_EXCEEDED'}
end

性能优化建议

优化项建议说明
CDN 静态化静态资源上 CDN减少源站压力
页面静态化秒杀页面静态化减少动态渲染
按钮置灰前端控制提交频率减少无效请求
多级缓存CDN → Redis → MySQL分层抗压
异步处理消息队列异步提升响应速度

常见问题与解决方案

1. 超卖问题

问题: 库存被多次扣减

解决方案:

  • Redis Lua 脚本原子操作
  • 数据库乐观锁兜底
  • 预扣 + 确认机制

2. 消息堆积

问题: 请求队列积压

解决方案:

  • 增加消费者数量
  • 批量处理消息
  • 动态扩容

3. 库存回滚

问题: 订单超时未支付需回滚库存

解决方案:

  • 延迟队列监控订单状态
  • 超时自动取消并释放库存
  • 定时任务扫描异常订单

4. 热点问题

问题: 单商品热点导致 Redis 压力大

解决方案:

  • 库存分片到多个 Key
  • 本地缓存 + Redis 双写
  • 限流保护

相关链接