React编程模型:Project Reactor深度解析
文章目录
- 3.3 Project Reactor(Spring Reactor)
- 3.3.1 Mono(0-1个数据流)
- 核心特性
- 创建Mono的方式
- 常用操作符
- 订阅Mono
- 实际应用场景
- 3.3.2 Flux(0-N个数据流)
- 核心特性
- 创建Flux的方式
- 常用操作符
- 订阅Flux
- 实际应用场景
- 3.3.3 Schedulers(调度器)
- 核心概念
- 预定义的Scheduler类型
- 使用Scheduler
- 调度策略选择指南
- 最佳实践
- 实际应用示例
- 总结
3.3 Project Reactor(Spring Reactor)
Project Reactor是Spring生态系统中的响应式编程库,它为构建非阻塞、异步和事件驱动的应用程序提供了强大的工具集。作为Spring WebFlux的默认响应式库,Reactor实现了Reactive Streams规范,使开发者能够以声明式的方式处理异步数据流。
3.3.1 Mono(0-1个数据流)
Mono是Project Reactor中表示最多包含一个元素的异步序列的核心类型。它代表了一种可能在未来某个时间点产生单个值(或空值)的异步计算。
核心特性
- 单值或空序列:Mono要么发出一个元素,要么不发出任何元素(仅发出完成信号)
- 延迟执行:Mono的操作通常是惰性的,只有在订阅时才会执行
- 异步处理:Mono支持非阻塞的异步处理模式
- 错误处理:提供了丰富的错误处理机制
创建Mono的方式
// 1. 从值创建 Mono mono1 = Mono.just("Hello"); Mono mono2 = Mono.justOrEmpty(null); // 允许空值 // 2. 从Supplier创建 Mono mono3 = Mono.fromSupplier(() -> "Hello from Supplier"); // 3. 从Callable创建 Mono mono4 = Mono.fromCallable(() -> "Hello from Callable"); // 4. 从Future创建 CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello from Future"); Mono mono5 = Mono.fromFuture(future); // 5. 空Mono Mono mono6 = Mono.empty(); // 6. 错误Mono Mono mono7 = Mono.error(new RuntimeException("Something went wrong"));
常用操作符
-
转换操作符:
- map:同步转换值
- flatMap:异步转换值(返回另一个Mono)
- flatMapMany:将Mono转换为Flux
Mono original = Mono.just("Hello"); Mono mapped = original.map(String::length); Mono flatMapped = original.flatMap(s -> Mono.just(s + " World")); Flux flatMapMany = original.flatMapMany(s -> Flux.just(s.split("")));
-
过滤操作符:
- filter:基于条件过滤
- defaultIfEmpty:如果为空提供默认值
Mono filtered = Mono.just("Hello") .filter(s -> s.length() > 3); Mono withDefault = Mono.empty() .defaultIfEmpty("Default Value");
-
错误处理操作符:
- onErrorReturn:出错时返回默认值
- onErrorResume:出错时切换到备用的Mono
- onErrorMap:转换错误类型
- retry:重试操作
Mono withErrorHandling = Mono.error(new RuntimeException()) .onErrorReturn("Fallback Value"); Mono withResume = Mono.error(new RuntimeException()) .onErrorResume(e -> Mono.just("Recovered from " + e.getMessage()));
-
组合操作符:
- zipWith:与另一个Mono组合
- then:忽略当前结果,执行另一个操作
Mono first = Mono.just("Hello"); Mono second = Mono.just("World"); Mono zipped = first.zipWith(second, (f, s) -> f + " " + s); Mono thenOperation = Mono.just("Hello").then(Mono.empty());
-
时间相关操作符:
- delayElement:延迟发出元素
- timeout:设置超时时间
Mono delayed = Mono.just("Hello") .delayElement(Duration.ofSeconds(1)); Mono withTimeout = Mono.just("Hello") .delayElement(Duration.ofSeconds(2)) .timeout(Duration.ofSeconds(1));
订阅Mono
Mono是惰性的,只有订阅时才会执行:
// 1. 简单订阅 mono.subscribe(); // 2. 带消费者的订阅 mono.subscribe( value -> System.out.println("Received: " + value), // onNext error -> System.err.println("Error: " + error), // onError () -> System.out.println("Completed") // onComplete ); // 3. 带Subscription控制的订阅 mono.subscribe(new BaseSubscriber() { @Override protected void hookOnSubscribe(Subscription subscription) { System.out.println("Subscribed"); request(1); // 请求第一个元素 } @Override protected void hookOnNext(String value) { System.out.println("Received: " + value); request(1); // 请求下一个元素(对于Mono通常不需要) } });
实际应用场景
- HTTP请求响应:在WebFlux中,控制器方法可以返回Mono来表示异步的单个响应
- 数据库操作:Spring Data Reactive Repositories返回Mono用于单个实体的CRUD操作
- 缓存查询:从缓存中获取单个值
- 异步任务:执行返回单个结果的异步计算
@RestController public class UserController { private final UserRepository userRepository; public UserController(UserRepository userRepository) { this.userRepository = userRepository; } @GetMapping("/users/{id}") public Mono getUser(@PathVariable String id) { return userRepository.findById(id) .switchIfEmpty(Mono.error(new UserNotFoundException(id))); } }
3.3.2 Flux(0-N个数据流)
Flux是Project Reactor中表示0到N个元素的异步序列的核心类型。它代表了一个可能在未来某个时间点产生多个值的异步数据流。
核心特性
- 多值序列:Flux可以发出0到N个元素,然后可选地以一个完成信号或错误信号结束
- 背压支持:允许消费者控制生产者的速度,防止内存溢出
- 延迟执行:与Mono类似,Flux的操作也是惰性的
- 丰富的操作符:提供了大量操作符来处理、转换和组合数据流
创建Flux的方式
// 1. 从多个值创建 Flux flux1 = Flux.just("A", "B", "C"); // 2. 从数组或集合创建 Flux flux2 = Flux.fromArray(new String[]{"A", "B", "C"}); Flux flux3 = Flux.fromIterable(Arrays.asList("A", "B", "C")); // 3. 从范围创建 Flux flux4 = Flux.range(1, 5); // 1, 2, 3, 4, 5 // 4. 从流生成器创建 Flux flux5 = Flux.generate( () -> 0L, // 初始状态 (state, sink) -> { sink.next(state); if (state == 10) sink.complete(); return state + 1; } ); // 5. 从间隔创建(周期性发出值) Flux flux6 = Flux.interval(Duration.ofMillis(100)); // 0, 1, 2... 每100ms // 6. 空Flux Flux flux7 = Flux.empty(); // 7. 错误Flux Flux flux8 = Flux.error(new RuntimeException("Flux error"));
常用操作符
-
转换操作符:
- map:同步转换每个元素
- flatMap:异步转换每个元素(返回Flux)
- concatMap:保持顺序的flatMap
- flatMapSequential:合并结果但保持源顺序
Flux original = Flux.just("apple", "banana", "cherry"); Flux mapped = original.map(String::length); Flux flatMapped = original.flatMap(s -> Flux.fromArray(s.split("")));
-
过滤操作符:
- filter:基于条件过滤元素
- take:取前N个元素
- skip:跳过前N个元素
- distinct:去重
Flux numbers = Flux.range(1, 10); Flux evens = numbers.filter(n -> n % 2 == 0); Flux first5 = numbers.take(5);
-
组合操作符:
- mergeWith:合并多个Flux,不保证顺序
- concatWith:顺序连接Flux
- zipWith:与另一个Flux组合
- combineLatest:当任一Flux发出值时组合最新值
Flux fluxA = Flux.just("A", "B", "C"); Flux fluxB = Flux.just("1", "2", "3"); Flux merged = fluxA.mergeWith(fluxB); // A, 1, B, 2, C, 3 Flux concatenated = fluxA.concatWith(fluxB); // A, B, C, 1, 2, 3 Flux zipped = fluxA.zipWith(fluxB, (a, b) -> a + b); // A1, B2, C3
-
错误处理操作符:
- onErrorReturn:出错时返回默认值
- onErrorResume:出错时切换到备用的Flux
- onErrorContinue:跳过错误元素并继续处理
- retry:重试操作
Flux withErrorHandling = Flux.just(1, 2, 0, 4) .map(i -> 10 / i) .onErrorResume(e -> Flux.just(-1)); Flux withContinue = Flux.just(1, 2, 0, 4) .map(i -> { try { return 10 / i; } catch (Exception e) { throw Exceptions.propagate(e); } }) .onErrorContinue((e, o) -> System.out.println("Error for " + o + ": " + e.getMessage()));
-
背压操作符:
- onBackpressureBuffer:缓冲元素
- onBackpressureDrop:丢弃无法处理的元素
- onBackpressureLatest:只保留最新元素
Flux.range(1, 1000) .onBackpressureBuffer(50) // 缓冲区大小为50 .subscribe(new BaseSubscriber() { @Override protected void hookOnSubscribe(Subscription subscription) { request(10); // 初始请求10个元素 } @Override protected void hookOnNext(Integer value) { System.out.println("Received: " + value); // 根据需要请求更多元素 if (value % 10 == 0) { request(10); } } });
-
时间相关操作符:
- delayElements:延迟发出每个元素
- timeout:设置超时时间
- sample:定期采样
Flux.range(1, 5) .delayElements(Duration.ofMillis(500)) .subscribe(System.out::println);
订阅Flux
与Mono类似,Flux也是惰性的,需要订阅才能执行:
// 1. 简单订阅 flux.subscribe(); // 2. 带消费者的订阅 flux.subscribe( value -> System.out.println("Received: " + value), // onNext error -> System.err.println("Error: " + error), // onError () -> System.out.println("Completed"), // onComplete subscription -> subscription.request(3) // 初始请求3个元素 ); // 3. 带Subscription控制的订阅 flux.subscribe(new BaseSubscriber() { @Override protected void hookOnSubscribe(Subscription subscription) { System.out.println("Subscribed"); request(1); // 请求第一个元素 } @Override protected void hookOnNext(String value) { System.out.println("Received: " + value); // 处理完当前元素后请求下一个 request(1); } });
实际应用场景
- 流式API:提供持续更新的数据流(如股票价格、传感器数据)
- 批量数据处理:处理大量数据时避免内存溢出
- 事件处理:处理来自消息队列或事件总线的事件流
- 文件处理:逐行读取大文件
@RestController public class EventController { private final EventPublisher eventPublisher; public EventController(EventPublisher eventPublisher) { this.eventPublisher = eventPublisher; } @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux getEvents() { return eventPublisher .publish() .map(event -> { // 转换或丰富事件数据 return event; }) .onBackpressureBuffer(100) .log(); } }
3.3.3 Schedulers(调度器)
在响应式编程中,Schedulers负责管理线程和执行上下文。Project Reactor提供了多种预定义的Scheduler实现,用于控制异步操作的执行位置。
核心概念
- 线程模型:响应式编程通常使用少量线程处理大量并发
- 非阻塞I/O:避免线程阻塞,提高资源利用率
- 执行上下文:决定操作在哪个线程或线程池上执行
预定义的Scheduler类型
-
Schedulers.immediate():
- 在当前线程立即执行
- 通常用于测试或不需要异步的场景
-
Schedulers.single():
- 使用单个可重用的线程
- 适用于低延迟的轻量级任务
- 所有调用者共享同一个线程
-
Schedulers.elastic()(已弃用,推荐使用boundedElastic):
- 无限扩展的线程池
- 适合阻塞I/O操作
- 每个新任务可能创建新线程
-
Schedulers.boundedElastic():
- 有界的弹性线程池
- 默认最多创建10 * CPU核心数的线程
- 适合阻塞I/O操作
- 比elastic更可控,避免资源耗尽
-
Schedulers.parallel():
- 固定大小的并行线程池
- 默认大小等于CPU核心数
- 适合计算密集型任务
-
Schedulers.fromExecutorService():
- 从现有的ExecutorService创建
- 允许与现有线程池集成
使用Scheduler
-
发布到Scheduler:
- publishOn:影响后续操作符的执行上下文
- subscribeOn:影响整个链的订阅上下文(通常用在链的开头)
Flux.range(1, 5) .map(i -> { System.out.println("Map on " + Thread.currentThread().getName()); return i * 2; }) .publishOn(Schedulers.boundedElastic()) .filter(i -> { System.out.println("Filter on " + Thread.currentThread().getName()); return i % 3 == 0; }) .subscribeOn(Schedulers.parallel()) .subscribe();
-
指定操作符的Scheduler:
许多操作符接受可选的Scheduler参数
Flux.interval(Duration.ofMillis(100), Schedulers.single()) .subscribe(System.out::println);
调度策略选择指南
-
计算密集型操作:
- 使用parallel()调度器
- 避免线程切换开销
- 线程数通常等于CPU核心数
-
阻塞I/O操作:
- 使用boundedElastic()调度器
- 防止阻塞事件循环线程
- 允许更多的线程处理并发I/O
-
非阻塞异步操作:
- 通常不需要显式调度
- 由底层异步库管理线程
-
UI交互:
- 使用专用的UI线程调度器
- 通过Schedulers.fromExecutor()与UI框架集成
最佳实践
-
避免在响应式链中阻塞:
- 如果必须阻塞,使用publishOn切换到适当的调度器
-
合理选择调度器:
- 计算密集型:parallel
- I/O密集型:boundedElastic
- 事件循环:通常不需要显式调度
-
注意上下文传播:
- Reactor Context可以携带跨线程的信息
- 使用contextWrite和deferContextual管理上下文
-
资源清理:
- 对于自定义调度器或长时间运行的应用程序,注意关闭调度器
Scheduler scheduler = Schedulers.newBoundedElastic(10, 100, "custom"); try { Flux.just(1, 2, 3) .publishOn(scheduler) .subscribe(System.out::println); } finally { scheduler.dispose(); }
- 对于自定义调度器或长时间运行的应用程序,注意关闭调度器
实际应用示例
@RestController public class DataController { private final DataService dataService; public DataController(DataService dataService) { this.dataService = dataService; } @GetMapping("/data/{id}") public Mono getData(@PathVariable String id) { // 使用boundedElastic处理潜在的阻塞操作 return Mono.fromCallable(() -> dataService.blockingGetData(id)) .subscribeOn(Schedulers.boundedElastic()) .timeout(Duration.ofSeconds(2)) .onErrorResume(e -> Mono.just(Data.fallbackData())); } @GetMapping("/stream") public Flux streamData() { // 使用parallel处理计算密集型流 return dataService.dataStream() .publishOn(Schedulers.parallel()) .map(data -> { // 计算密集型转换 return data.withProcessedPayload(processPayload(data.getPayload())); }) .log(); } private String processPayload(String payload) { // 模拟计算密集型处理 return payload.toUpperCase(); } }
总结
Project Reactor为Java响应式编程提供了强大的工具集:
- Mono:处理0-1个结果的异步操作,适合单个值或空结果的场景
- Flux:处理0-N个结果的异步序列,适合流式数据处理
- Schedulers:管理执行上下文,优化线程使用和资源分配
通过合理组合这些组件,开发者可以构建高效、可扩展的响应式应用程序,充分利用现代硬件资源,同时保持代码的清晰和可维护性。响应式编程模型特别适合高并发、低延迟的应用场景,如微服务架构、实时数据处理和事件驱动系统。