批處理的領域語言

對於任何經驗豐富的批處理架構師來說,Spring Batch 中使用的批處理總體概念應該是熟悉且舒適的。有 “Jobs” 和 “Steps”,以及開發者提供的處理單元 ItemReaderItemWriter。然而,由於 Spring 的模式、操作、模板、回撥和慣用法,為以下方面提供了機會

  • 顯著改進了清晰的關注點分離原則的應用。

  • 清晰劃分的架構層以及作為介面提供的服務。

  • 提供簡單和預設的實現,以便快速採用和開箱即用。

  • 顯著增強了可擴充套件性。

下圖是批處理參考架構的簡化版本,該架構已使用了數十年。它概述了構成批處理領域語言的元件。這個架構框架是一個藍圖,已在過去幾代平臺(大型機上的 COBOL、Unix 上的 C 以及現在的任何地方的 Java)上透過數十年的實現得到驗證。JCL 和 COBOL 開發者可能與 C、C# 和 Java 開發者一樣熟悉這些概念。Spring Batch 提供了健壯、可維護系統中常見的分層、元件和技術服務的物理實現,這些系統用於構建從簡單到複雜的批處理應用,並提供基礎設施和擴充套件來滿足非常複雜的處理需求。

Figure 2.1: Batch Stereotypes
圖 1. 批處理原型

前圖強調了構成 Spring Batch 領域語言的關鍵概念。一個 Job 包含一個或多個 Step,每個 Step 恰好有一個 ItemReader、一個 ItemProcessor 和一個 ItemWriter。Job 需要透過 JobLauncher 啟動,並且當前執行過程的元資料需要儲存在 JobRepository 中。

Job

本節描述與批處理 Job 概念相關的原型。Job 是一個封裝整個批處理過程的實體。與其他 Spring 專案一樣,Job 可以透過 XML 配置檔案或基於 Java 的配置進行連線。這種配置可能被稱為“job 配置”。然而,Job 只是整個層次結構的頂部,如下圖所示

Job Hierarchy
圖 2. Job 層次結構

在 Spring Batch 中,Job 僅僅是 Step 例項的容器。它將邏輯上屬於同一流程的多個 Step 組合在一起,並允許配置所有 Step 的全域性屬性,例如可重啟性。job 配置包含

  • job 的名稱。

  • Step 例項的定義和排序。

  • job 是否可重啟。

  • Java

  • XML

對於使用 Java 配置的使用者,Spring Batch 以 SimpleJob 類形式提供了 Job 介面的預設實現,它在 Job 的基礎上建立了一些標準功能。在使用基於 Java 的配置時,提供了一系列構建器用於例項化 Job,如下例所示

@Bean
public Job footballJob(JobRepository jobRepository) {
    return new JobBuilder("footballJob", jobRepository)
                     .start(playerLoad())
                     .next(gameLoad())
                     .next(playerSummarization())
                     .build();
}

對於使用 XML 配置的使用者,Spring Batch 以 SimpleJob 類形式提供了 Job 介面的預設實現,它在 Job 的基礎上建立了一些標準功能。然而,批處理名稱空間抽象了直接例項化它的需要。取而代之的是,您可以使用 <job> 元素,如下例所示

<job id="footballJob">
    <step id="playerload" next="gameLoad"/>
    <step id="gameLoad" next="playerSummarization"/>
    <step id="playerSummarization"/>
</job>

JobInstance

JobInstance 指的是邏輯 job 執行的概念。考慮一個應在一天結束時執行的批處理 job,例如前圖中的 EndOfDay Job。只有一個 EndOfDay job,但 Job 的每一次單獨執行都必須單獨跟蹤。對於此 job,每天有一個邏輯 JobInstance。例如,有 1 月 1 日的執行、1 月 2 日的執行,依此類推。如果 1 月 1 日的執行第一次失敗,並在第二天再次執行,它仍然是 1 月 1 日的執行。(通常,這與它處理的資料也相對應,意味著 1 月 1 日的執行處理 1 月 1 日的資料)。因此,每個 JobInstance 可以有多次執行(JobExecution 在本章後面會更詳細地討論),並且在給定時間只能執行一個 JobInstance(它對應於特定的 Job 和標識性 JobParameters)。

JobInstance 的定義與要載入的資料沒有任何關係。完全取決於 ItemReader 的實現來決定如何載入資料。例如,在 EndOfDay 場景中,資料中可能有一個列指示資料所屬的 effective dateschedule date。因此,1 月 1 日的執行僅載入 1 日的資料,而 1 月 2 日的執行僅使用 2 日的資料。由於這種決定很可能是一個業務決策,因此留給 ItemReader 來決定。然而,使用相同的 JobInstance 決定了是否使用之前執行的“狀態”(即本章後面討論的 ExecutionContext)。使用新的 JobInstance 意味著“從頭開始”,而使用現有例項通常意味著“從上次停止的地方繼續”。

JobParameters

討論了 JobInstance 以及它與 Job 的區別後,自然會問的問題是:“如何區分不同的 JobInstance?”答案是:JobParameters。一個 JobParameters 物件包含用於啟動批處理 job 的一組引數。這些引數可用於標識,甚至在執行期間用作參考資料,如下圖所示

Job Parameters
圖 3. Job Parameters

在前例中,有兩個例項,一個對應 1 月 1 日,另一個對應 1 月 2 日,實際上只有一個 Job,但它有兩個 JobParameter 物件:一個是用 job 引數 01-01-2017 啟動的,另一個是用引數 01-02-2017 啟動的。因此,契約可以定義為:JobInstance = Job + 標識性 JobParameters。這允許開發者有效控制 JobInstance 的定義方式,因為他們控制傳入的引數。

並非所有 job 引數都必須用於標識 JobInstance。預設情況下是這樣的。然而,框架也允許提交帶有不用於標識 JobInstance 的引數的 Job

JobExecution

JobExecution 指的是執行 Job 的單次嘗試的技術概念。一次執行可能以失敗或成功告終,但除非執行成功完成,否則對應的 JobInstance 不被認為是完成的。以前面描述的 EndOfDay Job 為例,考慮 01-01-2017 的 JobInstance,它在第一次執行時失敗了。如果使用與第一次執行相同的標識性 job 引數 (01-01-2017) 再次執行,則會建立一個新的 JobExecution。然而,仍然只有一個 JobInstance

Job 定義了 job 是什麼以及如何執行,而 JobInstance 純粹是一個組織物件,用於將執行組合在一起,主要是為了實現正確的重啟語義。然而,JobExecution 是實際執行期間發生情況的主要儲存機制,包含許多必須控制和持久化的屬性,如下表所示

表 1. JobExecution 屬性

屬性

定義

狀態

一個 BatchStatus 物件,指示執行的狀態。執行時為 BatchStatus#STARTED。如果失敗,則為 BatchStatus#FAILED。如果成功完成,則為 BatchStatus#COMPLETED

startTime

一個 java.time.LocalDateTime 物件,表示執行開始時的當前系統時間。如果 job 尚未開始,此欄位為空。

endTime

一個 java.time.LocalDateTime 物件,表示執行完成時的當前系統時間,無論是否成功。如果 job 尚未完成,此欄位為空。

exitStatus

ExitStatus,表示執行結果。它非常重要,因為它包含返回給呼叫者的退出程式碼。更多詳情請參閱第 5 章。如果 job 尚未完成,此欄位為空。

createTime

一個 java.time.LocalDateTime 物件,表示 JobExecution 首次持久化時的當前系統時間。Job 可能尚未開始(因此沒有開始時間),但它始終有一個 createTime,這是框架管理 job 級別 ExecutionContexts 所必需的。

lastUpdated

一個 java.time.LocalDateTime 物件,表示 JobExecution 最後一次持久化時的當前系統時間。如果 job 尚未開始,此欄位為空。

executionContext

包含需要在執行之間持久化的任何使用者資料的“屬性包”。

failureExceptions

執行 Job 期間遇到的異常列表。如果在 Job 失敗期間遇到不止一個異常,這些異常會很有用。

這些屬性很重要,因為它們會被持久化,並可用於完全確定執行的狀態。例如,如果 01-01 的 EndOfDay job 在晚上 9:00 執行,並在 9:30 失敗,則會在批處理元資料表中記錄以下條目

表 2. BATCH_JOB_INSTANCE

JOB_INST_ID

JOB_NAME

1

EndOfDayJob

表 3. BATCH_JOB_EXECUTION_PARAMS

JOB_EXECUTION_ID

TYPE_CD

KEY_NAME

DATE_VAL

IDENTIFYING

1

DATE

schedule.Date

2017-01-01

TRUE

表 4. BATCH_JOB_EXECUTION

JOB_EXEC_ID

JOB_INST_ID

START_TIME

END_TIME

STATUS

1

1

2017-01-01 21:00

2017-01-01 21:30

FAILED

為了清晰和格式化,列名可能已被縮寫或刪除。

現在 job 已失敗,假設確定問題花費了整個晚上,因此“批處理視窗”現已關閉。進一步假設視窗在晚上 9:00 開始,01-01 的 job 再次啟動,從上次停止的地方繼續,並在 9:30 成功完成。由於現在是第二天,01-02 的 job 也必須執行,它緊隨其後在 9:31 啟動,並在正常的一小時內於 10:30 完成。沒有要求一個 JobInstance 必須在另一個之後啟動,除非這兩個 job 有可能嘗試訪問相同的資料,從而導致資料庫級別的鎖定問題。何時執行 Job 完全取決於排程器決定。由於它們是獨立的 JobInstance,Spring Batch 不會嘗試阻止它們併發執行。(嘗試在另一個 JobInstance 正在執行時運行同一個 JobInstance 會丟擲 JobExecutionAlreadyRunningException 異常)。現在,在 JobInstanceJobParameters 表中都應該有一個額外的條目,在 JobExecution 表中應該有兩個額外的條目,如下表所示

表 5. BATCH_JOB_INSTANCE

JOB_INST_ID

JOB_NAME

1

EndOfDayJob

2

EndOfDayJob

表 6. BATCH_JOB_EXECUTION_PARAMS

JOB_EXECUTION_ID

TYPE_CD

KEY_NAME

DATE_VAL

IDENTIFYING

1

DATE

schedule.Date

2017-01-01 00:00:00

TRUE

2

DATE

schedule.Date

2017-01-01 00:00:00

TRUE

3

DATE

schedule.Date

2017-01-02 00:00:00

TRUE

表 7. BATCH_JOB_EXECUTION

JOB_EXEC_ID

JOB_INST_ID

START_TIME

END_TIME

STATUS

1

1

2017-01-01 21:00

2017-01-01 21:30

FAILED

2

1

2017-01-02 21:00

2017-01-02 21:30

COMPLETED

3

2

2017-01-02 21:31

2017-01-02 22:29

COMPLETED

為了清晰和格式化,列名可能已被縮寫或刪除。

Step

Step 是一個領域物件,封裝了批處理 job 的一個獨立的、順序的階段。因此,每個 Job 完全由一個或多個 Step 組成。Step 包含了定義和控制實際批處理所需的所有資訊。這是一個必然含糊的描述,因為任何給定 Step 的內容取決於編寫 Job 的開發人員的決定。Step 可以像開發人員希望的那樣簡單或複雜。一個簡單的 Step 可能將資料從檔案載入到資料庫,幾乎不需要編寫程式碼(取決於使用的實現)。更復雜的 Step 可能包含作為處理一部分應用的複雜業務規則。與 Job 一樣,Step 有一個單獨的 StepExecution,它與一個唯一的 JobExecution 相關聯,如下圖所示

Figure 2.1: Job Hierarchy With Steps
圖 4. 包含 Step 的 Job 層次結構

StepExecution

StepExecution 表示執行 Step 的單次嘗試。每次執行 Step 時都會建立一個新的 StepExecution,這與 JobExecution 類似。但是,如果一個 step 由於其之前的 step 失敗而未能執行,則不會為其持久化執行記錄。StepExecution 僅在其 Step 實際啟動時建立。

Step 執行由 StepExecution 類的物件表示。每次執行都包含對其對應 step 和 JobExecution 的引用以及事務相關資料,例如提交和回滾計數以及開始和結束時間。此外,每次 step 執行都包含一個 ExecutionContext,其中包含開發人員需要在批處理執行之間持久化的任何資料,例如統計資訊或重啟所需的狀態資訊。下表列出了 StepExecution 的屬性

表 8. StepExecution 屬性

屬性

定義

狀態

一個 BatchStatus 物件,指示執行的狀態。執行時狀態為 BatchStatus.STARTED。如果失敗,狀態為 BatchStatus.FAILED。如果成功完成,狀態為 BatchStatus.COMPLETED

startTime

一個 java.time.LocalDateTime 物件,表示執行開始時的當前系統時間。如果 step 尚未開始,此欄位為空。

endTime

一個 java.time.LocalDateTime 物件,表示執行完成時的當前系統時間,無論是否成功。如果 step 尚未退出,此欄位為空。

exitStatus

ExitStatus,指示執行結果。它非常重要,因為它包含返回給呼叫者的退出程式碼。更多詳情請參閱第 5 章。如果 job 尚未退出,此欄位為空。

executionContext

包含需要在執行之間持久化的任何使用者資料的“屬性包”。

readCount

成功讀取的專案數。

writeCount

成功寫入的專案數。

commitCount

此執行已提交的事務數。

rollbackCount

Step 控制的業務事務回滾的次數。

readSkipCount

read 失敗導致跳過專案的次數。

processSkipCount

process 失敗導致跳過專案的次數。

filterCount

ItemProcessor“過濾”掉的專案數。

writeSkipCount

write 失敗導致跳過專案的次數。

ExecutionContext

ExecutionContext 表示鍵/值對的集合,由框架持久化和控制,為開發人員提供一個儲存與 StepExecution 物件或 JobExecution 物件作用域關聯的持久狀態的地方。(對於熟悉 Quartz 的人來說,它與 JobDataMap 非常相似。)最佳用法示例是輔助重啟。以平面檔案輸入為例,在處理單行資料時,框架會在提交點定期持久化 ExecutionContext。這樣做可以讓 ItemReader 在執行期間發生致命錯誤或甚至斷電時儲存其狀態。所需做的就是將當前讀取的行數放入上下文中,如下例所示,其餘工作由框架完成

executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition());

使用 Job 原型部分中的 EndOfDay 示例,假設有一個 step,名為 loadData,它將檔案載入到資料庫中。第一次失敗執行後,元資料表將如下例所示

表 9. BATCH_JOB_INSTANCE

JOB_INST_ID

JOB_NAME

1

EndOfDayJob

表 10. BATCH_JOB_EXECUTION_PARAMS

JOB_INST_ID

TYPE_CD

KEY_NAME

DATE_VAL

1

DATE

schedule.Date

2017-01-01

表 11. BATCH_JOB_EXECUTION

JOB_EXEC_ID

JOB_INST_ID

START_TIME

END_TIME

STATUS

1

1

2017-01-01 21:00

2017-01-01 21:30

FAILED

表 12. BATCH_STEP_EXECUTION

STEP_EXEC_ID

JOB_EXEC_ID

STEP_NAME

START_TIME

END_TIME

STATUS

1

1

loadData

2017-01-01 21:00

2017-01-01 21:30

FAILED

表 13. BATCH_STEP_EXECUTION_CONTEXT

STEP_EXEC_ID

SHORT_CONTEXT

1

{piece.count=40321}

在前述情況下,Step 運行了 30 分鐘,處理了 40,321 個“片段”,在此場景中表示檔案中的行。該值由框架在每次提交之前更新,並可包含與 ExecutionContext 中的條目對應的多行。要在提交前收到通知,需要實現各種 StepListener 之一(或實現 ItemStream),這將在本指南後面更詳細地討論。與前面的示例一樣,假設 Job 在第二天重啟。重啟時,上次執行的 ExecutionContext 中的值會從資料庫中重建。當 ItemReader 開啟時,它可以檢查上下文中是否有儲存的狀態,並從中初始化自身,如下例所示

if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
    log.debug("Initializing for restart. Restart data is: " + executionContext);

    long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));

    LineReader reader = getReader();

    Object record = "";
    while (reader.getPosition() < lineCount && record != null) {
        record = readLine();
    }
}

在這種情況下,在前面的程式碼執行後,當前行是 40,322,允許 Step 從上次停止的地方再次開始。您還可以使用 ExecutionContext 來持久化關於執行本身的統計資訊。例如,如果一個平面檔案包含跨多行存在的待處理訂單,則可能需要儲存已處理的訂單數(這與讀取的行數大不相同),以便在 Step 結束時傳送一封包含已處理訂單總數的電子郵件。框架會為開發人員處理此資訊的儲存,以便將其正確地與單個 JobInstance 關聯起來。很難知道是否應該使用現有的 ExecutionContext。例如,使用上面的 EndOfDay 示例,當 01-01 的執行第二次再次開始時,框架會識別出它是同一個 JobInstance,並在單個 Step 的基礎上,從資料庫中取出 ExecutionContext,並將其(作為 StepExecution 的一部分)傳遞給 Step 本身。相反,對於 01-02 的執行,框架會識別出它是一個不同的例項,因此必須將一個空的 context 傳遞給 Step。框架為開發人員做了許多此類判斷,以確保在正確的時間將狀態提供給他們。同樣重要的是要注意,在任何給定時間,每個 StepExecution 恰好存在一個 ExecutionContextExecutionContext 的客戶端應謹慎,因為這會建立一個共享的鍵空間。因此,在存入值時應小心,確保資料不會被覆蓋。然而,Step 在 context 中完全不儲存資料,因此無法對框架產生不利影響。

注意,每個 JobExecution 至少有一個 ExecutionContext,每個 StepExecution 也有一個。例如,考慮以下程式碼片段

ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
//ecStep does not equal ecJob

如註釋中所述,ecStep 不等於 ecJob。它們是兩個不同的 ExecutionContexts。作用域為 Step 的 Context 在 Step 的每個提交點儲存,而作用域為 Job 的 Context 在每次 Step 執行之間儲存。

ExecutionContext 中,所有非 transient 的條目必須是 Serializable。執行上下文的正確序列化是 steps 和 jobs 重啟能力的基礎。如果您使用的鍵或值不是 natively serializable,則需要採用定製的序列化方法。未能序列化執行上下文可能會危及狀態持久化過程,導致失敗的 jobs 無法正確恢復。

JobRepository

JobRepository 是前面提到的所有原型的持久化機制。它為 JobLauncherJobStep 實現提供了 CRUD 操作。首次啟動 Job 時,會從 repository 獲取一個 JobExecution。此外,在執行過程中,透過將 StepExecutionJobExecution 實現傳遞給 repository 來進行持久化。

  • Java

  • XML

使用 Java 配置時,@EnableBatchProcessing 註解提供了一個 JobRepository 作為自動配置的元件之一。

Spring Batch XML 名稱空間支援使用 <job-repository> 標籤配置 JobRepository 例項,如下例所示

<job-repository id="jobRepository"/>

JobLauncher

JobLauncher 表示一個簡單的介面,用於使用給定的一組 JobParameters 啟動 Job,如下例所示

public interface JobLauncher {

public JobExecution run(Job job, JobParameters jobParameters)
            throws JobExecutionAlreadyRunningException, JobRestartException,
                   JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}

預期實現會從 JobRepository 獲取有效的 JobExecution 並執行 Job

ItemReader

ItemReader 是一個抽象,表示為 Step 逐項檢索輸入。當 ItemReader 提供完所有專案時,它透過返回 null 來指示。關於 ItemReader 介面及其各種實現的更多詳細資訊,請參見Reader 和 Writer

ItemWriter

ItemWriter 是一個抽象,表示 Step 的輸出,每次一個批次或一個塊的專案。通常,ItemWriter 不知道接下來應該接收什麼輸入,只知道當前呼叫中傳遞的專案。關於 ItemWriter 介面及其各種實現的更多詳細資訊,請參見Reader 和 Writer

ItemProcessor

ItemProcessor 是一個抽象,表示專案的業務處理。ItemReader 讀入一個專案,ItemWriter 寫出一個專案,而 ItemProcessor 提供了一個訪問點來轉換或應用其他業務處理。如果在處理專案時確定該專案無效,返回 null 表示該專案不應被寫出。關於 ItemProcessor 介面的更多詳細資訊,請參見Reader 和 Writer

批處理名稱空間

前面列出的許多領域概念需要在 Spring ApplicationContext 中進行配置。雖然可以使用上述介面的實現進行標準 bean 定義,但為了便於配置,提供了一個名稱空間,如下例所示

<beans:beans xmlns="http://www.springframework.org/schema/batch"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
   http://www.springframework.org/schema/beans
   https://www.springframework.org/schema/beans/spring-beans.xsd
   http://www.springframework.org/schema/batch
   https://www.springframework.org/schema/batch/spring-batch.xsd">

<job id="ioSampleJob">
    <step id="step1">
        <tasklet>
            <chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
        </tasklet>
    </step>
</job>

</beans:beans>

只要聲明瞭批處理名稱空間,就可以使用其任何元素。關於配置 Job 的更多資訊,請參見配置和執行 Job。關於配置 Step 的更多資訊,請參見配置 Step