批處理

本節詳細介紹了 Spring Cloud Task 與 Spring Batch 的整合。本節內容包括跟蹤作業執行與任務執行之間的關聯,以及透過 Spring Cloud Deployer 進行遠端分割槽。

將作業執行與執行該作業的任務關聯

Spring Boot 提供了在 Spring Boot Uber-jar 內執行批處理作業的功能。Spring Boot 對此功能的支援允許開發者在一次執行中執行多個批處理作業。Spring Cloud Task 提供了將作業執行與任務執行關聯的能力,以便可以相互追溯。

Spring Cloud Task 透過使用 TaskBatchExecutionListener 實現此功能。預設情況下,在任何同時配置了 Spring Batch Job(透過在上下文中定義型別為 Job 的 bean)並且類路徑上有 spring-cloud-task-batch jar 的上下文中,該監聽器都會自動配置。該監聽器會被注入到所有符合條件的作業中。

覆蓋 TaskBatchExecutionListener

為了防止監聽器被注入到當前上下文中的任何批處理作業中,您可以使用標準的 Spring Boot 機制停用自動配置。

要僅將監聽器注入到上下文中的特定作業中,請覆蓋 batchTaskExecutionListenerBeanPostProcessor 並提供作業 bean ID 列表,如下例所示

public static TaskBatchExecutionListenerBeanPostProcessor batchTaskExecutionListenerBeanPostProcessor() {
	TaskBatchExecutionListenerBeanPostProcessor postProcessor =
		new TaskBatchExecutionListenerBeanPostProcessor();

	postProcessor.setJobNames(Arrays.asList(new String[] {"job1", "job2"}));

	return postProcessor;
}
您可以在 Spring Cloud Task 專案的 samples 模組中找到一個批處理示例應用,此處

遠端分割槽

Spring Cloud Deployer 提供了在大多數雲基礎設施上啟動基於 Spring Boot 的應用的功能。DeployerPartitionHandlerDeployerStepExecutionHandler 將工作步執行的啟動委託給 Spring Cloud Deployer。

要配置 DeployerStepExecutionHandler,必須提供一個表示要執行的 Spring Boot Uber-jar 的 Resource、一個 TaskLauncherHandler 和一個 JobExplorer。您可以配置任何環境變數以及同時執行的最大 worker 數量、輪詢結果的間隔(預設為 10 秒)和超時(預設為 -1 或無超時)。以下示例展示瞭如何配置此 PartitionHandler

@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher,
		JobExplorer jobExplorer) throws Exception {

	MavenProperties mavenProperties = new MavenProperties();
	mavenProperties.setRemoteRepositories(new HashMap<>(Collections.singletonMap("springRepo",
		new MavenProperties.RemoteRepository(repository))));

 	Resource resource =
		MavenResource.parse(String.format("%s:%s:%s",
				"io.spring.cloud",
				"partitioned-batch-job",
				"1.1.0.RELEASE"), mavenProperties);

	DeployerPartitionHandler partitionHandler =
		new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep");

	List<String> commandLineArgs = new ArrayList<>(3);
	commandLineArgs.add("--spring.profiles.active=worker");
	commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
	commandLineArgs.add("--spring.batch.initializer.enabled=false");

	partitionHandler.setCommandLineArgsProvider(
		new PassThroughCommandLineArgsProvider(commandLineArgs));
	partitionHandler.setEnvironmentVariablesProvider(new NoOpEnvironmentVariablesProvider());
	partitionHandler.setMaxWorkers(2);
	partitionHandler.setApplicationName("PartitionedBatchJobTask");

	return partitionHandler;
}
向分割槽傳遞環境變數時,每個分割槽可能位於具有不同環境設定的不同機器上。因此,您應該只傳遞必需的環境變數。

請注意,在上面的示例中,我們將最大 worker 數量設定為 2。設定最大 worker 數量可以確定同時應該執行的最大分割槽數量。

要執行的 Resource 應該是一個 Spring Boot Uber-jar,並且在當前上下文中將 DeployerStepExecutionHandler 配置為 CommandLineRunner。前面示例中列出的倉庫應該是 Spring Boot Uber-jar 所在的遠端倉庫。管理器和 worker 都應該能夠訪問用作作業倉庫和任務倉庫的同一資料儲存。底層基礎設施啟動 Spring Boot jar 並且 Spring Boot 啟動 DeployerStepExecutionHandler 後,步處理器就會執行請求的 Step。以下示例展示瞭如何配置 DeployerStepExecutionHandler

@Bean
public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) {
	DeployerStepExecutionHandler handler =
		new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository);

	return handler;
}
您可以在 Spring Cloud Task 專案的 samples 模組中找到一個遠端分割槽示例應用,此處

非同步啟動遠端批處理分割槽

預設情況下,批處理分割槽是按順序啟動的。然而,在某些情況下這可能會影響效能,因為每次啟動都會阻塞,直到資源(例如:在 Kubernetes 中配置 pod)被配置完成。在這些情況下,您可以向 DeployerPartitionHandler 提供一個 ThreadPoolTaskExecutor。這將根據 ThreadPoolTaskExecutor 的配置啟動遠端批處理分割槽。例如

	@Bean
	public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
		ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
		executor.setCorePoolSize(4);
		executor.setThreadNamePrefix("default_task_executor_thread");
		executor.setWaitForTasksToCompleteOnShutdown(true);
		executor.initialize();
		return executor;
	}

	@Bean
	public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer,
		TaskRepository taskRepository, ThreadPoolTaskExecutor executor) throws Exception {
		Resource resource = this.resourceLoader
			.getResource("maven://io.spring.cloud:partitioned-batch-job:2.2.0.BUILD-SNAPSHOT");

		DeployerPartitionHandler partitionHandler =
			new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
				"workerStep", taskRepository, executor);
	...
	}
由於使用了 ThreadPoolTaskExecutor 會留下一個活躍執行緒,因此應用程式不會終止,我們需要關閉上下文。要適當關閉應用程式,我們需要將 spring.cloud.task.closecontextEnabled 屬性設定為 true

開發 Kubernetes 平臺批處理分割槽應用的注意事項

  • 在 Kubernetes 平臺上部署分割槽應用時,您必須使用 Spring Cloud Kubernetes Deployer 的以下依賴

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-deployer-kubernetes</artifactId>
    </dependency>
  • 任務應用及其分割槽的應用名稱需要遵循以下正則表示式模式:[a-z0-9]([-a-z0-9]*[a-z0-9])。否則,會丟擲異常。

批處理資訊訊息

Spring Cloud Task 提供了批處理作業發出資訊訊息的能力。“Spring Batch Events” 部分詳細介紹了此功能。

批處理作業退出碼

前面討論的,Spring Cloud Task 應用支援記錄任務執行的退出碼。然而,在使用預設的 Batch/Boot 行為時,如果您在一個任務中執行 Spring Batch Job,無論批處理作業執行如何完成,任務的結果始終為零。請記住,任務是一個 Boot 應用,從任務返回的退出碼與 Boot 應用相同。要覆蓋此行為,並允許任務在批處理作業返回 FAILEDBatchStatus 時返回非零退出碼,請將 spring.cloud.task.batch.fail-on-job-failure 設定為 true。此時退出碼可以是 1(預設值),或者基於指定的 ExitCodeGenerator

此功能使用一個新的 ApplicationRunner,它取代了 Spring Boot 提供的那個。預設情況下,它的配置順序與 Spring Boot 提供的相同。然而,如果您想自定義 ApplicationRunner 的執行順序,可以透過設定 spring.cloud.task.batch.applicationRunnerOrder 屬性來指定其順序。要讓您的任務根據批處理作業執行的結果返回退出碼,您需要編寫自己的 CommandLineRunner