透過訊息啟動批處理作業
透過核心 Spring Batch API 啟動批處理作業時,您基本上有兩種選擇
-
從命令列,使用
CommandLineJobOperator -
透過程式設計方式,使用
JobOperator.start()
例如,當透過 shell 指令碼呼叫批處理作業時,您可能希望使用 CommandLineJobOperator。或者,您可以直接使用 JobOperator(例如,當 Spring Batch 作為 Web 應用程式的一部分使用時)。但是,更復雜的用例呢?也許您需要輪詢遠端 (S)FTP 伺服器以檢索批處理作業的資料,或者您的應用程式必須同時支援多個不同的資料來源。例如,您可能不僅從 Web 接收資料檔案,還從 FTP 和其他來源接收資料檔案。在呼叫 Spring Batch 之前,可能還需要對輸入檔案進行額外的轉換。
因此,使用 Spring Integration 及其眾多介面卡執行批處理作業會更加強大。例如,您可以使用檔案入站通道介面卡來監控檔案系統中的目錄,並在輸入檔案到達時立即啟動批處理作業。此外,您可以建立使用多個不同介面卡的 Spring Integration 流,僅透過配置即可輕鬆地同時從多個源獲取批處理作業的資料。使用 Spring Integration 實現所有這些場景非常容易,因為它允許解耦、事件驅動地執行 JobOperator。
Spring Batch Integration 提供了 JobLaunchingMessageHandler 類,您可以使用它來啟動批處理作業。JobLaunchingMessageHandler 的輸入由 Spring Integration 訊息提供,該訊息的有效負載型別為 JobLaunchRequest。此類是對要啟動的 Job 和啟動批處理作業所需的 JobParameters 的包裝。
下圖顯示了啟動批處理作業所需的典型 Spring Integration 訊息流。EIP (Enterprise Integration Patterns) 網站 提供了訊息圖示及其描述的完整概述。
將檔案轉換為 JobLaunchRequest
以下示例將檔案轉換為 JobLaunchRequest
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;
import java.io.File;
public class FileMessageToJobRequest {
private Job job;
private String fileParameterName;
public void setFileParameterName(String fileParameterName) {
this.fileParameterName = fileParameterName;
}
public void setJob(Job job) {
this.job = job;
}
@Transformer
public JobLaunchRequest toRequest(Message<File> message) {
JobParametersBuilder jobParametersBuilder =
new JobParametersBuilder();
jobParametersBuilder.addString(fileParameterName,
message.getPayload().getAbsolutePath());
return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}
}
JobExecution 響應
當批處理作業正在執行時,會返回一個 JobExecution 例項。您可以使用此例項來確定執行的狀態。如果 JobExecution 能夠成功建立,無論實際執行是否成功,它總是會返回。
返回 JobExecution 例項的確切行為取決於所提供的 TaskExecutor。如果使用 同步(單執行緒)TaskExecutor 實現,則 JobExecution 響應僅在作業完成後返回。當使用 非同步 TaskExecutor 時,JobExecution 例項會立即返回。然後,您可以獲取 JobExecution 例項的 id(使用 JobExecution.getJobInstanceId()),並使用 JobExplorer 查詢 JobRepository 以獲取作業的更新狀態。有關更多資訊,請參閱 查詢 Repository。
Spring Batch Integration 配置
考慮這樣一種情況:需要建立檔案 inbound-channel-adapter 以監聽指定目錄中的 CSV 檔案,將其交給轉換器(FileMessageToJobRequest),透過作業啟動閘道器啟動作業,並使用 logging-channel-adapter 記錄 JobExecution 的輸出。
-
Java
-
XML
以下示例顯示瞭如何在 Java 中配置這種常見情況
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
fileMessageToJobRequest.setFileParameterName("input.file.name");
fileMessageToJobRequest.setJob(personJob());
return fileMessageToJobRequest;
}
@Bean
public JobLaunchingGateway jobLaunchingGateway() {
TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SyncTaskExecutor());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher);
return jobLaunchingGateway;
}
@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
return IntegrationFlow.from(Files.inboundAdapter(new File("/tmp/myfiles")).
filter(new SimplePatternFileListFilter("*.csv")),
c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
transform(fileMessageToJobRequest()).
handle(jobLaunchingGateway).
log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
get();
}
以下示例顯示瞭如何在 XML 中配置這種常見情況
<int:channel id="inboundFileChannel"/>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>
<int-file:inbound-channel-adapter id="filePoller"
channel="inboundFileChannel"
directory="file:/tmp/myfiles/"
filename-pattern="*.csv">
<int:poller fixed-rate="1000"/>
</int-file:inbound-channel-adapter>
<int:transformer input-channel="inboundFileChannel"
output-channel="outboundJobRequestChannel">
<bean class="io.spring.sbi.FileMessageToJobRequest">
<property name="job" ref="personJob"/>
<property name="fileParameterName" value="input.file.name"/>
</bean>
</int:transformer>
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
reply-channel="jobLaunchReplyChannel"/>
<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>
ItemReader 配置示例
現在我們正在輪詢檔案並啟動作業,我們需要配置我們的 Spring Batch ItemReader(例如)以使用作業引數“input.file.name”定義的位置找到的檔案,如以下 bean 配置所示
-
Java
-
XML
以下 Java 示例顯示了必要的 bean 配置
@Bean
@StepScope
public ItemReader sampleReader(@Value("#{jobParameters[input.file.name]}") String resource) {
...
FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
flatFileItemReader.setResource(new FileSystemResource(resource));
...
return flatFileItemReader;
}
以下 XML 示例顯示了必要的 bean 配置
<bean id="itemReader" class="org.springframework.batch.infrastructure.item.file.FlatFileItemReader"
scope="step">
<property name="resource" value="file://#{jobParameters['input.file.name']}"/>
...
</bean>
前面示例中主要值得注意的地方是將 #{jobParameters['input.file.name']} 的值注入為 Resource 屬性值,並將 ItemReader bean 設定為 step 作用域。將 bean 設定為 step 作用域利用了後期繫結支援,允許訪問 jobParameters 變數。