建立自定義 ItemReaders 和 ItemWriters

到目前為止,本章已經討論了Spring Batch中讀寫的基本契約以及一些常用的實現方式。然而,這些都是相當通用的,並且存在許多開箱即用的實現可能無法覆蓋的潛在場景。本節透過一個簡單的例子,展示如何建立自定義的ItemReaderItemWriter實現並正確實現它們的契約。ItemReader還實現了ItemStream,以說明如何使讀取器或寫入器可重啟。

自定義ItemReader示例

為了本例的目的,我們建立一個簡單的ItemReader實現,它從提供的列表中讀取。我們首先實現ItemReader最基本的契約,即read方法,如以下程式碼所示:

public class CustomItemReader<T> implements ItemReader<T> {

    List<T> items;

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
       NonTransientResourceException, ParseException {

        if (!items.isEmpty()) {
            return items.remove(0);
        }
        return null;
    }
}

前面的類接收一個專案列表,然後一次返回一個,並從列表中移除。當列表為空時,它返回null,從而滿足ItemReader最基本的要求,如以下測試程式碼所示:

List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");

ItemReader itemReader = new CustomItemReader<>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());

使ItemReader可重啟

最後的挑戰是使ItemReader可重啟。目前,如果處理中斷並重新開始,ItemReader必須從頭開始。這在許多場景中是有效的,但有時批處理作業最好從上次中斷的地方重新開始。關鍵的區別通常在於讀取器是有狀態還是無狀態的。無狀態讀取器無需擔心可重啟性,但有狀態讀取器必須嘗試在重啟時恢復其上次已知狀態。因此,我們建議您儘可能保持自定義讀取器無狀態,這樣就無需擔心可重啟性。

如果您確實需要儲存狀態,則應使用ItemStream介面:

public class CustomItemReader<T> implements ItemReader<T>, ItemStream {

    List<T> items;
    int currentIndex = 0;
    private static final String CURRENT_INDEX = "current.index";

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
        ParseException, NonTransientResourceException {

        if (currentIndex < items.size()) {
            return items.get(currentIndex++);
        }

        return null;
    }

    public void open(ExecutionContext executionContext) throws ItemStreamException {
        if (executionContext.containsKey(CURRENT_INDEX)) {
            currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
        }
        else {
            currentIndex = 0;
        }
    }

    public void update(ExecutionContext executionContext) throws ItemStreamException {
        executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
    }

    public void close() throws ItemStreamException {}
}

每次呼叫ItemStreamupdate方法時,ItemReader的當前索引都以“current.index”為鍵儲存在提供的ExecutionContext中。當呼叫ItemStreamopen方法時,會檢查ExecutionContext以檢視是否包含該鍵的條目。如果找到該鍵,則當前索引會移動到該位置。這是一個相當簡單的示例,但它仍然滿足通用契約:

ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);

List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<>(items);

((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());

大多數ItemReaders都有更復雜的重啟邏輯。例如,JdbcCursorItemReader在遊標中儲存最後處理的行的行ID。

還值得注意的是,ExecutionContext中使用的鍵不應該過於簡單。這是因為同一個ExecutionContext用於Step中的所有ItemStreams。在大多數情況下,只需在鍵前面加上類名就足以保證唯一性。然而,在極少數情況下,如果同一型別的兩個ItemStream在同一個步驟中使用(當輸出需要兩個檔案時可能會發生),則需要一個更唯一的名稱。因此,許多Spring Batch的ItemReaderItemWriter實現都具有setName()屬性,允許覆蓋此鍵名。

自定義ItemWriter示例

實現自定義ItemWriter在許多方面與上面的ItemReader示例相似,但在足夠多的方面有所不同,因此值得單獨作為一個示例。然而,新增可重啟性基本相同,因此本示例不涉及。與ItemReader示例一樣,使用List以使示例儘可能簡單:

public class CustomItemWriter<T> implements ItemWriter<T> {

    List<T> output = TransactionAwareProxyFactory.createTransactionalList();

    public void write(Chunk<? extends T> items) throws Exception {
        output.addAll(items);
    }

    public List<T> getOutput() {
        return output;
    }
}

使ItemWriter可重啟

要使ItemWriter可重啟,我們將遵循與ItemReader相同的過程,新增並實現ItemStream介面以同步執行上下文。在此示例中,我們可能需要計算已處理的專案數量,並將其新增為頁尾記錄。如果我們需要這樣做,我們可以在ItemWriter中實現ItemStream,以便在重新開啟流時從執行上下文重新構成計數器。

在許多實際情況下,自定義ItemWriter也會委託給另一個本身可重啟的寫入器(例如,寫入檔案時),或者它寫入事務性資源,因此無需可重啟,因為它無狀態。當您擁有有狀態寫入器時,您應該確保同時實現ItemStreamItemWriter。請記住,寫入器的客戶端需要感知ItemStream,因此您可能需要在配置中將其註冊為流。

© . This site is unofficial and not affiliated with VMware.