Spring Cloud Stream 整合

任務本身可能很有用,但將任務整合到更大的生態系統中,可以使其對更復雜的處理和編排更加有用。本節介紹 Spring Cloud Task 與 Spring Cloud Stream 的整合選項。

從 Spring Cloud Stream 啟動任務

您可以從流中啟動任務。為此,您需要建立一個 sink,它監聽包含 TaskLaunchRequest 作為其有效載荷的訊息。TaskLaunchRequest 包含

  • uri: 要執行的任務 artifact 的 URI。

  • applicationName: 與任務關聯的名稱。如果未設定 applicationName,TaskLaunchRequest 會生成一個由以下部分組成的任務名稱:Task-<UUID>

  • commandLineArguments: 一個包含任務命令列引數的列表。

  • environmentProperties: 一個包含任務要使用的環境變數的 Map。

  • deploymentProperties: 一個包含部署器用於部署任務的屬性的 Map。

如果有效載荷是不同型別,sink 會丟擲異常。

例如,可以建立一個流,其中包含一個處理器,該處理器從 HTTP 源獲取資料,建立一個包含 TaskLaunchRequestGenericMessage,然後將訊息傳送到其輸出通道。然後,任務 sink 會從其輸入通道接收訊息並啟動任務。

要建立 taskSink,您只需建立一個包含 EnableTaskLauncher 註解的 Spring Boot 應用,如下例所示

@SpringBootApplication
@EnableTaskLauncher
public class TaskSinkApplication {
	public static void main(String[] args) {
		SpringApplication.run(TaskSinkApplication.class, args);
	}
}

Spring Cloud Task 專案的示例模組包含一個示例 Sink 和 Processor。要將這些示例安裝到您的本地 Maven 倉庫,請從 spring-cloud-task-samples 目錄執行 Maven 構建,並將 skipInstall 屬性設定為 false,如下例所示

mvn clean install

必須將 maven.remoteRepositories.springRepo.url 屬性設定為 Spring Boot Uber-jar 所在的遠端倉庫位置。如果未設定,則沒有遠端倉庫,因此僅依賴本地倉庫。

Spring Cloud Data Flow

要在 Spring Cloud Data Flow 中建立流,您必須首先註冊我們建立的 Task Sink Application。在以下示例中,我們使用 Spring Cloud Data Flow shell 註冊 Processor 和 Sink 示例應用

app register --name taskSink --type sink --uri maven://io.spring.cloud:tasksink:<version>
app register --name taskProcessor --type processor --uri maven:io.spring.cloud:taskprocessor:<version>

以下示例展示如何使用 Spring Cloud Data Flow shell 建立流

stream create foo --definition "http --server.port=9000|taskProcessor|taskSink" --deploy

Spring Cloud Task 事件

當任務透過 Spring Cloud Stream 通道執行時,Spring Cloud Task 提供了透過 Spring Cloud Stream 通道發出事件的能力。使用任務監聽器將 TaskExecution 釋出到名為 task-events 的訊息通道上。此功能會自動注入到 classpath 中包含 spring-cloud-streamspring-cloud-stream-<binder> 以及定義任務的任何任務中。

要停用事件發出監聽器,請將 spring.cloud.task.events.enabled 屬性設定為 false

定義適當的 classpath 後,以下任務會將 TaskExecution 作為事件傳送到 task-events 通道(在任務開始和結束時)

@SpringBootApplication
public class TaskEventsApplication {

	public static void main(String[] args) {
		SpringApplication.run(TaskEventsApplication.class, args);
	}

	@Configuration
	public static class TaskConfiguration {

		@Bean
		public ApplicationRunner applicationRunner() {
			return new ApplicationRunner() {
				@Override
				public void run(ApplicationArguments args) {
					System.out.println("The ApplicationRunner was executed");
				}
			};
		}
	}
}
還需要在 classpath 中包含 Binder 實現。
可以在 Spring Cloud Task 專案的示例模組中找到示例任務事件應用,此處

停用特定任務事件

要停用任務事件,可以將 spring.cloud.task.events.enabled 屬性設定為 false

Spring Batch 事件

透過任務執行 Spring Batch 作業時,可以配置 Spring Cloud Task 根據 Spring Batch 中可用的 Spring Batch 監聽器發出資訊性訊息。具體來說,以下 Spring Batch 監聽器會被自動配置到每個批處理作業中,並透過 Spring Cloud Task 執行時在關聯的 Spring Cloud Stream 通道上發出訊息

  • JobExecutionListener 監聽 job-execution-events

  • StepExecutionListener 監聽 step-execution-events

  • ChunkListener 監聽 chunk-events

  • ItemReadListener 監聽 item-read-events

  • ItemProcessListener 監聽 item-process-events

  • ItemWriteListener 監聽 item-write-events

  • SkipListener 監聽 skip-events

當上下文中存在適當的 bean(一個 Job 和一個 TaskLifecycleListener)時,這些監聽器會被自動配置到任何 AbstractJob 中。監聽這些事件的配置方式與繫結到任何其他 Spring Cloud Stream 通道的方式相同。我們的任務(執行批處理作業的任務)充當 Source,而監聽應用程式充當 ProcessorSink

例如,可以有一個應用監聽 job-execution-events 通道以獲取作業的開始和停止事件。要配置監聽應用,您可以如下配置輸入為 job-execution-events

spring.cloud.stream.bindings.input.destination=job-execution-events

還需要在 classpath 中包含 Binder 實現。
可以在 Spring Cloud Task 專案的示例模組中找到示例批處理事件應用,此處

將批處理事件傳送到不同通道

Spring Cloud Task 為批處理事件提供的一個選項是更改特定監聽器發出訊息的通道的能力。為此,請使用以下配置:spring.cloud.stream.bindings.<the channel>.destination=<new destination>。例如,如果 StepExecutionListener 需要將訊息傳送到名為 my-step-execution-events 的另一個通道,而不是預設的 step-execution-events,您可以新增以下配置

spring.cloud.task.batch.events.step-execution-events-binding-name=my-step-execution-events

停用批處理事件

要停用所有批處理事件的監聽器功能,請使用以下配置

spring.cloud.task.batch.events.enabled=false

要停用特定批處理事件,請使用以下配置

spring.cloud.task.batch.events.<batch event listener>.enabled=false:

以下列表顯示您可以停用的單個監聽器

spring.cloud.task.batch.events.job-execution.enabled=false
spring.cloud.task.batch.events.step-execution.enabled=false
spring.cloud.task.batch.events.chunk.enabled=false
spring.cloud.task.batch.events.item-read.enabled=false
spring.cloud.task.batch.events.item-process.enabled=false
spring.cloud.task.batch.events.item-write.enabled=false
spring.cloud.task.batch.events.skip.enabled=false

批處理事件的發出順序

預設情況下,批處理事件的排序為 Ordered.LOWEST_PRECEDENCE。要更改此值(例如,更改為 5),請使用以下配置

spring.cloud.task.batch.events.job-execution-order=5
spring.cloud.task.batch.events.step-execution-order=5
spring.cloud.task.batch.events.chunk-order=5
spring.cloud.task.batch.events.item-read-order=5
spring.cloud.task.batch.events.item-process-order=5
spring.cloud.task.batch.events.item-write-order=5
spring.cloud.task.batch.events.skip-order=5