
云原生 API 网关 APISIX 入门教程
openai
接口已经支持流式调用,再结合Web框架的流式响应功能,不难完成流式输出功能。
但LangChain对openai接口进行了深度包装,流式输出需要进行回调(callback
)。LangChain的流式回调类为StreamingStdOutCallbackHandler
,此为终端流式输出,不支持接口流式输出。
如果想要对LangChain的回答进行Web端流式输出,网络上有不少人已给出解决方案,大多数方法为继承BaseCallbackHandler
类,进行改造:通常方法是借助队列,将新生成的token送入队列,在回复答案(另起一个新的线程)的同时,进行队列元素的获取,从而实现接口流式输出。
参考其中一种解决方案:https://gist.github.com/python273/563177b3ad5b9f74c0f8f3299ec13850 .
本文的创新之处在于,借助异步Web框架sanic[1]和FastAPI[2], 和LangChain中的AsyncIteratorCallbackHandler[3],使用异步方法来实现调用LangChain,实现接口流式输出功能。
# -*- coding: utf-8 -*-
# @place: Pudong, Shanghai
# @file: sanic_langchain_stream.py
# @time: 2023/9/19 18:18
# sanic==23.6.0
import asyncio
from sanic import Sanic
from sanic.response import text, json, ResponseStream
from langchain.chat_models import ChatOpenAI
from langchain.schema import HumanMessage
from langchain.callbacks.streaming_aiter import AsyncIteratorCallbackHandler
app = Sanic("benchmark")
@app.route("/")
async def index(request):
return text("hello")
@app.route("/test", methods=["POST"])
async def answer(request):
content = request.json["content"]
return json({"text": content})
@app.route("/csv")
async def test(request):
async def sample_streaming_fn(response):
await response.write("foo,")
await response.write("bar")
return ResponseStream(sample_streaming_fn, content_type="text/csv")
@app.route("/answer/async", methods=["POST"])
async def answer_async(request):
content = request.json["content"]
async def predict(response):
handler = AsyncIteratorCallbackHandler()
model_message = [HumanMessage(content=content)]
chat = ChatOpenAI(streaming=True,
callbacks=[handler],
temperature=0,
openai_api_key="")
asyncio.create_task(chat.apredict_messages(model_message))
async for token in handler.aiter():
await response.write(f"data: {token}\n\n")
return ResponseStream(predict, content_type="text/event-stream")
if __name__ == "__main__":
app.run(host="0.0.0.0", port=3000, debug=False, access_log=True)
# -*- coding: utf-8 -*-
# @place: Pudong, Shanghai
# @file: fastapi_langchain_stream.py
# @time: 2023/9/20 17:36
# fastapi==0.101.1
import uvicorn
import asyncio
from fastapi import FastAPI
from pydantic import BaseModel
from fastapi.responses import StreamingResponse, JSONResponse
from langchain.chat_models import ChatOpenAI
from langchain.schema import HumanMessage
from langchain.callbacks.streaming_aiter import AsyncIteratorCallbackHandler
app = FastAPI(description="langchain_streaming")
class Item(BaseModel):
text: str
class Question(BaseModel):
text: str
async def fake_video_streamer():
for i in range(10):
yield b"some fake video bytes\n"
@app.get("/")
async def main():
return StreamingResponse(fake_video_streamer())
@app.post("/test")
async def test(item: Item):
return JSONResponse({"content": item.text})
@app.post("/answer/async")
async def answer_async(q: Question):
content = q.text
async def predict():
handler = AsyncIteratorCallbackHandler()
model_message = [HumanMessage(content=content)]
chat = ChatOpenAI(streaming=True,
callbacks=[handler],
temperature=0,
openai_api_key="sk-xxx")
asyncio.create_task(chat.apredict_messages(model_message))
async for token in handler.aiter():
yield f"data: {token}\n\n"
return StreamingResponse(predict())
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")
可视化功能采用HTML,页面较为简陋,因笔者的HTML知识有限(甚至可怜)。
HTML文件(form.html):
<body>
<form action="/qa/" method="post">
<p>question: <br><br><textarea rows="3" cols="50" name="question"></textarea></p>
<input type="submit">
</form>
<p>{{ answer }}</p>
</body>
Python代码:
# -*- coding: utf-8 -*-
# @place: Pudong, Shanghai
# @file: qa_with_form.py
# @time: 2023/9/20 21:36
from starlette.requests import Request
from fastapi import FastAPI, Form
from starlette.templating import Jinja2Templates
import uvicorn
import asyncio
from fastapi.responses import StreamingResponse
from langchain.chat_models import ChatOpenAI
from langchain.schema import HumanMessage
from langchain.callbacks.streaming_aiter import AsyncIteratorCallbackHandler
app = FastAPI()
template = Jinja2Templates(directory='template')
@app.get('/') # 接受get请求
async def get_user(request: Request):
return template.TemplateResponse('form.html', {'request': request})
@app.post('/qa/') # 接受post请求
async def get_user(request: Request,
question: str = Form(...)
):
async def predict():
handler = AsyncIteratorCallbackHandler()
model_message = [HumanMessage(content=question)]
chat = ChatOpenAI(streaming=True,
callbacks=[handler],
temperature=0,
openai_api_key="sk-xxx")
asyncio.create_task(chat.apredict_messages(model_message))
async for token in handler.aiter():
answer = token.replace("\n", "<br>")
yield answer
yield '<br><a href="/" color="red">返回</a>'
return StreamingResponse(predict(), media_type='text/html')
if __name__ == '__main__':
uvicorn.run(app, host='0.0.0.0', port=8080)
演示效果如下面的视频:
文章转自微信公众号@NLP奇幻之旅