批處理的領域語言
對於任何經驗豐富的批處理架構師來說,Spring Batch 中使用的批處理總體概念應該會感到熟悉和自在。它有“作業(Jobs)”和“步驟(Steps)”,以及由開發者提供的處理單元,名為 ItemReader 和 ItemWriter。然而,由於 Spring 的模式、操作、模板、回撥和慣用法,存在以下機遇:
-
顯著改進對明確關注點分離的遵循。
-
清晰劃分的架構層和以介面形式提供的服務。
-
簡單且預設的實現,使得開箱即用,快速採用且易於使用。
-
顯著增強的可擴充套件性。
以下圖表是已使用了數十年的批處理參考架構的簡化版本。它概述了構成批處理領域語言的元件。該架構框架是一個藍圖,經過數十年來在幾代平臺(大型機上的 COBOL、Unix 上的 C 以及現在的 Java anywhere)上的實現驗證。JCL 和 COBOL 開發者可能會像 C、C# 和 Java 開發者一樣對這些概念感到熟悉。Spring Batch 提供了對健壯、可維護系統中常見的層、元件和技術服務的物理實現,這些系統用於處理從簡單到複雜的批處理應用程式的建立,並提供了基礎設施和擴充套件以應對非常複雜的處理需求。
上圖突出了構成 Spring Batch 領域語言的關鍵概念。一個 Job 有一個或多個步驟(Step),每個步驟都只有一個 ItemReader、一個可選的 ItemProcessor 和一個 ItemWriter。一個作業由 JobOperator 操作(啟動、停止等),關於當前執行過程的元資料儲存在 JobRepository 中並從中恢復。
作業(Job)
本節描述與批處理作業概念相關的原型。Job 是一個封裝整個批處理過程的實體。與 Spring 的其他專案一樣,Job 透過 XML 配置檔案或基於 Java 的配置進行連線。此配置可以稱為“作業配置”。然而,Job 僅僅是整個層次結構的頂部,如下圖所示:
在 Spring Batch 中,Job 只是 Step 例項的容器。它將多個邏輯上屬於同一流程的步驟組合在一起,並允許配置所有步驟的全域性屬性,例如可重新啟動性。作業配置包含:
-
作業名稱。
-
Step例項的定義和排序。 -
作業是否可重新啟動。
-
Java
-
XML
對於使用 Java 配置的使用者,Spring Batch 以 SimpleJob 類的形式提供了 Job 介面的預設實現,它在 Job 的基礎上建立了一些標準功能。當使用基於 Java 的配置時,一組構建器可用於例項化 Job,如下例所示:
@Bean
public Job footballJob(JobRepository jobRepository) {
return new JobBuilder("footballJob", jobRepository)
.start(playerLoad())
.next(gameLoad())
.next(playerSummarization())
.build();
}
對於使用 XML 配置的使用者,Spring Batch 以 SimpleJob 類的形式提供了 Job 介面的預設實現,它在 Job 的基礎上建立了一些標準功能。但是,批處理名稱空間抽象掉了直接例項化它的需要。相反,您可以使用 <job> 元素,如下例所示:
<job id="footballJob">
<step id="playerload" next="gameLoad"/>
<step id="gameLoad" next="playerSummarization"/>
<step id="playerSummarization"/>
</job>
JobInstance
JobInstance 指的是邏輯作業執行的概念。考慮一個批處理作業,它應該在一天結束時執行一次,例如上圖中 EndOfDay Job。只有一個 EndOfDay 作業,但 Job 的每次單獨執行都必須單獨跟蹤。在這種情況下,每天有一個邏輯 JobInstance。例如,有 1 月 1 日的執行,1 月 2 日的執行,依此類推。如果 1 月 1 日的執行第一次失敗,並在第二天再次執行,它仍然是 1 月 1 日的執行。(通常,這也與它正在處理的資料相對應,這意味著 1 月 1 日的執行處理 1 月 1 日的資料)。因此,每個 JobInstance 可以有多次執行(JobExecution 在本章後面將更詳細地討論),並且在給定時間只能執行一個 JobInstance(它對應於特定的 Job 和標識性的 JobParameters)。
JobInstance 的定義與要載入的資料絕對無關。它完全由 ItemReader 的實現來決定如何載入資料。例如,在 EndOfDay 場景中,資料上可能有一個列指示資料所屬的 有效日期 或 計劃日期。因此,1 月 1 日的執行將只加載 1 日的資料,而 1 月 2 日的執行將只使用 2 日的資料。因為這個決定很可能是一個業務決策,所以它留給 ItemReader 來決定。但是,使用相同的 JobInstance 決定是否使用來自先前執行的“狀態”(即 ExecutionContext,這將在本章後面討論)。使用新的 JobInstance 意味著“從頭開始”,而使用現有例項通常意味著“從上次中斷的地方開始”。
JobParameters
討論了 JobInstance 以及它與 Job 的區別之後,自然會問一個問題:“如何區分不同的 JobInstance?”答案是:JobParameters。JobParameters 物件包含一組用於啟動批處理作業的引數。它們可以用於標識,甚至在執行期間用作參考資料,如下圖所示:
在 Job Instance 部分的示例中,有兩個例項,一個用於 1 月 1 日,另一個用於 1 月 2 日,實際上只有一個 Job,但它有兩個 JobParameter 物件:一個以 01-01-2017 的作業引數啟動,另一個以 01-02-2017 的引數啟動。因此,契約可以定義為:JobInstance = Job + 標識性 JobParameters。這允許開發人員有效地控制 JobInstance 的定義方式,因為他們控制傳入的引數。
並非所有作業引數都必須有助於 JobInstance 的識別。預設情況下,它們會這樣做。但是,該框架也允許提交帶有不有助於 JobInstance 身份的引數的 Job。 |
JobExecution
JobExecution 指的是單次嘗試執行作業的技術概念。一次執行可能以失敗或成功告終,但除非執行成功完成,否則與給定執行對應的 JobInstance 不會被視為已完成。以上面描述的 EndOfDay Job 為例,考慮一個 01-01-2017 的 JobInstance,它在第一次執行時失敗了。如果它以與第一次執行相同的標識性作業引數(01-01-2017)再次執行,則會建立一個新的 JobExecution。但是,仍然只有一個 JobInstance。
Job 定義了一個作業是什麼以及如何執行它,而 JobInstance 是一個純粹的組織物件,用於將執行分組在一起,主要用於實現正確的重啟語義。然而,JobExecution 是實際執行期間發生情況的主要儲存機制,幷包含許多必須受控和持久化的屬性,如下表所示:
財產 |
定義 |
|
一個 |
|
一個 |
|
一個 |
|
|
|
一個 |
|
一個 |
|
包含在執行之間需要持久化的任何使用者資料的“屬性包”。 |
|
在 |
這些屬性很重要,因為它們是持久化的,並且可以用來完全確定執行的狀態。例如,如果 01-01 的 EndOfDay 作業在晚上 9:00 執行並在 9:30 失敗,則批處理元資料表中會新增以下條目:
JOB_INST_ID |
JOB_NAME |
1 |
EndOfDayJob |
JOB_EXECUTION_ID |
TYPE_CD |
KEY_NAME |
DATE_VAL |
IDENTIFYING |
1 |
DATE |
schedule.Date |
2017-01-01 |
TRUE |
JOB_EXEC_ID |
JOB_INST_ID |
START_TIME |
END_TIME |
STATUS |
1 |
1 |
2017-01-01 21:00 |
2017-01-01 21:30 |
FAILED |
| 為了清晰和排版,列名可能已被縮寫或刪除。 |
現在作業已經失敗,假設確定問題花了整晚時間,所以“批處理視窗”現在已經關閉。進一步假設視窗從晚上 9:00 開始,作業再次為 01-01 啟動,從上次中斷的地方開始,並在 9:30 成功完成。因為現在是第二天,所以 01-02 作業也必須執行,它緊接著在 9:31 啟動,並在其正常的一小時時間 10:30 完成。一個 JobInstance 沒有要求必須在另一個之後啟動,除非兩個作業有可能嘗試訪問相同的資料,從而導致資料庫級別的鎖定問題。完全由排程程式決定何時執行 Job。由於它們是獨立的 JobInstances,Spring Batch 不會嘗試阻止它們併發執行。(嘗試在另一個 JobInstance 已經在執行時執行相同的 JobInstance 會導致丟擲 JobExecutionAlreadyRunningException)。現在,JobInstance 和 JobParameters 表中應該會有一個額外的條目,並且 JobExecution 表中會有兩個額外的條目,如下表所示:
JOB_INST_ID |
JOB_NAME |
1 |
EndOfDayJob |
2 |
EndOfDayJob |
JOB_EXECUTION_ID |
TYPE_CD |
KEY_NAME |
DATE_VAL |
IDENTIFYING |
1 |
DATE |
schedule.Date |
2017-01-01 00:00:00 |
TRUE |
2 |
DATE |
schedule.Date |
2017-01-01 00:00:00 |
TRUE |
3 |
DATE |
schedule.Date |
2017-01-02 00:00:00 |
TRUE |
JOB_EXEC_ID |
JOB_INST_ID |
START_TIME |
END_TIME |
STATUS |
1 |
1 |
2017-01-01 21:00 |
2017-01-01 21:30 |
FAILED |
2 |
1 |
2017-01-02 21:00 |
2017-01-02 21:30 |
COMPLETED |
3 |
2 |
2017-01-02 21:31 |
2017-01-02 22:29 |
COMPLETED |
| 為了清晰和排版,列名可能已被縮寫或刪除。 |
步驟(Step)
Step 是一個領域物件,它封裝了批處理作業的一個獨立的、順序的階段。因此,每個 Job 完全由一個或多個步驟組成。Step 包含定義和控制實際批處理所需的所有資訊。這是一個必然模糊的描述,因為任何給定 Step 的內容都由編寫 Job 的開發人員自行決定。一個 Step 可以像開發人員期望的那樣簡單或複雜。一個簡單的 Step 可能會將資料從檔案載入到資料庫中,幾乎不需要或根本不需要程式碼(取決於所使用的實現)。一個更復雜的 Step 可能具有在處理過程中應用的複雜業務規則。與 Job 一樣,Step 具有與唯一的 JobExecution 相關聯的單獨 StepExecution,如下圖所示:
StepExecution
StepExecution 表示執行 Step 的單次嘗試。每次執行 Step 時都會建立一個新的 StepExecution,類似於 JobExecution。但是,如果一個步驟因其之前的步驟失敗而未能執行,則不會為其持久化執行。只有當 Step 實際啟動時,才會建立 StepExecution。
Step 執行由 StepExecution 類的物件表示。每次執行都包含對其相應步驟和 JobExecution 的引用以及與事務相關的資料,例如提交和回滾計數以及開始和結束時間。此外,每個步驟執行都包含一個 ExecutionContext,其中包含開發人員需要跨批處理執行持久化的任何資料,例如統計資料或重新啟動所需的狀態資訊。下表列出了 StepExecution 的屬性:
財產 |
定義 |
|
一個 |
|
一個 |
|
一個 |
|
指示執行結果的 |
|
包含在執行之間需要持久化的任何使用者資料的“屬性包”。 |
|
已成功讀取的條目數。 |
|
已成功寫入的條目數。 |
|
此執行已提交的事務數。 |
|
由 |
|
|
|
|
|
已由 |
|
|
ExecutionContext
ExecutionContext 表示由框架持久化和控制的鍵/值對集合,為開發人員提供了一個儲存範圍限定於 StepExecution 物件或 JobExecution 物件的持久化狀態的地方。(對於熟悉 Quartz 的人來說,它與 JobDataMap 非常相似。)最好的使用示例是方便重新啟動。以平面檔案輸入為例,在處理單個行時,框架在提交點定期持久化 ExecutionContext。這樣做可以讓 ItemReader 儲存其狀態,以防在執行期間發生致命錯誤,甚至停電。所有需要做的就是將當前讀取的行數放入上下文中,如下例所示,其餘的由框架完成:
executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition());
以上述作業原型中的 EndOfDay 示例為例,假設有一個步驟 loadData,它將檔案載入到資料庫中。第一次執行失敗後,元資料表將如下圖所示:
JOB_INST_ID |
JOB_NAME |
1 |
EndOfDayJob |
JOB_INST_ID |
TYPE_CD |
KEY_NAME |
DATE_VAL |
1 |
DATE |
schedule.Date |
2017-01-01 |
JOB_EXEC_ID |
JOB_INST_ID |
START_TIME |
END_TIME |
STATUS |
1 |
1 |
2017-01-01 21:00 |
2017-01-01 21:30 |
FAILED |
STEP_EXEC_ID |
JOB_EXEC_ID |
STEP_NAME |
START_TIME |
END_TIME |
STATUS |
1 |
1 |
loadData |
2017-01-01 21:00 |
2017-01-01 21:30 |
FAILED |
STEP_EXEC_ID |
SHORT_CONTEXT |
1 |
{piece.count=40321} |
在上述情況中,Step 運行了 30 分鐘並處理了 40,321 個“片段”,在此場景中代表檔案中的行數。該值在每次提交之前由框架更新,並且可以包含與 ExecutionContext 中的條目對應的多行。在提交之前收到通知需要使用各種 StepListener 實現(或 ItemStream),這些將在本指南後面更詳細地討論。與前面的示例一樣,假設作業在第二天重新啟動。當它重新啟動時,上次執行的 ExecutionContext 中的值將從資料庫中重構。當 ItemReader 開啟時,它可以檢查上下文中是否有任何儲存狀態,並從中初始化自身,如下例所示:
if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
log.debug("Initializing for restart. Restart data is: " + executionContext);
long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));
LineReader reader = getReader();
Object record = "";
while (reader.getPosition() < lineCount && record != null) {
record = readLine();
}
}
在這種情況下,上述程式碼執行後,當前行數為 40,322,讓 Step 可以從上次中斷的地方重新開始。您還可以使用 ExecutionContext 儲存執行本身的統計資料。例如,如果一個平面檔案包含跨多行存在的要處理的訂單,則可能需要儲存已處理的訂單數量(這與讀取的行數大不相同),以便在 Step 結束時傳送一封包含已處理訂單總數的電子郵件。框架會為開發人員處理此儲存,以將其正確限定在單個 JobInstance 中。要知道是否應該使用現有的 ExecutionContext 可能非常困難。例如,使用上面 EndOfDay 示例,當 01-01 執行第二次啟動時,框架會識別它是同一個 JobInstance,並且在單個 Step 基礎上,從資料庫中提取 ExecutionContext,並將其(作為 StepExecution 的一部分)傳遞給 Step 本身。相反,對於 01-02 執行,框架會識別它是一個不同的例項,因此必須將一個空上下文傳遞給 Step。框架會為開發人員做出許多此類決定,以確保在正確的時間將狀態提供給他們。還需要注意的是,在任何給定時間,每個 StepExecution 都恰好存在一個 ExecutionContext。ExecutionContext 的客戶端應注意,因為這會建立一個共享鍵空間。因此,在放入值時應小心,以確保沒有資料被覆蓋。但是,Step 絕對不會在上下文中儲存任何資料,因此不會對框架產生不利影響。
請注意,每個 JobExecution 至少有一個 ExecutionContext,每個 StepExecution 也有一個。例如,考慮以下程式碼片段:
ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
//ecStep does not equal ecJob
正如註釋中提到的,ecStep 不等於 ecJob。它們是兩個不同的 ExecutionContexts。限定在 Step 範圍內的 ExecutionContext 在 Step 的每個提交點儲存,而限定在 Job 範圍內的 ExecutionContext 在每次 Step 執行之間儲存。
在 ExecutionContext 中,所有非瞬態條目都必須是 Serializable。執行上下文的正確序列化是步驟和作業重新啟動能力的基礎。如果您使用並非原生可序列化的鍵或值,則需要採用定製的序列化方法。未能序列化執行上下文可能會危及狀態持久化過程,導致失敗的作業無法正確恢復。 |
JobRepository
JobRepository 是所有前面提到的原型的持久化機制。它為 JobLauncher、Job 和 Step 實現提供 CRUD 操作。當 Job 首次啟動時,會從倉庫中獲取 JobExecution。此外,在執行過程中,StepExecution 和 JobExecution 實現透過將其傳遞給倉庫進行持久化。
-
Java
-
XML
使用 Java 配置時,@EnableBatchProcessing 註解將 JobRepository 作為自動配置的元件之一提供。
Spring Batch XML 名稱空間支援使用 <job-repository> 標籤配置 JobRepository 例項,如下例所示:
<job-repository id="jobRepository"/>
JobOperator
JobOperator 表示用於啟動、停止和重新啟動作業的簡單介面,如下例所示:
public interface JobOperator {
JobExecution start(Job job, JobParameters jobParameters) throws Exception;
JobExecution startNextInstance(Job job) throws Exception;
boolean stop(JobExecution jobExecution) throws Exception;
JobExecution restart(JobExecution jobExecution) throws Exception;
JobExecution abandon(JobExecution jobExecution) throws Exception;
}
Job 以給定的一組 JobParameters 啟動。實現應從 JobRepository 獲取有效的 JobExecution 並執行 Job。
ItemReader
ItemReader 是一個抽象,表示一次一個地檢索 Step 的輸入。當 ItemReader 用盡了它可以提供的專案時,它透過返回 null 來指示這一點。您可以在 讀取器和寫入器 中找到有關 ItemReader 介面及其各種實現的更多詳細資訊。
ItemWriter
ItemWriter 是一個抽象,表示 Step 的輸出,一次一批或一塊專案。通常,ItemWriter 對它接下來應該接收的輸入一無所知,只知道在其當前呼叫中傳遞的專案。您可以在 讀取器和寫入器 中找到有關 ItemWriter 介面及其各種實現的更多詳細資訊。
ItemProcessor
ItemProcessor 是一個抽象,表示對項的業務處理。ItemReader 讀取一個項,ItemWriter 寫入一個項,而 ItemProcessor 提供了一個訪問點來轉換或應用其他業務處理。如果在處理項時確定該項無效,返回 null 表示該項不應被寫入。您可以在 讀取器和寫入器 中找到有關 ItemProcessor 介面的更多詳細資訊。
批處理名稱空間
上述許多領域概念需要在 Spring ApplicationContext 中進行配置。雖然存在可在標準 bean 定義中使用的上述介面實現,但為了方便配置,提供了名稱空間,如下例所示:
<beans:beans xmlns="http://www.springframework.org/schema/batch"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/batch
https://www.springframework.org/schema/batch/spring-batch.xsd">
<job id="ioSampleJob">
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
</tasklet>
</step>
</job>
</beans:beans>
| 批處理 XML 名稱空間自 Spring Batch 6.0 起已棄用,並將在 7.0 版本中移除。 |