Skip to content

流式响应处理

流式响应让用户实时看到AI输出,大大提升交互体验

概述

流式响应(Streaming)是一种逐步返回数据的技术,让用户能够实时看到AI生成的内容,而不是等待完整响应。本教程将教你如何实现高效的流式响应处理。

为什么需要流式响应?

传统响应 vs 流式响应:

传统响应:
用户发送请求 → 等待... → 等待... → 等待... → 完整响应
(用户需要等待全部内容生成完成)

流式响应:
用户发送请求 → 收到第一个字 → 收到更多 → 持续接收 → 完成
(用户可以实时看到内容生成)

流式响应的优势

优势说明
即时反馈用户立即看到响应开始
降低等待焦虑不需要长时间等待
更好的体验类似真人对话的感觉
提前中断可以在生成过程中停止

SSE协议基础

Server-Sent Events格式

SSE数据格式:

data: {"choices":[{"delta":{"content":"你"}}]}

data: {"choices":[{"delta":{"content":"好"}}]}

data: {"choices":[{"delta":{"content":"!"}}]}

data: [DONE]

规则:
1. 每条消息以 "data: " 开头
2. 消息之间用空行分隔
3. 以 "data: [DONE]" 结束

PHP处理SSE

php
<?php
class SSEParser
{
    public function parse(string $data): array
    {
        $messages = [];
        $lines = explode("\n", $data);

        foreach ($lines as $line) {
            $line = trim($line);

            if (empty($line)) {
                continue;
            }

            if (strpos($line, 'data: ') === 0) {
                $json = substr($line, 6);

                if ($json === '[DONE]') {
                    $messages[] = ['done' => true];
                } else {
                    $messages[] = json_decode($json, true);
                }
            }
        }

        return $messages;
    }

    public function extractContent(array $message): ?string
    {
        return $message['choices'][0]['delta']['content'] ?? null;
    }
}

基础流式客户端

实现流式请求

php
<?php
require 'vendor/autoload.php';

use GuzzleHttp\Client;

class StreamingClient
{
    private $client;
    private $apiKey;
    private $baseUrl = 'https://api.openai.com/v1';

    public function __construct(string $apiKey)
    {
        $this->apiKey = $apiKey;
        $this->client = new Client([
            'base_uri' => $this->baseUrl,
            'timeout' => 120,
            'headers' => [
                'Authorization' => 'Bearer ' . $this->apiKey,
                'Content-Type' => 'application/json',
            ],
        ]);
    }

    public function chatStream(array $messages, string $model = 'gpt-4o-mini'): Generator
    {
        $response = $this->client->post('/chat/completions', [
            'json' => [
                'model' => $model,
                'messages' => $messages,
                'stream' => true,
            ],
            'stream' => true,
        ]);

        $body = $response->getBody();
        $buffer = '';

        while (!$body->eof()) {
            $chunk = $body->read(1024);
            $buffer .= $chunk;

            while (($pos = strpos($buffer, "\n\n")) !== false) {
                $block = substr($buffer, 0, $pos);
                $buffer = substr($buffer, $pos + 2);

                $lines = explode("\n", $block);
                foreach ($lines as $line) {
                    $line = trim($line);

                    if (empty($line)) {
                        continue;
                    }

                    if (strpos($line, 'data: ') === 0) {
                        $json = substr($line, 6);

                        if ($json === '[DONE]') {
                            return;
                        }

                        $data = json_decode($json, true);

                        if (isset($data['choices'][0]['delta']['content'])) {
                            yield $data['choices'][0]['delta']['content'];
                        }
                    }
                }
            }
        }
    }
}

// 使用示例
$client = new StreamingClient($apiKey);

echo "AI回复:";
foreach ($client->chatStream([['role' => 'user', 'content' => '讲一个故事']]) as $chunk) {
    echo $chunk;
    flush();
}
echo "\n";

高级流式处理

带回调的流式处理

php
<?php
class CallbackStreamingClient
{
    private $client;

    public function chatStreamWithCallback(
        array $messages,
        callable $onChunk,
        callable $onComplete = null,
        callable $onError = null
    ): string {
        $fullContent = '';

        try {
            foreach ($this->client->chatStream($messages) as $chunk) {
                $fullContent .= $chunk;
                $onChunk($chunk, $fullContent);
            }

            if ($onComplete) {
                $onComplete($fullContent);
            }

        } catch (Exception $e) {
            if ($onError) {
                $onError($e);
            }
            throw $e;
        }

        return $fullContent;
    }
}

// 使用示例
$client = new CallbackStreamingClient($baseClient);

$client->chatStreamWithCallback(
    [['role' => 'user', 'content' => '写一首诗']],
    function($chunk, $full) {
        echo $chunk;
        flush();
    },
    function($full) {
        echo "\n完成!总长度:" . strlen($full) . "\n";
    },
    function($e) {
        echo "\n错误:" . $e->getMessage() . "\n";
    }
);

带进度显示的流式处理

php
<?php
class ProgressStreamingClient
{
    private $client;

    public function chatWithProgress(array $messages, int $updateInterval = 10): string
    {
        $fullContent = '';
        $chunkCount = 0;
        $startTime = microtime(true);

        echo "开始生成...\n";

        foreach ($this->client->chatStream($messages) as $chunk) {
            $fullContent .= $chunk;
            $chunkCount++;

            if ($chunkCount % $updateInterval === 0) {
                $elapsed = microtime(true) - $startTime;
                $speed = strlen($fullContent) / $elapsed;
                echo sprintf(
                    "\r已生成 %d 字符,速度 %.1f 字符/秒",
                    strlen($fullContent),
                    $speed
                );
            }
        }

        $elapsed = microtime(true) - $startTime;
        echo sprintf(
            "\n完成!共 %d 字符,耗时 %.2f 秒\n",
            strlen($fullContent),
            $elapsed
        );

        return $fullContent;
    }
}

Web应用中的流式响应

PHP输出流式响应

php
<?php
class WebStreamingHandler
{
    private $client;

    public function handleStreamRequest(array $messages): void
    {
        header('Content-Type: text/event-stream');
        header('Cache-Control: no-cache');
        header('Connection: keep-alive');
        header('X-Accel-Buffering: no');

        ob_end_flush();

        foreach ($this->client->chatStream($messages) as $chunk) {
            echo "data: " . json_encode(['content' => $chunk]) . "\n\n";
            flush();
        }

        echo "data: [DONE]\n\n";
        flush();
    }
}

// API端点示例
// stream.php
$messages = json_decode($_POST['messages'] ?? '[]', true);
$handler = new WebStreamingHandler($client);
$handler->handleStreamRequest($messages);

前端接收流式响应

javascript
// JavaScript前端示例
async function streamChat(messages) {
    const response = await fetch('/stream.php', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ messages })
    });

    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    let result = '';

    while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        const chunk = decoder.decode(value);
        const lines = chunk.split('\n');

        for (const line of lines) {
            if (line.startsWith('data: ')) {
                const data = line.slice(6);
                if (data === '[DONE]') {
                    return result;
                }
                const json = JSON.parse(data);
                result += json.content;
                updateUI(json.content);
            }
        }
    }

    return result;
}

流式响应与工具调用

处理工具调用的流式响应

php
<?php
class ToolCallStreamingHandler
{
    private $client;

    public function handleStreamWithTools(array $messages, array $tools): array
    {
        $fullContent = '';
        $toolCalls = [];
        $currentToolCall = null;

        foreach ($this->client->chatStream($messages, 'gpt-4o-mini', ['tools' => $tools]) as $chunk) {
            $delta = $chunk['choices'][0]['delta'] ?? [];

            if (isset($delta['content'])) {
                $fullContent .= $delta['content'];
                yield ['type' => 'content', 'content' => $delta['content']];
            }

            if (isset($delta['tool_calls'])) {
                foreach ($delta['tool_calls'] as $toolCallDelta) {
                    $index = $toolCallDelta['index'];

                    if (!isset($toolCalls[$index])) {
                        $toolCalls[$index] = [
                            'id' => $toolCallDelta['id'] ?? '',
                            'type' => 'function',
                            'function' => [
                                'name' => '',
                                'arguments' => '',
                            ],
                        ];
                    }

                    if (isset($toolCallDelta['function']['name'])) {
                        $toolCalls[$index]['function']['name'] = $toolCallDelta['function']['name'];
                    }

                    if (isset($toolCallDelta['function']['arguments'])) {
                        $toolCalls[$index]['function']['arguments'] .= $toolCallDelta['function']['arguments'];
                    }

                    yield ['type' => 'tool_call', 'tool_call' => $toolCalls[$index]];
                }
            }
        }

        return [
            'content' => $fullContent,
            'tool_calls' => array_values($toolCalls),
        ];
    }
}

流式响应优化

缓冲区管理

php
<?php
class BufferedStreamingClient
{
    private $client;
    private int $bufferSize;
    private int $flushInterval;

    public function __construct($client, int $bufferSize = 100, int $flushInterval = 50)
    {
        $this->client = $client;
        $this->bufferSize = $bufferSize;
        $this->flushInterval = $flushInterval;
    }

    public function chatBuffered(array $messages): Generator
    {
        $buffer = '';
        $lastFlush = microtime(true);

        foreach ($this->client->chatStream($messages) as $chunk) {
            $buffer .= $chunk;

            $shouldFlush = strlen($buffer) >= $this->bufferSize ||
                          (microtime(true) - $lastFlush) * 1000 >= $this->flushInterval;

            if ($shouldFlush) {
                yield $buffer;
                $buffer = '';
                $lastFlush = microtime(true);
            }
        }

        if (!empty($buffer)) {
            yield $buffer;
        }
    }
}

超时处理

php
<?php
class TimeoutStreamingClient
{
    private $client;
    private int $timeout;
    private int $chunkTimeout;

    public function __construct($client, int $timeout = 60, int $chunkTimeout = 5)
    {
        $this->client = $client;
        $this->timeout = $timeout;
        $this->chunkTimeout = $chunkTimeout;
    }

    public function chatWithTimeout(array $messages): Generator
    {
        $startTime = time();
        $lastChunkTime = time();

        foreach ($this->client->chatStream($messages) as $chunk) {
            $now = time();

            if ($now - $startTime > $this->timeout) {
                throw new Exception('总超时');
            }

            if ($now - $lastChunkTime > $this->chunkTimeout) {
                throw new Exception('数据块超时');
            }

            $lastChunkTime = $now;
            yield $chunk;
        }
    }
}

常见问题答疑(FAQ)

Q1:流式响应和普通响应有什么区别?

回答

特性普通响应流式响应
返回方式一次性返回逐步返回
首字节时间等待全部生成立即开始
用户体验需要等待实时反馈
实现复杂度简单较复杂
适用场景批量处理实时交互

Q2:如何处理流式响应中的错误?

回答

php
<?php
try {
    foreach ($client->chatStream($messages) as $chunk) {
        echo $chunk;
    }
} catch (Exception $e) {
    // 处理连接错误
    if ($e instanceof ConnectException) {
        echo "\n[连接失败]";
    }
    // 处理解析错误
    elseif ($e instanceof JsonException) {
        echo "\n[数据解析错误]";
    }
    // 处理超时
    elseif (strpos($e->getMessage(), 'timeout') !== false) {
        echo "\n[响应超时]";
    }
}

Q3:如何在CLI和Web中使用流式响应?

回答

php
<?php
// CLI环境
if (php_sapi_name() === 'cli') {
    foreach ($client->chatStream($messages) as $chunk) {
        echo $chunk;
        flush();
    }
}
// Web环境
else {
    header('Content-Type: text/event-stream');
    foreach ($client->chatStream($messages) as $chunk) {
        echo "data: " . json_encode(['content' => $chunk]) . "\n\n";
        flush();
    }
    echo "data: [DONE]\n\n";
}

Q4:如何实现流式响应的取消?

回答

php
<?php
class CancellableStreamingClient
{
    private bool $cancelled = false;

    public function cancel(): void
    {
        $this->cancelled = true;
    }

    public function chatCancellable(array $messages): Generator
    {
        foreach ($this->client->chatStream($messages) as $chunk) {
            if ($this->cancelled) {
                yield ['cancelled' => true];
                return;
            }
            yield $chunk;
        }
    }
}

Q5:流式响应如何获取完整内容?

回答

php
<?php
$fullContent = '';
foreach ($client->chatStream($messages) as $chunk) {
    $fullContent .= $chunk;
    echo $chunk;
}
echo "\n完整内容:\n" . $fullContent;

Q6:如何处理不同API的流式格式差异?

回答

php
<?php
class UniversalStreamingParser
{
    public function parse(string $provider, string $data): ?string
    {
        switch ($provider) {
            case 'openai':
            case 'deepseek':
                return $this->parseOpenAIFormat($data);
            case 'claude':
                return $this->parseClaudeFormat($data);
            case 'gemini':
                return $this->parseGeminiFormat($data);
            default:
                return null;
        }
    }

    private function parseOpenAIFormat(string $data): ?string
    {
        $json = json_decode($data, true);
        return $json['choices'][0]['delta']['content'] ?? null;
    }

    private function parseClaudeFormat(string $data): ?string
    {
        $json = json_decode($data, true);
        if ($json['type'] === 'content_block_delta') {
            return $json['delta']['text'] ?? null;
        }
        return null;
    }
}

实战练习

基础练习

练习1:实现一个简单的流式聊天程序。

参考代码

php
<?php
$client = new StreamingClient($apiKey);

echo "AI: ";
foreach ($client->chatStream([['role' => 'user', 'content' => '你好']]) as $chunk) {
    echo $chunk;
}
echo "\n";

进阶练习

练习2:实现一个带进度显示的流式客户端。

参考代码

php
<?php
class ProgressClient
{
    public function chatWithProgress(array $messages): string
    {
        $content = '';
        foreach ($this->client->chatStream($messages) as $chunk) {
            $content .= $chunk;
            echo "\r已生成: " . strlen($content) . " 字符";
        }
        echo "\n";
        return $content;
    }
}

挑战练习

练习3:实现一个完整的Web流式聊天API。

参考代码

php
<?php
class WebChatAPI
{
    public function handle(): void
    {
        header('Content-Type: text/event-stream');

        $messages = json_decode(file_get_contents('php://input'), true)['messages'] ?? [];

        foreach ($this->client->chatStream($messages) as $chunk) {
            echo "data: " . json_encode(['content' => $chunk]) . "\n\n";
            flush();
        }

        echo "data: [DONE]\n\n";
    }
}

知识点总结

核心要点

  1. SSE协议:理解Server-Sent Events格式
  2. 流式解析:正确解析流式数据块
  3. 回调处理:使用回调函数处理数据
  4. Web集成:在Web应用中实现流式响应
  5. 错误处理:处理流式响应中的错误
  6. 性能优化:缓冲区管理和超时处理

易错点回顾

易错点正确做法
不flush输出使用flush()刷新缓冲区
不处理连接断开检查eof和异常
不设置正确的Header设置SSE相关Header
不处理解析错误try-catch包裹JSON解析

拓展参考资料

官方文档

进阶学习路径

  1. 本知识点 → 流式响应处理
  2. 下一步并发与限流
  3. 进阶安全与鉴权

💡 记住:流式响应大大提升用户体验,理解SSE协议和正确处理数据流是实现的关键。