批處理的領域語言
對於任何經驗豐富的批處理架構師來說,Spring Batch 中使用的批處理總體概念應該是熟悉且舒適的。有 “Jobs” 和 “Steps”,以及開發者提供的處理單元 ItemReader
和 ItemWriter
。然而,由於 Spring 的模式、操作、模板、回撥和慣用法,為以下方面提供了機會
-
顯著改進了清晰的關注點分離原則的應用。
-
清晰劃分的架構層以及作為介面提供的服務。
-
提供簡單和預設的實現,以便快速採用和開箱即用。
-
顯著增強了可擴充套件性。
下圖是批處理參考架構的簡化版本,該架構已使用了數十年。它概述了構成批處理領域語言的元件。這個架構框架是一個藍圖,已在過去幾代平臺(大型機上的 COBOL、Unix 上的 C 以及現在的任何地方的 Java)上透過數十年的實現得到驗證。JCL 和 COBOL 開發者可能與 C、C# 和 Java 開發者一樣熟悉這些概念。Spring Batch 提供了健壯、可維護系統中常見的分層、元件和技術服務的物理實現,這些系統用於構建從簡單到複雜的批處理應用,並提供基礎設施和擴充套件來滿足非常複雜的處理需求。

前圖強調了構成 Spring Batch 領域語言的關鍵概念。一個 Job
包含一個或多個 Step,每個 Step 恰好有一個 ItemReader
、一個 ItemProcessor
和一個 ItemWriter
。Job 需要透過 JobLauncher
啟動,並且當前執行過程的元資料需要儲存在 JobRepository
中。
Job
本節描述與批處理 Job 概念相關的原型。Job
是一個封裝整個批處理過程的實體。與其他 Spring 專案一樣,Job
可以透過 XML 配置檔案或基於 Java 的配置進行連線。這種配置可能被稱為“job 配置”。然而,Job
只是整個層次結構的頂部,如下圖所示

在 Spring Batch 中,Job
僅僅是 Step
例項的容器。它將邏輯上屬於同一流程的多個 Step 組合在一起,並允許配置所有 Step 的全域性屬性,例如可重啟性。job 配置包含
-
job 的名稱。
-
Step
例項的定義和排序。 -
job 是否可重啟。
-
Java
-
XML
對於使用 Java 配置的使用者,Spring Batch 以 SimpleJob
類形式提供了 Job
介面的預設實現,它在 Job
的基礎上建立了一些標準功能。在使用基於 Java 的配置時,提供了一系列構建器用於例項化 Job
,如下例所示
@Bean
public Job footballJob(JobRepository jobRepository) {
return new JobBuilder("footballJob", jobRepository)
.start(playerLoad())
.next(gameLoad())
.next(playerSummarization())
.build();
}
對於使用 XML 配置的使用者,Spring Batch 以 SimpleJob
類形式提供了 Job
介面的預設實現,它在 Job
的基礎上建立了一些標準功能。然而,批處理名稱空間抽象了直接例項化它的需要。取而代之的是,您可以使用 <job>
元素,如下例所示
<job id="footballJob">
<step id="playerload" next="gameLoad"/>
<step id="gameLoad" next="playerSummarization"/>
<step id="playerSummarization"/>
</job>
JobInstance
JobInstance
指的是邏輯 job 執行的概念。考慮一個應在一天結束時執行的批處理 job,例如前圖中的 EndOfDay
Job
。只有一個 EndOfDay
job,但 Job
的每一次單獨執行都必須單獨跟蹤。對於此 job,每天有一個邏輯 JobInstance
。例如,有 1 月 1 日的執行、1 月 2 日的執行,依此類推。如果 1 月 1 日的執行第一次失敗,並在第二天再次執行,它仍然是 1 月 1 日的執行。(通常,這與它處理的資料也相對應,意味著 1 月 1 日的執行處理 1 月 1 日的資料)。因此,每個 JobInstance
可以有多次執行(JobExecution
在本章後面會更詳細地討論),並且在給定時間只能執行一個 JobInstance
(它對應於特定的 Job
和標識性 JobParameters
)。
JobInstance
的定義與要載入的資料沒有任何關係。完全取決於 ItemReader
的實現來決定如何載入資料。例如,在 EndOfDay
場景中,資料中可能有一個列指示資料所屬的 effective date
或 schedule date
。因此,1 月 1 日的執行僅載入 1 日的資料,而 1 月 2 日的執行僅使用 2 日的資料。由於這種決定很可能是一個業務決策,因此留給 ItemReader
來決定。然而,使用相同的 JobInstance
決定了是否使用之前執行的“狀態”(即本章後面討論的 ExecutionContext
)。使用新的 JobInstance
意味著“從頭開始”,而使用現有例項通常意味著“從上次停止的地方繼續”。
JobParameters
討論了 JobInstance
以及它與 Job
的區別後,自然會問的問題是:“如何區分不同的 JobInstance
?”答案是:JobParameters
。一個 JobParameters
物件包含用於啟動批處理 job 的一組引數。這些引數可用於標識,甚至在執行期間用作參考資料,如下圖所示

在前例中,有兩個例項,一個對應 1 月 1 日,另一個對應 1 月 2 日,實際上只有一個 Job
,但它有兩個 JobParameter
物件:一個是用 job 引數 01-01-2017 啟動的,另一個是用引數 01-02-2017 啟動的。因此,契約可以定義為:JobInstance
= Job
+ 標識性 JobParameters
。這允許開發者有效控制 JobInstance
的定義方式,因為他們控制傳入的引數。
並非所有 job 引數都必須用於標識 JobInstance 。預設情況下是這樣的。然而,框架也允許提交帶有不用於標識 JobInstance 的引數的 Job 。 |
JobExecution
JobExecution
指的是執行 Job 的單次嘗試的技術概念。一次執行可能以失敗或成功告終,但除非執行成功完成,否則對應的 JobInstance
不被認為是完成的。以前面描述的 EndOfDay
Job
為例,考慮 01-01-2017 的 JobInstance
,它在第一次執行時失敗了。如果使用與第一次執行相同的標識性 job 引數 (01-01-2017) 再次執行,則會建立一個新的 JobExecution
。然而,仍然只有一個 JobInstance
。
Job
定義了 job 是什麼以及如何執行,而 JobInstance
純粹是一個組織物件,用於將執行組合在一起,主要是為了實現正確的重啟語義。然而,JobExecution
是實際執行期間發生情況的主要儲存機制,包含許多必須控制和持久化的屬性,如下表所示
屬性 |
定義 |
|
一個 |
|
一個 |
|
一個 |
|
|
|
一個 |
|
一個 |
|
包含需要在執行之間持久化的任何使用者資料的“屬性包”。 |
|
執行 |
這些屬性很重要,因為它們會被持久化,並可用於完全確定執行的狀態。例如,如果 01-01 的 EndOfDay
job 在晚上 9:00 執行,並在 9:30 失敗,則會在批處理元資料表中記錄以下條目
JOB_INST_ID |
JOB_NAME |
1 |
EndOfDayJob |
JOB_EXECUTION_ID |
TYPE_CD |
KEY_NAME |
DATE_VAL |
IDENTIFYING |
1 |
DATE |
schedule.Date |
2017-01-01 |
TRUE |
JOB_EXEC_ID |
JOB_INST_ID |
START_TIME |
END_TIME |
STATUS |
1 |
1 |
2017-01-01 21:00 |
2017-01-01 21:30 |
FAILED |
為了清晰和格式化,列名可能已被縮寫或刪除。 |
現在 job 已失敗,假設確定問題花費了整個晚上,因此“批處理視窗”現已關閉。進一步假設視窗在晚上 9:00 開始,01-01 的 job 再次啟動,從上次停止的地方繼續,並在 9:30 成功完成。由於現在是第二天,01-02 的 job 也必須執行,它緊隨其後在 9:31 啟動,並在正常的一小時內於 10:30 完成。沒有要求一個 JobInstance
必須在另一個之後啟動,除非這兩個 job 有可能嘗試訪問相同的資料,從而導致資料庫級別的鎖定問題。何時執行 Job
完全取決於排程器決定。由於它們是獨立的 JobInstance
,Spring Batch 不會嘗試阻止它們併發執行。(嘗試在另一個 JobInstance
正在執行時運行同一個 JobInstance
會丟擲 JobExecutionAlreadyRunningException
異常)。現在,在 JobInstance
和 JobParameters
表中都應該有一個額外的條目,在 JobExecution
表中應該有兩個額外的條目,如下表所示
JOB_INST_ID |
JOB_NAME |
1 |
EndOfDayJob |
2 |
EndOfDayJob |
JOB_EXECUTION_ID |
TYPE_CD |
KEY_NAME |
DATE_VAL |
IDENTIFYING |
1 |
DATE |
schedule.Date |
2017-01-01 00:00:00 |
TRUE |
2 |
DATE |
schedule.Date |
2017-01-01 00:00:00 |
TRUE |
3 |
DATE |
schedule.Date |
2017-01-02 00:00:00 |
TRUE |
JOB_EXEC_ID |
JOB_INST_ID |
START_TIME |
END_TIME |
STATUS |
1 |
1 |
2017-01-01 21:00 |
2017-01-01 21:30 |
FAILED |
2 |
1 |
2017-01-02 21:00 |
2017-01-02 21:30 |
COMPLETED |
3 |
2 |
2017-01-02 21:31 |
2017-01-02 22:29 |
COMPLETED |
為了清晰和格式化,列名可能已被縮寫或刪除。 |
Step
Step
是一個領域物件,封裝了批處理 job 的一個獨立的、順序的階段。因此,每個 Job
完全由一個或多個 Step 組成。Step
包含了定義和控制實際批處理所需的所有資訊。這是一個必然含糊的描述,因為任何給定 Step
的內容取決於編寫 Job
的開發人員的決定。Step
可以像開發人員希望的那樣簡單或複雜。一個簡單的 Step
可能將資料從檔案載入到資料庫,幾乎不需要編寫程式碼(取決於使用的實現)。更復雜的 Step
可能包含作為處理一部分應用的複雜業務規則。與 Job
一樣,Step
有一個單獨的 StepExecution
,它與一個唯一的 JobExecution
相關聯,如下圖所示

StepExecution
StepExecution
表示執行 Step
的單次嘗試。每次執行 Step
時都會建立一個新的 StepExecution
,這與 JobExecution
類似。但是,如果一個 step 由於其之前的 step 失敗而未能執行,則不會為其持久化執行記錄。StepExecution
僅在其 Step
實際啟動時建立。
Step
執行由 StepExecution
類的物件表示。每次執行都包含對其對應 step 和 JobExecution
的引用以及事務相關資料,例如提交和回滾計數以及開始和結束時間。此外,每次 step 執行都包含一個 ExecutionContext
,其中包含開發人員需要在批處理執行之間持久化的任何資料,例如統計資訊或重啟所需的狀態資訊。下表列出了 StepExecution
的屬性
屬性 |
定義 |
|
一個 |
|
一個 |
|
一個 |
|
|
|
包含需要在執行之間持久化的任何使用者資料的“屬性包”。 |
|
成功讀取的專案數。 |
|
成功寫入的專案數。 |
|
此執行已提交的事務數。 |
|
由 |
|
|
|
|
|
被 |
|
|
ExecutionContext
ExecutionContext
表示鍵/值對的集合,由框架持久化和控制,為開發人員提供一個儲存與 StepExecution
物件或 JobExecution
物件作用域關聯的持久狀態的地方。(對於熟悉 Quartz 的人來說,它與 JobDataMap
非常相似。)最佳用法示例是輔助重啟。以平面檔案輸入為例,在處理單行資料時,框架會在提交點定期持久化 ExecutionContext
。這樣做可以讓 ItemReader
在執行期間發生致命錯誤或甚至斷電時儲存其狀態。所需做的就是將當前讀取的行數放入上下文中,如下例所示,其餘工作由框架完成
executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition());
使用 Job
原型部分中的 EndOfDay
示例,假設有一個 step,名為 loadData
,它將檔案載入到資料庫中。第一次失敗執行後,元資料表將如下例所示
JOB_INST_ID |
JOB_NAME |
1 |
EndOfDayJob |
JOB_INST_ID |
TYPE_CD |
KEY_NAME |
DATE_VAL |
1 |
DATE |
schedule.Date |
2017-01-01 |
JOB_EXEC_ID |
JOB_INST_ID |
START_TIME |
END_TIME |
STATUS |
1 |
1 |
2017-01-01 21:00 |
2017-01-01 21:30 |
FAILED |
STEP_EXEC_ID |
JOB_EXEC_ID |
STEP_NAME |
START_TIME |
END_TIME |
STATUS |
1 |
1 |
loadData |
2017-01-01 21:00 |
2017-01-01 21:30 |
FAILED |
STEP_EXEC_ID |
SHORT_CONTEXT |
1 |
{piece.count=40321} |
在前述情況下,Step
運行了 30 分鐘,處理了 40,321 個“片段”,在此場景中表示檔案中的行。該值由框架在每次提交之前更新,並可包含與 ExecutionContext
中的條目對應的多行。要在提交前收到通知,需要實現各種 StepListener
之一(或實現 ItemStream
),這將在本指南後面更詳細地討論。與前面的示例一樣,假設 Job
在第二天重啟。重啟時,上次執行的 ExecutionContext
中的值會從資料庫中重建。當 ItemReader
開啟時,它可以檢查上下文中是否有儲存的狀態,並從中初始化自身,如下例所示
if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
log.debug("Initializing for restart. Restart data is: " + executionContext);
long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));
LineReader reader = getReader();
Object record = "";
while (reader.getPosition() < lineCount && record != null) {
record = readLine();
}
}
在這種情況下,在前面的程式碼執行後,當前行是 40,322,允許 Step
從上次停止的地方再次開始。您還可以使用 ExecutionContext
來持久化關於執行本身的統計資訊。例如,如果一個平面檔案包含跨多行存在的待處理訂單,則可能需要儲存已處理的訂單數(這與讀取的行數大不相同),以便在 Step
結束時傳送一封包含已處理訂單總數的電子郵件。框架會為開發人員處理此資訊的儲存,以便將其正確地與單個 JobInstance
關聯起來。很難知道是否應該使用現有的 ExecutionContext
。例如,使用上面的 EndOfDay
示例,當 01-01 的執行第二次再次開始時,框架會識別出它是同一個 JobInstance
,並在單個 Step
的基礎上,從資料庫中取出 ExecutionContext
,並將其(作為 StepExecution
的一部分)傳遞給 Step
本身。相反,對於 01-02 的執行,框架會識別出它是一個不同的例項,因此必須將一個空的 context 傳遞給 Step
。框架為開發人員做了許多此類判斷,以確保在正確的時間將狀態提供給他們。同樣重要的是要注意,在任何給定時間,每個 StepExecution
恰好存在一個 ExecutionContext
。ExecutionContext
的客戶端應謹慎,因為這會建立一個共享的鍵空間。因此,在存入值時應小心,確保資料不會被覆蓋。然而,Step
在 context 中完全不儲存資料,因此無法對框架產生不利影響。
注意,每個 JobExecution
至少有一個 ExecutionContext
,每個 StepExecution
也有一個。例如,考慮以下程式碼片段
ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
//ecStep does not equal ecJob
如註釋中所述,ecStep
不等於 ecJob
。它們是兩個不同的 ExecutionContexts
。作用域為 Step
的 Context 在 Step
的每個提交點儲存,而作用域為 Job 的 Context 在每次 Step
執行之間儲存。
在 ExecutionContext 中,所有非 transient 的條目必須是 Serializable 。執行上下文的正確序列化是 steps 和 jobs 重啟能力的基礎。如果您使用的鍵或值不是 natively serializable,則需要採用定製的序列化方法。未能序列化執行上下文可能會危及狀態持久化過程,導致失敗的 jobs 無法正確恢復。 |
JobRepository
JobRepository
是前面提到的所有原型的持久化機制。它為 JobLauncher
、Job
和 Step
實現提供了 CRUD 操作。首次啟動 Job
時,會從 repository 獲取一個 JobExecution
。此外,在執行過程中,透過將 StepExecution
和 JobExecution
實現傳遞給 repository 來進行持久化。
-
Java
-
XML
使用 Java 配置時,@EnableBatchProcessing
註解提供了一個 JobRepository
作為自動配置的元件之一。
Spring Batch XML 名稱空間支援使用 <job-repository>
標籤配置 JobRepository
例項,如下例所示
<job-repository id="jobRepository"/>
JobLauncher
JobLauncher
表示一個簡單的介面,用於使用給定的一組 JobParameters
啟動 Job
,如下例所示
public interface JobLauncher {
public JobExecution run(Job job, JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException,
JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}
預期實現會從 JobRepository
獲取有效的 JobExecution
並執行 Job
。
ItemReader
ItemReader
是一個抽象,表示為 Step
逐項檢索輸入。當 ItemReader
提供完所有專案時,它透過返回 null
來指示。關於 ItemReader
介面及其各種實現的更多詳細資訊,請參見Reader 和 Writer。
ItemWriter
ItemWriter
是一個抽象,表示 Step
的輸出,每次一個批次或一個塊的專案。通常,ItemWriter
不知道接下來應該接收什麼輸入,只知道當前呼叫中傳遞的專案。關於 ItemWriter
介面及其各種實現的更多詳細資訊,請參見Reader 和 Writer。
ItemProcessor
ItemProcessor
是一個抽象,表示專案的業務處理。ItemReader
讀入一個專案,ItemWriter
寫出一個專案,而 ItemProcessor
提供了一個訪問點來轉換或應用其他業務處理。如果在處理專案時確定該專案無效,返回 null
表示該專案不應被寫出。關於 ItemProcessor
介面的更多詳細資訊,請參見Reader 和 Writer。
批處理名稱空間
前面列出的許多領域概念需要在 Spring ApplicationContext
中進行配置。雖然可以使用上述介面的實現進行標準 bean 定義,但為了便於配置,提供了一個名稱空間,如下例所示
<beans:beans xmlns="http://www.springframework.org/schema/batch"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/batch
https://www.springframework.org/schema/batch/spring-batch.xsd">
<job id="ioSampleJob">
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
</tasklet>
</step>
</job>
</beans:beans>