Spring Batch是一个轻量级的、完善的批处理框架,作为Spring体系中的一员,它拥有灵活、方便、生产可用的特点。在应对高效处理大量信息、定时处理大量数据等场景十分简便。结合调度框架能更大地发挥Spring Batch的作用
Spring Batch
的分层架构图如下:
可以看到它分为三层,分别是:
Application
应用层:包含了所有任务batch jobs
和开发人员自定义的代码,主要是根据项目需要开发的业务流程等。
Batch Core
核心层:包含启动和管理任务的运行环境类,如JobLauncher
等。
Batch Infrastructure
基础层:上面两层是建立在基础层之上的,包含基础的读入reader
和写出writer
、重试框架等。
理解下图所涉及的概念至关重要,不然很难进行后续开发和问题分析。
专门负责与数据库打交道,对整个批处理的新增、更新、执行进行记录。所以Spring Batch
是需要依赖数据库来管理的。
负责启动任务Job
。
Job
是封装整个批处理过程的单位,跑一个批处理任务,就是跑一个Job
所定义的内容。
11
上图介绍了Job
的一些相关概念:
Job
:封装处理实体,定义过程逻辑。
JobInstance
:Job
的运行实例,不同的实例,参数不同,所以定义好一个Job
后可以通过不同参数运行多次。
JobParameters
:与JobInstance
相关联的参数。
JobExecution
:代表Job
的一次实际执行,可能成功、可能失败。
所以,开发人员要做的事情,就是定义Job
。
Step
是对Job
某个过程的封装,一个Job
可以包含一个或多个Step
,一步步的Step
按特定逻辑执行,才代表Job
执行完成。
通过定义Step
来组装Job
可以更灵活地实现复杂的业务逻辑。
所以,定义一个Job
关键是定义好一个或多个Step
,然后把它们组装好即可。而定义Step
有多种方法,但有一种常用的模型就是输入——处理——输出
,即Item Reader
、Item Processor
和Item Writer
。比如通过Item Reader
从文件输入数据,然后通过Item Processor
进行业务处理和数据转换,最后通过Item Writer
写到数据库中去。
Spring Batch
为我们提供了许多开箱即用的Reader
和Writer
,非常方便。
理解了基本概念后,就直接通过代码来感受一下吧。整个项目的功能是从多个csv
文件中读数据,处理后输出到一个csv
文件。
添加依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <scope>runtime</scope> </dependency>
需要添加Spring Batch
的依赖,同时使用H2
作为内存数据库比较方便,实际生产肯定是要使用外部的数据库,如Oracle
、PostgreSQL
。
入口主类:
@SpringBootApplication @EnableBatchProcessing public class PkslowBatchJobMain { public static void main(String[] args) { SpringApplication.run(PkslowBatchJobMain.class, args); } }
也很简单,只是在Springboot
的基础上添加注解@EnableBatchProcessing
。
领域实体类Employee
:
package com.pkslow.batch.entity; public class Employee { String id; String firstName; String lastName; }
对应的csv
文件内容如下:
id,firstName,lastName
1,Lokesh,Gupta
2,Amit,Mishra
3,Pankaj,Kumar
4,David,Miller
因为有多个输入文件,所以定义如下:
@Value("input/inputData*.csv") private Resource[] inputResources; @Bean public MultiResourceItemReader<Employee> multiResourceItemReader() { MultiResourceItemReader<Employee> resourceItemReader = new MultiResourceItemReader<Employee>(); resourceItemReader.setResources(inputResources); resourceItemReader.setDelegate(reader()); return resourceItemReader; } @Bean public FlatFileItemReader<Employee> reader() { FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>(); //跳过csv文件第一行,为表头 reader.setLinesToSkip(1); reader.setLineMapper(new DefaultLineMapper() { { setLineTokenizer(new DelimitedLineTokenizer() { { //字段名 setNames(new String[] { "id", "firstName", "lastName" }); } }); setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() { { //转换化后的目标类 setTargetType(Employee.class); } }); } }); return reader; }
这里使用了FlatFileItemReader
,方便我们从文件读取数据。
为了简单演示,处理很简单,就是把最后一列转为大写:
public ItemProcessor<Employee, Employee> itemProcessor() { return employee -> { employee.setLastName(employee.getLastName().toUpperCase()); return employee; }; }
比较简单,代码及注释如下:
private Resource outputResource = new FileSystemResource("output/outputData.csv"); @Bean public FlatFileItemWriter<Employee> writer() { FlatFileItemWriter<Employee> writer = new FlatFileItemWriter<>(); writer.setResource(outputResource); //是否为追加模式 writer.setAppendAllowed(true); writer.setLineAggregator(new DelimitedLineAggregator<Employee>() { { //设置分割符 setDelimiter(","); setFieldExtractor(new BeanWrapperFieldExtractor<Employee>() { { //设置字段 setNames(new String[] { "id", "firstName", "lastName" }); } }); } }); return writer; }
有了Reader-Processor-Writer
后,就可以定义Step
了:
@Bean public Step csvStep() { return stepBuilderFactory.get("csvStep").<Employee, Employee>chunk(5) .reader(multiResourceItemReader()) .processor(itemProcessor()) .writer(writer()) .build(); }
这里有一个chunk
的设置,值为5
,意思是5条记录后再提交输出,可以根据自己需求定义。
完成了Step
的编码,定义Job
就容易了:
@Bean public Job pkslowCsvJob() { return jobBuilderFactory .get("pkslowCsvJob") .incrementer(new RunIdIncrementer()) .start(csvStep()) .build(); }
完成以上编码后,执行程序,结果如下:
成功读取数据,并将最后字段转为大写,并输出到outputData.csv
文件。
可以通过Listener
接口对特定事件进行监听,以实现更多业务功能。比如如果处理失败,就记录一条失败日志;处理完成,就通知下游拿数据等。
我们分别对Read
、Process
和Write
事件进行监听,对应分别要实现ItemReadListener
接口、ItemProcessListener
接口和ItemWriteListener
接口。因为代码比较简单,就是打印一下日志,这里只贴出ItemWriteListener
的实现代码:
public class PkslowWriteListener implements ItemWriteListener<Employee> { private static final Log logger = LogFactory.getLog(PkslowWriteListener.class); @Override public void beforeWrite(List<? extends Employee> list) { logger.info("beforeWrite: " + list); } @Override public void afterWrite(List<? extends Employee> list) { logger.info("afterWrite: " + list); } @Override public void onWriteError(Exception e, List<? extends Employee> list) { logger.info("onWriteError: " + list); } }
把实现的监听器listener
整合到Step
中去:
@Bean public Step csvStep() { return stepBuilderFactory.get("csvStep").<Employee, Employee>chunk(5) .reader(multiResourceItemReader()) .listener(new PkslowReadListener()) .processor(itemProcessor()) .listener(new PkslowProcessListener()) .writer(writer()) .listener(new PkslowWriteListener()) .build(); }
执行后看一下日志:
这里就能明显看到之前设置的chunk
的作用了。Writer
每次是处理5条记录,如果一条输出一次,会对IO
造成压力。