Server-Sent Events: 实现高效的实时Web通信
Server-Sent Events 概述
Server-Sent Events(SSE)是一种基于 HTTP 协议的服务器推送技术,它允许服务器以流的方式向客户端实时推送数据。与 WebSocket 等双向通信技术不同,SSE 专注于单向通信(从服务器到客户端),特别适合需要服务器主动推送数据的场景,如:
- 社交媒体的实时通知(评论、点赞、关注提醒)
- 股票价格、体育比分的实时更新
- 日志和事件流的实时监控
- 聊天应用中的消息提醒
SSE 与 WebSocket 的对比
特性 Server-Sent Events WebSocket 通信方向 单向(服务器到客户端) 双向 协议 标准 HTTP WebSocket 协议(基于 HTTP 握手) 实现复杂度 低,使用现有的 HTTP 基础设施 相对较高,需要特殊的服务器支持 重连机制 内置自动重连 需要手动实现 浏览器支持 所有现代浏览器(IE 需要 polyfill) 所有现代浏览器 数据格式 文本(通常是 JSON) 文本或二进制 适用场景 服务器频繁更新客户端 需要频繁双向数据交换 SSE 工作原理
SSE 的实现原理非常简单。客户端通过 JavaScript 的 EventSource 接口发起请求,服务器保持连接打开并定期发送事件。这种连接具有以下特点:
- 服务端使用 Content-Type: text/event-stream 响应头
- 消息以特定格式发送(每条消息由一个或多个字段组成,以换行符分隔)
- 客户端可以监听不同类型的事件
- 浏览器自动处理连接断开和重连
警告:当使用 HTTP/1.x 时,SSE 连接受到浏览器对每个域名最大并发连接数的限制(通常为 6),这在需要打开多个 SSE 连接的应用中可能成为瓶颈。使用 HTTP/2 可以显著提高这个限制(默认为 100),大大改善 SSE 的可扩展性。
实现 SSE 的前后端示例
下面通过一个完整的实例来展示如何在实际项目中实现 SSE。我们将创建一个简单的通知系统,前端使用原生 JavaScript,后端使用 Spring Boot。
前端实现
HTML 和 JavaScript 代码如下:
SSE 通知系统演示 body { font-family: "Arial", sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; } h1 { color: #333; } button { background-color: #4caf50; border: none; color: white; padding: 10px 20px; text-align: center; cursor: pointer; border-radius: 4px; } #list { margin-top: 20px; padding-left: 20px; } #list li { padding: 8px; margin-bottom: 5px; background-color: #f9f9f9; border-left: 3px solid #4caf50; }
SSE 实时通知演示
建立 SSE 连接接收到的通知:
这段代码展示了如何使用原生 EventSource API 创建 SSE 连接。注意以下几个关键点:
- 创建连接:通过 new EventSource(url) 创建连接
- 事件监听:
- onmessage 处理没有指定 event 类型的消息
- addEventListener 处理特定类型的自定义事件
- onopen 和 onerror 处理连接状态变化
- 自动重连:EventSource 会自动尝试重新连接,无需手动实现重连逻辑
后端实现
接下来,我们需要创建一个 Spring Boot 应用作为后端。可以通过 Spring Initializr 快速生成项目骨架:
选择需要的依赖后,点击"Generate"下载项目,解压后在 IDE 中打开,然后创建一个用于处理 SSE 连接的控制器类:
package com.example.SeeDemo; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter.SseEventBuilder; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @RestController public class NotificationController { // 使用线程安全的ConcurrentHashMap存储每个用户的SSE连接 private final Map emitters = new ConcurrentHashMap(); /** * 建立SSE连接的端点 * @param userId 用户ID,用于标识每个连接 * @return SseEmitter实例 */ @GetMapping(value = "/sse/{userId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public SseEmitter connect(@PathVariable String userId) { // 如果该用户已有连接,先移除旧连接 SseEmitter sseEmitter = emitters.get(userId); if (sseEmitter != null) { emitters.remove(userId); } // 创建新的SSE连接,超时时间设为0表示不超时 SseEmitter emitter = new SseEmitter(0L); emitters.put(userId, emitter); // 注册连接完成、超时和错误的回调处理,确保资源正确释放 emitter.onCompletion(() -> emitters.remove(userId)); emitter.onTimeout(() -> emitters.remove(userId)); emitter.onError((Throwable throwable) -> { emitters.remove(userId); }); return emitter; } /** * 向指定用户发送通知的API端点 * @param userId 目标用户ID * @param message 要发送的消息内容 * @return 操作结果 */ @PostMapping(value = "/notify/{userId}") public ResponseEntity notifyUser(@PathVariable String userId, @RequestBody String message) { SseEmitter emitter = emitters.get(userId); if (emitter != null) { try { // 创建一个名为"notification"的事件,携带消息内容 SseEventBuilder siteNotification = SseEmitter.event() .name("notification") .data("New message: " + message); // 发送事件到客户端 emitter.send(siteNotification); return ResponseEntity.ok("Notification sent"); } catch (IOException e) { // 发送失败时释放连接 emitter.completeWithError(e); emitters.remove(userId); return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR) .body("Error sending notification: " + e.getMessage()); } } return ResponseEntity.status(HttpStatus.NOT_FOUND).body("User not connected"); } }
这个控制器提供了两个主要端点:
-
/sse/{userId} - 建立 SSE 连接的端点
- 使用 MediaType.TEXT_EVENT_STREAM_VALUE 指定响应内容类型为 text/event-stream
- 返回 SseEmitter 实例,Spring 会将其保持打开以便后续发送事件
- 注册回调函数处理连接状态变化
-
/notify/{userId} - 向特定用户发送通知的端点
- 从 emitters 集合中获取用户的连接
- 创建事件并发送到客户端
- 处理发送错误和资源清理
项目构建完成后,整体结构如下:
运行和测试
启动应用
完成代码编写后,直接运行 Spring Boot 应用。默认情况下,应用将在 http://localhost:8080 启动:
测试流程
- 在浏览器中打开我们创建的前端页面,点击"建立 SSE 连接"按钮
- 使用 HTTP 客户端工具(如 Postman、curl 或其他 API 测试工具)向 /notify/{userId} 端点发送 POST 请求
- 观察前端页面上实时显示的通知
下面是使用 API 工具发送通知的示例:
通知成功发送后,前端页面立即显示该消息:
通过浏览器开发者工具的网络面板,我们可以观察到服务器推送的具体内容:
SSE 的技术细节和最佳实践
SSE 消息格式
SSE 消息由以下字段组成,每个字段占一行,以换行符结束:
event: 事件名称 id: 事件ID data: 事件数据 retry: 重连时间(毫秒)
每条完整的消息以一个空行结束。示例:
event: notification id: 1 data: {"message": "新消息", "from": "user456"}
实现要点与陷阱
在实际开发中,需要注意以下几点:
-
消息接收方式:如前文所述,没有指定 event 字段的消息会触发 onmessage 事件,而有 event 字段的消息需要通过 addEventListener(eventName, callback) 监听。这是一个常见的陷阱点:
// 接收默认消息(无event字段) source.onmessage = function(event) { ... }; // 接收特定事件(如服务端发送event: notification) source.addEventListener("notification", function(event) { ... });
-
连接管理:在后端需要妥善管理连接,防止内存泄漏:
- 当连接关闭时及时从集合中移除
- 处理异常情况,如 IO 异常
- 考虑使用超时机制
-
跨域支持:如果前后端分离部署,需要配置 CORS 支持:
@CrossOrigin(origins = "*", allowedHeaders = "*") @GetMapping(value = "/sse/{userId}", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public SseEmitter connect(@PathVariable String userId) { // ... }
-
心跳机制:为了保持连接活跃,可以定期发送心跳消息:
@Scheduled(fixedRate = 30000) public void sendHeartbeat() { emitters.forEach((userId, emitter) -> { try { emitter.send(SseEmitter.event().name("heartbeat").data("ping")); } catch (IOException e) { emitters.remove(userId); } }); }
-
扩展性考虑:对于大规模应用,可以考虑:
- 使用 Redis 发布/订阅模式实现集群环境下的 SSE
- 配置负载均衡器支持长连接
- 设置适当的超时时间和最大连接数