SpringBoot中使用Flux实现流式返回的技术总结

06-01 1614阅读

背景

近期在使用deepseek/openai等网页和APP时,发现大模型在思考和回复时,内容是一点点的显示出来的,于是好奇他们的实现方式。经调研和使用开发者工具抓取请求,每次聊天会向后台发送一个http请求,而这个接口跟普通接口一次性返回不一样,而是以流式的返回。

流式返回的核心概念与优势

在传统的 Web 开发中,接口通常以「一次性返回完整响应体」的形式工作。而 ** 流式返回(Streaming Response)** 指的是服务器在处理请求时,将响应结果分段逐步返回给客户端,而非等待所有数据生成完成后再一次性返回。这种模式具有以下核心优势:

1. 提升用户体验

  • 对于大数据量响应(如文件下载、长文本流)或实时交互场景(如聊天机器人对话),客户端可边接收数据边处理,减少「空白等待时间」,提升实时性感知。

    2. 降低内存消耗

    • 服务器无需在内存中缓存完整响应数据,尤其适合处理高并发、大流量场景,降低 OOM(内存溢出)风险。

      3. 支持长连接与实时通信

      • 天然适配实时数据推送场景(如日志监控、股票行情更新),可与 SSE(Server-Sent Events)、WebSocket 等技术结合使用。

        大模型的接口,尤其是那些带推理的模型接口返回,数据就是一点点的返回的,因此如果要提升用户体验,最好的方式就是采用流式接口返回。

        在SpringBoot中基于Flux的流式接口实现

        1. 依赖配置

        在 pom.xml 中引入 WebFlux 依赖:

            org.springframework.boot
            spring-boot-starter-webflux
        

        2. 流式接口实现(以模拟大模型对话为例)

        import org.springframework.web.bind.annotation.*;
        import reactor.core.publisher.Flux;
        @RestController
        @RequestMapping("/api/chat")
        public class ChatController {
            @PostMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
            public Flux streamChat(@RequestBody ChatRequest request) {
                // 调用大模型 API 并返回 Flux 流
                return callLargeModelApi(request.message())
                    .doOnNext(chunk -> log.info("发送响应片段: {}", chunk))
                    .doOnError(error -> log.error("流式处理出错", error));
            }
            // 模拟调用大模型 API,返回 Flux 流
            private Flux callLargeModelApi(String prompt) {
                // 实际项目中需替换为真实的大模型调用逻辑
                return Flux.just(
                    "您好!", 
                    "我是您的AI助手。", 
                    "您的问题是:" + prompt, 
                    "我将为您提供详细解答..."
                )
                .delayElements(Duration.ofMillis(300)); // 模拟实时响应延迟
            }
        }

        3. 关键配置说明

        • 响应格式:设置 produces = MediaType.TEXT_EVENT_STREAM_VALUE,符合 SSE 协议。
        • 异步处理:Flux 流中的元素会被自动转换为 SSE 格式(data: \n\n)并推送至客户端。
        • 背压控制:通过 onBackpressureBuffer() 或 onBackpressureDrop() 处理客户端消费速率问题。

          浏览器端 JS 调用方案

          1. 使用 EventSource(简化版)

          function connectWithEventSource() {
              const source = new EventSource("/api/chat");
              const chatWindow = document.getElementById("chat-window");
              source.onmessage = (event) => {
                  chatWindow.innerHTML += `${event.data}`;
                  chatWindow.scrollTop = chatWindow.scrollHeight;
              };
              source.onerror = (error) => {
                  console.error("EventSource failed:", error);
                  source.close();
              };
          }

          2. 使用 Fetch API(支持 POST 请求)

          async function connectWithFetch() {
              const response = await fetch("/api/chat", {
                  method: "POST",
                  headers: { "Content-Type": "application/json" },
                  body: JSON.stringify({ message: "你好" })
              });
              const reader = response.body.getReader();
              const decoder = new TextDecoder();
              const chatWindow = document.getElementById("chat-window");
              while (true) {
                  const { done, value } = await reader.read();
                  if (done) break;
                  
                  // 解码并处理数据块
                  const chunk = decoder.decode(value, { stream: true });
                  const messages = chunk.split('\n\n')
                      .filter(line => line.trim().startsWith('data:'))
                      .map(line => line.replace('data:', '').trim());
                  
                  messages.forEach(msg => {
                      chatWindow.innerHTML += `${msg}`;
                      chatWindow.scrollTop = chatWindow.scrollHeight;
                  });
              }
          }

          调用Deepseek模型实战

          写一个接口,通过Spring AI Alibaba ,调用阿里云百炼的deepseek模型,返回Flux流数据

          基础使用详见:快速开始-阿里云Spring AI Alibaba官网官网

          这里只给出转Flux的示例,即通过client/model的stream方法来转,并通过map方法将每个流转成前端需要的数据(我这里是区分了thinking思考和content的数据,便于前端显示):                                

              public Flux processRealMessage(ChatMessageRequest request) throws ChatBaseException {
                  // 获取会话的历史消息
                  List messages = new ArrayList();
                  List chatMessages = this.chatMessageService.getConversationMessage(request.getSessionId(), 1, 20);
                  for (ChatMessage chatMessage : chatMessages) {
                      if (Constants.MESSAGE_ROLE_USER.equals(chatMessage.getRole())) {
                          messages.add(new UserMessage(chatMessage.getContent()));
                      } else {
                          messages.add(new AssistantMessage(chatMessage.getContent()));
                      }
                  }
                  // 记录用户的输入
                  ChatMessage message = new ChatMessage();
                  message.setContent(request.getContent());
                  message.setType("text");
                  message.setRole(Constants.MESSAGE_ROLE_USER);
                  chatMessageService.insertMessage(request.getSessionId(), message);
                  StringBuilder sb = new StringBuilder();
                  // 模拟流式响应
                  return this.chatClient.prompt().messages(messages).user(request.getContent()).stream().chatResponse().doOnNext(response -> {
                              String content = response.getResult().getOutput().getText();
                              if (StringUtils.isNotBlank(content)) {
                                  // 记录完整的响应对象
                                  sb.append(content);
                              }
                          })
                          // 在流结束时记录完整的会话内容
                          .doOnComplete(() -> {
                              // 这里记录消息到数据库
                              String content = sb.toString();
                              LOGGER.info("收到模型原始响应结束: " + content);
                              ChatMessage assistantMessage = new ChatMessage();
                              assistantMessage.setContent(content);
                              assistantMessage.setType("text");
                              assistantMessage.setRole(Constants.MESSAGE_ROLE_ASSISTENT);
                              try {
                                  chatMessageService.insertMessage(request.getSessionId(), assistantMessage);
                              } catch (ChatBaseException e) {
                                  LOGGER.error("processMessage2 doOnComplete insertMessage error");
                              }
                          }).map(response -> {
                              String content = response.getResult().getOutput().getText();
                              String thinking = response.getResults().get(0).getOutput().getMetadata().get("reasoningContent").toString();
                              if (StringUtils.isNotEmpty(content)) {
                                  LOGGER.info("content" + content);
                                  return new ChatMessageResponse("content", content);
                              } else if (StringUtils.isNotEmpty(thinking)) {
                                  LOGGER.info("thinking" + thinking);
                                  return new ChatMessageResponse("thinking", thinking);
                              } else {
                                  LOGGER.info("done~~~~");
                                  return new ChatMessageResponse("done", "");
                              }
                          });
              }

          完整代码:MaDiXin/madichat 

          SpringBoot中使用Flux实现流式返回的技术总结
          (图片来源网络,侵删)
          SpringBoot中使用Flux实现流式返回的技术总结
          (图片来源网络,侵删)
          SpringBoot中使用Flux实现流式返回的技术总结
          (图片来源网络,侵删)
免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。

目录[+]

取消
微信二维码
微信二维码
支付宝二维码