Langchain 流式输出到前端(真正解决方法,附最佳实践的完整代码)

06-01 549阅读

Langchain 流式输出

当我们深入使用Langchain时,我们都会考虑如何进行流式输出。尽管官方网站提供了一些流式输出的示例,但这些示例只能在控制台中输出,并不能获取我们所需的生成器。而网上的许多教程也只是伪流式输出,即先完全生成结束,再进行流式输出。

以下是我为大家提供的真正的流式输出示例代码:

这里是2023/12/08 最新补充的方法,简单快捷,好理解,强烈推荐。

(已封装为 fastapi 接口,可供前端调用,前端如何使用,则需要前端工程师了解一下 流式输出 SSE的知识,有相关的前端库)

大语言模型的部署(glm3、qwen等)可以使用我的开源项目:https://github.com/shell-nlp/gpt_server

最新的流式输出方式(LCEL语法的特性) 推荐指数 : ※ ※ ※ ※ ※ ※※※※※:

import json
from dotenv import load_dotenv
from langchain_community.chat_models.openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
load_dotenv()
llm = ChatOpenAI(model="chatglm3", streaming=True, max_tokens=2048)
prompt = ChatPromptTemplate.from_messages(
    [("system", "你是一个专业的AI助手。"), ("human", "{query}")]
)
llm_chain = prompt | llm
@app.post("/chat_stream")
def chat_stream(query: str = "你是谁"):
    ret = llm_chain.stream({"query": query})
    def predict():
        text = ""
        for _token in ret:
            token = _token.content
            js_data = {"code": "200", "msg": "ok", "data": token}
            yield f"data: {json.dumps(js_data,ensure_ascii=False)}\n\n"
            text += token
        print(text)
    generate = predict()
    return StreamingResponse(generate, media_type="text/event-stream")
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app=app, host="0.0.0.0", port=10088)

方法一 推荐指数 : ※ ※ ※ ※ ※ :

本方法是开辟新的线程的方法(当然也可以是新的进程的方式) ,然后结合 langchain的callback方法

为什么推荐使用 线程的方法,而不是 方法二中的异步操作,因为在实际使用过程中,很多外部的方法是不支持异步操作的,要想让程序run起来,必须把一些方法(比如 langchain中的 某些检索器,官方代码只帮你写了 同步的方法,而没有实现异步的方法) 重写为异步方法,而在 重写的过程中会遇到很多 的 问题,令人头疼,而所有的代码都是支持同步的,所以开辟新线程的方式是好的方法。

注: 此方法就是我在 使用方法二的异步方式的过程,遇到了难以解决的问题(本人小白)才找到了此种方式。

缺点: 开辟线程要比 协程 更加耗费计算机资源,因此未来的实现 尽可能还是使用 方法二,但是对于不会异步编程的人来说方法一更加简单。

但是如果对异步操作非常熟悉,那么方法二 是最能提升性能的方式。

代码实现不解释了,手疼,需要的人自然可以看得懂。

至于 ChatOpenAI 所需要的 key,需要自己想办法。

import os
from dotenv import load_dotenv
from langchain.chat_models import ChatOpenAI
from langchain.callbacks import StreamingStdOutCallbackHandler
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
from typing import Generator
import threading
import uvicorn
os.system('clear')
app = FastAPI()
load_dotenv()
class My_StreamingStdOutCallbackHandler(StreamingStdOutCallbackHandler):
    # def __init__(self):
    tokens = []
    # 记得结束后这里置true
    finish = False
    def on_llm_new_token(self, token: str, **kwargs) -> None:
        self.tokens.append(token)
    def on_llm_end(self, response, **kwargs) -> None:
        self.finish = True
    def on_llm_error(self, error: Exception, **kwargs) -> None:
        self.tokens.append(str(error))
    def generate_tokens(self) -> Generator:
        while not self.finish:  # or self.tokens:
            if self.tokens:
                token = self.tokens.pop(0)
                yield {'data': token}
            else:
                pass
                # time.sleep(0.02)  # wait for a new token
# 用于在另一个 线程中运行的方法
def f(llm, query):
    llm.predict(query)
@app.post('/qa')
def test(query='你好'):
    callback = My_StreamingStdOutCallbackHandler()
    llm = ChatOpenAI(model='chatglm3',
                     streaming=True,
                     callbacks=[callback],
                     max_tokens=1024)
    thread = threading.Thread(target=f, args=(llm, query))
    thread.start()
    return EventSourceResponse(callback.generate_tokens(), media_type="text/event-stream")
if __name__ == '__main__':
    uvicorn.run(app=app, host='0.0.0.0')

方法二: 基于 python 异步机制实现 推荐指数 : ※ ※ ※ ※

为了方便展示,我直接使用gradio写一个小webUI,因为流式输出的场景就是用于Web的展示。直接进行python输出也是可以的。

值得注意的是 方法 一定要加上 async ,这里对于小白可能看不懂, 因为这里涉及到,异步、协程等概念。

import gradio as gr
import asyncio
from langchain.chat_models import ChatOpenAI
#使用 异步的 Callback   AsyncIteratorCallbackHandler
from langchain.callbacks import AsyncIteratorCallbackHandler
async def f():
   callback = AsyncIteratorCallbackHandler()
   llm = ChatOpenAI(engine='GPT-35',streaming=True,callbacks=[callback])
   coro = llm.apredict("写一个1000字的修仙小说")  # 这里如果是 LLMChain的话 可以 换成  chain.acall()
   asyncio.create_task(coro)
   text = ""
   async for token in callback.aiter():
       text = text+token
       yield gr.TextArea.update(value=text)
with gr.Blocks() as demo:
    with gr.Column():
         摘要汇总 = gr.TextArea(value="",label="摘要总结",)
         bn = gr.Button("触发", variant="primary")
    bn.click(f,[],[摘要汇总])
demo.queue().launch(share=False, inbrowser=False, server_name="0.0.0.0", server_port=8001)

方法来之不易,如果您有所收获,请点赞,收藏,关注。

Langchain 流式输出到前端(真正解决方法,附最佳实践的完整代码)
(图片来源网络,侵删)
Langchain 流式输出到前端(真正解决方法,附最佳实践的完整代码)
(图片来源网络,侵删)
免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。

相关阅读

目录[+]

取消
微信二维码
微信二维码
支付宝二维码