項處理
ItemReader 和 ItemWriter 介面對於其特定任務都非常有用,但是如果你想在寫入之前插入業務邏輯怎麼辦?對於讀寫,一個選項是使用組合模式:建立一個包含另一個 ItemWriter 的 ItemWriter 或一個包含另一個 ItemReader 的 ItemReader。以下程式碼顯示了一個示例
public class CompositeItemWriter<T> implements ItemWriter<T> {
ItemWriter<T> itemWriter;
public CompositeItemWriter(ItemWriter<T> itemWriter) {
this.itemWriter = itemWriter;
}
public void write(Chunk<? extends T> items) throws Exception {
//Add business logic here
itemWriter.write(items);
}
public void setDelegate(ItemWriter<T> itemWriter){
this.itemWriter = itemWriter;
}
}
前面的類包含另一個 ItemWriter,在提供了某些業務邏輯之後,它會將任務委託給該 ItemWriter。這種模式也可以很容易地用於 ItemReader,也許是為了根據主 ItemReader 提供的輸入獲取更多的參考資料。如果你需要自己控制對 write 的呼叫,它也很有用。但是,如果你只想在實際寫入之前“轉換”傳入以進行寫入的專案,則無需自己 write。你只需修改專案。對於這種情況,Spring Batch 提供了 ItemProcessor 介面,如下面的介面定義所示
public interface ItemProcessor<I, O> {
O process(I item) throws Exception;
}
ItemProcessor 很簡單。給定一個物件,對其進行轉換並返回另一個物件。提供的物件可能與原始物件型別相同,也可能不同。重點是業務邏輯可以在處理過程中應用,並且完全由開發人員建立該邏輯。ItemProcessor 可以直接連線到步驟中。例如,假設 ItemReader 提供了一個型別為 Foo 的類,並且在寫入之前需要將其轉換為型別 Bar。以下示例顯示了一個執行轉換的 ItemProcessor
public class Foo {}
public class Bar {
public Bar(Foo foo) {}
}
public class FooProcessor implements ItemProcessor<Foo, Bar> {
public Bar process(Foo foo) throws Exception {
//Perform simple transformation, convert a Foo to a Bar
return new Bar(foo);
}
}
public class BarWriter implements ItemWriter<Bar> {
public void write(Chunk<? extends Bar> bars) throws Exception {
//write bars
}
}
在前面的示例中,有一個名為 Foo 的類,一個名為 Bar 的類,以及一個名為 FooProcessor 的類,它遵循 ItemProcessor 介面。轉換很簡單,但可以在此處進行任何型別的轉換。BarWriter 寫入 Bar 物件,如果提供了任何其他型別,則丟擲異常。同樣,如果提供了除 Foo 之外的任何內容,FooProcessor 也會丟擲異常。然後可以將 FooProcessor 注入到 Step 中,如下面的示例所示
-
Java
-
XML
@Bean
public Job ioSampleJob(JobRepository jobRepository, Step step1) {
return new JobBuilder("ioSampleJob", jobRepository)
.start(step1)
.build();
}
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<Foo, Bar>chunk(2).transactionManager(transactionManager)
.reader(fooReader())
.processor(fooProcessor())
.writer(barWriter())
.build();
}
<job id="ioSampleJob">
<step name="step1">
<tasklet>
<chunk reader="fooReader" processor="fooProcessor" writer="barWriter"
commit-interval="2"/>
</tasklet>
</step>
</job>
ItemProcessor 與 ItemReader 或 ItemWriter 的區別在於,ItemProcessor 對於 Step 來說是可選的。
鏈式 ItemProcessor
在許多場景中,執行單一轉換很有用,但如果你想將多個 ItemProcessor 實現“連結”在一起怎麼辦?你可以透過使用前面提到的組合模式來實現。為了更新前面的單一轉換示例,Foo 被轉換為 Bar,然後被轉換為 Foobar 並寫入,如下面的示例所示
public class Foo {}
public class Bar {
public Bar(Foo foo) {}
}
public class Foobar {
public Foobar(Bar bar) {}
}
public class FooProcessor implements ItemProcessor<Foo, Bar> {
public Bar process(Foo foo) throws Exception {
//Perform simple transformation, convert a Foo to a Bar
return new Bar(foo);
}
}
public class BarProcessor implements ItemProcessor<Bar, Foobar> {
public Foobar process(Bar bar) throws Exception {
return new Foobar(bar);
}
}
public class FoobarWriter implements ItemWriter<Foobar>{
public void write(Chunk<? extends Foobar> items) throws Exception {
//write items
}
}
一個 FooProcessor 和一個 BarProcessor 可以“連結”在一起,以得到結果 Foobar,如下面的示例所示
CompositeItemProcessor<Foo,Foobar> compositeProcessor =
new CompositeItemProcessor<Foo,Foobar>();
List itemProcessors = new ArrayList();
itemProcessors.add(new FooProcessor());
itemProcessors.add(new BarProcessor());
compositeProcessor.setDelegates(itemProcessors);
與前面的示例一樣,你可以將組合處理器配置到 Step 中
-
Java
-
XML
@Bean
public Job ioSampleJob(JobRepository jobRepository, Step step1) {
return new JobBuilder("ioSampleJob", jobRepository)
.start(step1)
.build();
}
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<Foo, Foobar>chunk(2).transactionManager(transactionManager)
.reader(fooReader())
.processor(compositeProcessor())
.writer(foobarWriter())
.build();
}
@Bean
public CompositeItemProcessor compositeProcessor() {
List<ItemProcessor> delegates = new ArrayList<>(2);
delegates.add(new FooProcessor());
delegates.add(new BarProcessor());
CompositeItemProcessor processor = new CompositeItemProcessor();
processor.setDelegates(delegates);
return processor;
}
<job id="ioSampleJob">
<step name="step1">
<tasklet>
<chunk reader="fooReader" processor="compositeItemProcessor" writer="foobarWriter"
commit-interval="2"/>
</tasklet>
</step>
</job>
<bean id="compositeItemProcessor"
class="org.springframework.batch.infrastructure.item.support.CompositeItemProcessor">
<property name="delegates">
<list>
<bean class="..FooProcessor" />
<bean class="..BarProcessor" />
</list>
</property>
</bean>
過濾記錄
專案處理器的一個典型用途是在記錄傳遞給 ItemWriter 之前將其過濾掉。過濾是一種與跳過不同的操作。跳過表示記錄無效,而過濾表示不應寫入記錄。
例如,考慮一個批處理作業,它讀取一個包含三種不同型別記錄的檔案:要插入的記錄、要更新的記錄和要刪除的記錄。如果系統不支援記錄刪除,我們就不希望將任何可刪除的記錄傳送到 ItemWriter。但是,由於這些記錄實際上並不是錯誤的記錄,我們希望將其過濾掉而不是跳過。因此,ItemWriter 將只接收可插入和可更新的記錄。
要過濾記錄,你可以從 ItemProcessor 返回 null。框架會檢測到結果是 null,並避免將該項新增到傳遞給 ItemWriter 的記錄列表中。從 ItemProcessor 丟擲的異常會導致跳過。
驗證輸入
ItemReaders 和 ItemWriters 一章討論了多種解析輸入的方法。如果輸入“格式不正確”,每個主要實現都會丟擲異常。如果缺少資料範圍,FixedLengthTokenizer 會丟擲異常。同樣,嘗試訪問 RowMapper 或 FieldSetMapper 中不存在或格式與預期不同的索引會導致丟擲異常。所有這些型別的異常都在 read 返回之前丟擲。但是,它們不解決返回的專案是否有效的問題。例如,如果其中一個欄位是年齡,它不能是負數。它可能解析正確,因為它存在並且是一個數字,但它不會導致異常。由於已經有大量的驗證框架,Spring Batch 不會嘗試提供另一個。相反,它提供了一個簡單的介面,稱為 Validator,你可以透過許多框架來實現,如下面的介面定義所示
public interface Validator<T> {
void validate(T value) throws ValidationException;
}
約定是,如果物件無效,validate 方法會丟擲異常;如果物件有效,則正常返回。Spring Batch 提供了一個 ValidatingItemProcessor,如下面的 bean 定義所示
-
Java
-
XML
@Bean
public ValidatingItemProcessor itemProcessor() {
ValidatingItemProcessor processor = new ValidatingItemProcessor();
processor.setValidator(validator());
return processor;
}
@Bean
public SpringValidator validator() {
SpringValidator validator = new SpringValidator();
validator.setValidator(new TradeValidator());
return validator;
}
<bean class="org.springframework.batch.infrastructure.item.validator.ValidatingItemProcessor">
<property name="validator" ref="validator" />
</bean>
<bean id="validator" class="org.springframework.batch.infrastructure.item.validator.SpringValidator">
<property name="validator">
<bean class="org.springframework.batch.samples.domain.trade.internal.validator.TradeValidator"/>
</property>
</bean>
你還可以使用 BeanValidatingItemProcessor 來驗證使用 Bean Validation API (JSR-303) 註解標註的專案。例如,考慮以下型別 Person
class Person {
@NotEmpty
private String name;
public Person(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
你可以透過在應用程式上下文中宣告一個 BeanValidatingItemProcessor bean,並將其註冊為面向塊的步驟中的處理器來驗證專案
@Bean
public BeanValidatingItemProcessor<Person> beanValidatingItemProcessor() throws Exception {
BeanValidatingItemProcessor<Person> beanValidatingItemProcessor = new BeanValidatingItemProcessor<>();
beanValidatingItemProcessor.setFilter(true);
return beanValidatingItemProcessor;
}