
云原生 API 网关 APISIX 入门教程
最近几年GPT在全球大火,相信大家在日常生活、工作中都在使用。细心的老铁们可能已经注意到,市面上的GPT在回复我们的问题的时候基本上都是采用类似对话的方式。数据在生成后立即被发送给用户,而不是等待所有数据都生成完毕后再一次性发送。
在本文中,我们将探讨主流的大型语言模型(LLM)提供商如何实现其流式输出的 HTTP API。我们将深入研究流式输出的工作原理,探讨其优势,并提供示例代码以帮助您理解如何在实际应用中使用流式输出。
流式输出(Streaming Output)是一种使后端将数据分块、逐步发送到前端的技术。通过这种方法,前端应用能够即时接收和渲染数据,不必等到整个响应体生成完毕后再处理。
传统的API 通常会一次性返回所有数据,然后客户端一次性接收。
流式输出则允许服务器在生成数据的同时将其发送给客户端,从而实现实时更新。
流式输出通常用于以下几种场景:
流式 API 提供了即时响应的体验,允许用户在内容生成过程中即时查看部分结果。相比等待整个响应完成,流式输出极大提高了用户体验。适用于多种场景,例如:
在具体实现流式输出时,常用的技术包括:
本文主要讲解SSE的实现。
Server-Sent Events(SSE)返回的数据格式是由一系列文本流组成,每行包含一个键值对,表示一个数据事件。每条事件消息由事件名称、数据内容等字段组成,并且这些字段具有特定的格式和规则。
event: custom-event
id: 1
retry: 5000
data: {"message": "Hello, World!"}
data: {"message": "Part 1 of the message"}
data: {"message": "Part 2 of the message"}
data: {"message": "Part 3 of the message"}
在客户端收到时,这几行会被拼接成一条数据。
[HttpPost, HttpGet]
[ActionTitle(Name = "聊天")]
[Route("chat")]
public async Task Completions([FromBody] ChatDto chatDto)
{
Response.ContentType = "text/event-stream";
await foreach( var message in GetStreamingResponseAsync(chatDto.Input) ) {
var data = $"data: {message}\n\n";
Console.Write(data);
var bytes = Encoding.UTF8.GetBytes(data);
await Response.Body.WriteAsync(bytes);
await Response.Body.FlushAsync();
await Task.Delay(100);
}
}
public static async IAsyncEnumerable<string> GetStreamingResponseAsync(string userInput)
{
// 随机获取一个配置
GptConfig gptConfig = new GptConfig() {
ApiKey = "your-api-key",
Version = "2023-03-15-preview"
};
HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Post, $"URL_ADDRESS");
request.Headers.Add("api-key", gptConfig.ApiKey);
var requestBody = new {
messages = new[]
{
new { role = "user", content = userInput }
},
stream = true
};
var jsonRequestBody = JsonSerializer.Serialize(requestBody);
request.Content = new StringContent(jsonRequestBody, Encoding.UTF8, "application/json");
using HttpClient httpClient = new HttpClient();
using( var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead) ) {
response.EnsureSuccessStatusCode();
var responseStream = await response.Content.ReadAsStreamAsync();
using( var reader = new StreamReader(responseStream) ) {
while( !reader.EndOfStream ) {
var line = await reader.ReadLineAsync();
if( !string.IsNullOrWhiteSpace(line) && line.StartsWith("data:") ) {
var jsonData = line.Substring(5).Trim();
if( jsonData == "[DONE]" )
break;
var data = JsonSerializer.Deserialize<JsonElement>(jsonData);
// 检查是否包含 content 字段,避免报错
if( data.TryGetProperty("choices", out var choices) &&
choices[0].TryGetProperty("delta", out var delta) &&
delta.TryGetProperty("content", out var content) ) {
yield return content.GetString();
}
}
}
}
}
}
前端实现
在前端,我们可以使用 vue3来实现。以下是一个简单的示例:
chat() {
fetch(`/v20/openai/chat`, {
method: 'POST',
body: JSON.stringify({ input: this.input }),
headers: {
'Content-Type': 'application/json'
}
}).then((res) => {
const reader = res.body.getReader();
this.handleReadStream(reader)
}).finally(() => {
this.input = ''
})
},
// 流式对话
handleReadStream(stream) {
stream.read().then(({ done, value }) => {
if (done) {
return
}
const data = new TextDecoder().decode(value)
if (!data) {
return
}
this.message += data.replaceAll('data: ', '')
// 强制 Vue 渲染更新
this.$nextTick(() => {
console.log("Stream updated");
});
// 递归处理流
this.handleReadStream(stream)
})
},
实现效果
需要注意的是,vue3项目在本地开发代理api接口的时候似乎默认启用了gzip压缩,导致前端无法正常解析SSE的数据格式。可以在vue.config.js中配置关闭gzip压缩。
devServer: {
port: 9588,
compress: false,
allowedHosts: "all",
proxy: {
'v20': { target: 'http://localhost:2222', changeOrigin: true },
}
}
流式输出是一种强大的工具,能够显著改善数据传输体验,特别适用于实时和大数据场景。合理选择适合的流式输出技术并处理好前后端的数据解析和错误恢复,可以显著提升应用的交互性和性能。
文章转自微信公众号@ITProHub