配置 Step 以便重啟

在“配置和執行 Job”一節中,討論了重啟 Job。重啟對步驟有諸多影響,因此可能需要一些特定的配置。

設定啟動次數限制

在許多場景中,您可能希望控制 Step 可以啟動的次數。例如,您可能需要配置一個特定的 Step 使其只執行一次,因為它會使某些資源失效,必須手動修復後才能再次執行。這可以在步驟級別進行配置,因為不同的步驟可能有不同的要求。一個只能執行一次的 Step 可以與一個可以無限次執行的 Step 存在於同一個 Job 中。

  • Java

  • XML

以下程式碼片段顯示了 Java 中的啟動次數限制配置示例

Java 配置
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.startLimit(1)
				.build();
}

以下程式碼片段顯示了 XML 中的啟動次數限制配置示例

XML 配置
<step id="step1">
    <tasklet start-limit="1">
        <chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
    </tasklet>
</step>

前面示例中顯示的步驟只能執行一次。嘗試再次執行它會導致丟擲 StartLimitExceededException 異常。請注意,start-limit 的預設值為 Integer.MAX_VALUE

重啟已完成的 Step

對於一個可重啟的 Job,可能有一個或多個步驟應該始終執行,無論它們第一次是否成功。例如,可以是驗證步驟或在處理前清理資源的 Step。在重啟的 Job 的正常處理過程中,任何狀態為 COMPLETED 的步驟(意味著它已經成功完成)都會被跳過。將 allow-start-if-complete 設定為 true 會覆蓋此行為,以便該步驟始終執行。

  • Java

  • XML

以下程式碼片段顯示瞭如何在 Java 中定義一個可重啟的 Job

Java 配置
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.allowStartIfComplete(true)
				.build();
}

以下程式碼片段顯示瞭如何在 XML 中定義一個可重啟的 Job

XML 配置
<step id="step1">
    <tasklet allow-start-if-complete="true">
        <chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
    </tasklet>
</step>

Step 重啟配置示例

  • Java

  • XML

以下 Java 示例顯示瞭如何配置 Job 以使其包含可重啟的步驟

Java 配置
@Bean
public Job footballJob(JobRepository jobRepository, Step playerLoad, Step gameLoad, Step playerSummarization) {
	return new JobBuilder("footballJob", jobRepository)
				.start(playerLoad)
				.next(gameLoad)
				.next(playerSummarization)
				.build();
}

@Bean
public Step playerLoad(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("playerLoad", jobRepository)
			.<String, String>chunk(10, transactionManager)
			.reader(playerFileItemReader())
			.writer(playerWriter())
			.build();
}

@Bean
public Step gameLoad(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("gameLoad", jobRepository)
			.allowStartIfComplete(true)
			.<String, String>chunk(10, transactionManager)
			.reader(gameFileItemReader())
			.writer(gameWriter())
			.build();
}

@Bean
public Step playerSummarization(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("playerSummarization", jobRepository)
			.startLimit(2)
			.<String, String>chunk(10, transactionManager)
			.reader(playerSummarizationSource())
			.writer(summaryWriter())
			.build();
}

以下 XML 示例顯示瞭如何配置 Job 以使其包含可重啟的步驟

XML 配置
<job id="footballJob" restartable="true">
    <step id="playerload" next="gameLoad">
        <tasklet>
            <chunk reader="playerFileItemReader" writer="playerWriter"
                   commit-interval="10" />
        </tasklet>
    </step>
    <step id="gameLoad" next="playerSummarization">
        <tasklet allow-start-if-complete="true">
            <chunk reader="gameFileItemReader" writer="gameWriter"
                   commit-interval="10"/>
        </tasklet>
    </step>
    <step id="playerSummarization">
        <tasklet start-limit="2">
            <chunk reader="playerSummarizationSource" writer="summaryWriter"
                   commit-interval="10"/>
        </tasklet>
    </step>
</job>

前面的示例配置適用於一個 Job,該 Job 載入足球比賽資訊並進行彙總。它包含三個步驟:playerLoadgameLoadplayerSummarizationplayerLoad 步驟從扁平檔案載入球員資訊,而 gameLoad 步驟載入比賽資訊。最後一個步驟 playerSummarization 則根據提供的比賽彙總每個球員的統計資料。假設由 playerLoad 載入的檔案只能載入一次,而 gameLoad 可以載入特定目錄中找到的任何比賽檔案,並在成功載入到資料庫後將其刪除。因此,playerLoad 步驟不包含額外的配置。它可以啟動任意次數,如果完成則跳過。然而,gameLoad 步驟需要每次都執行,以防上次執行後添加了額外檔案。它將 allow-start-if-complete 設定為 true,以確保始終啟動。(假設比賽資料載入到的資料庫錶帶有一個處理指示器,以確保彙總步驟可以正確找到新比賽)。彙總步驟是 Job 中最重要的步驟,配置了啟動次數限制為 2。這很有用,因為如果該步驟持續失敗,將返回新的退出碼給控制 Job 執行的操作員,並且在人工干預之前無法再次啟動。

此 Job 是本文件的一個示例,與 samples 專案中的 footballJob 不同。

本節的其餘部分描述了 footballJob 示例的每次執行會發生什麼。

執行 1

  1. playerLoad 執行併成功完成,向 PLAYERS 表新增 400 名球員。

  2. gameLoad 執行並處理 11 個比賽資料檔案,將其內容載入到 GAMES 表中。

  3. playerSummarization 開始處理並在 5 分鐘後失敗。

執行 2

  1. playerLoad 不執行,因為它已經成功完成,並且 allow-start-if-completefalse(預設值)。

  2. gameLoad 再次執行並處理另外 2 個檔案,也將其內容載入到 GAMES 表中(帶有處理指示器,表明它們尚未被處理)。

  3. playerSummarization 開始處理所有剩餘的比賽資料(使用處理指示器進行過濾),並在 30 分鐘後再次失敗。

執行 3

  1. playerLoad 不執行,因為它已經成功完成,並且 allow-start-if-completefalse(預設值)。

  2. gameLoad 再次執行並處理另外 2 個檔案,也將其內容載入到 GAMES 表中(帶有處理指示器,表明它們尚未被處理)。

  3. playerSummarization 未啟動,作業立即被終止,因為這是 playerSummarization 的第三次執行,而其限制僅為 2。必須提高限制,或者將該 Job 作為新的 JobInstance 執行。