透過訊息啟動批處理 Job
使用核心 Spring Batch API 啟動批處理 Job 時,您基本上有兩種選擇:
-
透過命令列,使用 `CommandLineJobRunner`
-
透過程式設計方式,使用 `JobOperator.start()` 或 `JobLauncher.run()`
例如,當您使用 shell 指令碼呼叫批處理 Job 時,可能需要使用 `CommandLineJobRunner`。另外,您可以直接使用 `JobOperator`(例如,在將 Spring Batch 作為 web 應用程式的一部分使用時)。然而,對於更復雜的用例呢?也許您需要輪詢遠端 (S)FTP 伺服器來檢索批處理 Job 的資料,或者您的應用程式必須同時支援多個不同的資料來源。例如,您可能不僅從 web 接收資料檔案,還從 FTP 和其他來源接收。在呼叫 Spring Batch 之前,可能需要對輸入檔案進行額外的轉換。
因此,使用 Spring Integration 及其眾多介面卡來執行批處理 Job 將更加強大。例如,您可以使用 *File Inbound Channel Adapter* 來監視檔案系統中的目錄,並在輸入檔案到達時立即啟動批處理 Job。此外,您可以建立 Spring Integration 流程,這些流程使用多個不同的介面卡,僅透過配置即可輕鬆地同時從多個源獲取批處理 Job 的資料。使用 Spring Integration 實現所有這些場景都很容易,因為它允許 `JobLauncher` 解耦、事件驅動地執行。
Spring Batch Integration 提供了 `JobLaunchingMessageHandler` 類,可用於啟動批處理 Job。`JobLaunchingMessageHandler` 的輸入由 Spring Integration 訊息提供,其有效載荷型別為 `JobLaunchRequest`。此類是待啟動的 `Job` 和啟動批處理 Job 所必需的 `JobParameters` 的包裝器。
下圖顯示了啟動批處理 Job 所需的典型 Spring Integration 訊息流。EIP(企業整合模式)網站提供了訊息圖示及其描述的完整概述。

將 File 轉換為 JobLaunchRequest
以下示例將檔案轉換為 `JobLaunchRequest`
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;
import java.io.File;
public class FileMessageToJobRequest {
private Job job;
private String fileParameterName;
public void setFileParameterName(String fileParameterName) {
this.fileParameterName = fileParameterName;
}
public void setJob(Job job) {
this.job = job;
}
@Transformer
public JobLaunchRequest toRequest(Message<File> message) {
JobParametersBuilder jobParametersBuilder =
new JobParametersBuilder();
jobParametersBuilder.addString(fileParameterName,
message.getPayload().getAbsolutePath());
return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}
}
JobExecution 響應
當批處理 Job 執行時,會返回一個 `JobExecution` 例項。您可以使用此例項來確定執行的狀態。如果 `JobExecution` 能夠成功建立,它總是會被返回,無論實際執行是否成功。
`JobExecution` 例項如何返回的確切行為取決於提供的 `TaskExecutor`。如果使用 `同步`(單執行緒)的 `TaskExecutor` 實現,則 `JobExecution` 響應僅在 Job 完成 `後` 返回。使用 `非同步` 的 `TaskExecutor` 時,`JobExecution` 例項會立即返回。然後,您可以獲取 `JobExecution` 例項的 `id`(透過 `JobExecution.getJobId()`),並使用 `JobExplorer` 向 `JobRepository` 查詢 Job 的更新狀態。更多資訊,請參閱查詢 Repository。
Spring Batch Integration 配置
考慮這樣一種情況:有人需要建立一個檔案 `inbound-channel-adapter` 來監聽指定目錄中的 CSV 檔案,將其交給一個 transformer (`FileMessageToJobRequest`) 進行轉換,透過 Job 啟動閘道器啟動 Job,並使用 `logging-channel-adapter` 記錄 `JobExecution` 的輸出。
-
Java
-
XML
以下示例展示瞭如何在 Java 中配置這個常見用例
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
fileMessageToJobRequest.setFileParameterName("input.file.name");
fileMessageToJobRequest.setJob(personJob());
return fileMessageToJobRequest;
}
@Bean
public JobLaunchingGateway jobLaunchingGateway() {
TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SyncTaskExecutor());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher);
return jobLaunchingGateway;
}
@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
return IntegrationFlow.from(Files.inboundAdapter(new File("/tmp/myfiles")).
filter(new SimplePatternFileListFilter("*.csv")),
c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
transform(fileMessageToJobRequest()).
handle(jobLaunchingGateway).
log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
get();
}
以下示例展示瞭如何在 XML 中配置這個常見用例
<int:channel id="inboundFileChannel"/>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>
<int-file:inbound-channel-adapter id="filePoller"
channel="inboundFileChannel"
directory="file:/tmp/myfiles/"
filename-pattern="*.csv">
<int:poller fixed-rate="1000"/>
</int-file:inbound-channel-adapter>
<int:transformer input-channel="inboundFileChannel"
output-channel="outboundJobRequestChannel">
<bean class="io.spring.sbi.FileMessageToJobRequest">
<property name="job" ref="personJob"/>
<property name="fileParameterName" value="input.file.name"/>
</bean>
</int:transformer>
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
reply-channel="jobLaunchReplyChannel"/>
<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>
ItemReader 配置示例
既然我們正在輪詢檔案並啟動 Job,就需要配置我們的 Spring Batch `ItemReader`(例如)來使用由名為 "input.file.name" 的 Job 引數定義的位置找到的檔案,如下面的 bean 配置所示:
-
Java
-
XML
以下 Java 示例展示了所需的 bean 配置
@Bean
@StepScope
public ItemReader sampleReader(@Value("#{jobParameters[input.file.name]}") String resource) {
...
FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
flatFileItemReader.setResource(new FileSystemResource(resource));
...
return flatFileItemReader;
}
以下 XML 示例展示了所需的 bean 配置
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader"
scope="step">
<property name="resource" value="file://#{jobParameters['input.file.name']}"/>
...
</bean>
前面示例中主要的關注點是將 `#{jobParameters['input.file.name']}` 的值注入到 Resource 屬性中,並將 `ItemReader` bean 設定為 step 作用域。將 bean 設定為 step 作用域利用了後期繫結支援,這允許訪問 `jobParameters` 變數。