Spring Cloud Stream 整合
任務本身可能很有用,但將任務整合到更大的生態系統中可以使其用於更復雜的處理和編排。本節涵蓋 Spring Cloud Task 與 Spring Cloud Stream 的整合選項。
Spring Cloud Task 事件
Spring Cloud Task 提供了在任務透過 Spring Cloud Stream 通道執行時,透過 Spring Cloud Stream 通道發出事件的能力。任務監聽器用於在名為 task-events 的訊息通道上釋出 TaskExecution。此功能會自動裝配到任何具有 spring-cloud-stream、spring-cloud-stream-<binder> 並在其類路徑上定義了任務的任務中。
要停用事件發射監聽器,請將 spring.cloud.task.events.enabled 屬性設定為 false。 |
透過定義適當的類路徑,以下任務會在 task-events 通道(在任務開始和結束時)發出 TaskExecution 作為事件
@SpringBootApplication
public class TaskEventsApplication {
public static void main(String[] args) {
SpringApplication.run(TaskEventsApplication.class, args);
}
@Configuration
public static class TaskConfiguration {
@Bean
public ApplicationRunner applicationRunner() {
return new ApplicationRunner() {
@Override
public void run(ApplicationArguments args) {
System.out.println("The ApplicationRunner was executed");
}
};
}
}
}
| 還需要在類路徑中包含一個繫結器實現。 |
| Spring Cloud Task 專案的 samples 模組中提供了任務事件示例應用程式,點選此處檢視。 |
Spring Batch 事件
當透過任務執行 Spring Batch 作業時,Spring Cloud Task 可以配置為根據 Spring Batch 中可用的 Spring Batch 監聽器發出資訊性訊息。具體來說,當透過 Spring Cloud Task 執行時,以下 Spring Batch 監聽器會自動配置到每個批處理作業中,並在關聯的 Spring Cloud Stream 通道上發出訊息
-
JobExecutionListener監聽job-execution-events -
StepExecutionListener監聽step-execution-events -
ChunkListener監聽chunk-events -
ItemReadListener監聽item-read-events -
ItemProcessListener監聽item-process-events -
ItemWriteListener監聽item-write-events -
SkipListener監聽skip-events
當上下文中存在適當的 Bean(一個 Job 和一個 TaskLifecycleListener)時,這些監聽器會自動配置到任何 AbstractJob 中。監聽這些事件的配置方式與繫結到任何其他 Spring Cloud Stream 通道的方式相同。我們的任務(執行批處理作業的任務)充當 Source,而監聽應用程式充當 Processor 或 Sink。
一個示例可以是有一個應用程式監聽 job-execution-events 通道以獲取作業的開始和停止。要配置監聽應用程式,您可以將輸入配置為 job-execution-events,如下所示
spring.cloud.stream.bindings.input.destination=job-execution-events
| 還需要在類路徑中包含一個繫結器實現。 |
| Spring Cloud Task 專案的 samples 模組中提供了批處理事件示例應用程式,點選此處檢視。 |
將批處理事件傳送到不同的通道
Spring Cloud Task 為批處理事件提供的選項之一是能夠更改特定監聽器可以發出訊息的通道。為此,請使用以下配置:spring.cloud.stream.bindings.<the channel>.destination=<new destination>。例如,如果 StepExecutionListener 需要將其訊息發出到名為 my-step-execution-events 的另一個通道,而不是預設的 step-execution-events,您可以新增以下配置
spring.cloud.task.batch.events.step-execution-events-binding-name=my-step-execution-events
停用批處理事件
要停用所有批處理事件的監聽器功能,請使用以下配置
spring.cloud.task.batch.events.enabled=false
要停用特定的批處理事件,請使用以下配置
spring.cloud.task.batch.events.<batch event listener>.enabled=false:
以下列表顯示了您可以停用的各個監聽器
spring.cloud.task.batch.events.job-execution.enabled=false
spring.cloud.task.batch.events.step-execution.enabled=false
spring.cloud.task.batch.events.chunk.enabled=false
spring.cloud.task.batch.events.item-read.enabled=false
spring.cloud.task.batch.events.item-process.enabled=false
spring.cloud.task.batch.events.item-write.enabled=false
spring.cloud.task.batch.events.skip.enabled=false
批處理事件的發出順序
預設情況下,批處理事件具有 Ordered.LOWEST_PRECEDENCE。要更改此值(例如,更改為 5),請使用以下配置
spring.cloud.task.batch.events.job-execution-order=5
spring.cloud.task.batch.events.step-execution-order=5
spring.cloud.task.batch.events.chunk-order=5
spring.cloud.task.batch.events.item-read-order=5
spring.cloud.task.batch.events.item-process-order=5
spring.cloud.task.batch.events.item-write-order=5
spring.cloud.task.batch.events.skip-order=5