批處理
本節詳細介紹了 Spring Cloud Task 與 Spring Batch 的整合。本節內容包括跟蹤作業執行與任務執行之間的關聯,以及透過 Spring Cloud Deployer 進行遠端分割槽。
將作業執行與執行該作業的任務關聯
Spring Boot 提供了在 Spring Boot Uber-jar 內執行批處理作業的功能。Spring Boot 對此功能的支援允許開發者在一次執行中執行多個批處理作業。Spring Cloud Task 提供了將作業執行與任務執行關聯的能力,以便可以相互追溯。
Spring Cloud Task 透過使用 TaskBatchExecutionListener
實現此功能。預設情況下,在任何同時配置了 Spring Batch Job(透過在上下文中定義型別為 Job
的 bean)並且類路徑上有 spring-cloud-task-batch
jar 的上下文中,該監聽器都會自動配置。該監聽器會被注入到所有符合條件的作業中。
覆蓋 TaskBatchExecutionListener
為了防止監聽器被注入到當前上下文中的任何批處理作業中,您可以使用標準的 Spring Boot 機制停用自動配置。
要僅將監聽器注入到上下文中的特定作業中,請覆蓋 batchTaskExecutionListenerBeanPostProcessor
並提供作業 bean ID 列表,如下例所示
public static TaskBatchExecutionListenerBeanPostProcessor batchTaskExecutionListenerBeanPostProcessor() {
TaskBatchExecutionListenerBeanPostProcessor postProcessor =
new TaskBatchExecutionListenerBeanPostProcessor();
postProcessor.setJobNames(Arrays.asList(new String[] {"job1", "job2"}));
return postProcessor;
}
您可以在 Spring Cloud Task 專案的 samples 模組中找到一個批處理示例應用,此處。 |
遠端分割槽
Spring Cloud Deployer 提供了在大多數雲基礎設施上啟動基於 Spring Boot 的應用的功能。DeployerPartitionHandler
和 DeployerStepExecutionHandler
將工作步執行的啟動委託給 Spring Cloud Deployer。
要配置 DeployerStepExecutionHandler
,必須提供一個表示要執行的 Spring Boot Uber-jar 的 Resource
、一個 TaskLauncherHandler
和一個 JobExplorer
。您可以配置任何環境變數以及同時執行的最大 worker 數量、輪詢結果的間隔(預設為 10 秒)和超時(預設為 -1 或無超時)。以下示例展示瞭如何配置此 PartitionHandler
@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher,
JobExplorer jobExplorer) throws Exception {
MavenProperties mavenProperties = new MavenProperties();
mavenProperties.setRemoteRepositories(new HashMap<>(Collections.singletonMap("springRepo",
new MavenProperties.RemoteRepository(repository))));
Resource resource =
MavenResource.parse(String.format("%s:%s:%s",
"io.spring.cloud",
"partitioned-batch-job",
"1.1.0.RELEASE"), mavenProperties);
DeployerPartitionHandler partitionHandler =
new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep");
List<String> commandLineArgs = new ArrayList<>(3);
commandLineArgs.add("--spring.profiles.active=worker");
commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
commandLineArgs.add("--spring.batch.initializer.enabled=false");
partitionHandler.setCommandLineArgsProvider(
new PassThroughCommandLineArgsProvider(commandLineArgs));
partitionHandler.setEnvironmentVariablesProvider(new NoOpEnvironmentVariablesProvider());
partitionHandler.setMaxWorkers(2);
partitionHandler.setApplicationName("PartitionedBatchJobTask");
return partitionHandler;
}
向分割槽傳遞環境變數時,每個分割槽可能位於具有不同環境設定的不同機器上。因此,您應該只傳遞必需的環境變數。 |
請注意,在上面的示例中,我們將最大 worker 數量設定為 2。設定最大 worker 數量可以確定同時應該執行的最大分割槽數量。
要執行的 Resource
應該是一個 Spring Boot Uber-jar,並且在當前上下文中將 DeployerStepExecutionHandler
配置為 CommandLineRunner
。前面示例中列出的倉庫應該是 Spring Boot Uber-jar 所在的遠端倉庫。管理器和 worker 都應該能夠訪問用作作業倉庫和任務倉庫的同一資料儲存。底層基礎設施啟動 Spring Boot jar 並且 Spring Boot 啟動 DeployerStepExecutionHandler
後,步處理器就會執行請求的 Step
。以下示例展示瞭如何配置 DeployerStepExecutionHandler
@Bean
public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) {
DeployerStepExecutionHandler handler =
new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository);
return handler;
}
您可以在 Spring Cloud Task 專案的 samples 模組中找到一個遠端分割槽示例應用,此處。 |
非同步啟動遠端批處理分割槽
預設情況下,批處理分割槽是按順序啟動的。然而,在某些情況下這可能會影響效能,因為每次啟動都會阻塞,直到資源(例如:在 Kubernetes 中配置 pod)被配置完成。在這些情況下,您可以向 DeployerPartitionHandler
提供一個 ThreadPoolTaskExecutor
。這將根據 ThreadPoolTaskExecutor
的配置啟動遠端批處理分割槽。例如
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setThreadNamePrefix("default_task_executor_thread");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer,
TaskRepository taskRepository, ThreadPoolTaskExecutor executor) throws Exception {
Resource resource = this.resourceLoader
.getResource("maven://io.spring.cloud:partitioned-batch-job:2.2.0.BUILD-SNAPSHOT");
DeployerPartitionHandler partitionHandler =
new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
"workerStep", taskRepository, executor);
...
}
由於使用了 ThreadPoolTaskExecutor 會留下一個活躍執行緒,因此應用程式不會終止,我們需要關閉上下文。要適當關閉應用程式,我們需要將 spring.cloud.task.closecontextEnabled 屬性設定為 true 。 |
開發 Kubernetes 平臺批處理分割槽應用的注意事項
-
在 Kubernetes 平臺上部署分割槽應用時,您必須使用 Spring Cloud Kubernetes Deployer 的以下依賴
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-deployer-kubernetes</artifactId> </dependency>
-
任務應用及其分割槽的應用名稱需要遵循以下正則表示式模式:
[a-z0-9]([-a-z0-9]*[a-z0-9])
。否則,會丟擲異常。
批處理資訊訊息
Spring Cloud Task 提供了批處理作業發出資訊訊息的能力。“Spring Batch Events” 部分詳細介紹了此功能。
批處理作業退出碼
如前面討論的,Spring Cloud Task 應用支援記錄任務執行的退出碼。然而,在使用預設的 Batch/Boot 行為時,如果您在一個任務中執行 Spring Batch Job,無論批處理作業執行如何完成,任務的結果始終為零。請記住,任務是一個 Boot 應用,從任務返回的退出碼與 Boot 應用相同。要覆蓋此行為,並允許任務在批處理作業返回 FAILED
的 BatchStatus 時返回非零退出碼,請將 spring.cloud.task.batch.fail-on-job-failure
設定為 true
。此時退出碼可以是 1(預設值),或者基於指定的 ExitCodeGenerator
)
此功能使用一個新的 ApplicationRunner
,它取代了 Spring Boot 提供的那個。預設情況下,它的配置順序與 Spring Boot 提供的相同。然而,如果您想自定義 ApplicationRunner
的執行順序,可以透過設定 spring.cloud.task.batch.applicationRunnerOrder
屬性來指定其順序。要讓您的任務根據批處理作業執行的結果返回退出碼,您需要編寫自己的 CommandLineRunner
。