【Springboot知识】Spring Batch批处理框架深入解析

06-01 1280阅读

文章目录

    • Spring Batch概述
      • 一、Spring Batch 核心概念
      • 二、Spring Boot 集成 Spring Batch 步骤
        • 1. 创建项目并添加依赖
        • 2. 配置批处理数据库
        • 3. 定义数据模型
        • 4. 实现批处理组件
        • 5. 定义 Job 和 Step
        • 6. 创建 CSV 文件
        • 7. 运行批处理作业
        • 三、高级特性
        • 四、总结
        • 如何解决任务间依赖问题
          • 一、任务间依赖的典型场景
          • 二、解决方案与实现
            • 1. **使用监听器(JobExecutionListener)触发后续任务**
            • 2. **通过调度框架编排任务**
            • 3. **利用工作流引擎(如 Spring Cloud Data Flow)**
            • 4. **通过数据库或文件传递数据**
            • 三、关键注意事项
              • 1. **避免重复执行同一 Job 实例**
              • 2. **错误处理与事务一致性**
              • 四、适用场景总结

                Spring Batch概述

                Spring Boot 提供了对批处理框架 Spring Batch 的集成支持,使得开发者可以轻松构建高效、可扩展的批处理任务。以下是对 Spring Batch 的详细介绍及详细使用步骤:

                一、Spring Batch 核心概念

                1. Job(作业)

                  批处理任务的最小单元,由多个 Step 组成,代表一个完整的批处理流程(如数据导入、报表生成等)。

                2. Step(步骤)

                  一个 Job 由多个 Step 组成,每个 Step 包含 ItemReader、ItemProcessor、ItemWriter 三个核心组件。

                3. ItemReader

                  负责读取数据(如从文件、数据库、API 读取)。

                4. ItemProcessor

                  对读取的数据进行业务处理(如数据清洗、转换、过滤)。

                5. ItemWriter

                  将处理后的数据写入目标(如数据库、文件、消息队列)。

                6. JobRepository

                  存储批处理作业的元数据(如 Job 执行状态、参数等)。

                二、Spring Boot 集成 Spring Batch 步骤

                1. 创建项目并添加依赖

                使用 Spring Initializr 创建项目,添加以下依赖:

                    org.springframework.boot
                    spring-boot-starter-batch
                
                
                    com.h2database
                    h2
                    runtime
                
                
                2. 配置批处理数据库

                Spring Batch 需要数据库存储元数据。在 application.properties 中配置 H2 内存数据库:

                【Springboot知识】Spring Batch批处理框架深入解析
                (图片来源网络,侵删)
                spring.datasource.url=jdbc:h2:mem:testdb
                spring.datasource.driverClassName=org.h2.Driver
                spring.datasource.username=sa
                spring.datasource.password=
                spring.h2.console.enabled=true
                spring.batch.jdbc.initialize-schema=always
                
                3. 定义数据模型

                假设处理一个 CSV 文件并写入数据库,定义实体类:

                public class Person {
                    private String name;
                    private int age;
                    // 省略 getter/setter 和构造方法
                }
                
                4. 实现批处理组件

                • ItemReader:读取 CSV 文件

                【Springboot知识】Spring Batch批处理框架深入解析
                (图片来源网络,侵删)
                @Bean
                public FlatFileItemReader reader() {
                    return new FlatFileItemReaderBuilder()
                        .name("personItemReader")
                        .resource(new ClassPathResource("persons.csv"))
                        .delimited()
                        .names("name", "age")
                        .fieldSetMapper(new BeanWrapperFieldSetMapper() {{
                            setTargetType(Person.class);
                        }})
                        .build();
                }
                

                • ItemProcessor:处理数据(可选)

                public class PersonProcessor implements ItemProcessor {
                    @Override
                    public Person process(Person person) {
                        // 示例:过滤年龄小于 18 的记录
                        if (person.getAge()  
                

                • ItemWriter:写入数据库

                【Springboot知识】Spring Batch批处理框架深入解析
                (图片来源网络,侵删)
                @Bean
                public JdbcBatchItemWriter writer(DataSource dataSource) {
                    return new JdbcBatchItemWriterBuilder()
                        .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider())
                        .sql("INSERT INTO person (name, age) VALUES (:name, :age)")
                        .dataSource(dataSource)
                        .build();
                }
                
                5. 定义 Job 和 Step
                @Configuration
                @EnableBatchProcessing
                public class BatchConfig {
                    @Autowired
                    private JobBuilderFactory jobBuilderFactory;
                    @Autowired
                    private StepBuilderFactory stepBuilderFactory;
                    @Bean
                    public Job importPersonJob(Step step1) {
                        return jobBuilderFactory.get("importPersonJob")
                                .incrementer(new RunIdIncrementer()) // 允许 Job 多次执行
                                .flow(step1)
                                .end()
                                .build();
                    }
                    @Bean
                    public Step step1(JdbcBatchItemWriter writer) {
                        return stepBuilderFactory.get("step1")
                                .chunk(10) // 每处理 10 条数据提交一次
                                .reader(reader())
                                .processor(new PersonProcessor())
                                .writer(writer)
                                .build();
                    }
                }
                
                6. 创建 CSV 文件

                在 src/main/resources 下创建 persons.csv:

                John,25
                Alice,17
                Bob,30
                
                7. 运行批处理作业

                通过 REST API 或命令行启动 Job:

                @RestController
                public class JobController {
                    @Autowired
                    private JobLauncher jobLauncher;
                    @Autowired
                    private Job importPersonJob;
                    @GetMapping("/runJob")
                    public String runJob() throws Exception {
                        JobParameters params = new JobParametersBuilder()
                                .addLong("startAt", System.currentTimeMillis())
                                .toJobParameters();
                        jobLauncher.run(importPersonJob, params);
                        return "Job started!";
                    }
                }
                

                三、高级特性

                1. 任务调度

                  结合 @Scheduled 定时运行 Job:

                @Scheduled(fixedRate = 24 * 60 * 60 * 1000) // 每天执行一次
                public void scheduleJob() throws Exception {
                    jobLauncher.run(importPersonJob, new JobParameters());
                }
                
                1. 错误处理与重试

                  在 Step 中配置错误跳过和重试策略:

                @Bean
                public Step step1() {
                    return stepBuilderFactory.get("step1")
                            .chunk(10)
                            .reader(reader())
                            .processor(processor())
                            .writer(writer())
                            .faultTolerant()
                            .skipLimit(10) // 最多跳过 10 条错误数据
                            .skip(Exception.class)
                            .retryLimit(3) // 重试次数
                            .retry(DeadlockLoserDataAccessException.class)
                            .build();
                }
                
                1. 监控与管理

                  通过 Spring Boot Actuator 提供 /actuator/batchjobs 端点监控作业状态(需添加 actuator 依赖)。

                四、总结

                适用场景:

                • 大数据量处理(如数据迁移、ETL)

                • 定时任务(如日报生成、对账)

                • 自动化处理(如文件清理、邮件发送)

                优势:

                • 内置事务管理、错误恢复机制

                • 支持分布式处理(分区、远程分块)

                • 与 Spring 生态无缝集成

                通过上述步骤,您可以快速构建一个健壮的批处理任务。更多高级配置可参考 Spring Batch 官方文档:

                Spring Batch Documentation

                如何解决任务间依赖问题

                在批处理任务中,任务间的依赖通常涉及 多个Job的顺序执行、数据传递 或 执行状态依赖。以下是详细的解决方案和实现步骤:

                一、任务间依赖的典型场景

                1. 顺序依赖:Job B 必须在 Job A 成功完成后执行。
                2. 条件依赖:Job B 的执行取决于 Job A 的输出结果。
                3. 数据依赖:Job B 需要读取 Job A 生成的数据。

                二、解决方案与实现

                1. 使用监听器(JobExecutionListener)触发后续任务

                通过监听 Job 的完成事件,触发下一个 Job 的执行。

                实现步骤:

                1. 创建监听器类,注入 JobLauncher 和后续的 Job。
                2. 在 afterJob 方法中判断当前 Job 状态,并启动后续 Job。

                代码示例:

                import org.springframework.batch.core.JobExecution;
                import org.springframework.batch.core.JobParameters;
                import org.springframework.batch.core.JobParametersBuilder;
                import org.springframework.batch.core.listener.JobExecutionListenerSupport;
                import org.springframework.beans.factory.annotation.Autowired;
                import org.springframework.stereotype.Component;
                @Component
                public class JobCompletionListener extends JobExecutionListenerSupport {
                    @Autowired
                    private JobLauncher jobLauncher;
                    @Autowired
                    @Qualifier("nextJob") // 后续的 Job Bean
                    private Job nextJob;
                    @Override
                    public void afterJob(JobExecution jobExecution) {
                        if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
                            try {
                                // 生成新的 Job 参数,避免重复执行同一实例
                                JobParameters jobParameters = new JobParametersBuilder()
                                        .addLong("startAt", System.currentTimeMillis())
                                        .toJobParameters();
                                jobLauncher.run(nextJob, jobParameters);
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }
                

                配置到原 Job 中:

                @Bean
                public Job firstJob(Step step1) {
                    return jobBuilderFactory.get("firstJob")
                            .listener(jobCompletionListener) // 绑定监听器
                            .start(step1)
                            .end()
                            .build();
                }
                
                2. 通过调度框架编排任务

                使用 Spring Scheduler 或 Quartz 等工具,通过代码控制 Job 执行顺序。

                示例:Spring Scheduler 编排任务:

                import org.springframework.batch.core.launch.JobLauncher;
                import org.springframework.scheduling.annotation.Scheduled;
                import org.springframework.stereotype.Component;
                @Component
                public class JobScheduler {
                    @Autowired
                    private JobLauncher jobLauncher;
                    @Autowired
                    @Qualifier("firstJob")
                    private Job firstJob;
                    @Autowired
                    @Qualifier("secondJob")
                    private Job secondJob;
                    @Scheduled(cron = "0 0 3 * * ?") // 每天凌晨3点执行
                    public void runJobsSequentially() throws Exception {
                        // 执行第一个 Job
                        JobParameters params1 = new JobParametersBuilder()
                                .addLong("startAt", System.currentTimeMillis())
                                .toJobParameters();
                        jobLauncher.run(firstJob, params1);
                        // 第一个 Job 成功后,执行第二个 Job
                        JobParameters params2 = new JobParametersBuilder()
                                .addLong("startAt", System.currentTimeMillis())
                                .toJobParameters();
                        jobLauncher.run(secondJob, params2);
                    }
                }
                
                3. 利用工作流引擎(如 Spring Cloud Data Flow)

                通过可视化工具编排多个 Job,定义复杂的依赖关系。

                实现步骤:

                1. 将每个 Job 打包为独立的 Spring Boot 应用。
                2. 在 Spring Cloud Data Flow 中定义任务流程:
                  task create --name jobFlow --definition "firstJob && secondJob"
                  
                3. 通过 REST API 或 Dashboard 触发流程执行。
                4. 通过数据库或文件传递数据

                Job A 将结果写入数据库或文件,Job B 读取该数据继续处理。

                示例:通过数据库传递数据:

                1. Job A 写入数据到临时表:
                CREATE TABLE temp_data (id INT, result VARCHAR(100));
                
                1. Job B 读取临时表数据:
                @Bean
                public JdbcCursorItemReader reader(DataSource dataSource) {
                    return new JdbcCursorItemReaderBuilder()
                            .name("dataReader")
                            .dataSource(dataSource)
                            .sql("SELECT id, result FROM temp_data")
                            .rowMapper((rs, rowNum) -> new Data(rs.getInt("id"), rs.getString("result")))
                            .build();
                }
                

                三、关键注意事项

                1. 避免重复执行同一 Job 实例

                • 每次启动 Job 时,使用不同的 JobParameters(如时间戳)。

                • 使用 RunIdIncrementer 自动生成唯一参数:

                @Bean
                public Job firstJob(Step step1) {
                    return jobBuilderFactory.get("firstJob")
                            .incrementer(new RunIdIncrementer()) // 自动生成唯一参数
                            .start(step1)
                            .end()
                            .build();
                }
                
                2. 错误处理与事务一致性

                • 若 Job A 失败,后续 Job 不应执行(通过监听器或调度逻辑控制)。

                • 使用数据库事务确保临时数据的原子性:

                @Bean
                public Step step1(DataSource dataSource) {
                    return stepBuilderFactory.get("step1")
                            .chunk(10)
                            .reader(reader())
                            .writer(writer(dataSource))
                            .transactionManager(new DataSourceTransactionManager(dataSource))
                            .build();
                }
                

                四、适用场景总结

                场景解决方案
                简单顺序依赖JobExecutionListener 或调度器
                复杂条件分支Spring Cloud Data Flow
                跨任务数据传递数据库/文件/消息队列
                需要可视化编排工作流引擎(如 Airflow、SCDF)

                通过上述方法,可灵活解决批处理任务间的依赖问题,确保任务按需执行且数据一致。

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。

目录[+]

取消
微信二维码
微信二维码
支付宝二维码