Appearance
并发与限流
合理的并发控制和限流策略是构建高性能AI应用的关键
概述
AI API通常有速率限制,如何在限制内最大化吞吐量是构建高性能AI应用的关键。本教程将教你并发处理和限流策略的实现方法。
为什么需要并发与限流?
AI API限制类型:
速率限制(Rate Limit)
├── RPM (Requests Per Minute)
├── RPD (Requests Per Day)
├── TPM (Tokens Per Minute)
└── TPD (Tokens Per Day)
并发限制(Concurrency Limit)
├── 同时请求数
└── 连接数限制
配额限制(Quota Limit)
├── 月度配额
└── 预算限制常见API限制
| API | RPM | TPM | 并发 |
|---|---|---|---|
| OpenAI GPT-4 | 500 | 30000 | 取决于套餐 |
| OpenAI GPT-4o-mini | 500 | 200000 | 取决于套餐 |
| DeepSeek | 60 | 60000 | 无明确限制 |
| Claude | 取决于套餐 | 取决于套餐 | 取决于套餐 |
基础限流器
令牌桶算法
php
<?php
class TokenBucket
{
private int $capacity;
private int $tokens;
private int $refillRate;
private int $lastRefillTime;
public function __construct(int $capacity, int $refillRate)
{
$this->capacity = $capacity;
$this->tokens = $capacity;
$this->refillRate = $refillRate;
$this->lastRefillTime = time();
}
public function consume(int $tokens = 1): bool
{
$this->refill();
if ($this->tokens >= $tokens) {
$this->tokens -= $tokens;
return true;
}
return false;
}
public function waitForToken(int $tokens = 1): void
{
while (!$this->consume($tokens)) {
usleep(100000);
}
}
private function refill(): void
{
$now = time();
$elapsed = $now - $this->lastRefillTime;
if ($elapsed > 0) {
$tokensToAdd = $elapsed * $this->refillRate;
$this->tokens = min($this->capacity, $this->tokens + $tokensToAdd);
$this->lastRefillTime = $now;
}
}
public function getAvailableTokens(): int
{
$this->refill();
return $this->tokens;
}
}
// 使用示例
$bucket = new TokenBucket(100, 10); // 容量100,每秒补充10个
if ($bucket->consume()) {
// 执行API请求
$result = $client->chat($messages);
} else {
echo "速率限制,请稍后重试";
}滑动窗口算法
php
<?php
class SlidingWindowLimiter
{
private int $limit;
private int $windowSize;
private array $requests = [];
public function __construct(int $limit, int $windowSizeSeconds)
{
$this->limit = $limit;
$this->windowSize = $windowSizeSeconds;
}
public function tryAcquire(): bool
{
$this->cleanup();
if (count($this->requests) < $this->limit) {
$this->requests[] = microtime(true);
return true;
}
return false;
}
public function acquire(): void
{
while (!$this->tryAcquire()) {
$waitTime = $this->getWaitTime();
usleep($waitTime * 1000000);
}
}
private function cleanup(): void
{
$cutoff = microtime(true) - $this->windowSize;
$this->requests = array_filter($this->requests, fn($t) => $t > $cutoff);
$this->requests = array_values($this->requests);
}
public function getWaitTime(): float
{
if (empty($this->requests)) {
return 0;
}
$oldestRequest = min($this->requests);
$waitTime = $oldestRequest + $this->windowSize - microtime(true);
return max(0, $waitTime);
}
public function getAvailableRequests(): int
{
$this->cleanup();
return $this->limit - count($this->requests);
}
}
// 使用示例
$limiter = new SlidingWindowLimiter(60, 60); // 60秒内最多60个请求
if ($limiter->tryAcquire()) {
$result = $client->chat($messages);
} else {
$waitTime = $limiter->getWaitTime();
echo "需要等待 {$waitTime} 秒";
}并发控制
并发请求管理器
php
<?php
class ConcurrentRequestManager
{
private int $maxConcurrency;
private int $currentRequests = 0;
private array $queue = [];
public function __construct(int $maxConcurrency = 5)
{
$this->maxConcurrency = $maxConcurrency;
}
public function execute(callable $task)
{
$this->waitForSlot();
$this->currentRequests++;
try {
return $task();
} finally {
$this->currentRequests--;
$this->processQueue();
}
}
public function executeAsync(callable $task): Promise
{
return new Promise(function($resolve, $reject) use ($task) {
$this->queue[] = [
'task' => $task,
'resolve' => $resolve,
'reject' => $reject,
];
$this->processQueue();
});
}
private function waitForSlot(): void
{
while ($this->currentRequests >= $this->maxConcurrency) {
usleep(10000);
}
}
private function processQueue(): void
{
while (!empty($this->queue) && $this->currentRequests < $this->maxConcurrency) {
$item = array_shift($this->queue);
$this->currentRequests++;
try {
$result = $item['task']();
$item['resolve']($result);
} catch (Exception $e) {
$item['reject']($e);
} finally {
$this->currentRequests--;
}
}
}
public function getStatus(): array
{
return [
'max_concurrency' => $this->maxConcurrency,
'current_requests' => $this->currentRequests,
'queue_length' => count($this->queue),
'available_slots' => $this->maxConcurrency - $this->currentRequests,
];
}
}
// 使用示例
$manager = new ConcurrentRequestManager(5);
$results = [];
foreach ($prompts as $prompt) {
$results[] = $manager->execute(function() use ($client, $prompt) {
return $client->chat([['role' => 'user', 'content' => $prompt]]);
});
}使用Guzzle异步请求
php
<?php
class AsyncBatchProcessor
{
private $client;
private int $concurrency;
private SlidingWindowLimiter $limiter;
public function __construct($client, int $concurrency = 5, int $rpm = 60)
{
$this->client = $client;
$this->concurrency = $concurrency;
$this->limiter = new SlidingWindowLimiter($rpm, 60);
}
public function processBatch(array $messagesList): array
{
$promises = [];
$results = [];
foreach ($messagesList as $key => $messages) {
$this->limiter->acquire();
$promises[$key] = $this->client->postAsync('/chat/completions', [
'json' => [
'model' => 'gpt-4o-mini',
'messages' => $messages,
],
]);
}
$responses = \GuzzleHttp\Promise\Utils::settle($promises)->wait();
foreach ($responses as $key => $response) {
if ($response['state'] === 'fulfilled') {
$body = json_decode($response['value']->getBody(), true);
$results[$key] = $body['choices'][0]['message']['content'];
} else {
$results[$key] = ['error' => $response['reason']->getMessage()];
}
}
return $results;
}
}
// 使用示例
$processor = new AsyncBatchProcessor($client, 5, 60);
$messagesList = [
[['role' => 'user', 'content' => '问题1']],
[['role' => 'user', 'content' => '问题2']],
[['role' => 'user', 'content' => '问题3']],
];
$results = $processor->processBatch($messagesList);智能限流
自适应限流器
php
<?php
class AdaptiveRateLimiter
{
private int $baseLimit;
private int $currentLimit;
private int $successCount = 0;
private int $failureCount = 0;
private int $windowStart;
private int $windowSize = 60;
public function __construct(int $baseLimit)
{
$this->baseLimit = $baseLimit;
$this->currentLimit = $baseLimit;
$this->windowStart = time();
}
public function recordSuccess(): void
{
$this->checkWindow();
$this->successCount++;
if ($this->successCount > $this->currentLimit * 0.9 && $this->failureCount < $this->currentLimit * 0.1) {
$this->currentLimit = min($this->currentLimit * 1.1, $this->baseLimit * 2);
}
}
public function recordFailure(): void
{
$this->checkWindow();
$this->failureCount++;
if ($this->failureCount > $this->currentLimit * 0.2) {
$this->currentLimit = max($this->currentLimit * 0.8, $this->baseLimit * 0.5);
}
}
public function getCurrentLimit(): int
{
return (int)$this->currentLimit;
}
private function checkWindow(): void
{
if (time() - $this->windowStart >= $this->windowSize) {
$this->successCount = 0;
$this->failureCount = 0;
$this->windowStart = time();
}
}
}
class SmartRateLimitedClient
{
private $client;
private AdaptiveRateLimiter $limiter;
private SlidingWindowLimiter $hardLimit;
public function __construct($client, int $baseLimit = 60)
{
$this->client = $client;
$this->limiter = new AdaptiveRateLimiter($baseLimit);
$this->hardLimit = new SlidingWindowLimiter($baseLimit, 60);
}
public function chat(array $messages): array
{
$this->hardLimit->acquire();
try {
$result = $this->client->chat($messages);
$this->limiter->recordSuccess();
return $result;
} catch (Exception $e) {
$this->limiter->recordFailure();
throw $e;
}
}
}多级限流
php
<?php
class MultiLevelRateLimiter
{
private array $limiters;
public function __construct()
{
$this->limiters = [
'per_second' => new TokenBucket(10, 10),
'per_minute' => new SlidingWindowLimiter(60, 60),
'per_hour' => new SlidingWindowLimiter(1000, 3600),
];
}
public function tryAcquire(): bool
{
foreach ($this->limiters as $limiter) {
if (!$limiter->tryAcquire()) {
return false;
}
}
return true;
}
public function acquire(): void
{
while (!$this->tryAcquire()) {
usleep(100000);
}
}
public function getWaitTime(): float
{
$maxWait = 0;
foreach ($this->limiters as $limiter) {
$wait = $limiter->getWaitTime();
$maxWait = max($maxWait, $wait);
}
return $maxWait;
}
}批量请求优化
智能批处理器
php
<?php
class SmartBatchProcessor
{
private $client;
private int $batchSize;
private int $batchTimeout;
private array $queue = [];
private $timer = null;
public function __construct($client, int $batchSize = 10, int $batchTimeoutMs = 100)
{
$this->client = $client;
$this->batchSize = $batchSize;
$this->batchTimeout = $batchTimeoutMs / 1000;
}
public function addRequest(array $messages, callable $callback): void
{
$this->queue[] = [
'messages' => $messages,
'callback' => $callback,
];
if (count($this->queue) >= $this->batchSize) {
$this->flush();
}
}
public function flush(): void
{
if (empty($this->queue)) {
return;
}
$batch = $this->queue;
$this->queue = [];
$this->processBatch($batch);
}
private function processBatch(array $batch): void
{
$promises = [];
foreach ($batch as $index => $item) {
$promises[$index] = $this->client->postAsync('/chat/completions', [
'json' => [
'model' => 'gpt-4o-mini',
'messages' => $item['messages'],
],
]);
}
$responses = \GuzzleHttp\Promise\Utils::settle($promises)->wait();
foreach ($responses as $index => $response) {
if ($response['state'] === 'fulfilled') {
$body = json_decode($response['value']->getBody(), true);
$batch[$index]['callback']($body);
} else {
$batch[$index]['callback'](null, $response['reason']);
}
}
}
}常见问题答疑(FAQ)
Q1:如何确定合适的并发数?
回答:
php
<?php
// 根据API限制计算
$rpm = 60; // 每分钟请求数
$avgResponseTime = 2; // 平均响应时间(秒)
// 理论最大并发
$maxConcurrency = ($rpm / 60) * $avgResponseTime;
// 建议使用理论值的70-80%
$recommendedConcurrency = (int)($maxConcurrency * 0.7);Q2:如何处理429错误?
回答:
php
<?php
function handle429Error($client, $messages, $maxRetries = 3): array
{
for ($i = 0; $i < $maxRetries; $i++) {
try {
return $client->chat($messages);
} catch (Exception $e) {
if (strpos($e->getMessage(), '429') !== false) {
$retryAfter = $e->getResponse()->getHeader('Retry-After')[0] ?? 60;
sleep($retryAfter);
} else {
throw $e;
}
}
}
throw new Exception('超过最大重试次数');
}Q3:如何监控限流状态?
回答:
php
<?php
class RateLimitMonitor
{
private array $stats = [
'total_requests' => 0,
'successful_requests' => 0,
'rate_limited' => 0,
'errors' => 0,
];
public function recordRequest(bool $success, bool $rateLimited = false): void
{
$this->stats['total_requests']++;
if ($rateLimited) {
$this->stats['rate_limited']++;
} elseif ($success) {
$this->stats['successful_requests']++;
} else {
$this->stats['errors']++;
}
}
public function getStats(): array
{
return $this->stats;
}
}Q4:如何实现分布式限流?
回答:
php
<?php
class RedisRateLimiter
{
private $redis;
private string $key;
private int $limit;
private int $window;
public function __construct($redis, string $key, int $limit, int $windowSeconds)
{
$this->redis = $redis;
$this->key = $key;
$this->limit = $limit;
$this->window = $windowSeconds;
}
public function tryAcquire(): bool
{
$now = time();
$windowStart = $now - $this->window;
$this->redis->multi();
$this->redis->zRemRangeByScore($this->key, 0, $windowStart);
$this->redis->zCard($this->key);
$this->redis->zAdd($this->key, $now, uniqid());
$this->redis->expire($this->key, $this->window);
$results = $this->redis->exec();
$count = $results[1];
return $count < $this->limit;
}
}Q5:如何优雅降级?
回答:
php
<?php
class GracefulDegradation
{
private $client;
private RateLimitMonitor $monitor;
public function chat(array $messages): array
{
if ($this->monitor->getStats()['rate_limited'] > 10) {
return $this->getCachedResponse($messages);
}
try {
return $this->client->chat($messages);
} catch (Exception $e) {
if (strpos($e->getMessage(), '429') !== false) {
return $this->getCachedResponse($messages);
}
throw $e;
}
}
}Q6:如何测试限流效果?
回答:
php
<?php
function testRateLimiter(RateLimiterInterface $limiter, int $requests): array
{
$results = [];
$startTime = microtime(true);
for ($i = 0; $i < $requests; $i++) {
$requestStart = microtime(true);
$limiter->acquire();
$waitTime = microtime(true) - $requestStart;
$results[] = [
'request' => $i + 1,
'wait_time' => $waitTime,
'timestamp' => microtime(true) - $startTime,
];
}
return $results;
}实战练习
基础练习
练习1:实现一个简单的令牌桶限流器。
参考代码:
php
<?php
class SimpleTokenBucket
{
private int $tokens;
private int $maxTokens;
private int $lastRefill;
public function __construct(int $maxTokens, int $refillRate)
{
$this->tokens = $maxTokens;
$this->maxTokens = $maxTokens;
$this->lastRefill = time();
}
public function consume(): bool
{
$this->refill();
if ($this->tokens > 0) {
$this->tokens--;
return true;
}
return false;
}
private function refill(): void
{
$now = time();
$elapsed = $now - $this->lastRefill;
$this->tokens = min($this->maxTokens, $this->tokens + $elapsed);
$this->lastRefill = $now;
}
}进阶练习
练习2:实现一个并发请求管理器。
参考代码:
php
<?php
class SimpleConcurrencyManager
{
private int $maxConcurrent;
private int $current = 0;
public function execute(callable $task)
{
while ($this->current >= $this->maxConcurrent) {
usleep(10000);
}
$this->current++;
try {
return $task();
} finally {
$this->current--;
}
}
}挑战练习
练习3:实现一个完整的智能限流系统。
参考代码:
php
<?php
class IntelligentRateLimiter
{
private MultiLevelRateLimiter $limiter;
private AdaptiveRateLimiter $adaptive;
private RateLimitMonitor $monitor;
public function chat(array $messages): array
{
$this->limiter->acquire();
try {
$result = $this->client->chat($messages);
$this->adaptive->recordSuccess();
$this->monitor->recordRequest(true);
return $result;
} catch (Exception $e) {
$this->adaptive->recordFailure();
$this->monitor->recordRequest(false, strpos($e->getMessage(), '429') !== false);
throw $e;
}
}
}知识点总结
核心要点
- 令牌桶算法:平滑限流,允许突发流量
- 滑动窗口算法:精确限流,防止超限
- 并发控制:限制同时请求数
- 自适应限流:根据响应动态调整
- 多级限流:多维度保护
- 批量处理:提高吞吐量
易错点回顾
| 易错点 | 正确做法 |
|---|---|
| 不处理429错误 | 解析Retry-After并等待 |
| 并发数设置过高 | 根据API限制计算 |
| 不监控限流状态 | 记录请求统计 |
| 不实现降级 | 缓存或备用方案 |
拓展参考资料
官方文档
进阶学习路径
💡 记住:合理的限流策略是保护API和提升用户体验的关键,令牌桶+滑动窗口+并发控制是三大核心手段。
