React编程模型:Project Reactor深度解析

06-01 1293阅读

React编程模型:Project Reactor深度解析

文章目录

    • 3.3 Project Reactor(Spring Reactor)
      • 3.3.1 Mono(0-1个数据流)
        • 核心特性
        • 创建Mono的方式
        • 常用操作符
        • 订阅Mono
        • 实际应用场景
        • 3.3.2 Flux(0-N个数据流)
          • 核心特性
          • 创建Flux的方式
          • 常用操作符
          • 订阅Flux
          • 实际应用场景
          • 3.3.3 Schedulers(调度器)
            • 核心概念
            • 预定义的Scheduler类型
            • 使用Scheduler
            • 调度策略选择指南
            • 最佳实践
            • 实际应用示例
            • 总结

              React编程模型:Project Reactor深度解析

              3.3 Project Reactor(Spring Reactor)

              React编程模型:Project Reactor深度解析

              Project Reactor是Spring生态系统中的响应式编程库,它为构建非阻塞、异步和事件驱动的应用程序提供了强大的工具集。作为Spring WebFlux的默认响应式库,Reactor实现了Reactive Streams规范,使开发者能够以声明式的方式处理异步数据流。

              3.3.1 Mono(0-1个数据流)

              Mono是Project Reactor中表示最多包含一个元素的异步序列的核心类型。它代表了一种可能在未来某个时间点产生单个值(或空值)的异步计算。

              核心特性
              1. 单值或空序列:Mono要么发出一个元素,要么不发出任何元素(仅发出完成信号)
              2. 延迟执行:Mono的操作通常是惰性的,只有在订阅时才会执行
              3. 异步处理:Mono支持非阻塞的异步处理模式
              4. 错误处理:提供了丰富的错误处理机制
              创建Mono的方式

              React编程模型:Project Reactor深度解析

              // 1. 从值创建
              Mono mono1 = Mono.just("Hello");
              Mono mono2 = Mono.justOrEmpty(null); // 允许空值
              // 2. 从Supplier创建
              Mono mono3 = Mono.fromSupplier(() -> "Hello from Supplier");
              // 3. 从Callable创建
              Mono mono4 = Mono.fromCallable(() -> "Hello from Callable");
              // 4. 从Future创建
              CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello from Future");
              Mono mono5 = Mono.fromFuture(future);
              // 5. 空Mono
              Mono mono6 = Mono.empty();
              // 6. 错误Mono
              Mono mono7 = Mono.error(new RuntimeException("Something went wrong"));
              
              常用操作符

              React编程模型:Project Reactor深度解析

              1. 转换操作符:

                • map:同步转换值
                • flatMap:异步转换值(返回另一个Mono)
                • flatMapMany:将Mono转换为Flux
                  Mono original = Mono.just("Hello");
                  Mono mapped = original.map(String::length);
                  Mono flatMapped = original.flatMap(s -> Mono.just(s + " World"));
                  Flux flatMapMany = original.flatMapMany(s -> Flux.just(s.split("")));
                  
                • 过滤操作符:

                  • filter:基于条件过滤
                  • defaultIfEmpty:如果为空提供默认值
                    Mono filtered = Mono.just("Hello")
                        .filter(s -> s.length() > 3);
                    Mono withDefault = Mono.empty()
                        .defaultIfEmpty("Default Value");
                    
                  • 错误处理操作符:

                    • onErrorReturn:出错时返回默认值
                    • onErrorResume:出错时切换到备用的Mono
                    • onErrorMap:转换错误类型
                    • retry:重试操作
                      Mono withErrorHandling = Mono.error(new RuntimeException())
                          .onErrorReturn("Fallback Value");
                      Mono withResume = Mono.error(new RuntimeException())
                          .onErrorResume(e -> Mono.just("Recovered from " + e.getMessage()));
                      
                    • 组合操作符:

                      • zipWith:与另一个Mono组合
                      • then:忽略当前结果,执行另一个操作
                        Mono first = Mono.just("Hello");
                        Mono second = Mono.just("World");
                        Mono zipped = first.zipWith(second, (f, s) -> f + " " + s);
                        Mono thenOperation = Mono.just("Hello").then(Mono.empty());
                        
                      • 时间相关操作符:

                        • delayElement:延迟发出元素
                        • timeout:设置超时时间
                          Mono delayed = Mono.just("Hello")
                              .delayElement(Duration.ofSeconds(1));
                          Mono withTimeout = Mono.just("Hello")
                              .delayElement(Duration.ofSeconds(2))
                              .timeout(Duration.ofSeconds(1));
                          
              订阅Mono

              React编程模型:Project Reactor深度解析

              Mono是惰性的,只有订阅时才会执行:

              // 1. 简单订阅
              mono.subscribe();
              // 2. 带消费者的订阅
              mono.subscribe(
                  value -> System.out.println("Received: " + value), // onNext
                  error -> System.err.println("Error: " + error),   // onError
                  () -> System.out.println("Completed")             // onComplete
              );
              // 3. 带Subscription控制的订阅
              mono.subscribe(new BaseSubscriber() {
                  @Override
                  protected void hookOnSubscribe(Subscription subscription) {
                      System.out.println("Subscribed");
                      request(1); // 请求第一个元素
                  }
                  
                  @Override
                  protected void hookOnNext(String value) {
                      System.out.println("Received: " + value);
                      request(1); // 请求下一个元素(对于Mono通常不需要)
                  }
              });
              
              实际应用场景

              React编程模型:Project Reactor深度解析

              1. HTTP请求响应:在WebFlux中,控制器方法可以返回Mono来表示异步的单个响应
              2. 数据库操作:Spring Data Reactive Repositories返回Mono用于单个实体的CRUD操作
              3. 缓存查询:从缓存中获取单个值
              4. 异步任务:执行返回单个结果的异步计算
              @RestController
              public class UserController {
                  private final UserRepository userRepository;
                  
                  public UserController(UserRepository userRepository) {
                      this.userRepository = userRepository;
                  }
                  
                  @GetMapping("/users/{id}")
                  public Mono getUser(@PathVariable String id) {
                      return userRepository.findById(id)
                          .switchIfEmpty(Mono.error(new UserNotFoundException(id)));
                  }
              }
              

              3.3.2 Flux(0-N个数据流)

              React编程模型:Project Reactor深度解析

              Flux是Project Reactor中表示0到N个元素的异步序列的核心类型。它代表了一个可能在未来某个时间点产生多个值的异步数据流。

              核心特性
              1. 多值序列:Flux可以发出0到N个元素,然后可选地以一个完成信号或错误信号结束
              2. 背压支持:允许消费者控制生产者的速度,防止内存溢出
              3. 延迟执行:与Mono类似,Flux的操作也是惰性的
              4. 丰富的操作符:提供了大量操作符来处理、转换和组合数据流
              创建Flux的方式
              // 1. 从多个值创建
              Flux flux1 = Flux.just("A", "B", "C");
              // 2. 从数组或集合创建
              Flux flux2 = Flux.fromArray(new String[]{"A", "B", "C"});
              Flux flux3 = Flux.fromIterable(Arrays.asList("A", "B", "C"));
              // 3. 从范围创建
              Flux flux4 = Flux.range(1, 5); // 1, 2, 3, 4, 5
              // 4. 从流生成器创建
              Flux flux5 = Flux.generate(
                  () -> 0L, // 初始状态
                  (state, sink) -> {
                      sink.next(state);
                      if (state == 10) sink.complete();
                      return state + 1;
                  }
              );
              // 5. 从间隔创建(周期性发出值)
              Flux flux6 = Flux.interval(Duration.ofMillis(100)); // 0, 1, 2... 每100ms
              // 6. 空Flux
              Flux flux7 = Flux.empty();
              // 7. 错误Flux
              Flux flux8 = Flux.error(new RuntimeException("Flux error"));
              
              常用操作符

              React编程模型:Project Reactor深度解析

              1. 转换操作符:

                • map:同步转换每个元素
                • flatMap:异步转换每个元素(返回Flux)
                • concatMap:保持顺序的flatMap
                • flatMapSequential:合并结果但保持源顺序
                  Flux original = Flux.just("apple", "banana", "cherry");
                  Flux mapped = original.map(String::length);
                  Flux flatMapped = original.flatMap(s -> Flux.fromArray(s.split("")));
                  
                • 过滤操作符:

                  • filter:基于条件过滤元素
                  • take:取前N个元素
                  • skip:跳过前N个元素
                  • distinct:去重
                    Flux numbers = Flux.range(1, 10);
                    Flux evens = numbers.filter(n -> n % 2 == 0);
                    Flux first5 = numbers.take(5);
                    
                  • 组合操作符:

                    • mergeWith:合并多个Flux,不保证顺序
                    • concatWith:顺序连接Flux
                    • zipWith:与另一个Flux组合
                    • combineLatest:当任一Flux发出值时组合最新值
                      Flux fluxA = Flux.just("A", "B", "C");
                      Flux fluxB = Flux.just("1", "2", "3");
                      Flux merged = fluxA.mergeWith(fluxB); // A, 1, B, 2, C, 3
                      Flux concatenated = fluxA.concatWith(fluxB); // A, B, C, 1, 2, 3
                      Flux zipped = fluxA.zipWith(fluxB, (a, b) -> a + b); // A1, B2, C3
                      
                    • 错误处理操作符:

                      • onErrorReturn:出错时返回默认值
                      • onErrorResume:出错时切换到备用的Flux
                      • onErrorContinue:跳过错误元素并继续处理
                      • retry:重试操作
                        Flux withErrorHandling = Flux.just(1, 2, 0, 4)
                            .map(i -> 10 / i)
                            .onErrorResume(e -> Flux.just(-1));
                        Flux withContinue = Flux.just(1, 2, 0, 4)
                            .map(i -> {
                                try {
                                    return 10 / i;
                                } catch (Exception e) {
                                    throw Exceptions.propagate(e);
                                }
                            })
                            .onErrorContinue((e, o) -> System.out.println("Error for " + o + ": " + e.getMessage()));
                        
                      • 背压操作符:

                        • onBackpressureBuffer:缓冲元素
                        • onBackpressureDrop:丢弃无法处理的元素
                        • onBackpressureLatest:只保留最新元素
                          Flux.range(1, 1000)
                              .onBackpressureBuffer(50) // 缓冲区大小为50
                              .subscribe(new BaseSubscriber() {
                                  @Override
                                  protected void hookOnSubscribe(Subscription subscription) {
                                      request(10); // 初始请求10个元素
                                  }
                                  
                                  @Override
                                  protected void hookOnNext(Integer value) {
                                      System.out.println("Received: " + value);
                                      // 根据需要请求更多元素
                                      if (value % 10 == 0) {
                                          request(10);
                                      }
                                  }
                              });
                          
                        • 时间相关操作符:

                          • delayElements:延迟发出每个元素
                          • timeout:设置超时时间
                          • sample:定期采样
                            Flux.range(1, 5)
                                .delayElements(Duration.ofMillis(500))
                                .subscribe(System.out::println);
                            
              订阅Flux

              React编程模型:Project Reactor深度解析

              与Mono类似,Flux也是惰性的,需要订阅才能执行:

              // 1. 简单订阅
              flux.subscribe();
              // 2. 带消费者的订阅
              flux.subscribe(
                  value -> System.out.println("Received: " + value), // onNext
                  error -> System.err.println("Error: " + error),   // onError
                  () -> System.out.println("Completed"),           // onComplete
                  subscription -> subscription.request(3)          // 初始请求3个元素
              );
              // 3. 带Subscription控制的订阅
              flux.subscribe(new BaseSubscriber() {
                  @Override
                  protected void hookOnSubscribe(Subscription subscription) {
                      System.out.println("Subscribed");
                      request(1); // 请求第一个元素
                  }
                  
                  @Override
                  protected void hookOnNext(String value) {
                      System.out.println("Received: " + value);
                      // 处理完当前元素后请求下一个
                      request(1);
                  }
              });
              
              实际应用场景

              React编程模型:Project Reactor深度解析

              1. 流式API:提供持续更新的数据流(如股票价格、传感器数据)
              2. 批量数据处理:处理大量数据时避免内存溢出
              3. 事件处理:处理来自消息队列或事件总线的事件流
              4. 文件处理:逐行读取大文件
              @RestController
              public class EventController {
                  private final EventPublisher eventPublisher;
                  
                  public EventController(EventPublisher eventPublisher) {
                      this.eventPublisher = eventPublisher;
                  }
                  
                  @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
                  public Flux getEvents() {
                      return eventPublisher
                          .publish()
                          .map(event -> {
                              // 转换或丰富事件数据
                              return event;
                          })
                          .onBackpressureBuffer(100)
                          .log();
                  }
              }
              

              3.3.3 Schedulers(调度器)

              在响应式编程中,Schedulers负责管理线程和执行上下文。Project Reactor提供了多种预定义的Scheduler实现,用于控制异步操作的执行位置。

              核心概念

              React编程模型:Project Reactor深度解析

              1. 线程模型:响应式编程通常使用少量线程处理大量并发
              2. 非阻塞I/O:避免线程阻塞,提高资源利用率
              3. 执行上下文:决定操作在哪个线程或线程池上执行
              预定义的Scheduler类型
              1. Schedulers.immediate():

                • 在当前线程立即执行
                • 通常用于测试或不需要异步的场景
                • Schedulers.single():

                  • 使用单个可重用的线程
                  • 适用于低延迟的轻量级任务
                  • 所有调用者共享同一个线程
                  • Schedulers.elastic()(已弃用,推荐使用boundedElastic):

                    • 无限扩展的线程池
                    • 适合阻塞I/O操作
                    • 每个新任务可能创建新线程
                    • Schedulers.boundedElastic():

                      • 有界的弹性线程池
                      • 默认最多创建10 * CPU核心数的线程
                      • 适合阻塞I/O操作
                      • 比elastic更可控,避免资源耗尽
                      • Schedulers.parallel():

                        • 固定大小的并行线程池
                        • 默认大小等于CPU核心数
                        • 适合计算密集型任务
                        • Schedulers.fromExecutorService():

                          • 从现有的ExecutorService创建
                          • 允许与现有线程池集成
              使用Scheduler
              1. 发布到Scheduler:

                • publishOn:影响后续操作符的执行上下文
                • subscribeOn:影响整个链的订阅上下文(通常用在链的开头)
                  Flux.range(1, 5)
                      .map(i -> {
                          System.out.println("Map on " + Thread.currentThread().getName());
                          return i * 2;
                      })
                      .publishOn(Schedulers.boundedElastic())
                      .filter(i -> {
                          System.out.println("Filter on " + Thread.currentThread().getName());
                          return i % 3 == 0;
                      })
                      .subscribeOn(Schedulers.parallel())
                      .subscribe();
                  
                • 指定操作符的Scheduler:

                  许多操作符接受可选的Scheduler参数

                  Flux.interval(Duration.ofMillis(100), Schedulers.single())
                      .subscribe(System.out::println);
                  
              调度策略选择指南
              1. 计算密集型操作:

                • 使用parallel()调度器
                • 避免线程切换开销
                • 线程数通常等于CPU核心数
                • 阻塞I/O操作:

                  • 使用boundedElastic()调度器
                  • 防止阻塞事件循环线程
                  • 允许更多的线程处理并发I/O
                  • 非阻塞异步操作:

                    • 通常不需要显式调度
                    • 由底层异步库管理线程
                    • UI交互:

                      • 使用专用的UI线程调度器
                      • 通过Schedulers.fromExecutor()与UI框架集成
              最佳实践

              React编程模型:Project Reactor深度解析

              1. 避免在响应式链中阻塞:

                • 如果必须阻塞,使用publishOn切换到适当的调度器
                • 合理选择调度器:

                  • 计算密集型:parallel
                  • I/O密集型:boundedElastic
                  • 事件循环:通常不需要显式调度
                  • 注意上下文传播:

                    • Reactor Context可以携带跨线程的信息
                    • 使用contextWrite和deferContextual管理上下文
                    • 资源清理:

                      • 对于自定义调度器或长时间运行的应用程序,注意关闭调度器
                        Scheduler scheduler = Schedulers.newBoundedElastic(10, 100, "custom");
                        try {
                            Flux.just(1, 2, 3)
                                .publishOn(scheduler)
                                .subscribe(System.out::println);
                        } finally {
                            scheduler.dispose();
                        }
                        
              实际应用示例
              @RestController
              public class DataController {
                  private final DataService dataService;
                  
                  public DataController(DataService dataService) {
                      this.dataService = dataService;
                  }
                  
                  @GetMapping("/data/{id}")
                  public Mono getData(@PathVariable String id) {
                      // 使用boundedElastic处理潜在的阻塞操作
                      return Mono.fromCallable(() -> dataService.blockingGetData(id))
                          .subscribeOn(Schedulers.boundedElastic())
                          .timeout(Duration.ofSeconds(2))
                          .onErrorResume(e -> Mono.just(Data.fallbackData()));
                  }
                  
                  @GetMapping("/stream")
                  public Flux streamData() {
                      // 使用parallel处理计算密集型流
                      return dataService.dataStream()
                          .publishOn(Schedulers.parallel())
                          .map(data -> {
                              // 计算密集型转换
                              return data.withProcessedPayload(processPayload(data.getPayload()));
                          })
                          .log();
                  }
                  
                  private String processPayload(String payload) {
                      // 模拟计算密集型处理
                      return payload.toUpperCase();
                  }
              }
              

              总结

              React编程模型:Project Reactor深度解析

              Project Reactor为Java响应式编程提供了强大的工具集:

              1. Mono:处理0-1个结果的异步操作,适合单个值或空结果的场景
              2. Flux:处理0-N个结果的异步序列,适合流式数据处理
              3. Schedulers:管理执行上下文,优化线程使用和资源分配

              通过合理组合这些组件,开发者可以构建高效、可扩展的响应式应用程序,充分利用现代硬件资源,同时保持代码的清晰和可维护性。响应式编程模型特别适合高并发、低延迟的应用场景,如微服务架构、实时数据处理和事件驱动系统。

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。

相关阅读

目录[+]

取消
微信二维码
微信二维码
支付宝二维码