Appearance
流式响应处理
流式响应让用户实时看到AI输出,大大提升交互体验
概述
流式响应(Streaming)是一种逐步返回数据的技术,让用户能够实时看到AI生成的内容,而不是等待完整响应。本教程将教你如何实现高效的流式响应处理。
为什么需要流式响应?
传统响应 vs 流式响应:
传统响应:
用户发送请求 → 等待... → 等待... → 等待... → 完整响应
(用户需要等待全部内容生成完成)
流式响应:
用户发送请求 → 收到第一个字 → 收到更多 → 持续接收 → 完成
(用户可以实时看到内容生成)流式响应的优势
| 优势 | 说明 |
|---|---|
| 即时反馈 | 用户立即看到响应开始 |
| 降低等待焦虑 | 不需要长时间等待 |
| 更好的体验 | 类似真人对话的感觉 |
| 提前中断 | 可以在生成过程中停止 |
SSE协议基础
Server-Sent Events格式
SSE数据格式:
data: {"choices":[{"delta":{"content":"你"}}]}
data: {"choices":[{"delta":{"content":"好"}}]}
data: {"choices":[{"delta":{"content":"!"}}]}
data: [DONE]
规则:
1. 每条消息以 "data: " 开头
2. 消息之间用空行分隔
3. 以 "data: [DONE]" 结束PHP处理SSE
php
<?php
class SSEParser
{
public function parse(string $data): array
{
$messages = [];
$lines = explode("\n", $data);
foreach ($lines as $line) {
$line = trim($line);
if (empty($line)) {
continue;
}
if (strpos($line, 'data: ') === 0) {
$json = substr($line, 6);
if ($json === '[DONE]') {
$messages[] = ['done' => true];
} else {
$messages[] = json_decode($json, true);
}
}
}
return $messages;
}
public function extractContent(array $message): ?string
{
return $message['choices'][0]['delta']['content'] ?? null;
}
}基础流式客户端
实现流式请求
php
<?php
require 'vendor/autoload.php';
use GuzzleHttp\Client;
class StreamingClient
{
private $client;
private $apiKey;
private $baseUrl = 'https://api.openai.com/v1';
public function __construct(string $apiKey)
{
$this->apiKey = $apiKey;
$this->client = new Client([
'base_uri' => $this->baseUrl,
'timeout' => 120,
'headers' => [
'Authorization' => 'Bearer ' . $this->apiKey,
'Content-Type' => 'application/json',
],
]);
}
public function chatStream(array $messages, string $model = 'gpt-4o-mini'): Generator
{
$response = $this->client->post('/chat/completions', [
'json' => [
'model' => $model,
'messages' => $messages,
'stream' => true,
],
'stream' => true,
]);
$body = $response->getBody();
$buffer = '';
while (!$body->eof()) {
$chunk = $body->read(1024);
$buffer .= $chunk;
while (($pos = strpos($buffer, "\n\n")) !== false) {
$block = substr($buffer, 0, $pos);
$buffer = substr($buffer, $pos + 2);
$lines = explode("\n", $block);
foreach ($lines as $line) {
$line = trim($line);
if (empty($line)) {
continue;
}
if (strpos($line, 'data: ') === 0) {
$json = substr($line, 6);
if ($json === '[DONE]') {
return;
}
$data = json_decode($json, true);
if (isset($data['choices'][0]['delta']['content'])) {
yield $data['choices'][0]['delta']['content'];
}
}
}
}
}
}
}
// 使用示例
$client = new StreamingClient($apiKey);
echo "AI回复:";
foreach ($client->chatStream([['role' => 'user', 'content' => '讲一个故事']]) as $chunk) {
echo $chunk;
flush();
}
echo "\n";高级流式处理
带回调的流式处理
php
<?php
class CallbackStreamingClient
{
private $client;
public function chatStreamWithCallback(
array $messages,
callable $onChunk,
callable $onComplete = null,
callable $onError = null
): string {
$fullContent = '';
try {
foreach ($this->client->chatStream($messages) as $chunk) {
$fullContent .= $chunk;
$onChunk($chunk, $fullContent);
}
if ($onComplete) {
$onComplete($fullContent);
}
} catch (Exception $e) {
if ($onError) {
$onError($e);
}
throw $e;
}
return $fullContent;
}
}
// 使用示例
$client = new CallbackStreamingClient($baseClient);
$client->chatStreamWithCallback(
[['role' => 'user', 'content' => '写一首诗']],
function($chunk, $full) {
echo $chunk;
flush();
},
function($full) {
echo "\n完成!总长度:" . strlen($full) . "\n";
},
function($e) {
echo "\n错误:" . $e->getMessage() . "\n";
}
);带进度显示的流式处理
php
<?php
class ProgressStreamingClient
{
private $client;
public function chatWithProgress(array $messages, int $updateInterval = 10): string
{
$fullContent = '';
$chunkCount = 0;
$startTime = microtime(true);
echo "开始生成...\n";
foreach ($this->client->chatStream($messages) as $chunk) {
$fullContent .= $chunk;
$chunkCount++;
if ($chunkCount % $updateInterval === 0) {
$elapsed = microtime(true) - $startTime;
$speed = strlen($fullContent) / $elapsed;
echo sprintf(
"\r已生成 %d 字符,速度 %.1f 字符/秒",
strlen($fullContent),
$speed
);
}
}
$elapsed = microtime(true) - $startTime;
echo sprintf(
"\n完成!共 %d 字符,耗时 %.2f 秒\n",
strlen($fullContent),
$elapsed
);
return $fullContent;
}
}Web应用中的流式响应
PHP输出流式响应
php
<?php
class WebStreamingHandler
{
private $client;
public function handleStreamRequest(array $messages): void
{
header('Content-Type: text/event-stream');
header('Cache-Control: no-cache');
header('Connection: keep-alive');
header('X-Accel-Buffering: no');
ob_end_flush();
foreach ($this->client->chatStream($messages) as $chunk) {
echo "data: " . json_encode(['content' => $chunk]) . "\n\n";
flush();
}
echo "data: [DONE]\n\n";
flush();
}
}
// API端点示例
// stream.php
$messages = json_decode($_POST['messages'] ?? '[]', true);
$handler = new WebStreamingHandler($client);
$handler->handleStreamRequest($messages);前端接收流式响应
javascript
// JavaScript前端示例
async function streamChat(messages) {
const response = await fetch('/stream.php', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ messages })
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let result = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') {
return result;
}
const json = JSON.parse(data);
result += json.content;
updateUI(json.content);
}
}
}
return result;
}流式响应与工具调用
处理工具调用的流式响应
php
<?php
class ToolCallStreamingHandler
{
private $client;
public function handleStreamWithTools(array $messages, array $tools): array
{
$fullContent = '';
$toolCalls = [];
$currentToolCall = null;
foreach ($this->client->chatStream($messages, 'gpt-4o-mini', ['tools' => $tools]) as $chunk) {
$delta = $chunk['choices'][0]['delta'] ?? [];
if (isset($delta['content'])) {
$fullContent .= $delta['content'];
yield ['type' => 'content', 'content' => $delta['content']];
}
if (isset($delta['tool_calls'])) {
foreach ($delta['tool_calls'] as $toolCallDelta) {
$index = $toolCallDelta['index'];
if (!isset($toolCalls[$index])) {
$toolCalls[$index] = [
'id' => $toolCallDelta['id'] ?? '',
'type' => 'function',
'function' => [
'name' => '',
'arguments' => '',
],
];
}
if (isset($toolCallDelta['function']['name'])) {
$toolCalls[$index]['function']['name'] = $toolCallDelta['function']['name'];
}
if (isset($toolCallDelta['function']['arguments'])) {
$toolCalls[$index]['function']['arguments'] .= $toolCallDelta['function']['arguments'];
}
yield ['type' => 'tool_call', 'tool_call' => $toolCalls[$index]];
}
}
}
return [
'content' => $fullContent,
'tool_calls' => array_values($toolCalls),
];
}
}流式响应优化
缓冲区管理
php
<?php
class BufferedStreamingClient
{
private $client;
private int $bufferSize;
private int $flushInterval;
public function __construct($client, int $bufferSize = 100, int $flushInterval = 50)
{
$this->client = $client;
$this->bufferSize = $bufferSize;
$this->flushInterval = $flushInterval;
}
public function chatBuffered(array $messages): Generator
{
$buffer = '';
$lastFlush = microtime(true);
foreach ($this->client->chatStream($messages) as $chunk) {
$buffer .= $chunk;
$shouldFlush = strlen($buffer) >= $this->bufferSize ||
(microtime(true) - $lastFlush) * 1000 >= $this->flushInterval;
if ($shouldFlush) {
yield $buffer;
$buffer = '';
$lastFlush = microtime(true);
}
}
if (!empty($buffer)) {
yield $buffer;
}
}
}超时处理
php
<?php
class TimeoutStreamingClient
{
private $client;
private int $timeout;
private int $chunkTimeout;
public function __construct($client, int $timeout = 60, int $chunkTimeout = 5)
{
$this->client = $client;
$this->timeout = $timeout;
$this->chunkTimeout = $chunkTimeout;
}
public function chatWithTimeout(array $messages): Generator
{
$startTime = time();
$lastChunkTime = time();
foreach ($this->client->chatStream($messages) as $chunk) {
$now = time();
if ($now - $startTime > $this->timeout) {
throw new Exception('总超时');
}
if ($now - $lastChunkTime > $this->chunkTimeout) {
throw new Exception('数据块超时');
}
$lastChunkTime = $now;
yield $chunk;
}
}
}常见问题答疑(FAQ)
Q1:流式响应和普通响应有什么区别?
回答:
| 特性 | 普通响应 | 流式响应 |
|---|---|---|
| 返回方式 | 一次性返回 | 逐步返回 |
| 首字节时间 | 等待全部生成 | 立即开始 |
| 用户体验 | 需要等待 | 实时反馈 |
| 实现复杂度 | 简单 | 较复杂 |
| 适用场景 | 批量处理 | 实时交互 |
Q2:如何处理流式响应中的错误?
回答:
php
<?php
try {
foreach ($client->chatStream($messages) as $chunk) {
echo $chunk;
}
} catch (Exception $e) {
// 处理连接错误
if ($e instanceof ConnectException) {
echo "\n[连接失败]";
}
// 处理解析错误
elseif ($e instanceof JsonException) {
echo "\n[数据解析错误]";
}
// 处理超时
elseif (strpos($e->getMessage(), 'timeout') !== false) {
echo "\n[响应超时]";
}
}Q3:如何在CLI和Web中使用流式响应?
回答:
php
<?php
// CLI环境
if (php_sapi_name() === 'cli') {
foreach ($client->chatStream($messages) as $chunk) {
echo $chunk;
flush();
}
}
// Web环境
else {
header('Content-Type: text/event-stream');
foreach ($client->chatStream($messages) as $chunk) {
echo "data: " . json_encode(['content' => $chunk]) . "\n\n";
flush();
}
echo "data: [DONE]\n\n";
}Q4:如何实现流式响应的取消?
回答:
php
<?php
class CancellableStreamingClient
{
private bool $cancelled = false;
public function cancel(): void
{
$this->cancelled = true;
}
public function chatCancellable(array $messages): Generator
{
foreach ($this->client->chatStream($messages) as $chunk) {
if ($this->cancelled) {
yield ['cancelled' => true];
return;
}
yield $chunk;
}
}
}Q5:流式响应如何获取完整内容?
回答:
php
<?php
$fullContent = '';
foreach ($client->chatStream($messages) as $chunk) {
$fullContent .= $chunk;
echo $chunk;
}
echo "\n完整内容:\n" . $fullContent;Q6:如何处理不同API的流式格式差异?
回答:
php
<?php
class UniversalStreamingParser
{
public function parse(string $provider, string $data): ?string
{
switch ($provider) {
case 'openai':
case 'deepseek':
return $this->parseOpenAIFormat($data);
case 'claude':
return $this->parseClaudeFormat($data);
case 'gemini':
return $this->parseGeminiFormat($data);
default:
return null;
}
}
private function parseOpenAIFormat(string $data): ?string
{
$json = json_decode($data, true);
return $json['choices'][0]['delta']['content'] ?? null;
}
private function parseClaudeFormat(string $data): ?string
{
$json = json_decode($data, true);
if ($json['type'] === 'content_block_delta') {
return $json['delta']['text'] ?? null;
}
return null;
}
}实战练习
基础练习
练习1:实现一个简单的流式聊天程序。
参考代码:
php
<?php
$client = new StreamingClient($apiKey);
echo "AI: ";
foreach ($client->chatStream([['role' => 'user', 'content' => '你好']]) as $chunk) {
echo $chunk;
}
echo "\n";进阶练习
练习2:实现一个带进度显示的流式客户端。
参考代码:
php
<?php
class ProgressClient
{
public function chatWithProgress(array $messages): string
{
$content = '';
foreach ($this->client->chatStream($messages) as $chunk) {
$content .= $chunk;
echo "\r已生成: " . strlen($content) . " 字符";
}
echo "\n";
return $content;
}
}挑战练习
练习3:实现一个完整的Web流式聊天API。
参考代码:
php
<?php
class WebChatAPI
{
public function handle(): void
{
header('Content-Type: text/event-stream');
$messages = json_decode(file_get_contents('php://input'), true)['messages'] ?? [];
foreach ($this->client->chatStream($messages) as $chunk) {
echo "data: " . json_encode(['content' => $chunk]) . "\n\n";
flush();
}
echo "data: [DONE]\n\n";
}
}知识点总结
核心要点
- SSE协议:理解Server-Sent Events格式
- 流式解析:正确解析流式数据块
- 回调处理:使用回调函数处理数据
- Web集成:在Web应用中实现流式响应
- 错误处理:处理流式响应中的错误
- 性能优化:缓冲区管理和超时处理
易错点回顾
| 易错点 | 正确做法 |
|---|---|
| 不flush输出 | 使用flush()刷新缓冲区 |
| 不处理连接断开 | 检查eof和异常 |
| 不设置正确的Header | 设置SSE相关Header |
| 不处理解析错误 | try-catch包裹JSON解析 |
拓展参考资料
官方文档
进阶学习路径
💡 记住:流式响应大大提升用户体验,理解SSE协议和正确处理数据流是实现的关键。
