專案處理

ItemReader 和 ItemWriter 介面對於它們各自的任務都非常有用,但如果您想在寫入之前插入業務邏輯怎麼辦?對於讀和寫,一個選項是使用複合(composite)模式:建立一個包含另一個 ItemWriterItemWriter,或建立一個包含另一個 ItemReaderItemReader。以下程式碼顯示了一個示例

public class CompositeItemWriter<T> implements ItemWriter<T> {

    ItemWriter<T> itemWriter;

    public CompositeItemWriter(ItemWriter<T> itemWriter) {
        this.itemWriter = itemWriter;
    }

    public void write(Chunk<? extends T> items) throws Exception {
        //Add business logic here
       itemWriter.write(items);
    }

    public void setDelegate(ItemWriter<T> itemWriter){
        this.itemWriter = itemWriter;
    }
}

前面的類包含另一個 ItemWriter,在提供了某些業務邏輯後,它將委託給後者。這種模式也很容易用於 ItemReader,或許可以根據主 ItemReader 提供的輸入獲取更多參考資料。如果您需要自己控制對 write 的呼叫,它也很有用。但是,如果您只想在實際寫入專案之前“轉換”傳遞給寫入的專案,則無需自己進行 write。您只需修改該專案即可。對於這種情況,Spring Batch 提供了 ItemProcessor 介面,如下面的介面定義所示

public interface ItemProcessor<I, O> {

    O process(I item) throws Exception;
}

ItemProcessor 很簡單。給定一個物件,轉換它並返回另一個物件。提供的物件型別可能相同,也可能不同。重點是業務邏輯可以應用於該過程,並且完全由開發人員建立該邏輯。ItemProcessor 可以直接裝配到 step 中。例如,假設 ItemReader 提供了一個型別為 Foo 的類,並且在寫入之前需要將其轉換為型別為 Bar 的類。以下示例顯示了一個執行轉換的 ItemProcessor

public class Foo {}

public class Bar {
    public Bar(Foo foo) {}
}

public class FooProcessor implements ItemProcessor<Foo, Bar> {
    public Bar process(Foo foo) throws Exception {
        //Perform simple transformation, convert a Foo to a Bar
        return new Bar(foo);
    }
}

public class BarWriter implements ItemWriter<Bar> {
    public void write(Chunk<? extends Bar> bars) throws Exception {
        //write bars
    }
}

在前面的示例中,有一個名為 Foo 的類,一個名為 Bar 的類,以及一個符合 ItemProcessor 介面的名為 FooProcessor 的類。轉換很簡單,但此處可以進行任何型別的轉換。BarWriter 寫入 Bar 物件,如果提供了任何其他型別,則會丟擲異常。同樣,如果提供的不是 Foo,則 FooProcessor 會丟擲異常。然後可以將 FooProcessor 注入到 Step 中,如下面的示例所示

  • Java

  • XML

Java 配置
@Bean
public Job ioSampleJob(JobRepository jobRepository, Step step1) {
	return new JobBuilder("ioSampleJob", jobRepository)
				.start(step1)
				.build();
}

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<Foo, Bar>chunk(2, transactionManager)
				.reader(fooReader())
				.processor(fooProcessor())
				.writer(barWriter())
				.build();
}
XML 配置
<job id="ioSampleJob">
    <step name="step1">
        <tasklet>
            <chunk reader="fooReader" processor="fooProcessor" writer="barWriter"
                   commit-interval="2"/>
        </tasklet>
    </step>
</job>

ItemProcessorItemReaderItemWriter 的區別在於 ItemProcessor 對於 Step 是可選的。

連結 ItemProcessor

在許多場景中,執行單個轉換非常有用,但如果您想將多個 ItemProcessor 實現“連結”在一起怎麼辦?您可以透過使用前面提到的複合模式來實現。為了更新前面的單個轉換示例,Foo 被轉換為 Bar,然後再轉換為 Foobar 並寫入,如下面的示例所示

public class Foo {}

public class Bar {
    public Bar(Foo foo) {}
}

public class Foobar {
    public Foobar(Bar bar) {}
}

public class FooProcessor implements ItemProcessor<Foo, Bar> {
    public Bar process(Foo foo) throws Exception {
        //Perform simple transformation, convert a Foo to a Bar
        return new Bar(foo);
    }
}

public class BarProcessor implements ItemProcessor<Bar, Foobar> {
    public Foobar process(Bar bar) throws Exception {
        return new Foobar(bar);
    }
}

public class FoobarWriter implements ItemWriter<Foobar>{
    public void write(Chunk<? extends Foobar> items) throws Exception {
        //write items
    }
}

可以將 FooProcessorBarProcessor “連結”在一起,以得到結果 Foobar,如下例所示

CompositeItemProcessor<Foo,Foobar> compositeProcessor =
                                      new CompositeItemProcessor<Foo,Foobar>();
List itemProcessors = new ArrayList();
itemProcessors.add(new FooProcessor());
itemProcessors.add(new BarProcessor());
compositeProcessor.setDelegates(itemProcessors);

就像前面的示例一樣,您可以將複合處理器配置到 Step

  • Java

  • XML

Java 配置
@Bean
public Job ioSampleJob(JobRepository jobRepository, Step step1) {
	return new JobBuilder("ioSampleJob", jobRepository)
				.start(step1)
				.build();
}

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<Foo, Foobar>chunk(2, transactionManager)
				.reader(fooReader())
				.processor(compositeProcessor())
				.writer(foobarWriter())
				.build();
}

@Bean
public CompositeItemProcessor compositeProcessor() {
	List<ItemProcessor> delegates = new ArrayList<>(2);
	delegates.add(new FooProcessor());
	delegates.add(new BarProcessor());

	CompositeItemProcessor processor = new CompositeItemProcessor();

	processor.setDelegates(delegates);

	return processor;
}
XML 配置
<job id="ioSampleJob">
    <step name="step1">
        <tasklet>
            <chunk reader="fooReader" processor="compositeItemProcessor" writer="foobarWriter"
                   commit-interval="2"/>
        </tasklet>
    </step>
</job>

<bean id="compositeItemProcessor"
      class="org.springframework.batch.item.support.CompositeItemProcessor">
    <property name="delegates">
        <list>
            <bean class="..FooProcessor" />
            <bean class="..BarProcessor" />
        </list>
    </property>
</bean>

過濾記錄

專案處理器的典型用途之一是在記錄傳遞給 ItemWriter 之前將其過濾掉。過濾是與跳過不同的動作。跳過表示記錄無效,而過濾表示記錄不應寫入。

例如,考慮一個批處理 Job,它讀取包含三種不同型別記錄的檔案:要插入的記錄、要更新的記錄和要刪除的記錄。如果系統不支援記錄刪除,我們就不希望將任何可刪除的記錄傳送到 ItemWriter。但是,由於這些記錄實際上並非壞記錄,因此我們希望將其過濾掉,而不是跳過它們。結果是,ItemWriter 只會收到可插入和可更新的記錄。

要過濾記錄,可以從 ItemProcessor 返回 null。框架會檢測到結果為 null,並避免將該專案新增到傳遞給 ItemWriter 的記錄列表中。從 ItemProcessor 丟擲異常會導致跳過。

驗證輸入

ItemReaders 和 ItemWriters 章節討論了多種解析輸入的方法。每個主要實現都會在格式不“良好”時丟擲異常。如果缺少資料範圍,FixedLengthTokenizer 會丟擲異常。類似地,嘗試訪問 RowMapperFieldSetMapper 中不存在或格式與預期不同的索引會導致丟擲異常。所有這些型別的異常都在 read 返回之前丟擲。但是,它們不解決返回的專案是否有效的問題。例如,如果其中一個欄位是年齡,它不能為負。它可以正確解析,因為它存在並且是一個數字,但不會導致異常。由於已經存在大量的驗證框架,Spring Batch 不會嘗試再提供一個。相反,它提供了一個簡單的介面,稱為 Validator,您可以使用任意數量的框架來實現它,如下面的介面定義所示

public interface Validator<T> {

    void validate(T value) throws ValidationException;

}

契約是如果物件無效,則 validate 方法會丟擲異常;如果物件有效,則正常返回。Spring Batch 提供了一個 ValidatingItemProcessor,如下面的 Bean 定義所示

  • Java

  • XML

Java 配置
@Bean
public ValidatingItemProcessor itemProcessor() {
	ValidatingItemProcessor processor = new ValidatingItemProcessor();

	processor.setValidator(validator());

	return processor;
}

@Bean
public SpringValidator validator() {
	SpringValidator validator = new SpringValidator();

	validator.setValidator(new TradeValidator());

	return validator;
}
XML 配置
<bean class="org.springframework.batch.item.validator.ValidatingItemProcessor">
    <property name="validator" ref="validator" />
</bean>

<bean id="validator" class="org.springframework.batch.item.validator.SpringValidator">
	<property name="validator">
		<bean class="org.springframework.batch.samples.domain.trade.internal.validator.TradeValidator"/>
	</property>
</bean>

您還可以使用 BeanValidatingItemProcessor 來驗證使用 Bean Validation API (JSR-303) 註解的專案。例如,考慮以下型別 Person

class Person {

    @NotEmpty
    private String name;

    public Person(String name) {
     this.name = name;
    }

    public String getName() {
     return name;
    }

    public void setName(String name) {
     this.name = name;
    }

}

您可以透過在應用程式上下文中宣告一個 BeanValidatingItemProcessor bean,並將其註冊為面向 chunk 的 step 中的處理器來驗證專案

@Bean
public BeanValidatingItemProcessor<Person> beanValidatingItemProcessor() throws Exception {
    BeanValidatingItemProcessor<Person> beanValidatingItemProcessor = new BeanValidatingItemProcessor<>();
    beanValidatingItemProcessor.setFilter(true);

    return beanValidatingItemProcessor;
}

容錯

當一個 chunk 回滾時,在讀取期間快取的專案可能會被重新處理。如果一個 step 配置為容錯(通常透過使用跳過或重試處理),則使用的任何 ItemProcessor 都應以冪等的方式實現。通常,這將包括不對 ItemProcessor 的輸入專案執行任何更改,並且僅更新作為結果的例項。