攔截 Step 執行

Job 一樣,在 Step 執行期間有許多事件,使用者可能需要在這些事件中執行某些功能。例如,要寫入需要頁尾的平面檔案,當 Step 完成時需要通知 ItemWriter,以便可以寫入頁尾。這可以透過許多 Step 作用域的監聽器之一來實現。

您可以透過 listeners 元素將實現 StepListener 擴充套件介面之一(但不包括該介面本身,因為它為空)的任何類應用於 step。listeners 元素在 step、tasklet 或 chunk 宣告中有效。我們建議您在其功能適用的級別宣告監聽器,或者如果它具有多項功能(例如 StepExecutionListenerItemReadListener),則在它適用的最細粒度級別宣告它。

  • Java

  • XML

以下示例展示了在 Java 中應用於 chunk 級別的監聽器

Java 配置
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(reader())
				.writer(writer())
				.listener(chunkListener())
				.build();
}

以下示例展示了在 XML 中應用於 chunk 級別的監聽器

XML 配置
<step id="step1">
    <tasklet>
        <chunk reader="reader" writer="writer" commit-interval="10"/>
        <listeners>
            <listener ref="chunkListener"/>
        </listeners>
    </tasklet>
</step>

如果使用名稱空間 <step> 元素或其中一個 *StepFactoryBean 工廠,實現 StepListener 介面之一的 ItemReaderItemWriterItemProcessor 會自動註冊到 Step。這僅適用於直接注入到 Step 中的元件。如果監聽器巢狀在另一個元件內部,則需要顯式註冊它(如之前在ItemStream 註冊到 Step 中所述)。

除了 StepListener 介面,還提供了註解來解決相同的問題。普通的 Java 物件可以擁有帶有這些註解的方法,然後這些方法會被轉換為相應的 StepListener 型別。同時,對 chunk 元件的自定義實現(如 ItemReaderItemWriterTasklet)進行註解也很常見。XML 解析器和構建器中的 listener 方法都會分析 <listener/> 元素和這些註解,因此您只需使用 XML 名稱空間或構建器將監聽器註冊到 step 即可。

StepExecutionListener

StepExecutionListener 是用於 Step 執行的最通用的監聽器。它允許在 Step 開始之前以及在它結束之後(無論正常結束還是失敗)進行通知,如下例所示

public interface StepExecutionListener extends StepListener {

    void beforeStep(StepExecution stepExecution);

    ExitStatus afterStep(StepExecution stepExecution);

}

afterStep 的返回型別是 ExitStatus,這使監聽器有機會修改 Step 完成時返回的退出碼。

與此介面對應的註解是

  • @BeforeStep

  • @AfterStep

ChunkListener

“chunk” 定義為在事務範圍內處理的項。在每個提交間隔提交事務即是提交一個 chunk。您可以使用 ChunkListener 在 chunk 開始處理之前或 chunk 成功完成之後執行邏輯,如下面的介面定義所示

public interface ChunkListener extends StepListener {

    void beforeChunk(ChunkContext context);
    void afterChunk(ChunkContext context);
    void afterChunkError(ChunkContext context);

}

beforeChunk 方法在事務啟動後、ItemReader 開始讀取之前呼叫。反之,afterChunk 在 chunk 提交後呼叫(如果發生回滾則完全不呼叫)。

與此介面對應的註解是

  • @BeforeChunk

  • @AfterChunk

  • @AfterChunkError

即使沒有 chunk 宣告,您也可以應用 ChunkListenerTaskletStep 負責呼叫 ChunkListener,因此它也適用於非面向項的 tasklet(在 tasklet 之前和之後呼叫)。

ChunkListener 不設計用於丟擲 checked exception。錯誤必須在實現中處理,否則 step 將終止。

ItemReadListener

在之前討論跳過邏輯時,曾提到記錄跳過的記錄可能會很有益處,以便以後處理它們。在讀取錯誤的情況下,這可以透過 ItemReaderListener 來完成,如下面的介面定義所示

public interface ItemReadListener<T> extends StepListener {

    void beforeRead();
    void afterRead(T item);
    void onReadError(Exception ex);

}

beforeRead 方法在每次呼叫 ItemReader 的 read 方法之前呼叫。afterRead 方法在每次成功呼叫 read 方法之後呼叫,並傳遞讀取的項。如果在讀取時發生錯誤,則呼叫 onReadError 方法。提供遇到的異常,以便可以記錄下來。

與此介面對應的註解是

  • @BeforeRead

  • @AfterRead

  • @OnReadError

ItemProcessListener

ItemReadListener 類似,可以“監聽”項的處理過程,如下面的介面定義所示

public interface ItemProcessListener<T, S> extends StepListener {

    void beforeProcess(T item);
    void afterProcess(T item, S result);
    void onProcessError(T item, Exception e);

}

beforeProcess 方法在呼叫 ItemProcessorprocess 方法之前呼叫,並接收待處理的項。afterProcess 方法在項成功處理後呼叫。如果在處理時發生錯誤,則呼叫 onProcessError 方法。提供遇到的異常和嘗試處理的項,以便可以記錄下來。

與此介面對應的註解是

  • @BeforeProcess

  • @AfterProcess

  • @OnProcessError

ItemWriteListener

您可以使用 ItemWriteListener“監聽”項的寫入過程,如下面的介面定義所示

public interface ItemWriteListener<S> extends StepListener {

    void beforeWrite(List<? extends S> items);
    void afterWrite(List<? extends S> items);
    void onWriteError(Exception exception, List<? extends S> items);

}

beforeWrite 方法在呼叫 ItemWriterwrite 方法之前呼叫,並接收要寫入的項列表。afterWrite 方法在項成功寫入後呼叫,但在提交與 chunk 處理關聯的事務之前。如果在寫入時發生錯誤,則呼叫 onWriteError 方法。提供遇到的異常和嘗試寫入的項,以便可以記錄下來。

與此介面對應的註解是

  • @BeforeWrite

  • @AfterWrite

  • @OnWriteError

SkipListener

ItemReadListenerItemProcessListenerItemWriteListener 都提供了錯誤通知機制,但它們都不會告知您某條記錄是否已被實際跳過。例如,即使某個項重試成功,onWriteError 也會被呼叫。因此,有一個單獨的介面用於跟蹤跳過的項,如下面的介面定義所示

public interface SkipListener<T,S> extends StepListener {

    void onSkipInRead(Throwable t);
    void onSkipInProcess(T item, Throwable t);
    void onSkipInWrite(S item, Throwable t);

}

當讀取時跳過某個項時,會呼叫 onSkipInRead。值得注意的是,回滾可能會導致同一個項被多次註冊為跳過。當寫入時跳過某個項時,會呼叫 onSkipInWrite。因為該項已被成功讀取(未跳過),所以也會將該項本身作為引數提供。

與此介面對應的註解是

  • @OnSkipInRead

  • @OnSkipInWrite

  • @OnSkipInProcess

SkipListener 和事務

SkipListener 最常見的用例之一是記錄跳過的項,以便另一個批處理過程甚至人工流程可以用來評估和修復導致跳過的問題。由於在許多情況下原始事務可能會回滾,Spring Batch 做出了兩項保證

  • 相應的跳過方法(取決於錯誤發生的時間)對每個項僅呼叫一次。

  • SkipListener 總是在事務提交之前被呼叫。這是為了確保監聽器呼叫的任何事務性資源不會因 ItemWriter 中的失敗而被回滾。