重複
RepeatTemplate
批處理涉及到重複的操作,無論是作為簡單的最佳化還是作為作業的一部分。為了策略化和泛化重複過程,並提供類似於迭代器框架的功能,Spring Batch 提供了 RepeatOperations
介面。RepeatOperations
介面定義如下:
public interface RepeatOperations {
RepeatStatus iterate(RepeatCallback callback) throws RepeatException;
}
回撥是一個介面,定義如下,它允許您插入一些需要重複的業務邏輯:
public interface RepeatCallback {
RepeatStatus doInIteration(RepeatContext context) throws Exception;
}
回撥會重複執行,直到實現判斷迭代應該結束。這些介面的返回值是一個列舉值,可以是 RepeatStatus.CONTINUABLE
或 RepeatStatus.FINISHED
。RepeatStatus
列舉向重複操作的呼叫者傳達是否還有工作需要完成的資訊。一般來說,RepeatOperations
的實現應該檢查 RepeatStatus
並將其作為結束迭代決策的一部分。任何希望向呼叫者表明沒有剩餘工作的回撥都可以返回 RepeatStatus.FINISHED
。
RepeatOperations
最簡單的通用實現是 RepeatTemplate
RepeatTemplate template = new RepeatTemplate();
template.setCompletionPolicy(new SimpleCompletionPolicy(2));
template.iterate(new RepeatCallback() {
public RepeatStatus doInIteration(RepeatContext context) {
// Do stuff in batch...
return RepeatStatus.CONTINUABLE;
}
});
在前面的示例中,我們返回 RepeatStatus.CONTINUABLE
,以表明還有更多工作要做。回撥也可以返回 RepeatStatus.FINISHED
,向呼叫者表明沒有剩餘工作。有些迭代可以根據回撥中正在進行的工作本身的考量而終止。其他迭代(就回調而言)實際上是無限迴圈,完成決策委託給外部策略,如前面示例所示的情況。
完成策略
在 RepeatTemplate
內部,iterate
方法中迴圈的終止由 CompletionPolicy
決定,CompletionPolicy
也是 RepeatContext
的工廠。RepeatTemplate
負責使用當前策略建立 RepeatContext
,並在迭代的每個階段將其傳遞給 RepeatCallback
。在回撥完成其 doInIteration
後,RepeatTemplate
必須呼叫 CompletionPolicy
以要求它更新其狀態(該狀態將儲存在 RepeatContext
中)。然後,它會詢問策略迭代是否已完成。
Spring Batch 提供了一些簡單的通用 CompletionPolicy
實現。SimpleCompletionPolicy
允許執行固定次數(在任何時候 RepeatStatus.FINISHED
都會強制提前完成)。
使用者可能需要實現自己的完成策略來處理更復雜的決策。例如,一個批處理視窗,一旦線上系統投入使用就阻止批處理作業執行,這就需要自定義策略。
異常處理
如果在 RepeatCallback
內部丟擲異常,RepeatTemplate
會諮詢 ExceptionHandler
,後者可以決定是否重新丟擲異常。
以下清單顯示了 ExceptionHandler
介面定義:
public interface ExceptionHandler {
void handleException(RepeatContext context, Throwable throwable)
throws Throwable;
}
一個常見的用例是計算給定型別的異常數量,並在達到限制時失敗。為此,Spring Batch 提供了 SimpleLimitExceptionHandler
和稍靈活一些的 RethrowOnThresholdExceptionHandler
。SimpleLimitExceptionHandler
有一個 limit 屬性和一個應與當前異常進行比較的異常型別。提供的型別的所有子類也會被計數。給定型別的異常在達到限制之前會被忽略,然後重新丟擲。其他型別的異常始終會被重新丟擲。
SimpleLimitExceptionHandler
的一個重要的可選屬性是布林標誌 useParent
。它預設為 false
,因此限制僅在當前 RepeatContext
中計算。當設定為 true
時,限制會在巢狀迭代中的兄弟上下文之間保持(例如,一個 step 中的一組 chunk)。
監聽器
通常,能夠接收額外的回撥來處理多個不同迭代中的橫切關注點是非常有用的。為此,Spring Batch 提供了 RepeatListener
介面。RepeatTemplate
允許使用者註冊 RepeatListener
實現,並在迭代過程中,只要可用,就會向它們提供帶有 RepeatContext
和 RepeatStatus
的回撥。
RepeatListener
介面定義如下:
public interface RepeatListener {
void before(RepeatContext context);
void after(RepeatContext context, RepeatStatus result);
void open(RepeatContext context);
void onError(RepeatContext context, Throwable e);
void close(RepeatContext context);
}
open
和 close
回撥分別發生在整個迭代之前和之後。before
、after
和 onError
適用於單獨的 RepeatCallback
呼叫。
請注意,當存在多個監聽器時,它們位於一個列表中,因此存在順序。在這種情況下,open
和 before
按照相同的順序呼叫,而 after
、onError
和 close
則按照相反的順序呼叫。
並行處理
RepeatOperations
的實現不限於順序執行回撥。重要的是,一些實現能夠並行執行其回撥。為此,Spring Batch 提供了 TaskExecutorRepeatTemplate
,它使用 Spring 的 TaskExecutor
策略來執行 RepeatCallback
。預設是使用 SynchronousTaskExecutor
,這會導致整個迭代在同一個執行緒中執行(與普通的 RepeatTemplate
相同)。
宣告式迭代
有時,有些業務處理您知道每次發生時都希望重複執行。經典的例子是訊息管道的最佳化。如果頻繁到達一批訊息,處理它們比為每條訊息承擔單獨事務的成本更有效率。為此,Spring Batch 提供了一個 AOP 攔截器,它將方法呼叫包裝在 RepeatOperations
物件中。RepeatOperationsInterceptor
執行被攔截的方法,並根據提供的 RepeatTemplate
中的 CompletionPolicy
進行重複。
-
Java
-
XML
以下示例使用 Java 配置重複呼叫名為 processMessage
的服務方法(有關如何配置 AOP 攔截器的更多詳細資訊,請參閱 Spring 使用者指南):
@Bean
public MyService myService() {
ProxyFactory factory = new ProxyFactory(RepeatOperations.class.getClassLoader());
factory.setInterfaces(MyService.class);
factory.setTarget(new MyService());
MyService service = (MyService) factory.getProxy();
JdkRegexpMethodPointcut pointcut = new JdkRegexpMethodPointcut();
pointcut.setPatterns(".*processMessage.*");
RepeatOperationsInterceptor interceptor = new RepeatOperationsInterceptor();
((Advised) service).addAdvisor(new DefaultPointcutAdvisor(pointcut, interceptor));
return service;
}
以下示例顯示了使用 Spring AOP 名稱空間的宣告式迭代,用於重複呼叫名為 processMessage
的服務方法(有關如何配置 AOP 攔截器的更多詳細資訊,請參閱 Spring 使用者指南):
<aop:config>
<aop:pointcut id="transactional"
expression="execution(* com..*Service.processMessage(..))" />
<aop:advisor pointcut-ref="transactional"
advice-ref="retryAdvice" order="-1"/>
</aop:config>
<bean id="retryAdvice" class="org.spr...RepeatOperationsInterceptor"/>
前面的示例在攔截器內部使用了預設的 RepeatTemplate
。要更改策略、監聽器和其他細節,您可以將一個 RepeatTemplate
例項注入到攔截器中。
如果被攔截的方法返回 void
,則攔截器始終返回 RepeatStatus.CONTINUABLE
(因此如果 CompletionPolicy
沒有有限的終點,則存在無限迴圈的危險)。否則,它返回 RepeatStatus.CONTINUABLE
,直到被攔截方法的返回值為 null
。此時,它返回 RepeatStatus.FINISHED
。因此,目標方法中的業務邏輯可以透過返回 null
或丟擲提供的 RepeatTemplate
中的 ExceptionHandler
會重新丟擲的異常來指示沒有更多工作要做。