常見批處理模式

一些批處理作業可以完全由 Spring Batch 中的現成元件組裝而成。例如,可以配置 ItemReaderItemWriter 實現來涵蓋各種場景。然而,在大多數情況下,必須編寫自定義程式碼。應用程式開發人員的主要 API 入口點是 TaskletItemReaderItemWriter 以及各種監聽器介面。大多數簡單的批處理作業可以使用 Spring Batch ItemReader 的現成輸入,但通常情況下,處理和寫入中存在需要開發人員實現 ItemWriterItemProcessor 的自定義問題。

在本章中,我們提供了一些自定義業務邏輯中常見模式的示例。這些示例主要以監聽器介面為特色。需要注意的是,ItemReaderItemWriter 也可以在適當的情況下實現監聽器介面。

日誌記錄項處理和失敗

一個常見的用例是需要對步驟中的錯誤進行特殊處理,逐項處理,例如記錄到特殊通道或將記錄插入資料庫。面向塊的 Step(由步驟工廠 bean 建立)允許使用者透過簡單的 ItemReadListener 處理讀取錯誤,透過 ItemWriteListener 處理寫入錯誤來實現此用例。以下程式碼片段演示了一個記錄讀取和寫入失敗的監聽器

public class ItemFailureLoggerListener extends ItemListenerSupport {

    private static Log logger = LogFactory.getLog("item.error");

    public void onReadError(Exception ex) {
        logger.error("Encountered error on read", e);
    }

    public void onWriteError(Exception ex, List<? extends Object> items) {
        logger.error("Encountered error on write", ex);
    }
}

實現此監聽器後,必須將其註冊到步驟中。

  • Java

  • XML

以下示例展示瞭如何使用 Java 將監聽器註冊到步驟中

Java 配置
@Bean
public Step simpleStep(JobRepository jobRepository) {
	return new StepBuilder("simpleStep", jobRepository)
				...
				.listener(new ItemFailureLoggerListener())
				.build();
}

以下示例展示瞭如何使用 XML 將監聽器註冊到步驟中

XML 配置
<step id="simpleStep">
...
<listeners>
    <listener>
        <bean class="org.example...ItemFailureLoggerListener"/>
    </listener>
</listeners>
</step>
如果您的監聽器在 onError() 方法中執行任何操作,則它必須在將要回滾的事務內部。如果您需要在 onError() 方法內部使用事務性資源(例如資料庫),請考慮為該方法新增宣告性事務(有關詳細資訊,請參閱 Spring Core 參考指南),並將其傳播屬性設定為 REQUIRES_NEW

出於業務原因手動停止作業

Spring Batch 透過 JobOperator 介面提供了一個 stop() 方法,但這實際上是供操作員使用的,而不是應用程式程式設計師。有時,從業務邏輯內部停止作業執行更方便或更有意義。

最簡單的做法是丟擲一個 RuntimeException(一個既不會無限重試也不會跳過的異常)。例如,可以使用自定義異常型別,如以下示例所示

public class PoisonPillItemProcessor<T> implements ItemProcessor<T, T> {

    @Override
    public T process(T item) throws Exception {
        if (isPoisonPill(item)) {
            throw new PoisonPillException("Poison pill detected: " + item);
        }
        return item;
    }
}

停止執行步驟的另一種簡單方法是從 ItemReader 返回 null,如以下示例所示

public class EarlyCompletionItemReader implements ItemReader<T> {

    private ItemReader<T> delegate;

    public void setDelegate(ItemReader<T> delegate) { ... }

    public T read() throws Exception {
        T item = delegate.read();
        if (isEndItem(item)) {
            return null; // end the step here
        }
        return item;
    }

}

前面的示例實際上依賴於 CompletionPolicy 策略的預設實現,當要處理的項為 null 時,該策略會發出完成批處理的訊號。可以透過 SimpleStepFactoryBean 實現更復雜的完成策略並將其注入到 Step 中。

  • Java

  • XML

以下示例展示瞭如何在 Java 中將完成策略注入到步驟中

Java 配置
@Bean
public Step simpleStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("simpleStep", jobRepository)
				.<String, String>chunk(new SpecialCompletionPolicy(), transactionManager)
				.reader(reader())
				.writer(writer())
				.build();
}

以下示例展示瞭如何在 XML 中將完成策略注入到步驟中

XML 配置
<step id="simpleStep">
    <tasklet>
        <chunk reader="reader" writer="writer" commit-interval="10"
               chunk-completion-policy="completionPolicy"/>
    </tasklet>
</step>

<bean id="completionPolicy" class="org.example...SpecialCompletionPolicy"/>

另一種方法是在 StepExecution 中設定一個標誌,框架中的 Step 實現會在項處理之間檢查該標誌。為了實現此替代方案,我們需要訪問當前的 StepExecution,這可以透過實現 StepListener 並將其註冊到 Step 來實現。以下示例展示了一個設定標誌的監聽器

public class CustomItemWriter extends ItemListenerSupport implements StepListener {

    private StepExecution stepExecution;

    public void beforeStep(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
    }

    public void afterRead(Object item) {
        if (isPoisonPill(item)) {
            stepExecution.setTerminateOnly();
       }
    }

}

當設定了標誌時,預設行為是步驟丟擲 JobInterruptedException。此行為可以透過 StepInterruptionPolicy 控制。然而,唯一的選擇是丟擲或不丟擲異常,因此這總是作業的異常結束。

新增頁尾記錄

通常,當寫入平面檔案時,在所有處理完成後,必須將“頁尾”記錄附加到檔案末尾。這可以透過 Spring Batch 提供的 FlatFileFooterCallback 介面實現。FlatFileFooterCallback(及其對應項 FlatFileHeaderCallback)是 FlatFileItemWriter 的可選屬性,可以新增到項寫入器中。

  • Java

  • XML

以下示例展示瞭如何在 Java 中使用 FlatFileHeaderCallbackFlatFileFooterCallback

Java 配置
@Bean
public FlatFileItemWriter<String> itemWriter(Resource outputResource) {
	return new FlatFileItemWriterBuilder<String>()
			.name("itemWriter")
			.resource(outputResource)
			.lineAggregator(lineAggregator())
			.headerCallback(headerCallback())
			.footerCallback(footerCallback())
			.build();
}

以下示例展示瞭如何在 XML 中使用 FlatFileHeaderCallbackFlatFileFooterCallback

XML 配置
<bean id="itemWriter" class="org.spr...FlatFileItemWriter">
    <property name="resource" ref="outputResource" />
    <property name="lineAggregator" ref="lineAggregator"/>
    <property name="headerCallback" ref="headerCallback" />
    <property name="footerCallback" ref="footerCallback" />
</bean>

頁尾回撥介面只有一個方法,當需要寫入頁尾時呼叫,如以下介面定義所示

public interface FlatFileFooterCallback {

    void writeFooter(Writer writer) throws IOException;

}

寫入摘要頁尾

涉及頁尾記錄的一個常見要求是在輸出過程中聚合資訊,並將此資訊附加到檔案末尾。此頁尾通常用作檔案的摘要或提供校驗和。

例如,如果批處理作業正在將 Trade 記錄寫入平面檔案,並且要求將所有 Trades 的總金額放入頁尾,則可以使用以下 ItemWriter 實現

public class TradeItemWriter implements ItemWriter<Trade>,
                                        FlatFileFooterCallback {

    private ItemWriter<Trade> delegate;

    private BigDecimal totalAmount = BigDecimal.ZERO;

    public void write(Chunk<? extends Trade> items) throws Exception {
        BigDecimal chunkTotal = BigDecimal.ZERO;
        for (Trade trade : items) {
            chunkTotal = chunkTotal.add(trade.getAmount());
        }

        delegate.write(items);

        // After successfully writing all items
        totalAmount = totalAmount.add(chunkTotal);
    }

    public void writeFooter(Writer writer) throws IOException {
        writer.write("Total Amount Processed: " + totalAmount);
    }

    public void setDelegate(ItemWriter delegate) {...}
}

TradeItemWriter 儲存一個 totalAmount 值,該值隨著寫入的每個 Trade 項的 amount 而增加。在處理完最後一個 Trade 後,框架呼叫 writeFooter,將 totalAmount 寫入檔案。請注意,write 方法使用了一個臨時變數 chunkTotal,它儲存了塊中 Trade 金額的總和。這樣做是為了確保,如果 write 方法中發生跳過,totalAmount 保持不變。只有在 write 方法結束時,一旦我們確定沒有丟擲異常,我們才更新 totalAmount

為了呼叫 writeFooter 方法,必須將 TradeItemWriter(它實現了 FlatFileFooterCallback)作為 footerCallback 連線到 FlatFileItemWriter 中。

  • Java

  • XML

以下示例展示瞭如何在 Java 中連線 TradeItemWriter

Java 配置
@Bean
public TradeItemWriter tradeItemWriter() {
	TradeItemWriter itemWriter = new TradeItemWriter();

	itemWriter.setDelegate(flatFileItemWriter(null));

	return itemWriter;
}

@Bean
public FlatFileItemWriter<String> flatFileItemWriter(Resource outputResource) {
	return new FlatFileItemWriterBuilder<String>()
			.name("itemWriter")
			.resource(outputResource)
			.lineAggregator(lineAggregator())
			.footerCallback(tradeItemWriter())
			.build();
}

以下示例展示瞭如何在 XML 中連線 TradeItemWriter

XML 配置
<bean id="tradeItemWriter" class="..TradeItemWriter">
    <property name="delegate" ref="flatFileItemWriter" />
</bean>

<bean id="flatFileItemWriter" class="org.spr...FlatFileItemWriter">
   <property name="resource" ref="outputResource" />
   <property name="lineAggregator" ref="lineAggregator"/>
   <property name="footerCallback" ref="tradeItemWriter" />
</bean>

到目前為止,TradeItemWriter 的編寫方式只有在 Step 不可重新啟動時才能正常工作。這是因為該類是有狀態的(因為它儲存了 totalAmount),但 totalAmount 未持久化到資料庫。因此,在重新啟動時無法檢索。為了使此類可重新啟動,應實現 ItemStream 介面以及 openupdate 方法,如以下示例所示

public void open(ExecutionContext executionContext) {
    if (executionContext.containsKey("total.amount") {
        totalAmount = (BigDecimal) executionContext.get("total.amount");
    }
}

public void update(ExecutionContext executionContext) {
    executionContext.put("total.amount", totalAmount);
}

update 方法在 ExecutionContext 物件持久化到資料庫之前,將 totalAmount 的最新版本儲存到 ExecutionContextopen 方法從 ExecutionContext 檢索任何現有的 totalAmount,並將其用作處理的起點,允許 TradeItemWriter 在重新啟動時從上次 Step 執行停止的地方繼續。

驅動基於查詢的 ItemReaders

讀寫器章節中,討論了使用分頁的資料庫輸入。許多資料庫供應商,例如 DB2,具有非常悲觀的鎖定策略,如果正在讀取的表也需要被線上應用程式的其他部分使用,則可能會導致問題。此外,對超大資料集開啟遊標可能會導致某些供應商的資料庫出現問題。因此,許多專案傾向於使用“驅動查詢”方法讀取資料。這種方法透過迭代鍵而不是需要返回的整個物件來工作,如下圖所示

Driving Query Job
圖 1. 驅動查詢作業

如您所見,前面影像中顯示的示例使用了與基於遊標的示例相同的“FOO”表。然而,在 SQL 語句中,只選擇了 ID,而不是選擇整個行。因此,從 read 返回的是一個 Integer,而不是一個 FOO 物件。然後,可以使用此數字查詢“詳細資訊”,即一個完整的 Foo 物件,如下圖所示

Driving Query Example
圖 2. 驅動查詢示例

應使用 ItemProcessor 將從驅動查詢獲得的鍵轉換為完整的 Foo 物件。可以使用現有 DAO 根據鍵查詢完整的物件。

多行記錄

雖然通常情況下,平面檔案中的每條記錄都限制在一行中,但檔案可能包含跨多行且具有多種格式的記錄。以下檔案摘錄顯示了這種安排的一個示例

HEA;0013100345;2007-02-15
NCU;Smith;Peter;;T;20014539;F
BAD;;Oak Street 31/A;;Small Town;00235;IL;US
FOT;2;2;267.34

從“HEA”開頭的行到“FOT”開頭的行之間的所有內容都被視為一條記錄。為了正確處理這種情況,必須考慮以下幾點

  • ItemReader 必須一次讀取多行記錄的所有行作為一個組,而不是一次讀取一條記錄,以便可以將其完整地傳遞給 ItemWriter

  • 每種行型別可能需要以不同的方式進行標記。

由於一條記錄跨越多行,並且我們可能不知道有多少行,因此 ItemReader 必須小心地始終讀取整個記錄。為此,應將自定義 ItemReader 實現為 FlatFileItemReader 的包裝器。

  • Java

  • XML

以下示例展示瞭如何在 Java 中實現自定義 ItemReader

Java 配置
@Bean
public MultiLineTradeItemReader itemReader() {
	MultiLineTradeItemReader itemReader = new MultiLineTradeItemReader();

	itemReader.setDelegate(flatFileItemReader());

	return itemReader;
}

@Bean
public FlatFileItemReader flatFileItemReader() {
	FlatFileItemReader<Trade> reader = new FlatFileItemReaderBuilder<>()
			.name("flatFileItemReader")
			.resource(new ClassPathResource("data/iosample/input/multiLine.txt"))
			.lineTokenizer(orderFileTokenizer())
			.fieldSetMapper(orderFieldSetMapper())
			.build();
	return reader;
}

以下示例展示瞭如何在 XML 中實現自定義 ItemReader

XML 配置
<bean id="itemReader" class="org.spr...MultiLineTradeItemReader">
    <property name="delegate">
        <bean class="org.springframework.batch.infrastructure.item.file.FlatFileItemReader">
            <property name="resource" value="data/iosample/input/multiLine.txt" />
            <property name="lineMapper">
                <bean class="org.spr...DefaultLineMapper">
                    <property name="lineTokenizer" ref="orderFileTokenizer"/>
                    <property name="fieldSetMapper" ref="orderFieldSetMapper"/>
                </bean>
            </property>
        </bean>
    </property>
</bean>

為了確保每行都正確分詞(這對於固定長度輸入尤其重要),可以在委託 FlatFileItemReader 上使用 PatternMatchingCompositeLineTokenizer。有關更多詳細資訊,請參閱讀寫器章節中的 FlatFileItemReader。然後,委託讀取器使用 PassThroughFieldSetMapper 為每行向包裝 ItemReader 提供一個 FieldSet

  • Java

  • XML

以下示例展示瞭如何在 Java 中確保每行都被正確分詞

Java 內容
@Bean
public PatternMatchingCompositeLineTokenizer orderFileTokenizer() {
	PatternMatchingCompositeLineTokenizer tokenizer =
			new PatternMatchingCompositeLineTokenizer();

	Map<String, LineTokenizer> tokenizers = new HashMap<>(4);

	tokenizers.put("HEA*", headerRecordTokenizer());
	tokenizers.put("FOT*", footerRecordTokenizer());
	tokenizers.put("NCU*", customerLineTokenizer());
	tokenizers.put("BAD*", billingAddressLineTokenizer());

	tokenizer.setTokenizers(tokenizers);

	return tokenizer;
}

以下示例展示瞭如何在 XML 中確保每行都被正確分詞

XML 內容
<bean id="orderFileTokenizer" class="org.spr...PatternMatchingCompositeLineTokenizer">
    <property name="tokenizers">
        <map>
            <entry key="HEA*" value-ref="headerRecordTokenizer" />
            <entry key="FOT*" value-ref="footerRecordTokenizer" />
            <entry key="NCU*" value-ref="customerLineTokenizer" />
            <entry key="BAD*" value-ref="billingAddressLineTokenizer" />
        </map>
    </property>
</bean>

此包裝器必須能夠識別記錄的末尾,以便它可以不斷地對其委託物件呼叫 read(),直到達到末尾。對於讀取的每一行,包裝器都應該構建要返回的項。一旦到達頁尾,就可以返回該項以交付給 ItemProcessorItemWriter,如以下示例所示

private FlatFileItemReader<FieldSet> delegate;

public Trade read() throws Exception {
    Trade t = null;

    for (FieldSet line = null; (line = this.delegate.read()) != null;) {
        String prefix = line.readString(0);
        if (prefix.equals("HEA")) {
            t = new Trade(); // Record must start with header
        }
        else if (prefix.equals("NCU")) {
            Assert.notNull(t, "No header was found.");
            t.setLast(line.readString(1));
            t.setFirst(line.readString(2));
            ...
        }
        else if (prefix.equals("BAD")) {
            Assert.notNull(t, "No header was found.");
            t.setCity(line.readString(4));
            t.setState(line.readString(6));
          ...
        }
        else if (prefix.equals("FOT")) {
            return t; // Record must end with footer
        }
    }
    Assert.isNull(t, "No 'END' was found.");
    return null;
}

執行系統命令

許多批處理作業要求從批處理作業內部呼叫外部命令。這樣的過程可以由排程程式單獨啟動,但這樣會丟失有關執行的公共元資料。此外,多步驟作業也需要拆分為多個作業。

由於此需求非常常見,Spring Batch 提供了一個用於呼叫系統命令的 Tasklet 實現。

  • Java

  • XML

以下示例展示瞭如何在 Java 中呼叫外部命令

Java 配置
@Bean
public SystemCommandTasklet tasklet() {
	SystemCommandTasklet tasklet = new SystemCommandTasklet();

	tasklet.setCommand("echo hello");
	tasklet.setTimeout(5000);

	return tasklet;
}

以下示例展示瞭如何在 XML 中呼叫外部命令

XML 配置
<bean class="org.springframework.batch.core.step.tasklet.SystemCommandTasklet">
    <property name="command" value="echo hello" />
    <!-- 5 second timeout for the command to complete -->
    <property name="timeout" value="5000" />
</bean>

處理未找到輸入時的步驟完成

在許多批處理場景中,在資料庫或檔案中找不到要處理的行並非異常。Step 只是被認為沒有找到工作,並以讀取 0 項完成。Spring Batch 中開箱即用的所有 ItemReader 實現都預設採用此方法。如果即使存在輸入也沒有寫入任何內容(通常在檔案命名錯誤或出現類似問題時發生),這可能會導致一些混亂。因此,應該檢查元資料本身以確定框架發現了多少要處理的工作。但是,如果找不到輸入被認為是異常情況怎麼辦?在這種情況下,程式化地檢查元資料以確定沒有處理任何項並導致失敗是最佳解決方案。因為這是一個常見的用例,Spring Batch 提供了一個具有完全此功能的監聽器,如 NoWorkFoundStepExecutionListener 的類定義所示

public class NoWorkFoundStepExecutionListener implements StepExecutionListener {

    public ExitStatus afterStep(StepExecution stepExecution) {
        if (stepExecution.getReadCount() == 0) {
            return ExitStatus.FAILED;
        }
        return null;
    }

}

前面的 StepExecutionListener 在“afterStep”階段檢查 StepExecutionreadCount 屬性,以確定是否沒有讀取任何項。如果是這種情況,則返回退出程式碼 FAILED,表示 Step 應該失敗。否則,返回 null,這不會影響 Step 的狀態。

將資料傳遞給未來的步驟

將資訊從一個步驟傳遞到另一個步驟通常很有用。這可以透過 ExecutionContext 完成。需要注意的是,有兩個 ExecutionContext:一個在 Step 級別,一個在 Job 級別。StepExecutionContext 僅在步驟持續期間存在,而 JobExecutionContext 則在整個 Job 期間存在。另一方面,StepExecutionContext 在每次 Step 提交一個塊時更新,而 JobExecutionContext 僅在每個 Step 結束時更新。

這種分離的結果是,所有資料都必須在 Step 執行期間放置在 Step ExecutionContext 中。這樣做可以確保資料在 Step 執行時正確儲存。如果資料儲存在 Job ExecutionContext 中,則在 Step 執行期間不會持久化。如果 Step 失敗,這些資料將丟失。

public class SavingItemWriter implements ItemWriter<Object> {
    private StepExecution stepExecution;

    public void write(Chunk<? extends Object> items) throws Exception {
        // ...

        ExecutionContext stepContext = this.stepExecution.getExecutionContext();
        stepContext.put("someKey", someObject);
    }

    @BeforeStep
    public void saveStepExecution(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
    }
}

為了使資料可供未來的 Steps 使用,它必須在步驟完成後“提升”到 Job ExecutionContext。Spring Batch 為此提供了 ExecutionContextPromotionListener。監聽器必須配置與 ExecutionContext 中必須提升的資料相關的鍵。它還可以選擇配置一個退出程式碼模式列表,在此模式下應發生提升(COMPLETED 是預設值)。與所有監聽器一樣,它必須在 Step 上註冊。

  • Java

  • XML

以下示例展示瞭如何在 Java 中將步驟提升到 Job ExecutionContext

Java 配置
@Bean
public Job job1(JobRepository jobRepository, Step step1, Step step2) {
	return new JobBuilder("job1", jobRepository)
				.start(step1)
				.next(step2)
				.build();
}

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(10).transactionManager(transactionManager)
				.reader(reader())
				.writer(savingWriter())
				.listener(promotionListener())
				.build();
}

@Bean
public ExecutionContextPromotionListener promotionListener() {
	ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();

	listener.setKeys(new String[] {"someKey"});

	return listener;
}

以下示例展示瞭如何在 XML 中將步驟提升到 Job ExecutionContext

XML 配置
<job id="job1">
    <step id="step1">
        <tasklet>
            <chunk reader="reader" writer="savingWriter" commit-interval="10"/>
        </tasklet>
        <listeners>
            <listener ref="promotionListener"/>
        </listeners>
    </step>

    <step id="step2">
       ...
    </step>
</job>

<beans:bean id="promotionListener" class="org.spr....ExecutionContextPromotionListener">
    <beans:property name="keys">
        <list>
            <value>someKey</value>
        </list>
    </beans:property>
</beans:bean>

最後,必須從 Job ExecutionContext 中檢索儲存的值,如以下示例所示

public class RetrievingItemWriter implements ItemWriter<Object> {
    private Object someObject;

    public void write(Chunk<? extends Object> items) throws Exception {
        // ...
    }

    @BeforeStep
    public void retrieveInterstepData(StepExecution stepExecution) {
        JobExecution jobExecution = stepExecution.getJobExecution();
        ExecutionContext jobContext = jobExecution.getExecutionContext();
        this.someObject = jobContext.get("someKey");
    }
}
© . This site is unofficial and not affiliated with VMware.