spring batch简介
Spring Batch架构介绍
[
spring batch是spring提供的一个数据处理框架。企业域中的许多应用程序需要批量处理才能在关键任务环境中执行业务操作。这些业务运营包括:
Spring Batch是一个轻量级、全面的批处理框架,旨在开发对企业系统日常操作至关重要的健壮批处理应用程序。Spring批处理基于人们所期望的Spring框架的特性(生产力、基于POJO的开发方法和总体易用性),同时使开发人员能够在必要时轻松访问和利用更高级的企业服务。Spring Batch不是一个调度框架。在商业和开源领域都有许多优秀的企业调度器(如Quartz、Tivoli、Control-M等)。它旨在与调度器一起工作,而不是替换调度器。
Spring Batch提供了在处理大量记录时必不可少的可重用功能,包括日志/跟踪、事务管理、作业处理统计、作业重启、跳过和资源管理。它还提供了更高级的技术服务和功能,通过优化和分区技术实现了超大容量和高性能的批处理作业。Spring Batch既可以用于简单的用例(如将文件读入数据库或运行存储过程),也可以用于复杂的高容量用例(如在数据库之间移动大量数据、转换数据等)。大容量批处理作业可以以高度可扩展的方式利用该框架来处理大量信息。
Spring Batch的设计考虑到了可扩展性和不同的最终用户群体。下图显示了支持最终用户开发人员的可扩展性和易用性的分层体系结构。
此分层体系结构突出了三个主要的高层组件:应用程序、核心和基础架构。该应用程序包含所有批处理作业和开发人员使用Spring batch编写的自定义代码。批处理核心包含启动和控制批处理作业所需的核心运行时类。它包括JobLauncher、Job和Step的实现。应用程序和核心都构建在公共基础架构之上。此基础结构包含公共读写器和服务(如RetryTemplate),应用程序开发人员(读写器,如ItemReader和ItemWriter)和核心框架本身(retry,它是自己的库)都使用这些服务。
构建批处理解决方案时,应考虑以下关键原则、指导原则和一般注意事项。
为了帮助设计和实现批处理系统,应以示例结构图和代码外壳的形式向设计者和程序员提供基本的批处理应用程序构建块和模式。开始设计批处理作业时,应将业务逻辑分解为一系列步骤,这些步骤可以使用以下标准构建块来实现:
此外,应该为无法使用前面提到的构建块构建的业务逻辑提供基本的应用程序外壳。
除了主要构建块之外,每个应用程序还可以使用一个或多个标准实用程序步骤,例如:
批处理应用程序还可以按其输入源进行分类:
任何批处理系统的基础都是处理策略。影响策略选择的因素包括:估计的批处理系统容量、与在线系统或其他批处理系统的并发性、可用的批处理窗口。(请注意,随着越来越多的企业希望全天候运营,清晰的批处理窗口正在消失)。
批处理的典型处理选项有(按实现复杂性的增加顺序):
商业调度器可能支持部分或全部这些选项。
后续将更详细地讨论这些处理选项。需要注意的是,根据经验,批处理采用的提交和锁定策略取决于执行的处理类型,在线锁定策略也应使用相同的原则。因此,在设计总体体系结构时,批处理体系结构不能只是事后诸葛亮。
锁定策略可以是仅使用普通数据库锁,或者在体系结构中实现额外的自定义锁定服务。锁定服务将跟踪数据库锁定(例如,通过在专用db表中存储必要的信息),并向请求db操作的应用程序授予或拒绝权限。此体系结构还可以实现重试逻辑,以避免在出现锁定情况时中止批处理作业。
在大多数情况下,更稳健的方法更合适。请记住,批处理系统在复杂性和处理的数据量方面都有随时间增长的趋势。如果没有锁定策略,并且系统仍然依赖于单个提交点,那么修改批处理程序可能会很痛苦。因此,即使是最简单的批处理系统,也要考虑重新启动恢复选项的提交逻辑需要,以及本节后面描述的更复杂情况的相关信息。
另一种最小化物理锁定的方法是使用乐观锁定模式或悲观锁定模式实现逻辑行级锁定。
乐观锁定假设记录争用的可能性很低。它通常意味着在批处理和在线处理同时使用的每个数据库表中插入一个时间戳列。当应用程序获取一行进行处理时,它还获取时间戳。当应用程序尝试更新已处理的行时,更新将使用WHERE子句中的原始时间戳。如果时间戳匹配,则更新数据和时间戳。如果时间戳不匹配,则表示另一个应用程序已在提取和更新尝试之间更新了同一行。因此,无法执行更新。
悲观锁定是任何一种锁定策略,它假设记录争用的可能性很高,因此需要在检索时获取物理或逻辑锁。一种悲观逻辑锁定使用数据库表中的专用锁列。当应用程序检索要更新的行时,它会在lock列中设置一个标志。使用该标志后,其他尝试从逻辑上检索同一行的应用程序将失败。设置标志的应用程序更新行时,也会清除标志,使其他应用程序能够检索该行。请注意,在初始提取和标志设置之间,也必须保持数据的完整性,例如使用db锁(如SELECT for UPDATE)。还要注意的是,这种方法与物理锁定具有相同的缺点,不同的是,如果用户在锁定记录的同时去吃午饭,那么构建一种超时机制来释放锁会更容易管理。
这些模式不一定适合批处理,但可以用于并行批处理和在线处理(例如,在数据库不支持行级锁定的情况下)。一般来说,乐观锁定更适合于在线应用程序,而悲观锁定更适合于批处理应用程序。每当使用逻辑锁定时,所有访问受逻辑锁定保护的数据实体的应用程序都必须使用相同的方案。
请注意,这两种解决方案都只解决锁定单个记录的问题。通常,我们可能需要锁定一组逻辑相关的记录。对于物理锁,您必须非常小心地管理这些锁,以避免潜在的死锁。对于逻辑锁,通常最好构建一个逻辑锁管理器,该管理器能够理解要保护的逻辑记录组,并且能够确保锁是一致的和非死锁的。这个逻辑锁管理器通常使用自己的表来管理锁、争用报告、超时机制和其他问题。
如果数据访问没有问题,那么可以通过使用额外的线程进行并行处理来实现并行处理。在大型机环境中,传统上使用并行作业类,以确保所有进程都有足够的CPU时间。无论如何,解决方案必须足够健壮,以确保所有正在运行的进程的时间片。
并行处理中的其他关键问题包括负载平衡和通用系统资源(如文件、数据库缓冲池等)的可用性。还要注意,控制表本身很容易成为关键资源。
此外,分区的进程必须设计为仅处理其分配的数据集。分区体系结构必须与数据库设计和数据库分区策略密切相关。注意,数据库分区并不一定意味着数据库的物理分区,尽管在大多数情况下这是可取的。下图说明了分区方法:
图2: 分区进程
该体系结构应该足够灵活,以允许动态配置分区的数量。应考虑自动和用户控制的配置。自动配置可能基于输入文件大小和输入记录数等参数。
1、记录集的固定和均匀分解
这涉及到将输入记录集分成偶数个部分(例如,10个,其中每个部分正好占整个记录集的十分之一)。然后,批处理/提取应用程序的一个实例处理每个部分。
为了使用这种方法,需要进行预处理来拆分记录集。此拆分的结果将是一个下限和上限放置编号,可以将其用作批处理/提取应用程序的输入,以便将其处理限制为仅处理其部分。
预处理可能会带来很大的开销,因为它必须计算并确定记录集每个部分的边界。
2、按键列拆分
这涉及到按键列(如位置代码)分解输入记录集,并将每个键的数据分配给批处理实例。为了实现这一点,列值可以是:
通过分区表分配给批处理实例(本节稍后将介绍)。
通过部分值(例如0000-0999、1000-1999等)分配给批次实例。
在选项1下,添加新值意味着手动重新配置批处理/提取,以确保将新值添加到特定实例。
在选项2下,这确保通过批处理作业的实例覆盖所有值。但是,一个实例处理的值的数量取决于列值的分布(0000-0999范围内可能有大量位置,1000-1999范围内可能很少)。在此选项下,数据范围的设计应考虑分区。
在这两种选择下,无法实现记录到批实例的最佳均匀分布。使用的批处理实例数没有动态配置。
3、按视图拆分
这种方法基本上由一个键列分解,但在数据库级别。它涉及到将记录集分解为视图。批处理应用程序的每个实例在处理过程中都使用这些视图。分解是通过对数据分组来完成的。
使用此选项,必须将批处理应用程序的每个实例配置为命中特定视图(而不是主表)。此外,随着新数据值的添加,必须将这组新数据包含到视图中。没有动态配置功能,因为实例数量的变化会导致视图的变化。
4、增加处理指标
这涉及到在输入表中添加一个新列,作为指示器。作为预处理步骤,所有指标都标记为未处理。在批处理应用程序的记录获取阶段,在将记录标记为未处理的情况下读取记录,一旦读取(带锁),记录将标记为正在处理。该记录完成后,指示器将更新为“完成”或“错误”。批处理应用程序的许多实例可以在不进行更改的情况下启动,因为附加列确保记录只处理一次。“完成后,指标标记为完成”的一两句话)
使用此选项,表上的I/O将动态增加。在更新批处理应用程序的情况下,这种影响会减少,因为无论如何都必须进行写入。
5、将表格提取到平面文件
这涉及到将表提取到文件中。然后,可以将该文件拆分为多个段,并将其用作批处理实例的输入。
使用此选项,将表提取到文件中并将其拆分的额外开销可能会抵消多重分区的影响。可以通过更改文件拆分脚本来实现动态配置。
6、哈希列的使用
此方案涉及在用于检索驱动程序记录的数据库表中添加哈希列(键/索引)。此哈希列有一个指示器,用于确定批处理应用程序的哪个实例处理此特定行。例如,如果有三个批处理实例要启动,则“A”的指示器标记实例1处理的行,“B”的指示器标记实例2处理的行,“C”的指示器标记实例3处理的行。
然后,用于检索记录的过程将有一个额外的WHERE子句来选择由特定指示符标记的所有行。此表中的插入将涉及添加标记字段,该字段将默认为其中一个实例(如“A”)。
将使用一个简单的批处理应用程序来更新指标,例如在不同实例之间重新分配负载。当添加了足够多的新行时,可以运行该批处理(在批处理窗口中除外的任何时间),以将新行重新分发给其他实例。
批处理应用程序的其他实例只需要按照前面段落中的描述运行批处理应用程序,以重新分配指示器以使用新数量的实例。
支持使用键列方法对分区数据库表运行的多分区应用程序的体系结构应包括一个用于存储分区参数的中央分区存储库。这提供了灵活性并确保了可维护性。存储库通常由一个表组成,称为分区表。
存储在分区表中的信息是静态的,通常应由DBA维护。该表应包含多分区应用程序每个分区的一行信息。该表应包含以下列:程序ID代码、分区号(分区的逻辑ID)、该分区的db key列的低值和该分区的db key列的高值。
在程序启动时,应将程序id和分区号从体系结构传递给应用程序(特别是从控制处理微线程)。如果使用键列方法,则这些变量用于读取分区表,以确定应用程序要处理的数据范围。此外,在整个处理过程中,必须使用分区号来:
当应用程序并行运行或分区时,可能会发生数据库资源争用和死锁。作为数据库设计的一部分,数据库设计团队必须尽可能消除潜在的争用情况,这一点至关重要。
此外,开发人员必须确保在设计数据库索引表时考虑到死锁预防和性能。
死锁或热点通常出现在管理或体系结构表中,如日志表、控制表和锁表。这些问题的影响也应考虑在内。现实的压力测试对于确定体系结构中可能的瓶颈至关重要。
为了最大限度地减少冲突对数据的影响,体系结构应该提供服务,例如连接到数据库或遇到死锁时的等待和重试间隔。这意味着一种内置机制,可以对某些数据库返回代码做出反应,而不是立即发出错误,而是等待预定的时间并重试数据库操作
分区体系结构应该对应用程序开发人员相对透明。体系结构应执行与以分区模式运行应用程序相关的所有任务,包括:
验证应包括检查,以确保:
如果数据库已分区,则可能需要进行一些额外的验证,以确保单个分区不跨数据库分区。
此外,架构还应考虑分区的整合。关键问题包括:
一个典型的批处理应用程序大致如下:
其对应的示意图如下:
Figure 2.1: Batch Stereotypes
上图突出显示了组成Spring Batch领域语言的关键概念。一个作业有一到多个步骤,每个步骤正好有一个ItemReader、一个ItemProcessor和一个ItemWriter。需要启动作业(使用JobLauncher),并且需要存储有关当前正在运行的流程的元数据(在JobRepository中)
下面是一些概念是Spring batch框架中的核心概念。
Job和Step是spring batch执行批处理任务最为核心的两个概念。
其中Job是一个封装整个批处理过程的一个概念。Job在spring batch的体系当中只是一个最顶层的一个抽象概念,体现在代码当中则它只是一个最上层的接口,其代码如下:
/** * Batch domain object representing a job. Job is an explicit abstraction * representing the configuration of a job specified by a developer. It should * be noted that restart policy is applied to the job as a whole and not to a * step. */ public interface Job { String getName(); boolean isRestartable(); void execute(JobExecution execution); JobParametersIncrementer getJobParametersIncrementer(); JobParametersValidator getJobParametersValidator(); }
在Job这个接口当中定义了五个方法,它的实现类主要有两种类型的job,一个是simplejob,另一个是flowjob。在spring batch当中,job是最顶层的抽象,除job之外我们还有JobInstance以及JobExecution这两个更加底层的抽象。
一个job是我们运行的基本单位,它内部由step组成。job本质上可以看成step的一个容器。一个job可以按照指定的逻辑顺序组合step,并提供了我们给所有step设置相同属性的方法,例如一些事件监听,跳过策略。
Spring Batch以SimpleJob类的形式提供了Job接口的默认简单实现,它在Job之上创建了一些标准功能。一个使用java config的例子代码如下:
@Bean public Job footballJob() { return this.jobBuilderFactory.get("footballJob") .start(playerLoad()) .next(gameLoad()) .next(playerSummarization()) .build(); }
这个配置的意思是:首先给这个job起了一个名字叫footballJob,接着指定了这个job的三个step,他们分别由方法,playerLoad,gameLoad, playerSummarization实现。
我们在上文已经提到了JobInstance,他是Job的更加底层的一个抽象,他的定义如下:
public interface JobInstance { /** * Get unique id for this JobInstance. * @return instance id */ public long getInstanceId(); /** * Get job name. * @return value of 'id' attribute from <job> */ public String getJobName(); }
他的方法很简单,一个是返回Job的id,另一个是返回Job的名字。
JobInstance指的是job运行当中,作业执行过程当中的概念。Instance本就是实例的意思。
比如说现在有一个批处理的job,它的功能是在一天结束时执行行一次。我们假定这个批处理job的名字为’EndOfDay’。在这个情况下,那么每天就会有一个逻辑意义上的JobInstance, 而我们必须记录job的每次运行的情况。
在上文当中我们提到了,同一个job每天运行一次的话,那么每天都有一个jobIntsance,但他们的job定义都是一样的,那么我们怎么来区别一个job的不同jobinstance了。不妨先做个猜想,虽然jobinstance的job定义一样,但是他们有的东西就不一样,例如运行时间。
spring batch中提供的用来标识一个jobinstance的东西是:JobParameters。JobParameters对象包含一组用于启动批处理作业的参数,它可以在运行期间用于识别或甚至用作参考数据。我们假设的运行时间,就可以作为一个JobParameters。
例如, 我们前面的’EndOfDay’的job现在已经有了两个实例,一个产生于1月1日,另一个产生于1月2日,那么我们就可以定义两个JobParameter对象:一个的参数是01-01, 另一个的参数是01-02。因此,识别一个JobInstance的方法可以定义为:
JobExecution指的是单次尝试运行一个我们定义好的Job的代码层面的概念。job的一次执行可能以失败也可能成功。只有当执行成功完成时,给定的与执行相对应的JobInstance才也被视为完成。
JobExecution的接口定义如下:
public interface JobExecution { /** * Get unique id for this JobExecution. * @return execution id */ public long getExecutionId(); /** * Get job name. * @return value of 'id' attribute from <job> */ public String getJobName(); /** * Get batch status of this execution. * @return batch status value. */ public BatchStatus getBatchStatus(); /** * Get time execution entered STARTED status. * @return date (time) */ public Date getStartTime(); /** * Get time execution entered end status: COMPLETED, STOPPED, FAILED * @return date (time) */ public Date getEndTime(); /** * Get execution exit status. * @return exit status. */ public String getExitStatus(); /** * Get time execution was created. * @return date (time) */ public Date getCreateTime(); /** * Get time execution was last updated updated. * @return date (time) */ public Date getLastUpdatedTime(); /** * Get job parameters for this execution. * @return job parameters */ public Properties getJobParameters(); }
每一个方法的注释已经解释的很清楚,这里不再多做解释。只提一下BatchStatus,JobExecution当中提供了一个方法getBatchStatus用于获取一个job某一次特地执行的一个状态。BatchStatus是一个代表job状态的枚举类,其定义如下:
public enum BatchStatus {STARTING, STARTED, STOPPING, STOPPED, FAILED, COMPLETED, ABANDONED }
“Job”定义了什么是作业以及如何执行作业,“JobInstance”是一个纯粹的组织对象,用于将执行分组在一起,主要是为了实现正确的重启语义。然而,“JobExecution”是运行期间实际发生情况的主要存储机制,它包含更多必须控制和持久化的属性,如下表所示:
Property | Definition |
---|---|
Status | A BatchStatus object that indicates the status of the execution. While running, it is BatchStatus#STARTED . If it fails, it is BatchStatus#FAILED . If it finishes successfully, it is BatchStatus#COMPLETED |
startTime | A java.util.Date representing the current system time when the execution was started. This field is empty if the job has yet to start. |
endTime | A java.util.Date representing the current system time when the execution finished, regardless of whether or not it was successful. The field is empty if the job has yet to finish. |
exitStatus | The ExitStatus , indicating the result of the run. It is most important, because it contains an exit code that is returned to the caller. See chapter 5 for more details. The field is empty if the job has yet to finish. |
createTime | A java.util.Date representing the current system time when the JobExecution was first persisted. The job may not have been started yet (and thus has no start time), but it always has a createTime, which is required by the framework for managing job level ExecutionContexts . |
lastUpdated | A java.util.Date representing the last time a JobExecution was persisted. This field is empty if the job has yet to start. |
executionContext | The “property bag” containing any user data that needs to be persisted between executions. |
failureExceptions | The list of exceptions encountered during the execution of a Job . These can be useful if more than one exception is encountered during the failure of a Job . |
These properties are important because they are persisted and can be used to completely determine the status of an execution. For example, if the EndOfDay job for 01-01 is executed at 9:00 PM and fails at 9:30, the following entries are made in the batch metadata tables:
JOB_INST_ID | JOB_NAME |
---|---|
1 | EndOfDayJob |
JOB_EXECUTION_ID | TYPE_CD | KEY_NAME | DATE_VAL | IDENTIFYING |
---|---|---|---|---|
1 | DATE | schedule.Date | 2017-01-01 | TRUE |
JOB_EXEC_ID | JOB_INST_ID | START_TIME | END_TIME | STATUS |
---|---|---|---|---|
1 | 1 | 2017-01-01 21:00 | 2017-01-01 21:30 | FAILED |
Column names may have been abbreviated or removed for the sake of clarity and formatting. | |
---|---|
Now that the job has failed, assume that it took the entire night for the problem to be determined, so that the ‘batch window’ is now closed. Further assuming that the window starts at 9:00 PM, the job is kicked off again for 01-01, starting where it left off and completing successfully at 9:30. Because it is now the next day, the 01-02 job must be run as well, and it is kicked off just afterwards at 9:31 and completes in its normal one hour time at 10:30. There is no requirement that one JobInstance
be kicked off after another, unless there is potential for the two jobs to attempt to access the same data, causing issues with locking at the database level. It is entirely up to the scheduler to determine when a Job
should be run. Since they are separate JobInstances
, Spring Batch makes no attempt to stop them from being run concurrently. (Attempting to run the same JobInstance
while another is already running results in a JobExecutionAlreadyRunningException
being thrown). There should now be an extra entry in both the JobInstance
and JobParameters
tables and two extra entries in the JobExecution
table, as shown in the following tables:
JOB_INST_ID | JOB_NAME |
---|---|
1 | EndOfDayJob |
2 | EndOfDayJob |
JOB_EXECUTION_ID | TYPE_CD | KEY_NAME | DATE_VAL | IDENTIFYING |
---|---|---|---|---|
1 | DATE | schedule.Date | 2017-01-01 00:00:00 | TRUE |
2 | DATE | schedule.Date | 2017-01-01 00:00:00 | TRUE |
3 | DATE | schedule.Date | 2017-01-02 00:00:00 | TRUE |
JOB_EXEC_ID | JOB_INST_ID | START_TIME | END_TIME | STATUS |
---|---|---|---|---|
1 | 1 | 2017-01-01 21:00 | 2017-01-01 21:30 | FAILED |
2 | 1 | 2017-01-02 21:00 | 2017-01-02 21:30 | COMPLETED |
3 | 2 | 2017-01-02 21:31 | 2017-01-02 22:29 | COMPLETED |
每一个Step对象都封装了批处理作业的一个独立的阶段。事实上,每一个Job本质上都是由一个或多个步骤组成。每一个step包含定义和控制实际批处理所需的所有信息。任何特定的内容都由编写Job的开发人员自行决定。一个step可以非常简单也可以非常复杂。例如,一个step的功能是将文件中的数据加载到数据库中,那么基于现在spring batch的支持则几乎不需要写代码。更复杂的step可能具有复杂的业务逻辑,这些逻辑作为处理的一部分。与Job一样,Step具有与JobExecution类似的StepExecution,如下图所示:
StepExecution表示一次执行Step, 每次运行一个Step时都会创建一个新的StepExecution,类似于JobExecution。但是,某个步骤可能由于其之前的步骤失败而无法执行。且仅当Step实际启动时才会创建StepExecution。
一次step执行的实例由StepExecution类的对象表示。每个StepExecution都包含对其相应步骤的引用以及JobExecution和事务相关的数据,例如提交和回滚计数以及开始和结束时间。此外,每个步骤执行都包含一个ExecutionContext,其中包含开发人员需要在批处理运行中保留的任何数据
StepExecution
:
Property | Definition |
---|---|
Status | A BatchStatus object that indicates the status of the execution. While running, the status is BatchStatus.STARTED . If it fails, the status is BatchStatus.FAILED . If it finishes successfully, the status is BatchStatus.COMPLETED . |
startTime | A java.util.Date representing the current system time when the execution was started. This field is empty if the step has yet to start. |
endTime | A java.util.Date representing the current system time when the execution finished, regardless of whether or not it was successful. This field is empty if the step has yet to exit. |
exitStatus | The ExitStatus indicating the result of the execution. It is most important, because it contains an exit code that is returned to the caller. See chapter 5 for more details. This field is empty if the job has yet to exit. |
executionContext | The “property bag” containing any user data that needs to be persisted between executions. |
readCount | The number of items that have been successfully read. |
writeCount | The number of items that have been successfully written. |
commitCount | The number of transactions that have been committed for this execution. |
rollbackCount | The number of times the business transaction controlled by the Step has been rolled back. |
readSkipCount | The number of times read has failed, resulting in a skipped item. |
processSkipCount | The number of times process has failed, resulting in a skipped item. |
filterCount | The number of items that have been ‘filtered’ by the ItemProcessor . |
writeSkipCount | The number of times write has failed, resulting in a skipped item. |
ExecutionContext即每一个StepExecution
的执行环境。它包含一系列的键值对。我们可以用如下代码获取ExecutionContext
ExecutionContext ecStep = stepExecution.getExecutionContext(); ExecutionContext ecJob = jobExecution.getExecutionContext(); if (executionContext.containsKey(getKey(LINES_READ_COUNT))) { log.debug("Initializing for restart. Restart data is: " + executionContext); long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT)); LineReader reader = getReader(); Object record = ""; while (reader.getPosition() < lineCount && record != null) { record = readLine(); } }
JobRepository是一个用于将上述job,step等概念进行持久化的一个类。它同时给Job和Step以及下文会提到的JobLauncher实现提供CRUD操作。首次启动Job时,将从repository中获取JobExecution,并且在执行批处理的过程中,StepExecution和JobExecution将被存储到repository当中。
@EnableBatchProcessing注解可以为JobRepository提供自动配置。
JobLauncher这个接口的功能非常简单,它是用于启动指定了JobParameters的Job,为什么这里要强调指定了JobParameter,原因其实我们在前面已经提到了,jobparameter和job一起才能组成一次job的执行。下面是代码实例:
public interface JobLauncher { public JobExecution run(Job job, JobParameters jobParameters) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException; }
上面run方法实现的功能是根据传入的job以及jobparamaters从JobRepository获取一个JobExecution并执行Job。
ItemReader是一个读数据的抽象,它的功能是为每一个Step提供数据输入。当ItemReader以及读完所有数据时,它会返回null来告诉后续操作数据已经读完。Spring Batch为ItemReader提供了非常多的有用的实现类,比如JdbcPagingItemReader,
JdbcCursorItemReader等等。
ItemReader支持的读入的数据源也是非常丰富的,包括各种类型的数据库,文件,数据流,等等。几乎涵盖了我们的所有场景。
下面是一个JdbcPagingItemReader 的例子代码:
@Bean public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) { Map<String, Object> parameterValues = new HashMap<>(); parameterValues.put("status", "NEW"); return new JdbcPagingItemReaderBuilder<CustomerCredit>() .name("creditReader") .dataSource(dataSource) .queryProvider(queryProvider) .parameterValues(parameterValues) .rowMapper(customerCreditMapper()) .pageSize(1000) .build(); } @Bean public SqlPagingQueryProviderFactoryBean queryProvider() { SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean(); provider.setSelectClause("select id, name, credit"); provider.setFromClause("from customer"); provider.setWhereClause("where status=:status"); provider.setSortKey("id"); return provider; }
下面是一个JdbcCursorItemReader的例子代码:
@Bean public JdbcCursorItemReader<CustomerCredit> itemReader() { return new JdbcCursorItemReaderBuilder<CustomerCredit>() .dataSource(this.dataSource) .name("creditReader") .sql("select ID, NAME, CREDIT from CUSTOMER") .rowMapper(new CustomerCreditRowMapper()) .build(); }
既然ItemReader是读数据的一个抽象,那么ItemWriter自然就是一个写数据的抽象,它是为每一个step提供数据写出的功能。写的单位是可以配置的,我们可以一次写一条数据,也可以一次写一个chunk的数据,关于chunk下文会有专门的介绍。ItemWriter对于读入的数据是不能做任何操作的。
Spring Batch为ItemWriter也提供了非常多的有用的实现类,当然我们也可以去实现自己的writer功能。
ItemProcessor对项目的业务逻辑处理的一个抽象, 当ItemReader读取到一条记录之后,ItemWriter还未写入这条记录之前,I我们可以借助temProcessor提供一个处理业务逻辑的功能,并对数据进行相应操作。如果我们在ItemProcessor发现一条数据不应该被写入,可以通过返回null来表示。ItemProcessor和ItemReader以及ItemWriter可以非常好的结合在一起工作,他们之间的数据传输也非常方便。我们直接使用即可。
spring batch提供了让我们按照chunk处理数据的能力,一个chunk的示意图如下:
它的意思就和图示的一样,由于我们一次batch的任务可能会有很多的数据读写操作,因此一条一条的处理并向数据库提交的话效率不会很高,因此spring batch提供了chunk这个概念,我们可以设定一个chunk size,spring batch 将一条一条处理数据,但不提交到数据库,只有当处理的数据数量达到chunk size设定的值得时候,才一起去commit.
java的实例定义代码如下:
/** * Note the JobRepository is typically autowired in and not needed to be explicitly * configured */ @Bean public Job sampleJob(JobRepository jobRepository, Step sampleStep) { return this.jobBuilderFactory.get("sampleJob") .repository(jobRepository) .start(sampleStep) .build(); } /** * Note the TransactionManager is typically autowired in and not needed to be explicitly * configured */ @Bean public Step sampleStep(PlatformTransactionManager transactionManager) { return this.stepBuilderFactory.get("sampleStep") .transactionManager(transactionManager) .<String, String>chunk(10) .reader(itemReader()) .writer(itemWriter()) .build(); }
在上面这个step里面,chunk size被设为了10,当ItemReader读的数据数量达到10的时候,这一批次的数据就一起被传到itemWriter,同时transaction被提交。
一个batch的job的step,可能会处理非常大数量的数据,难免会遇到出错的情况,出错的情况虽出现的概率较小,但是我们不得不考虑这些情况,因为我们做数据迁移最重要的是要保证数据的最终一致性。spring batch当然也考虑到了这种情况,并且为我们提供了相关的技术支持,请看如下bean的配置:
@Bean public Step step1() { return this.stepBuilderFactory.get("step1") .<String, String>chunk(10) .reader(flatFileItemReader()) .writer(itemWriter()) .faultTolerant() .skipLimit(10) .skip(FlatFileParseException.class) .build(); } @Bean public Step step2() { return this.stepBuilderFactory.get("step2") .<String, String>chunk(10) .reader(flatFileItemReader()) .writer(itemWriter()) .faultTolerant() .skipLimit(10) .skip(Exception.class) .noSkip(FileNotFoundException.class) .build(); }
我们需要留意这三个方法,分别是skipLimit(),skip(),noSkip(),
skipLimit方法的意思是我们可以设定一个我们允许的这个step可以跳过的异常数量,假如我们设定为10,则当这个step运行时,只要出现的异常数目不超过10,整个step都不会fail。注意,若不设定skipLimit,则其默认值是0.
skip方法我们可以指定我们可以跳过的异常,因为有些异常的出现,我们是可以忽略的。
noSkip方法的意思则是指出现这个异常我们不想跳过,也就是从skip的所以exception当中排除这个exception,从上面的例子来说,也就是跳过所有除FileNotFoundException的exception。那么对于这个step来说,FileNotFoundException就是一个fatal的exception,抛出这个exception的时候step就会直接fail