常見批處理模式
一些批處理 Job 可以完全由 Spring Batch 中的現成元件組裝而成。例如,可以配置 ItemReader
和 ItemWriter
的實現來覆蓋廣泛的場景。然而,在大多數情況下,必須編寫自定義程式碼。應用程式開發人員的主要 API 入口點是 Tasklet
、ItemReader
、ItemWriter
和各種監聽器介面。大多數簡單的批處理 Job 可以使用 Spring Batch ItemReader
提供的現成輸入,但通常在處理和寫入過程中存在需要開發人員實現 ItemWriter
或 ItemProcessor
的自定義需求。
在本章中,我們提供了一些自定義業務邏輯中常見模式的示例。這些示例主要涉及監聽器介面。需要注意的是,如果合適,ItemReader
或 ItemWriter
也可以實現監聽器介面。
記錄 Item 處理和失敗
一個常見的用例是需要在 Step 中逐個 Item 對錯誤進行特殊處理,例如記錄到特殊通道或將記錄插入資料庫。面向塊的 Step
(由 Step 工廠 bean 建立)允許使用者透過簡單的 ItemReadListener
來處理 read
錯誤,以及透過 ItemWriteListener
來處理 write
錯誤來實現此用例。以下程式碼片段展示了一個同時記錄讀寫失敗的監聽器
public class ItemFailureLoggerListener extends ItemListenerSupport {
private static Log logger = LogFactory.getLog("item.error");
public void onReadError(Exception ex) {
logger.error("Encountered error on read", e);
}
public void onWriteError(Exception ex, List<? extends Object> items) {
logger.error("Encountered error on write", ex);
}
}
實現此監聽器後,必須將其註冊到 Step。
-
Java
-
XML
以下示例展示瞭如何在 Java 中將監聽器註冊到 Step
@Bean
public Step simpleStep(JobRepository jobRepository) {
return new StepBuilder("simpleStep", jobRepository)
...
.listener(new ItemFailureLoggerListener())
.build();
}
以下示例展示瞭如何在 XML 中將監聽器註冊到 Step
<step id="simpleStep">
...
<listeners>
<listener>
<bean class="org.example...ItemFailureLoggerListener"/>
</listener>
</listeners>
</step>
如果你的監聽器在 onError() 方法中執行任何操作,它必須位於將要回滾的事務內部。如果你需要在 onError() 方法中使用事務資源(例如資料庫),考慮為該方法新增宣告式事務(詳見 Spring Core 參考指南),並將其傳播屬性設定為 REQUIRES_NEW 。 |
出於業務原因手動停止 Job
Spring Batch 透過 JobOperator
介面提供了一個 stop()
方法,但這實際上是供操作員使用,而不是應用程式設計師使用。有時,從業務邏輯內部停止 Job 執行更方便或更有意義。
最簡單的方法是丟擲 RuntimeException
(既不會無限重試也不會被跳過)。例如,可以使用自定義異常型別,如下例所示
public class PoisonPillItemProcessor<T> implements ItemProcessor<T, T> {
@Override
public T process(T item) throws Exception {
if (isPoisonPill(item)) {
throw new PoisonPillException("Poison pill detected: " + item);
}
return item;
}
}
停止 Step 執行的另一種簡單方法是從 ItemReader
返回 null
,如下例所示
public class EarlyCompletionItemReader implements ItemReader<T> {
private ItemReader<T> delegate;
public void setDelegate(ItemReader<T> delegate) { ... }
public T read() throws Exception {
T item = delegate.read();
if (isEndItem(item)) {
return null; // end the step here
}
return item;
}
}
前面的示例實際上依賴於一個事實,即存在 CompletionPolicy
策略的預設實現,當要處理的 Item 為 null
時,該實現會發出批處理完成的訊號。可以透過 SimpleStepFactoryBean
實現更復雜的完成策略並將其注入到 Step
中。
-
Java
-
XML
以下示例展示瞭如何在 Java 中將完成策略注入到 Step 中
@Bean
public Step simpleStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("simpleStep", jobRepository)
.<String, String>chunk(new SpecialCompletionPolicy(), transactionManager)
.reader(reader())
.writer(writer())
.build();
}
以下示例展示瞭如何在 XML 中將完成策略注入到 Step 中
<step id="simpleStep">
<tasklet>
<chunk reader="reader" writer="writer" commit-interval="10"
chunk-completion-policy="completionPolicy"/>
</tasklet>
</step>
<bean id="completionPolicy" class="org.example...SpecialCompletionPolicy"/>
另一種方法是在 StepExecution
中設定一個標誌,框架中的 Step
實現會在 Item 處理之間檢查此標誌。為了實現此替代方法,我們需要訪問當前的 StepExecution
,這可以透過實現 StepListener
並將其註冊到 Step
來實現。以下示例展示了一個設定標誌的監聽器
public class CustomItemWriter extends ItemListenerSupport implements StepListener {
private StepExecution stepExecution;
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
public void afterRead(Object item) {
if (isPoisonPill(item)) {
stepExecution.setTerminateOnly();
}
}
}
設定標誌後,預設行為是 Step 丟擲 JobInterruptedException
。可以透過 StepInterruptionPolicy
控制此行為。但是,唯一的選擇是丟擲異常或不丟擲異常,因此這總是 Job 的異常結束。
新增頁尾記錄
通常,在寫入平面檔案時,需要在檔案末尾附加一個“頁尾”記錄,以便在所有處理完成後新增。這可以使用 Spring Batch 提供的 FlatFileFooterCallback
介面來實現。FlatFileFooterCallback
(及其對應物 FlatFileHeaderCallback
)是 FlatFileItemWriter
的可選屬性,可以新增到 Item Writer 中。
-
Java
-
XML
以下示例展示瞭如何在 Java 中使用 FlatFileHeaderCallback
和 FlatFileFooterCallback
@Bean
public FlatFileItemWriter<String> itemWriter(Resource outputResource) {
return new FlatFileItemWriterBuilder<String>()
.name("itemWriter")
.resource(outputResource)
.lineAggregator(lineAggregator())
.headerCallback(headerCallback())
.footerCallback(footerCallback())
.build();
}
以下示例展示瞭如何在 XML 中使用 FlatFileHeaderCallback
和 FlatFileFooterCallback
<bean id="itemWriter" class="org.spr...FlatFileItemWriter">
<property name="resource" ref="outputResource" />
<property name="lineAggregator" ref="lineAggregator"/>
<property name="headerCallback" ref="headerCallback" />
<property name="footerCallback" ref="footerCallback" />
</bean>
頁尾回撥介面只有一個方法,該方法在需要寫入頁尾時被呼叫,如以下介面定義所示
public interface FlatFileFooterCallback {
void writeFooter(Writer writer) throws IOException;
}
編寫摘要頁尾
涉及頁尾記錄的一個常見需求是在輸出過程中彙總資訊,並將此資訊附加到檔案末尾。此頁尾通常用作檔案的摘要或提供校驗和。
例如,如果一個批處理 Job 正在將 Trade
記錄寫入平面檔案,並且要求將所有 Trade
的總金額放在頁尾中,則可以使用以下 ItemWriter
實現
public class TradeItemWriter implements ItemWriter<Trade>,
FlatFileFooterCallback {
private ItemWriter<Trade> delegate;
private BigDecimal totalAmount = BigDecimal.ZERO;
public void write(Chunk<? extends Trade> items) throws Exception {
BigDecimal chunkTotal = BigDecimal.ZERO;
for (Trade trade : items) {
chunkTotal = chunkTotal.add(trade.getAmount());
}
delegate.write(items);
// After successfully writing all items
totalAmount = totalAmount.add(chunkTotal);
}
public void writeFooter(Writer writer) throws IOException {
writer.write("Total Amount Processed: " + totalAmount);
}
public void setDelegate(ItemWriter delegate) {...}
}
此 TradeItemWriter
儲存一個 totalAmount
值,該值隨寫入的每個 Trade
項的 amount
增加。處理完最後一個 Trade
後,框架會呼叫 writeFooter
,將 totalAmount
寫入檔案。請注意,write
方法使用了一個臨時變數 chunkTotal
,用於儲存該塊中 Trade
金額的總和。這樣做是為了確保,如果在 write
方法中發生跳過,totalAmount
保持不變。只有在 write
方法結束時,一旦我們確定沒有丟擲異常,我們才會更新 totalAmount
。
為了呼叫 writeFooter
方法,必須將 TradeItemWriter
(它實現了 FlatFileFooterCallback
)作為 footerCallback
注入到 FlatFileItemWriter
中。
-
Java
-
XML
以下示例展示瞭如何在 Java 中注入 TradeItemWriter
@Bean
public TradeItemWriter tradeItemWriter() {
TradeItemWriter itemWriter = new TradeItemWriter();
itemWriter.setDelegate(flatFileItemWriter(null));
return itemWriter;
}
@Bean
public FlatFileItemWriter<String> flatFileItemWriter(Resource outputResource) {
return new FlatFileItemWriterBuilder<String>()
.name("itemWriter")
.resource(outputResource)
.lineAggregator(lineAggregator())
.footerCallback(tradeItemWriter())
.build();
}
以下示例展示瞭如何在 XML 中注入 TradeItemWriter
<bean id="tradeItemWriter" class="..TradeItemWriter">
<property name="delegate" ref="flatFileItemWriter" />
</bean>
<bean id="flatFileItemWriter" class="org.spr...FlatFileItemWriter">
<property name="resource" ref="outputResource" />
<property name="lineAggregator" ref="lineAggregator"/>
<property name="footerCallback" ref="tradeItemWriter" />
</bean>
目前為止 TradeItemWriter
的編寫方式只有在 Step
不可重啟的情況下才能正確工作。這是因為該類是有狀態的(因為它儲存了 totalAmount
),但 totalAmount
沒有持久化到資料庫。因此,在重啟時無法檢索到它。為了使該類可重啟,應實現 ItemStream
介面以及 open
和 update
方法,如下例所示
public void open(ExecutionContext executionContext) {
if (executionContext.containsKey("total.amount") {
totalAmount = (BigDecimal) executionContext.get("total.amount");
}
}
public void update(ExecutionContext executionContext) {
executionContext.put("total.amount", totalAmount);
}
update
方法在將 ExecutionContext
物件持久化到資料庫之前,將 totalAmount
的最新版本儲存到 ExecutionContext
中。open
方法從 ExecutionContext
中檢索任何現有的 totalAmount
,並將其用作處理的起點,從而允許 TradeItemWriter
在重啟時從上次 Step 執行中斷的地方繼續執行。
基於驅動查詢的 ItemReader
在關於 Reader 和 Writer 的章節中,討論了使用分頁的資料庫輸入。許多資料庫廠商(例如 DB2)具有非常悲觀的鎖定策略,如果正在讀取的表還需要被線上應用程式的其他部分使用,可能會導致問題。此外,在超大資料集上開啟遊標也可能導致某些廠商的資料庫出現問題。因此,許多專案傾向於使用“驅動查詢”方法來讀取資料。這種方法透過迭代鍵而不是需要返回的整個物件來工作,如下圖所示

如你所見,前面影像中顯示的示例使用了與基於遊標的示例相同的 'FOO' 表。但是,SQL 語句中選擇的不是整行,而是僅選擇 ID。因此,從 read
返回的不是 FOO
物件,而是 Integer
。然後可以使用此數字查詢“詳細資訊”,即完整的 Foo
物件,如下圖所示

應該使用 ItemProcessor
將從驅動查詢中獲得的鍵轉換為完整的 Foo
物件。可以使用現有的 DAO 根據鍵查詢完整的物件。
多行記錄
雖然平面檔案通常每條記錄都限制在一行內,但檔案包含跨越多行且具有多種格式的記錄也很常見。以下檔案摘錄顯示了此類安排的示例
HEA;0013100345;2007-02-15 NCU;Smith;Peter;;T;20014539;F BAD;;Oak Street 31/A;;Small Town;00235;IL;US FOT;2;2;267.34
以 'HEA' 開頭的行和以 'FOT' 開頭的行之間的所有內容被視為一條記錄。為了正確處理這種情況,必須考慮以下幾點
-
ItemReader
必須一次讀取多行記錄的所有行作為一個組,而不是一次讀取一條記錄,以便將其完整地傳遞給ItemWriter
。 -
每種行型別可能需要以不同的方式進行標記化。
因為單條記錄跨越多行,並且我們可能不知道有多少行,所以 ItemReader
必須小心,始終讀取整條記錄。為此,應將自定義 ItemReader
實現為 FlatFileItemReader
的包裝器。
-
Java
-
XML
以下示例展示瞭如何在 Java 中實現自定義 ItemReader
@Bean
public MultiLineTradeItemReader itemReader() {
MultiLineTradeItemReader itemReader = new MultiLineTradeItemReader();
itemReader.setDelegate(flatFileItemReader());
return itemReader;
}
@Bean
public FlatFileItemReader flatFileItemReader() {
FlatFileItemReader<Trade> reader = new FlatFileItemReaderBuilder<>()
.name("flatFileItemReader")
.resource(new ClassPathResource("data/iosample/input/multiLine.txt"))
.lineTokenizer(orderFileTokenizer())
.fieldSetMapper(orderFieldSetMapper())
.build();
return reader;
}
以下示例展示瞭如何在 XML 中實現自定義 ItemReader
<bean id="itemReader" class="org.spr...MultiLineTradeItemReader">
<property name="delegate">
<bean class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="data/iosample/input/multiLine.txt" />
<property name="lineMapper">
<bean class="org.spr...DefaultLineMapper">
<property name="lineTokenizer" ref="orderFileTokenizer"/>
<property name="fieldSetMapper" ref="orderFieldSetMapper"/>
</bean>
</property>
</bean>
</property>
</bean>
為確保每行都正確標記化(這對於固定長度輸入尤其重要),可以在委託 FlatFileItemReader
上使用 PatternMatchingCompositeLineTokenizer
。詳見 Reader 和 Writer 章節中的 FlatFileItemReader
。然後,委託 Reader 使用 PassThroughFieldSetMapper
將每行的 FieldSet
返回給包裝器 ItemReader
。
-
Java
-
XML
以下示例展示瞭如何在 Java 中確保每行都被正確標記化
@Bean
public PatternMatchingCompositeLineTokenizer orderFileTokenizer() {
PatternMatchingCompositeLineTokenizer tokenizer =
new PatternMatchingCompositeLineTokenizer();
Map<String, LineTokenizer> tokenizers = new HashMap<>(4);
tokenizers.put("HEA*", headerRecordTokenizer());
tokenizers.put("FOT*", footerRecordTokenizer());
tokenizers.put("NCU*", customerLineTokenizer());
tokenizers.put("BAD*", billingAddressLineTokenizer());
tokenizer.setTokenizers(tokenizers);
return tokenizer;
}
以下示例展示瞭如何在 XML 中確保每行都被正確標記化
<bean id="orderFileTokenizer" class="org.spr...PatternMatchingCompositeLineTokenizer">
<property name="tokenizers">
<map>
<entry key="HEA*" value-ref="headerRecordTokenizer" />
<entry key="FOT*" value-ref="footerRecordTokenizer" />
<entry key="NCU*" value-ref="customerLineTokenizer" />
<entry key="BAD*" value-ref="billingAddressLineTokenizer" />
</map>
</property>
</bean>
此包裝器必須能夠識別記錄的結尾,以便可以持續對其委託呼叫 read()
直到到達結尾。對於讀取的每一行,包裝器都應構建要返回的 Item。一旦到達頁尾,就可以返回該 Item,將其交付給 ItemProcessor
和 ItemWriter
,如下例所示
private FlatFileItemReader<FieldSet> delegate;
public Trade read() throws Exception {
Trade t = null;
for (FieldSet line = null; (line = this.delegate.read()) != null;) {
String prefix = line.readString(0);
if (prefix.equals("HEA")) {
t = new Trade(); // Record must start with header
}
else if (prefix.equals("NCU")) {
Assert.notNull(t, "No header was found.");
t.setLast(line.readString(1));
t.setFirst(line.readString(2));
...
}
else if (prefix.equals("BAD")) {
Assert.notNull(t, "No header was found.");
t.setCity(line.readString(4));
t.setState(line.readString(6));
...
}
else if (prefix.equals("FOT")) {
return t; // Record must end with footer
}
}
Assert.isNull(t, "No 'END' was found.");
return null;
}
執行系統命令
許多批處理 Job 需要從批處理 Job 內部呼叫外部命令。這樣的過程可以由排程程式單獨啟動,但這會失去關於執行的通用元資料的優勢。此外,多步驟的 Job 也需要拆分成多個 Job。
由於這種需求非常普遍,Spring Batch 提供了一個用於呼叫系統命令的 Tasklet
實現。
-
Java
-
XML
以下示例展示瞭如何在 Java 中呼叫外部命令
@Bean
public SystemCommandTasklet tasklet() {
SystemCommandTasklet tasklet = new SystemCommandTasklet();
tasklet.setCommand("echo hello");
tasklet.setTimeout(5000);
return tasklet;
}
以下示例展示瞭如何在 XML 中呼叫外部命令
<bean class="org.springframework.batch.core.step.tasklet.SystemCommandTasklet">
<property name="command" value="echo hello" />
<!-- 5 second timeout for the command to complete -->
<property name="timeout" value="5000" />
</bean>
處理未找到輸入時的 Step 完成
在許多批處理場景中,在資料庫或檔案中找不到要處理的行並非異常情況。Step
被簡單地認為沒有找到工作,並以讀取 0 個 Item 完成。Spring Batch 中開箱即用提供的所有 ItemReader
實現都預設為此方法。如果在存在輸入的情況下(通常是檔案命名錯誤或發生類似問題時)沒有輸出任何內容,可能會導致一些困惑。因此,應檢查元資料本身以確定框架發現了多少工作需要處理。但是,如果找不到輸入被視為異常情況怎麼辦?在這種情況下,透過程式設計方式檢查元資料以瞭解沒有處理 Item 並導致失敗是最佳解決方案。由於這是一個常見的用例,Spring Batch 提供了一個具有此功能的監聽器,如 NoWorkFoundStepExecutionListener
的類定義所示
public class NoWorkFoundStepExecutionListener implements StepExecutionListener {
public ExitStatus afterStep(StepExecution stepExecution) {
if (stepExecution.getReadCount() == 0) {
return ExitStatus.FAILED;
}
return null;
}
}
前面的 StepExecutionListener
在 'afterStep' 階段檢查 StepExecution
的 readCount
屬性,以確定是否未讀取任何 Item。如果是這種情況,則返回退出碼 FAILED
,表明 Step
應該失敗。否則,返回 null
,這不會影響 Step
的狀態。
將資料傳遞給後續 Step
通常將資訊從一個 Step 傳遞到另一個 Step 是很有用的。這可以透過 ExecutionContext
來完成。需要注意的是,有兩個 ExecutionContext
:一個在 Step
級別,一個在 Job
級別。Step
的 ExecutionContext
僅在 Step 執行時保留,而 Job
的 ExecutionContext
貫穿整個 Job
。另一方面,Step
的 ExecutionContext
在 Step
每次提交一個塊時更新,而 Job
的 ExecutionContext
僅在每個 Step
結束時更新。
這種分離的結果是,在 Step
執行期間,所有資料都必須放在 Step
的 ExecutionContext
中。這樣做可以確保資料在 Step
執行時正確儲存。如果資料儲存到 Job
的 ExecutionContext
中,則在 Step
執行期間不會持久化。如果 Step
失敗,該資料將丟失。
public class SavingItemWriter implements ItemWriter<Object> {
private StepExecution stepExecution;
public void write(Chunk<? extends Object> items) throws Exception {
// ...
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
stepContext.put("someKey", someObject);
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
}
為了使資料對後續 Step
可用,必須在 Step 完成後將其“提升”到 Job
的 ExecutionContext
。Spring Batch 為此目的提供了 ExecutionContextPromotionListener
。必須使用 ExecutionContext
中需要提升的資料相關的鍵來配置監聽器。它還可以選擇配置一個退出碼模式列表,在此模式下進行提升(預設值為 COMPLETED
)。與所有監聽器一樣,它必須在 Step
上註冊。
-
Java
-
XML
以下示例展示瞭如何在 Java 中將 Step 提升到 Job
的 ExecutionContext
@Bean
public Job job1(JobRepository jobRepository, Step step1, Step step2) {
return new JobBuilder("job1", jobRepository)
.start(step1)
.next(step2)
.build();
}
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(reader())
.writer(savingWriter())
.listener(promotionListener())
.build();
}
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[] {"someKey"});
return listener;
}
以下示例展示瞭如何在 XML 中將 Step 提升到 Job
的 ExecutionContext
<job id="job1">
<step id="step1">
<tasklet>
<chunk reader="reader" writer="savingWriter" commit-interval="10"/>
</tasklet>
<listeners>
<listener ref="promotionListener"/>
</listeners>
</step>
<step id="step2">
...
</step>
</job>
<beans:bean id="promotionListener" class="org.spr....ExecutionContextPromotionListener">
<beans:property name="keys">
<list>
<value>someKey</value>
</list>
</beans:property>
</beans:bean>
最後,必須從 Job
的 ExecutionContext
中檢索儲存的值,如下例所示
public class RetrievingItemWriter implements ItemWriter<Object> {
private Object someObject;
public void write(Chunk<? extends Object> items) throws Exception {
// ...
}
@BeforeStep
public void retrieveInterstepData(StepExecution stepExecution) {
JobExecution jobExecution = stepExecution.getJobExecution();
ExecutionContext jobContext = jobExecution.getExecutionContext();
this.someObject = jobContext.get("someKey");
}
}