Java调用SSE流式接口,并流式返回给前端实现打字输出效果

06-01 575阅读
目录
    • 1.SSE概述
      • 1.1 什么是是SSE
      • 2.2 与长链接(Long Polling)的区别
        • 长链接(Long Polling)
        • Server-Sent Events (SSE)
        • 比较
        • 总结
        • 2.通过okhttp调用SSE流式接口并流式返回给前端
            • 环境要求
            • 使用okhttp相关依赖
            • 示例
            • 3. 如果Spring Framework 低于5.0,可使用Servlet 3.0进行流式返回
            • 4. 前端调用SSE接口
              • 方式1 使用JavaScript的 EventSource API
              • 方式2 使用 fetchEventSource 插件
              • 5. 使用原生的http调用SSE流式接口

                1.SSE概述

                1.1 什么是是SSE

                Server-Sent Events (SSE)

                SSE是一种简单的事件推送技术,它允许服务器异步地向客户端发送更新,而无需客户端显式请求这些更新。这对于实时应用程序非常有用,例如股票价格更新、消息通知等。SSE基于HTTP协议,使用一个持久的HTTP连接来维持客户端和服务端之间的通信。

                2.2 与长链接(Long Polling)的区别

                Server-Sent Events (SSE) 和长链接(Long Polling)都是实现服务器向客户端推送数据的技术,但它们之间存在一些关键区别。下面我将详细解释这两种技术的不同之处:

                长链接(Long Polling)

                长链接是一种实现服务器推送数据到客户端的技术,它基于HTTP请求/响应模型。在这种模式下,客户端发起一个HTTP请求,服务器在没有数据可发送的情况下会保持连接打开,直到有数据可发送或者超时。一旦服务器有数据要发送,它就会响应客户端的请求,并关闭连接。客户端接收到数据后立即重新发起一个新的请求,从而保持与服务器的“长链接”。

                特点:

                • 客户端主动发起请求:客户端需要不断地向服务器发起请求以获取数据。
                • 服务器被动响应:服务器只在客户端请求时才发送数据。
                • 连接短暂:虽然每个连接可能会持续一段时间,但每次请求结束后连接会被关闭。
                • 实现简单:易于用现有HTTP技术实现。
                • 兼容性好:几乎所有浏览器都支持HTTP请求/响应模型。
                  Server-Sent Events (SSE)

                  Server-Sent Events 是一种更为现代的技术,用于实现服务器向客户端的单向数据推送。SSE基于HTTP协议,但使用了一个持久的HTTP连接来维持客户端和服务端之间的通信。服务器可以主动向客户端发送数据,而不需要等待客户端的请求。

                  特点:

                  • 服务器主动推送:服务器可以主动向客户端发送数据,而不需要客户端发起请求。
                  • 持久连接:客户端和服务端之间建立了一个持久的连接,直到客户端或服务器关闭该连接。
                  • 格式特定:SSE使用特定的格式来发送数据,包括data:字段和空行作为分隔符。
                  • 资源效率高:由于连接是持久的,因此减少了建立连接的开销。
                  • 实现复杂度适中:虽然比长链接稍微复杂,但现代浏览器和服务器框架提供了良好的支持。
                    比较
                    • 实时性:SSE提供更好的实时性,因为它不需要客户端不断发起请求。
                    • 性能:SSE在性能上通常优于长链接,因为它避免了重复建立连接的开销。
                    • 实现复杂度:SSE需要客户端和服务端双方的支持,而长链接可以更容易地在现有的HTTP基础设施上实现。
                    • 兼容性:SSE在现代浏览器中得到了广泛支持,但对于一些旧版浏览器可能不适用;长链接则具有更好的向后兼容性。
                      总结

                      选择哪种技术取决于你的具体需求。如果你的应用需要较低延迟的数据推送,并且可以依赖现代浏览器和服务器环境,那么SSE是一个不错的选择。如果你需要更广泛的浏览器兼容性,并且对实时性要求不是特别高,那么长链接可能更适合你。

                      2.通过okhttp调用SSE流式接口并流式返回给前端

                      环境要求
                      • Spring Framework 5.0
                      • Jdk1.8
                        使用okhttp相关依赖
                            com.squareup.okhttp3
                            okhttp
                            4.2.0
                        
                        
                            com.squareup.okhttp3
                            okhttp-sse
                            4.2.0
                        
                        
                        示例
                        @GetMapping(value = "/test1", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
                            public SseEmitter SseTest1() {
                                SseEmitter sseEmitter = new SseEmitter();
                                String prompt = "";
                                String url = "";
                                FormBody formBody = new FormBody.Builder().add("prompt", prompt).build();
                                Request request = new Request.Builder().url(url).post(formBody).build();
                                // 使用EventSourceListener处理来自服务器的SSE事件
                                EventSourceListener listener = new EventSourceListener() {
                                    @Override
                                    public void onOpen(@NotNull EventSource eventSource, @NotNull Response response) {
                                        log.info("Connection opened.");
                                    }
                                    @Override
                                    public void onClosed(@NotNull EventSource eventSource) {
                                        log.info("Connection closed.");
                                        sseEmitter.complete();
                                    }
                                    @Override
                                    public void onEvent(@NotNull EventSource eventSource, @Nullable String id, @Nullable String type, @NotNull String data) {
                                        try {
                                            JSONObject jsonObject = JSONUtil.parseObj(data);
                                            String event = jsonObject.getStr("event");
                                            if ("message".equals(event)) {
                                                sseEmitter.send(jsonObject.getStr("answer"));
                                            }
                                        } catch (Exception e) {
                                            log.error("推送数据失败", e);
                                        }
                                    }
                                    @Override
                                    public void onFailure(@NotNull EventSource eventSource, @Nullable Throwable t, @Nullable Response response) {
                                        log.error("Connection failed.", t);
                                        sseEmitter.completeWithError(t);
                                    }
                                };
                                OkHttpClient client = new OkHttpClient.Builder().connectTimeout(10, TimeUnit.SECONDS).writeTimeout(50, TimeUnit.SECONDS).readTimeout(10, TimeUnit.MINUTES).build();
                                EventSource.Factory factory = EventSources.createFactory(client);
                                factory.newEventSource(request, listener);
                                return sseEmitter;
                            }
                        

                        注意

                        • 该接口需为_Get_请求,ContentType为 text/event-stream
                        • SseEmitter 是Spring Framework 5.0引入的一个新特性,用于简化Server-Sent Events (SSE) 的实现。它提供了一种简单的方式来发送事件数据到客户端,特别适用于构建实时数据推送的应用程序。

                          3. 如果Spring Framework 低于5.0,可使用Servlet 3.0进行流式返回

                          使用_AsyncContext_:Servlet 3.0 引入了异步支持,允许Servlet在不同的线程中处理请求。你可以使用AsyncContext来启动一个异步线程,在该线程中发送SSE事件。

                          配置_async-supported_

                          使用_AsyncContext_前需配置_async-supported_

                          async-supported元素用于指定Servlet是否支持异步处理。这个配置通常是在部署描述符 web.xml 文件中进行设置的。

                          配置示例

                              
                                  MyServlet
                                  com.example.MyServlet
                                  true
                              
                              
                                  MyServlet
                                  /myServlet
                              
                          
                          

                          后端代码示例

                          Java调用SSE流式接口,并流式返回给前端实现打字输出效果
                          (图片来源网络,侵删)
                          	@GetMapping("/test3")
                              public void sseTest3(HttpServletRequest req, HttpServletResponse resp) {
                              	// text/event-stream 是一个特殊的MIME类型,用于定义Server-Sent Events (SSE)。它告诉浏览器这个响应是SSE流,浏览器应该以这种方式处理接收到的数据。
                                  resp.setContentType("text/event-stream");
                                  resp.setCharacterEncoding("UTF-8");
                                  // 这行代码设置了Cache-Control HTTP头部字段,值为no-cache。这意味着浏览器不应该缓存此响应。对于SSE来说,这是很重要的,因为我们希望实时更新数据,而不希望浏览器缓存旧的数据。
                                  resp.setHeader("Cache-Control", "no-cache");
                                  // 这行代码设置了Connection HTTP头部字段,值为keep-alive。这意味着客户端和服务器之间的TCP连接在响应完成后保持打开状态,以便后续的SSE事件可以通过同一个连接发送。这对于持续的数据流非常重要,因为它减少了建立新连接的开销。
                                  resp.setHeader("Connection", "keep-alive");
                                  try {
                                      // 创建一个AsyncContext对象并开启异步处理流程
                                      AsyncContext asyncContext = req.startAsync(req, resp);
                                      asyncContext.setTimeout(10 * 60 * 1000);
                                      PrintWriter writer = asyncContext.getResponse().getWriter();
                                      String prompt = "";
                                      String url = "";
                                      FormBody formBody = new FormBody.Builder().add("prompt", prompt).build();
                                      Request request = new Request.Builder().url(url).post(formBody).build();
                                      // 使用EventSourceListener处理来自服务器的SSE事件
                                      EventSourceListener listener = new EventSourceListener() {
                                          @Override
                                          public void onOpen(@NotNull EventSource eventSource, @NotNull Response response) {
                                              log.info("Connection opened.");
                                          }
                                          @Override
                                          public void onClosed(@NotNull EventSource eventSource) {
                                              log.info("Connection closed.");
                                              writer.write("data: __stop__
                          ");
                                              writer.flush();
                                              asyncContext.complete();
                                          }
                                          @Override
                                          public void onEvent(@NotNull EventSource eventSource, @Nullable String id, @Nullable String type, @NotNull String data) {
                                              try {
                                                  JSONObject jsonObject = JSONUtil.parseObj(data);
                                                  String event = jsonObject.getStr("event");
                                                  if ("message".equals(event)) {
                                                      String answer = jsonObject.getStr("answer");
                                                      log.info("message: {}", answer);
                                                      writer.write("data: " + answer + "
                          ");
                                                      writer.flush();
                                                  }
                                              } catch (Exception e) {
                                                  log.error("推送数据失败", e);
                                              }
                                          }
                                          @Override
                                          public void onFailure(@NotNull EventSource eventSource, @Nullable Throwable t, @Nullable Response response) {
                                              log.error("Connection failed.", t);
                                              asyncContext.complete();
                                          }
                                      };
                                      OkHttpClient client = new OkHttpClient.Builder().connectTimeout(10, TimeUnit.SECONDS).writeTimeout(50, TimeUnit.SECONDS).readTimeout(10, TimeUnit.MINUTES).build();
                                      EventSource.Factory factory = EventSources.createFactory(client);
                                      factory.newEventSource(request, listener);
                                  } catch (IOException e) {
                                      e.printStackTrace();
                                  }
                              }
                          

                          注意

                          返回数据格式:

                          Java调用SSE流式接口,并流式返回给前端实现打字输出效果
                          (图片来源网络,侵删)
                          // 以data: 开头  /n/n结束
                          "data: xxxxx /n/n"
                          

                          4. 前端调用SSE接口

                          方式1 使用JavaScript的 EventSource API
                          
                          
                              
                              SSE Example
                          
                          
                              
                              
                                  const source = new EventSource('/sse');
                                  source.onmessage = function(event) {
                                      const data = JSON.parse(event.data);
                                      // 约定一个结束标识
                                      if(data == '__stop__') {
                                      	source.close()
                                      	return
                                      }
                                      document.getElementById('events').innerHTML += `

                          ${data.message}

                          `; }; source.onerror = function(error) { console.error('Error occurred:', error); source.close(); };

                          注意

                          Java调用SSE流式接口,并流式返回给前端实现打字输出效果
                          (图片来源网络,侵删)
                          1. 后端返回需返回完整消息的对象(包括换行符),例:{“data”: “哈哈哈/n/n”},如果后端将data取出,则会导致换行符丢失!
                          2. EventSource 只支持Get请求,如果请求参数过长会导致调用失败!
                          方式2 使用 fetchEventSource 插件

                          安装插件

                          npm install --save @microsoft/fetch-event-source
                          

                          简单示例

                          // 导入依赖
                          import { fetchEventSource } from '@microsoft/fetch-event-source';
                           
                          send() {
                          	const params = {
                          		"prompt" : ""
                          	}
                          	const vm = this;
                          	const ctrlAbout = new window.AbortController();
                          	const { signal } = ctrlAbout;
                          	fetchEventSource(Url, {
                          	  method: 'POST',
                          	  headers: {
                          	    "Content-Type": 'application/json',
                          	    "Accept": 'text/event-stream',
                          	    "X-Requested-With": 'XMLHttpRequest'
                          	  },
                          	  body: JSON.stringify(params),
                          	  signal: ctrl.signal, // AbortSignal
                          	  openWhenHidden: true, // 取消visibilityChange事件
                          	  onmessage(event) {
                          	     console.info(event.data);
                          	     // 在这里操作流式数据
                          	     const message = JSON.parse(event.data)
                          	     vm.content += message.data
                          	     // 保证在打字输出时滚动条在最下方
                          	     vm.$nextTick(() => {
                          	     	const contentEl = vm.$refs.content.$el
                          	     	contentEl.scrollTop = contentEl.scrollHeight - contentEL.clientHeight
                          	     })
                          	  },
                          	  onclose(e) {
                          	     // 关闭流
                          	     // 中断流式返回
                          	     ctrl.abort()
                          	  }
                          	  onerror(error) {
                          	    // 返回流报错
                          		console.info(error);
                          		// 中断流式返回
                          		ctrl.abort()
                          		throw err // 直接抛出错误,避免反复调用
                          	  }
                          	})
                          }
                          

                          注意

                          1. 传参时需注意参数类型为json字符串

                          5. 使用原生的http调用SSE流式接口

                          示例

                          @GetMapping("/test2")
                              public void SseTest2() {
                                  String urlAddr = "";
                                  BufferedReader reader = null;
                                  try {
                                      URL url = new URL(urlAddr);
                                      // 建立链接
                                      HttpURLConnection connection = (HttpURLConnection) url.openConnection();
                                      connection.setRequestMethod("POST");
                                      connection.setRequestProperty("Accept", "text/event-stream");
                                      connection.setRequestProperty("Content-type", "application/json; charset=UTF-8");
                                      connection.setRequestProperty("Cache-Control", "no-cache");
                                      connection.setRequestProperty("Connection", "keep-alive");
                                      // 允许输入和输出
                                      connection.setDoInput(true);
                                      connection.setDoOutput(true);
                                      // 设置超时为0,表示无限制
                                      connection.setConnectTimeout(0);
                                      connection.setReadTimeout(0);
                                      // 传参
                                      String params = "prompt=哈哈哈哈";
                                      // 写入POST数据
                                      DataOutputStream out = new DataOutputStream(connection.getOutputStream());
                                      out.write(params.getBytes(StandardCharsets.UTF_8));
                                      out.flush();
                                      out.close();
                                      // 读取SSE事件
                                      reader = new BufferedReader(new InputStreamReader(connection.getInputStream(), StandardCharsets.UTF_8));
                                      StringBuilder eventBuilder = new StringBuilder();
                                      String line;
                                      while ((line = reader.readLine()) != null) {
                                          System.out.println(line);
                                      }
                                      reader.close();
                                      // 断开链接
                                      connection.disconnect();
                                  } catch (Exception e) {
                                      e.printStackTrace();
                                  } finally {
                                      IoUtil.close(reader);
                                  }
                              }
                          
免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。

相关阅读

目录[+]

取消
微信二维码
微信二维码
支付宝二维码