Spring Cloud Stream 整合
任務本身可能很有用,但將任務整合到更大的生態系統中,可以使其對更復雜的處理和編排更加有用。本節介紹 Spring Cloud Task 與 Spring Cloud Stream 的整合選項。
從 Spring Cloud Stream 啟動任務
您可以從流中啟動任務。為此,您需要建立一個 sink,它監聽包含 TaskLaunchRequest
作為其有效載荷的訊息。TaskLaunchRequest
包含
-
uri
: 要執行的任務 artifact 的 URI。 -
applicationName
: 與任務關聯的名稱。如果未設定 applicationName,TaskLaunchRequest
會生成一個由以下部分組成的任務名稱:Task-<UUID>
。 -
commandLineArguments
: 一個包含任務命令列引數的列表。 -
environmentProperties
: 一個包含任務要使用的環境變數的 Map。 -
deploymentProperties
: 一個包含部署器用於部署任務的屬性的 Map。
如果有效載荷是不同型別,sink 會丟擲異常。 |
例如,可以建立一個流,其中包含一個處理器,該處理器從 HTTP 源獲取資料,建立一個包含 TaskLaunchRequest
的 GenericMessage
,然後將訊息傳送到其輸出通道。然後,任務 sink 會從其輸入通道接收訊息並啟動任務。
要建立 taskSink,您只需建立一個包含 EnableTaskLauncher
註解的 Spring Boot 應用,如下例所示
@SpringBootApplication
@EnableTaskLauncher
public class TaskSinkApplication {
public static void main(String[] args) {
SpringApplication.run(TaskSinkApplication.class, args);
}
}
Spring Cloud Task 專案的示例模組包含一個示例 Sink 和 Processor。要將這些示例安裝到您的本地 Maven 倉庫,請從 spring-cloud-task-samples
目錄執行 Maven 構建,並將 skipInstall
屬性設定為 false
,如下例所示
mvn clean install
必須將 maven.remoteRepositories.springRepo.url 屬性設定為 Spring Boot Uber-jar 所在的遠端倉庫位置。如果未設定,則沒有遠端倉庫,因此僅依賴本地倉庫。 |
Spring Cloud Data Flow
要在 Spring Cloud Data Flow 中建立流,您必須首先註冊我們建立的 Task Sink Application。在以下示例中,我們使用 Spring Cloud Data Flow shell 註冊 Processor 和 Sink 示例應用
app register --name taskSink --type sink --uri maven://io.spring.cloud:tasksink:<version>
app register --name taskProcessor --type processor --uri maven:io.spring.cloud:taskprocessor:<version>
以下示例展示如何使用 Spring Cloud Data Flow shell 建立流
stream create foo --definition "http --server.port=9000|taskProcessor|taskSink" --deploy
Spring Cloud Task 事件
當任務透過 Spring Cloud Stream 通道執行時,Spring Cloud Task 提供了透過 Spring Cloud Stream 通道發出事件的能力。使用任務監聽器將 TaskExecution
釋出到名為 task-events
的訊息通道上。此功能會自動注入到 classpath 中包含 spring-cloud-stream
、spring-cloud-stream-<binder>
以及定義任務的任何任務中。
要停用事件發出監聽器,請將 spring.cloud.task.events.enabled 屬性設定為 false 。 |
定義適當的 classpath 後,以下任務會將 TaskExecution
作為事件傳送到 task-events
通道(在任務開始和結束時)
@SpringBootApplication
public class TaskEventsApplication {
public static void main(String[] args) {
SpringApplication.run(TaskEventsApplication.class, args);
}
@Configuration
public static class TaskConfiguration {
@Bean
public ApplicationRunner applicationRunner() {
return new ApplicationRunner() {
@Override
public void run(ApplicationArguments args) {
System.out.println("The ApplicationRunner was executed");
}
};
}
}
}
還需要在 classpath 中包含 Binder 實現。 |
可以在 Spring Cloud Task 專案的示例模組中找到示例任務事件應用,此處。 |
Spring Batch 事件
透過任務執行 Spring Batch 作業時,可以配置 Spring Cloud Task 根據 Spring Batch 中可用的 Spring Batch 監聽器發出資訊性訊息。具體來說,以下 Spring Batch 監聽器會被自動配置到每個批處理作業中,並透過 Spring Cloud Task 執行時在關聯的 Spring Cloud Stream 通道上發出訊息
-
JobExecutionListener
監聽job-execution-events
-
StepExecutionListener
監聽step-execution-events
-
ChunkListener
監聽chunk-events
-
ItemReadListener
監聽item-read-events
-
ItemProcessListener
監聽item-process-events
-
ItemWriteListener
監聽item-write-events
-
SkipListener
監聽skip-events
當上下文中存在適當的 bean(一個 Job
和一個 TaskLifecycleListener
)時,這些監聽器會被自動配置到任何 AbstractJob
中。監聽這些事件的配置方式與繫結到任何其他 Spring Cloud Stream 通道的方式相同。我們的任務(執行批處理作業的任務)充當 Source
,而監聽應用程式充當 Processor
或 Sink
。
例如,可以有一個應用監聽 job-execution-events
通道以獲取作業的開始和停止事件。要配置監聽應用,您可以如下配置輸入為 job-execution-events
spring.cloud.stream.bindings.input.destination=job-execution-events
還需要在 classpath 中包含 Binder 實現。 |
可以在 Spring Cloud Task 專案的示例模組中找到示例批處理事件應用,此處。 |
將批處理事件傳送到不同通道
Spring Cloud Task 為批處理事件提供的一個選項是更改特定監聽器發出訊息的通道的能力。為此,請使用以下配置:spring.cloud.stream.bindings.<the channel>.destination=<new destination>
。例如,如果 StepExecutionListener
需要將訊息傳送到名為 my-step-execution-events
的另一個通道,而不是預設的 step-execution-events
,您可以新增以下配置
spring.cloud.task.batch.events.step-execution-events-binding-name=my-step-execution-events
停用批處理事件
要停用所有批處理事件的監聽器功能,請使用以下配置
spring.cloud.task.batch.events.enabled=false
要停用特定批處理事件,請使用以下配置
spring.cloud.task.batch.events.<batch event listener>.enabled=false
:
以下列表顯示您可以停用的單個監聽器
spring.cloud.task.batch.events.job-execution.enabled=false
spring.cloud.task.batch.events.step-execution.enabled=false
spring.cloud.task.batch.events.chunk.enabled=false
spring.cloud.task.batch.events.item-read.enabled=false
spring.cloud.task.batch.events.item-process.enabled=false
spring.cloud.task.batch.events.item-write.enabled=false
spring.cloud.task.batch.events.skip.enabled=false
批處理事件的發出順序
預設情況下,批處理事件的排序為 Ordered.LOWEST_PRECEDENCE
。要更改此值(例如,更改為 5),請使用以下配置
spring.cloud.task.batch.events.job-execution-order=5
spring.cloud.task.batch.events.step-execution-order=5
spring.cloud.task.batch.events.chunk-order=5
spring.cloud.task.batch.events.item-read-order=5
spring.cloud.task.batch.events.item-process-order=5
spring.cloud.task.batch.events.item-write-order=5
spring.cloud.task.batch.events.skip-order=5