建立自定義 ItemReader 和 ItemWriter

到目前為止,本章討論了 Spring Batch 中讀寫操作的基本契約以及一些常見的實現方式。然而,這些實現都比較通用,許多潛在的場景可能無法透過開箱即用的實現來滿足。本節將透過一個簡單的示例,展示如何建立自定義的 ItemReaderItemWriter 實現,並正確地實現它們的契約。ItemReader 還實現了 ItemStream 介面,以說明如何使 reader 或 writer 具有重啟能力 (restartable)。

自定義 ItemReader 示例

出於示例目的,我們建立了一個簡單的 ItemReader 實現,它從提供的列表中讀取資料。我們首先實現 ItemReader 最基本的契約,即 read 方法,如下面的程式碼所示:

public class CustomItemReader<T> implements ItemReader<T> {

    List<T> items;

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
       NonTransientResourceException, ParseException {

        if (!items.isEmpty()) {
            return items.remove(0);
        }
        return null;
    }
}

前面的類接收一個 Item 列表,並逐個返回它們,同時將每個 Item 從列表中移除。當列表為空時,它返回 null,從而滿足了 ItemReader 最基本的要求,如下面的測試程式碼所示:

List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");

ItemReader itemReader = new CustomItemReader<>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());

使 ItemReader 具有重啟能力 (Restartable)

最後的挑戰是使 ItemReader 具有重啟能力 (restartable)。目前,如果處理中斷並再次開始,ItemReader 必須從頭開始。這在許多場景下是有效的,但有時更希望批處理作業 (job) 能從中斷的地方繼續。主要的區別通常在於 reader 是有狀態 (stateful) 還是無狀態 (stateless) 的。無狀態 reader 不需要擔心重啟能力,而有狀態 reader 則必須在重啟時嘗試恢復其最後已知狀態。因此,我們建議您儘可能保持自定義 reader 為無狀態的,這樣您就不需要擔心重啟能力了。

如果您確實需要儲存狀態,則應使用 ItemStream 介面

public class CustomItemReader<T> implements ItemReader<T>, ItemStream {

    List<T> items;
    int currentIndex = 0;
    private static final String CURRENT_INDEX = "current.index";

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
        ParseException, NonTransientResourceException {

        if (currentIndex < items.size()) {
            return items.get(currentIndex++);
        }

        return null;
    }

    public void open(ExecutionContext executionContext) throws ItemStreamException {
        if (executionContext.containsKey(CURRENT_INDEX)) {
            currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
        }
        else {
            currentIndex = 0;
        }
    }

    public void update(ExecutionContext executionContext) throws ItemStreamException {
        executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
    }

    public void close() throws ItemStreamException {}
}

每次呼叫 ItemStreamupdate 方法時,ItemReader 的當前索引都會儲存在提供的 ExecutionContext 中,鍵為 'current.index'。當呼叫 ItemStreamopen 方法時,會檢查 ExecutionContext 是否包含該鍵的條目。如果找到該鍵,則將當前索引移動到該位置。這是一個相當簡單的示例,但它仍然滿足了通用契約。

ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);

List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<>(items);

((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());

大多數 ItemReader 都擁有更為複雜的重啟邏輯。例如,JdbcCursorItemReader 會儲存遊標中最後處理行的行 ID。

另外值得注意的是,在 ExecutionContext 中使用的鍵不應該是微不足道的。這是因為同一個 ExecutionContext 用於 Step 中的所有 ItemStream。在大多數情況下,只需在鍵前面加上類名就足以保證唯一性。然而,在極少數情況下,如果同一個 Step 中使用了兩個相同型別的 ItemStream(例如,需要輸出到兩個檔案時),則需要一個更唯一的名稱。因此,許多 Spring Batch 的 ItemReaderItemWriter 實現都提供了 setName() 屬性,允許覆蓋此鍵名。

自定義 ItemWriter 示例

實現自定義 ItemWriter 在很多方面類似於上面的 ItemReader 示例,但在足以需要單獨示例的方面有所不同。然而,新增重啟能力基本上是相同的,因此本示例不再重複介紹。與 ItemReader 示例一樣,這裡使用了 List 以使示例儘可能簡單。

public class CustomItemWriter<T> implements ItemWriter<T> {

    List<T> output = TransactionAwareProxyFactory.createTransactionalList();

    public void write(Chunk<? extends T> items) throws Exception {
        output.addAll(items);
    }

    public List<T> getOutput() {
        return output;
    }
}

使 ItemWriter 具有重啟能力 (Restartable)

要使 ItemWriter 具有重啟能力 (restartable),我們將遵循與 ItemReader 相同的過程,新增並實現 ItemStream 介面來同步執行上下文。在示例中,我們可能需要計算已處理的 Item 數量,並將其作為頁尾記錄新增。如果需要這樣做,我們可以在 ItemWriter 中實現 ItemStream,以便在重新開啟流時,計數器可以從執行上下文中恢復。

在許多實際場景中,自定義 ItemWriter 也委託給另一個本身具有重啟能力的 writer(例如,寫入檔案時),或者寫入事務性資源,因此不需要具備重啟能力,因為它是無狀態的。當您有一個有狀態的 writer 時,您很可能需要確保同時實現 ItemStreamItemWriter。請記住,writer 的客戶端需要知道 ItemStream 的存在,因此您可能需要在配置中將其註冊為一個 stream。