实时响应的秘密:用Spring Boot轻松实现流式AI输出
1、背景
随着AI的快速发展,越来越多的AI应用诞生了,但是AI也有响应慢的问题,一般不能够即时响应,为了优化用户体验,现在大部分AI应用都是实现了打字机的效果,那么这种效果是如何实现的呢?今天我们先看一下后端的实现逻辑。
代码流程是后端发出请求,请求智能体或AI模型暴露的流式接口,然后返回一个流式接口。
为什么不直接前端请求AI接口,因为有的AI接口在前端直接请求,可能会出现跨域问题。因为AI接口返回的响应中没有包含跨域的Access-Control-Allow-Origin。
2、实现步骤
1、引入依赖
org.springframework.boot spring-boot-starter-webflux
2、使用webClient发起对AI接口请求
代码中的URL,请求头、请求体或者请求方法都可以按照对应的AI接口文档进行替换。
WebClient webClient = WebClient.create(); Flux resultFlux = webClient.post() .uri(URL) //请求url .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) .header(HttpHeaders.AUTHORIZATION, "Bearer "+ API_KEY) // 添加认证头部 .bodyValue(requestBody)//请求体 .retrieve() .bodyToFlux(String.class); //返回流式结果
3、启动类需要添加@EnableAsync注解
4、如果工程中有过滤器,需要进行配置
我的工程启动后报错
Async support must be enabled on a servlet and for all filters involved in async request processing. This is done in Java code using the Servlet API or by adding "true" to servlet and filter declarations in web.xml.
大概意思是异步支持必须在servlet和所有的过滤器中被标注成是enabled
Servlet和Filter声明:如果您使用的是基于XML的配置,可以在web.xml文件中的servlet和filter声明中添加true元素来启用异步支持
myServlet com.example.MyAsyncServlet true myFilter com.example.MyAsyncFilter true
如果您使用的是基于注解的配置或Java配置类,可以通过实现javax.servlet.Servlet接口并覆盖isAsyncSupported()方法返回true,或者通过@WebServlet和@WebFilter注解的asyncSupported属性来设置。我使用的是这种方式
@WebServlet(urlPatterns = "/async", asyncSupported = true) public class MyAsyncServlet extends HttpServlet { // ... } @WebFilter(urlPatterns = "/*", asyncSupported = true) public class MyAsyncFilter implements Filter { // ... }
5、postman发送请求后结果
controller类接口
@RequestMapping(method = RequestMethod.POST, value = "/getAIResult",produces = MediaType.TEXT_EVENT_STREAM_VALUE) Flux getAIResult(@RequestBody String content){ // 构建请求体 HashMap requestBody = new HashMap(); requestBody.put("model", "6bbdf08d55244bd9be24052ded2a58ef"); requestBody.put("context",0); requestBody.put("stream", true); List messages = new ArrayList(); HashMap message = new HashMap(); message.put("role", "user"); message.put("content", content); messages.add(message); requestBody.put("messages", messages); WebClient webClient = WebClient.create(); Flux resultFlux = webClient.post() .uri(URL) .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) .header(HttpHeaders.AUTHORIZATION, "Bearer "+ API_KEY) // 添加认证头部 .bodyValue(requestBody) .retrieve() .bodyToFlux(String.class); // 输出响应结果 // // 订阅响应以触发实际的 HTTP 请求 // resultFlux.subscribe( // response -> System.out.println("Response received: " + response), // error -> System.err.println("Error occurred: " + error.getMessage()) // ); return resultFlux; }
3、使用技术介绍
WebFlux
WebFlux模块是Spring 5引入的一部分,旨在提供一种新的方式来构建响应式的Web应用程序。它允许你以异步和非阻塞的方式处理HTTP请求,这在处理高并发场景时可以显著提高性能。
WebFlux的特点
- 非阻塞I/O:与传统的Servlet API不同,WebFlux使用的是非阻塞I/O模型,这意味着它可以更有效地利用线程资源。
- 反应式编程:WebFlux内置了对反应式编程的支持,主要通过Reactor库实现,使得编写和处理异步代码更加容易。
- 函数式路由:除了注解驱动的控制器,WebFlux还提供了函数式路由API,让你能够以声明性的方式定义路由规则。
以上来自AI内容生成,看完一头雾水,下方给出介绍
非阻塞I/O
含义:非阻塞I/O(Non-blocking I/O)是一种编程模型,它允许应用程序在等待某些操作完成时不会被阻塞。与传统的Servlet API(如Spring MVC)相比,WebFlux采用了基于事件和回调的非阻塞I/O模型。
-
传统Servlet API (阻塞I/O):在传统的Servlet环境中,每个HTTP请求都会分配一个线程来处理。如果这个处理过程包含了一个长时间运行的操作(例如数据库查询或网络调用),那么该线程会被阻塞直到操作完成。这意味着线程不能用来处理其他请求,从而降低了服务器的效率。
-
WebFlux (非阻塞I/O):WebFlux使用了Netty这样的异步网络框架,它们可以在不阻塞线程的情况下执行I/O操作。当一个请求涉及到耗时的任务时,它不会阻塞当前的线程;相反,任务完成后会触发相应的回调函数继续处理。这种方式使得单个线程可以处理多个并发请求,极大地提高了资源利用率和服务的吞吐量。
反应式编程
含义:反应式编程(Reactive Programming)是一种面向数据流和变化传播的编程范式。它强调的是通过声明式的代码来描述数据流的变化,并能够对这些变化做出响应。WebFlux内置了对反应式编程的支持,主要通过Project Reactor库实现,这是Spring 5引入的一个核心特性。
-
Reactor库:Reactor提供了两个核心类型——Mono和Flux,分别表示0到1个元素的异步序列和0到N个元素的异步序列。开发者可以使用这些类型来构建复杂的异步逻辑,而不需要显式地管理线程或同步问题。
-
好处:
- 简化异步编程:通过组合操作符(如map, flatMap, filter等),你可以轻松地创建复杂的异步工作流,同时保持代码的简洁性和可读性。
- 错误处理:Reactor还提供了一套强大的错误处理机制,比如onErrorResume、retry等,使得处理异常情况更加直观。
- 背压支持:对于生产者-消费者模式中的流量控制,Reactor实现了背压(Backpressure),确保系统不会因为过载而崩溃。
函数式路由
含义:函数式路由是WebFlux提供的另一种定义HTTP端点的方式,除了传统的注解驱动控制器之外。它允许你以一种声明性的、函数式的方式来配置路由规则,这在某些情况下可能比注解更灵活、更具表达力。
-
RouterFunction和HandlerFunction:在函数式路由中,RouterFunction用于定义路由匹配逻辑,而HandlerFunction则负责处理实际的请求。两者结合在一起,可以非常清晰地表达出“如果路径匹配,则执行某个处理器”的意图。
-
-