重複

RepeatTemplate

批處理涉及到重複的操作,無論是作為簡單的最佳化還是作為作業的一部分。為了策略化和泛化重複過程,並提供類似於迭代器框架的功能,Spring Batch 提供了 RepeatOperations 介面。RepeatOperations 介面定義如下:

public interface RepeatOperations {

    RepeatStatus iterate(RepeatCallback callback) throws RepeatException;

}

回撥是一個介面,定義如下,它允許您插入一些需要重複的業務邏輯:

public interface RepeatCallback {

    RepeatStatus doInIteration(RepeatContext context) throws Exception;

}

回撥會重複執行,直到實現判斷迭代應該結束。這些介面的返回值是一個列舉值,可以是 RepeatStatus.CONTINUABLERepeatStatus.FINISHEDRepeatStatus 列舉向重複操作的呼叫者傳達是否還有工作需要完成的資訊。一般來說,RepeatOperations 的實現應該檢查 RepeatStatus 並將其作為結束迭代決策的一部分。任何希望向呼叫者表明沒有剩餘工作的回撥都可以返回 RepeatStatus.FINISHED

RepeatOperations 最簡單的通用實現是 RepeatTemplate

RepeatTemplate template = new RepeatTemplate();

template.setCompletionPolicy(new SimpleCompletionPolicy(2));

template.iterate(new RepeatCallback() {

    public RepeatStatus doInIteration(RepeatContext context) {
        // Do stuff in batch...
        return RepeatStatus.CONTINUABLE;
    }

});

在前面的示例中,我們返回 RepeatStatus.CONTINUABLE,以表明還有更多工作要做。回撥也可以返回 RepeatStatus.FINISHED,向呼叫者表明沒有剩餘工作。有些迭代可以根據回撥中正在進行的工作本身的考量而終止。其他迭代(就回調而言)實際上是無限迴圈,完成決策委託給外部策略,如前面示例所示的情況。

RepeatContext

RepeatCallback 的方法引數是一個 RepeatContext。許多回調會忽略該上下文。但是,如有必要,您可以將其用作屬性包,用於在迭代期間儲存瞬時資料。在 iterate 方法返回後,上下文就不再存在了。

如果存在巢狀迭代,RepeatContext 會有一個父上下文。父上下文有時用於儲存需要在 iterate 呼叫之間共享的資料。例如,如果您想計算迭代中某個事件發生的次數並在後續呼叫中記住它,就可以使用父上下文。

RepeatStatus

RepeatStatus 是 Spring Batch 使用的一個列舉,用於指示處理是否已完成。它有兩個可能的 RepeatStatus 值:

表 1. RepeatStatus 屬性

描述

CONTINUABLE

還有更多工作要做。

FINISHED

不應再進行重複。

您可以使用 RepeatStatus 中的 and() 方法將 RepeatStatus 值與邏輯 AND 運算結合起來。其效果是對可繼續 (continuable) 標誌進行邏輯 AND 運算。換句話說,如果任一狀態是 FINISHED,則結果是 FINISHED

完成策略

RepeatTemplate 內部,iterate 方法中迴圈的終止由 CompletionPolicy 決定,CompletionPolicy 也是 RepeatContext 的工廠。RepeatTemplate 負責使用當前策略建立 RepeatContext,並在迭代的每個階段將其傳遞給 RepeatCallback。在回撥完成其 doInIteration 後,RepeatTemplate 必須呼叫 CompletionPolicy 以要求它更新其狀態(該狀態將儲存在 RepeatContext 中)。然後,它會詢問策略迭代是否已完成。

Spring Batch 提供了一些簡單的通用 CompletionPolicy 實現。SimpleCompletionPolicy 允許執行固定次數(在任何時候 RepeatStatus.FINISHED 都會強制提前完成)。

使用者可能需要實現自己的完成策略來處理更復雜的決策。例如,一個批處理視窗,一旦線上系統投入使用就阻止批處理作業執行,這就需要自定義策略。

異常處理

如果在 RepeatCallback 內部丟擲異常,RepeatTemplate 會諮詢 ExceptionHandler,後者可以決定是否重新丟擲異常。

以下清單顯示了 ExceptionHandler 介面定義:

public interface ExceptionHandler {

    void handleException(RepeatContext context, Throwable throwable)
        throws Throwable;

}

一個常見的用例是計算給定型別的異常數量,並在達到限制時失敗。為此,Spring Batch 提供了 SimpleLimitExceptionHandler 和稍靈活一些的 RethrowOnThresholdExceptionHandlerSimpleLimitExceptionHandler 有一個 limit 屬性和一個應與當前異常進行比較的異常型別。提供的型別的所有子類也會被計數。給定型別的異常在達到限制之前會被忽略,然後重新丟擲。其他型別的異常始終會被重新丟擲。

SimpleLimitExceptionHandler 的一個重要的可選屬性是布林標誌 useParent。它預設為 false,因此限制僅在當前 RepeatContext 中計算。當設定為 true 時,限制會在巢狀迭代中的兄弟上下文之間保持(例如,一個 step 中的一組 chunk)。

監聽器

通常,能夠接收額外的回撥來處理多個不同迭代中的橫切關注點是非常有用的。為此,Spring Batch 提供了 RepeatListener 介面。RepeatTemplate 允許使用者註冊 RepeatListener 實現,並在迭代過程中,只要可用,就會向它們提供帶有 RepeatContextRepeatStatus 的回撥。

RepeatListener 介面定義如下:

public interface RepeatListener {
    void before(RepeatContext context);
    void after(RepeatContext context, RepeatStatus result);
    void open(RepeatContext context);
    void onError(RepeatContext context, Throwable e);
    void close(RepeatContext context);
}

openclose 回撥分別發生在整個迭代之前和之後。beforeafteronError 適用於單獨的 RepeatCallback 呼叫。

請注意,當存在多個監聽器時,它們位於一個列表中,因此存在順序。在這種情況下,openbefore 按照相同的順序呼叫,而 afteronErrorclose 則按照相反的順序呼叫。

並行處理

RepeatOperations 的實現不限於順序執行回撥。重要的是,一些實現能夠並行執行其回撥。為此,Spring Batch 提供了 TaskExecutorRepeatTemplate,它使用 Spring 的 TaskExecutor 策略來執行 RepeatCallback。預設是使用 SynchronousTaskExecutor,這會導致整個迭代在同一個執行緒中執行(與普通的 RepeatTemplate 相同)。

宣告式迭代

有時,有些業務處理您知道每次發生時都希望重複執行。經典的例子是訊息管道的最佳化。如果頻繁到達一批訊息,處理它們比為每條訊息承擔單獨事務的成本更有效率。為此,Spring Batch 提供了一個 AOP 攔截器,它將方法呼叫包裝在 RepeatOperations 物件中。RepeatOperationsInterceptor 執行被攔截的方法,並根據提供的 RepeatTemplate 中的 CompletionPolicy 進行重複。

  • Java

  • XML

以下示例使用 Java 配置重複呼叫名為 processMessage 的服務方法(有關如何配置 AOP 攔截器的更多詳細資訊,請參閱 Spring 使用者指南):

@Bean
public MyService myService() {
	ProxyFactory factory = new ProxyFactory(RepeatOperations.class.getClassLoader());
	factory.setInterfaces(MyService.class);
	factory.setTarget(new MyService());

	MyService service = (MyService) factory.getProxy();
	JdkRegexpMethodPointcut pointcut = new JdkRegexpMethodPointcut();
	pointcut.setPatterns(".*processMessage.*");

	RepeatOperationsInterceptor interceptor = new RepeatOperationsInterceptor();

	((Advised) service).addAdvisor(new DefaultPointcutAdvisor(pointcut, interceptor));

	return service;
}

以下示例顯示了使用 Spring AOP 名稱空間的宣告式迭代,用於重複呼叫名為 processMessage 的服務方法(有關如何配置 AOP 攔截器的更多詳細資訊,請參閱 Spring 使用者指南):

<aop:config>
    <aop:pointcut id="transactional"
        expression="execution(* com..*Service.processMessage(..))" />
    <aop:advisor pointcut-ref="transactional"
        advice-ref="retryAdvice" order="-1"/>
</aop:config>

<bean id="retryAdvice" class="org.spr...RepeatOperationsInterceptor"/>

前面的示例在攔截器內部使用了預設的 RepeatTemplate。要更改策略、監聽器和其他細節,您可以將一個 RepeatTemplate 例項注入到攔截器中。

如果被攔截的方法返回 void,則攔截器始終返回 RepeatStatus.CONTINUABLE(因此如果 CompletionPolicy 沒有有限的終點,則存在無限迴圈的危險)。否則,它返回 RepeatStatus.CONTINUABLE,直到被攔截方法的返回值為 null。此時,它返回 RepeatStatus.FINISHED。因此,目標方法中的業務邏輯可以透過返回 null 或丟擲提供的 RepeatTemplate 中的 ExceptionHandler 會重新丟擲的異常來指示沒有更多工作要做。