Skip to content

并发与限流

合理的并发控制和限流策略是构建高性能AI应用的关键

概述

AI API通常有速率限制,如何在限制内最大化吞吐量是构建高性能AI应用的关键。本教程将教你并发处理和限流策略的实现方法。

为什么需要并发与限流?

AI API限制类型:

速率限制(Rate Limit)
├── RPM (Requests Per Minute)
├── RPD (Requests Per Day)
├── TPM (Tokens Per Minute)
└── TPD (Tokens Per Day)

并发限制(Concurrency Limit)
├── 同时请求数
└── 连接数限制

配额限制(Quota Limit)
├── 月度配额
└── 预算限制

常见API限制

APIRPMTPM并发
OpenAI GPT-450030000取决于套餐
OpenAI GPT-4o-mini500200000取决于套餐
DeepSeek6060000无明确限制
Claude取决于套餐取决于套餐取决于套餐

基础限流器

令牌桶算法

php
<?php
class TokenBucket
{
    private int $capacity;
    private int $tokens;
    private int $refillRate;
    private int $lastRefillTime;

    public function __construct(int $capacity, int $refillRate)
    {
        $this->capacity = $capacity;
        $this->tokens = $capacity;
        $this->refillRate = $refillRate;
        $this->lastRefillTime = time();
    }

    public function consume(int $tokens = 1): bool
    {
        $this->refill();

        if ($this->tokens >= $tokens) {
            $this->tokens -= $tokens;
            return true;
        }

        return false;
    }

    public function waitForToken(int $tokens = 1): void
    {
        while (!$this->consume($tokens)) {
            usleep(100000);
        }
    }

    private function refill(): void
    {
        $now = time();
        $elapsed = $now - $this->lastRefillTime;

        if ($elapsed > 0) {
            $tokensToAdd = $elapsed * $this->refillRate;
            $this->tokens = min($this->capacity, $this->tokens + $tokensToAdd);
            $this->lastRefillTime = $now;
        }
    }

    public function getAvailableTokens(): int
    {
        $this->refill();
        return $this->tokens;
    }
}

// 使用示例
$bucket = new TokenBucket(100, 10); // 容量100,每秒补充10个

if ($bucket->consume()) {
    // 执行API请求
    $result = $client->chat($messages);
} else {
    echo "速率限制,请稍后重试";
}

滑动窗口算法

php
<?php
class SlidingWindowLimiter
{
    private int $limit;
    private int $windowSize;
    private array $requests = [];

    public function __construct(int $limit, int $windowSizeSeconds)
    {
        $this->limit = $limit;
        $this->windowSize = $windowSizeSeconds;
    }

    public function tryAcquire(): bool
    {
        $this->cleanup();
        
        if (count($this->requests) < $this->limit) {
            $this->requests[] = microtime(true);
            return true;
        }

        return false;
    }

    public function acquire(): void
    {
        while (!$this->tryAcquire()) {
            $waitTime = $this->getWaitTime();
            usleep($waitTime * 1000000);
        }
    }

    private function cleanup(): void
    {
        $cutoff = microtime(true) - $this->windowSize;
        $this->requests = array_filter($this->requests, fn($t) => $t > $cutoff);
        $this->requests = array_values($this->requests);
    }

    public function getWaitTime(): float
    {
        if (empty($this->requests)) {
            return 0;
        }

        $oldestRequest = min($this->requests);
        $waitTime = $oldestRequest + $this->windowSize - microtime(true);

        return max(0, $waitTime);
    }

    public function getAvailableRequests(): int
    {
        $this->cleanup();
        return $this->limit - count($this->requests);
    }
}

// 使用示例
$limiter = new SlidingWindowLimiter(60, 60); // 60秒内最多60个请求

if ($limiter->tryAcquire()) {
    $result = $client->chat($messages);
} else {
    $waitTime = $limiter->getWaitTime();
    echo "需要等待 {$waitTime} 秒";
}

并发控制

并发请求管理器

php
<?php
class ConcurrentRequestManager
{
    private int $maxConcurrency;
    private int $currentRequests = 0;
    private array $queue = [];

    public function __construct(int $maxConcurrency = 5)
    {
        $this->maxConcurrency = $maxConcurrency;
    }

    public function execute(callable $task)
    {
        $this->waitForSlot();

        $this->currentRequests++;

        try {
            return $task();
        } finally {
            $this->currentRequests--;
            $this->processQueue();
        }
    }

    public function executeAsync(callable $task): Promise
    {
        return new Promise(function($resolve, $reject) use ($task) {
            $this->queue[] = [
                'task' => $task,
                'resolve' => $resolve,
                'reject' => $reject,
            ];
            $this->processQueue();
        });
    }

    private function waitForSlot(): void
    {
        while ($this->currentRequests >= $this->maxConcurrency) {
            usleep(10000);
        }
    }

    private function processQueue(): void
    {
        while (!empty($this->queue) && $this->currentRequests < $this->maxConcurrency) {
            $item = array_shift($this->queue);
            $this->currentRequests++;

            try {
                $result = $item['task']();
                $item['resolve']($result);
            } catch (Exception $e) {
                $item['reject']($e);
            } finally {
                $this->currentRequests--;
            }
        }
    }

    public function getStatus(): array
    {
        return [
            'max_concurrency' => $this->maxConcurrency,
            'current_requests' => $this->currentRequests,
            'queue_length' => count($this->queue),
            'available_slots' => $this->maxConcurrency - $this->currentRequests,
        ];
    }
}

// 使用示例
$manager = new ConcurrentRequestManager(5);

$results = [];
foreach ($prompts as $prompt) {
    $results[] = $manager->execute(function() use ($client, $prompt) {
        return $client->chat([['role' => 'user', 'content' => $prompt]]);
    });
}

使用Guzzle异步请求

php
<?php
class AsyncBatchProcessor
{
    private $client;
    private int $concurrency;
    private SlidingWindowLimiter $limiter;

    public function __construct($client, int $concurrency = 5, int $rpm = 60)
    {
        $this->client = $client;
        $this->concurrency = $concurrency;
        $this->limiter = new SlidingWindowLimiter($rpm, 60);
    }

    public function processBatch(array $messagesList): array
    {
        $promises = [];
        $results = [];

        foreach ($messagesList as $key => $messages) {
            $this->limiter->acquire();

            $promises[$key] = $this->client->postAsync('/chat/completions', [
                'json' => [
                    'model' => 'gpt-4o-mini',
                    'messages' => $messages,
                ],
            ]);
        }

        $responses = \GuzzleHttp\Promise\Utils::settle($promises)->wait();

        foreach ($responses as $key => $response) {
            if ($response['state'] === 'fulfilled') {
                $body = json_decode($response['value']->getBody(), true);
                $results[$key] = $body['choices'][0]['message']['content'];
            } else {
                $results[$key] = ['error' => $response['reason']->getMessage()];
            }
        }

        return $results;
    }
}

// 使用示例
$processor = new AsyncBatchProcessor($client, 5, 60);

$messagesList = [
    [['role' => 'user', 'content' => '问题1']],
    [['role' => 'user', 'content' => '问题2']],
    [['role' => 'user', 'content' => '问题3']],
];

$results = $processor->processBatch($messagesList);

智能限流

自适应限流器

php
<?php
class AdaptiveRateLimiter
{
    private int $baseLimit;
    private int $currentLimit;
    private int $successCount = 0;
    private int $failureCount = 0;
    private int $windowStart;
    private int $windowSize = 60;

    public function __construct(int $baseLimit)
    {
        $this->baseLimit = $baseLimit;
        $this->currentLimit = $baseLimit;
        $this->windowStart = time();
    }

    public function recordSuccess(): void
    {
        $this->checkWindow();
        $this->successCount++;

        if ($this->successCount > $this->currentLimit * 0.9 && $this->failureCount < $this->currentLimit * 0.1) {
            $this->currentLimit = min($this->currentLimit * 1.1, $this->baseLimit * 2);
        }
    }

    public function recordFailure(): void
    {
        $this->checkWindow();
        $this->failureCount++;

        if ($this->failureCount > $this->currentLimit * 0.2) {
            $this->currentLimit = max($this->currentLimit * 0.8, $this->baseLimit * 0.5);
        }
    }

    public function getCurrentLimit(): int
    {
        return (int)$this->currentLimit;
    }

    private function checkWindow(): void
    {
        if (time() - $this->windowStart >= $this->windowSize) {
            $this->successCount = 0;
            $this->failureCount = 0;
            $this->windowStart = time();
        }
    }
}

class SmartRateLimitedClient
{
    private $client;
    private AdaptiveRateLimiter $limiter;
    private SlidingWindowLimiter $hardLimit;

    public function __construct($client, int $baseLimit = 60)
    {
        $this->client = $client;
        $this->limiter = new AdaptiveRateLimiter($baseLimit);
        $this->hardLimit = new SlidingWindowLimiter($baseLimit, 60);
    }

    public function chat(array $messages): array
    {
        $this->hardLimit->acquire();

        try {
            $result = $this->client->chat($messages);
            $this->limiter->recordSuccess();
            return $result;
        } catch (Exception $e) {
            $this->limiter->recordFailure();
            throw $e;
        }
    }
}

多级限流

php
<?php
class MultiLevelRateLimiter
{
    private array $limiters;

    public function __construct()
    {
        $this->limiters = [
            'per_second' => new TokenBucket(10, 10),
            'per_minute' => new SlidingWindowLimiter(60, 60),
            'per_hour' => new SlidingWindowLimiter(1000, 3600),
        ];
    }

    public function tryAcquire(): bool
    {
        foreach ($this->limiters as $limiter) {
            if (!$limiter->tryAcquire()) {
                return false;
            }
        }
        return true;
    }

    public function acquire(): void
    {
        while (!$this->tryAcquire()) {
            usleep(100000);
        }
    }

    public function getWaitTime(): float
    {
        $maxWait = 0;
        foreach ($this->limiters as $limiter) {
            $wait = $limiter->getWaitTime();
            $maxWait = max($maxWait, $wait);
        }
        return $maxWait;
    }
}

批量请求优化

智能批处理器

php
<?php
class SmartBatchProcessor
{
    private $client;
    private int $batchSize;
    private int $batchTimeout;
    private array $queue = [];
    private $timer = null;

    public function __construct($client, int $batchSize = 10, int $batchTimeoutMs = 100)
    {
        $this->client = $client;
        $this->batchSize = $batchSize;
        $this->batchTimeout = $batchTimeoutMs / 1000;
    }

    public function addRequest(array $messages, callable $callback): void
    {
        $this->queue[] = [
            'messages' => $messages,
            'callback' => $callback,
        ];

        if (count($this->queue) >= $this->batchSize) {
            $this->flush();
        }
    }

    public function flush(): void
    {
        if (empty($this->queue)) {
            return;
        }

        $batch = $this->queue;
        $this->queue = [];

        $this->processBatch($batch);
    }

    private function processBatch(array $batch): void
    {
        $promises = [];

        foreach ($batch as $index => $item) {
            $promises[$index] = $this->client->postAsync('/chat/completions', [
                'json' => [
                    'model' => 'gpt-4o-mini',
                    'messages' => $item['messages'],
                ],
            ]);
        }

        $responses = \GuzzleHttp\Promise\Utils::settle($promises)->wait();

        foreach ($responses as $index => $response) {
            if ($response['state'] === 'fulfilled') {
                $body = json_decode($response['value']->getBody(), true);
                $batch[$index]['callback']($body);
            } else {
                $batch[$index]['callback'](null, $response['reason']);
            }
        }
    }
}

常见问题答疑(FAQ)

Q1:如何确定合适的并发数?

回答

php
<?php
// 根据API限制计算
$rpm = 60; // 每分钟请求数
$avgResponseTime = 2; // 平均响应时间(秒)

// 理论最大并发
$maxConcurrency = ($rpm / 60) * $avgResponseTime;

// 建议使用理论值的70-80%
$recommendedConcurrency = (int)($maxConcurrency * 0.7);

Q2:如何处理429错误?

回答

php
<?php
function handle429Error($client, $messages, $maxRetries = 3): array
{
    for ($i = 0; $i < $maxRetries; $i++) {
        try {
            return $client->chat($messages);
        } catch (Exception $e) {
            if (strpos($e->getMessage(), '429') !== false) {
                $retryAfter = $e->getResponse()->getHeader('Retry-After')[0] ?? 60;
                sleep($retryAfter);
            } else {
                throw $e;
            }
        }
    }
    throw new Exception('超过最大重试次数');
}

Q3:如何监控限流状态?

回答

php
<?php
class RateLimitMonitor
{
    private array $stats = [
        'total_requests' => 0,
        'successful_requests' => 0,
        'rate_limited' => 0,
        'errors' => 0,
    ];

    public function recordRequest(bool $success, bool $rateLimited = false): void
    {
        $this->stats['total_requests']++;

        if ($rateLimited) {
            $this->stats['rate_limited']++;
        } elseif ($success) {
            $this->stats['successful_requests']++;
        } else {
            $this->stats['errors']++;
        }
    }

    public function getStats(): array
    {
        return $this->stats;
    }
}

Q4:如何实现分布式限流?

回答

php
<?php
class RedisRateLimiter
{
    private $redis;
    private string $key;
    private int $limit;
    private int $window;

    public function __construct($redis, string $key, int $limit, int $windowSeconds)
    {
        $this->redis = $redis;
        $this->key = $key;
        $this->limit = $limit;
        $this->window = $windowSeconds;
    }

    public function tryAcquire(): bool
    {
        $now = time();
        $windowStart = $now - $this->window;

        $this->redis->multi();
        $this->redis->zRemRangeByScore($this->key, 0, $windowStart);
        $this->redis->zCard($this->key);
        $this->redis->zAdd($this->key, $now, uniqid());
        $this->redis->expire($this->key, $this->window);
        $results = $this->redis->exec();

        $count = $results[1];
        return $count < $this->limit;
    }
}

Q5:如何优雅降级?

回答

php
<?php
class GracefulDegradation
{
    private $client;
    private RateLimitMonitor $monitor;

    public function chat(array $messages): array
    {
        if ($this->monitor->getStats()['rate_limited'] > 10) {
            return $this->getCachedResponse($messages);
        }

        try {
            return $this->client->chat($messages);
        } catch (Exception $e) {
            if (strpos($e->getMessage(), '429') !== false) {
                return $this->getCachedResponse($messages);
            }
            throw $e;
        }
    }
}

Q6:如何测试限流效果?

回答

php
<?php
function testRateLimiter(RateLimiterInterface $limiter, int $requests): array
{
    $results = [];
    $startTime = microtime(true);

    for ($i = 0; $i < $requests; $i++) {
        $requestStart = microtime(true);
        $limiter->acquire();
        $waitTime = microtime(true) - $requestStart;
        $results[] = [
            'request' => $i + 1,
            'wait_time' => $waitTime,
            'timestamp' => microtime(true) - $startTime,
        ];
    }

    return $results;
}

实战练习

基础练习

练习1:实现一个简单的令牌桶限流器。

参考代码

php
<?php
class SimpleTokenBucket
{
    private int $tokens;
    private int $maxTokens;
    private int $lastRefill;

    public function __construct(int $maxTokens, int $refillRate)
    {
        $this->tokens = $maxTokens;
        $this->maxTokens = $maxTokens;
        $this->lastRefill = time();
    }

    public function consume(): bool
    {
        $this->refill();
        if ($this->tokens > 0) {
            $this->tokens--;
            return true;
        }
        return false;
    }

    private function refill(): void
    {
        $now = time();
        $elapsed = $now - $this->lastRefill;
        $this->tokens = min($this->maxTokens, $this->tokens + $elapsed);
        $this->lastRefill = $now;
    }
}

进阶练习

练习2:实现一个并发请求管理器。

参考代码

php
<?php
class SimpleConcurrencyManager
{
    private int $maxConcurrent;
    private int $current = 0;

    public function execute(callable $task)
    {
        while ($this->current >= $this->maxConcurrent) {
            usleep(10000);
        }

        $this->current++;
        try {
            return $task();
        } finally {
            $this->current--;
        }
    }
}

挑战练习

练习3:实现一个完整的智能限流系统。

参考代码

php
<?php
class IntelligentRateLimiter
{
    private MultiLevelRateLimiter $limiter;
    private AdaptiveRateLimiter $adaptive;
    private RateLimitMonitor $monitor;

    public function chat(array $messages): array
    {
        $this->limiter->acquire();

        try {
            $result = $this->client->chat($messages);
            $this->adaptive->recordSuccess();
            $this->monitor->recordRequest(true);
            return $result;
        } catch (Exception $e) {
            $this->adaptive->recordFailure();
            $this->monitor->recordRequest(false, strpos($e->getMessage(), '429') !== false);
            throw $e;
        }
    }
}

知识点总结

核心要点

  1. 令牌桶算法:平滑限流,允许突发流量
  2. 滑动窗口算法:精确限流,防止超限
  3. 并发控制:限制同时请求数
  4. 自适应限流:根据响应动态调整
  5. 多级限流:多维度保护
  6. 批量处理:提高吞吐量

易错点回顾

易错点正确做法
不处理429错误解析Retry-After并等待
并发数设置过高根据API限制计算
不监控限流状态记录请求统计
不实现降级缓存或备用方案

拓展参考资料

官方文档

进阶学习路径

  1. 本知识点 → 并发与限流
  2. 下一步安全与鉴权
  3. 进阶错误处理与重试

💡 记住:合理的限流策略是保护API和提升用户体验的关键,令牌桶+滑动窗口+并发控制是三大核心手段。