© 2009-2020 VMware, Inc. 保留所有權利。
本文件可供您自行使用和分發給他人,但前提是您不得對這些副本收取任何費用,並且無論是以印刷版還是電子版形式分發,每個副本都必須包含本版權宣告。
1. 前言
本節簡要概述了 Spring Cloud Task 參考文件。可以將其視為本文件其餘部分的地圖。您可以按順序閱讀本參考指南,如果某些部分不感興趣,也可以跳過。
1.1. 關於文件
Spring Cloud Task 參考指南提供 html 和 pdf、epub 版本。最新副本可從 docs.spring.io/spring-cloud-task/docs/current-SNAPSHOT/reference/html/ 獲取。
本文件可供您自行使用和分發給他人,但前提是您不得對這些副本收取任何費用,並且無論是以印刷版還是電子版形式分發,每個副本都必須包含本版權宣告。
1.2. 獲取幫助
使用 Spring Cloud Task 遇到問題?我們樂意提供幫助!
-
提問。我們會在 stackoverflow.com 監控標記為
spring-cloud-task
的問題。 -
在 github.com/spring-cloud/spring-cloud-task/issues 報告 Spring Cloud Task 的錯誤。
Spring Cloud Task 的所有內容都是開源的,包括文件。如果您發現文件有問題或只想改進它們,請 參與進來。 |
1.3. 第一步
如果您剛開始使用 Spring Cloud Task 或廣義上的 'Spring',我們建議閱讀入門一章。
要從頭開始,請閱讀以下章節:
要 mengikuti 教程,請閱讀 開發您的第一個 Spring Cloud Task 應用
要執行您的示例,請閱讀 執行示例
2. 入門
如果您剛開始使用 Spring Cloud Task,應閱讀本節。在此,我們將解答基本的“是什麼?”、“如何做?”和“為什麼?”等問題。我們首先溫和地介紹 Spring Cloud Task。然後,我們將構建一個 Spring Cloud Task 應用,並在過程中討論一些核心原則。
2.1. Spring Cloud Task 介紹
Spring Cloud Task 使得建立短生命週期的微服務變得容易。它提供了能夠在生產環境中按需執行短生命週期的 JVM 程序的功能。
2.2. 系統要求
您需要安裝 Java(Java 17 或更高版本)。要進行構建,還需要安裝 Maven。
2.2.1. 資料庫要求
Spring Cloud Task 使用關係型資料庫來儲存已執行任務的結果。雖然您可以在沒有資料庫的情況下開始開發任務(任務狀態會作為任務倉庫更新的一部分被記錄到日誌中),但對於生產環境,您需要使用一個受支援的資料庫。Spring Cloud Task 當前支援以下資料庫:
-
DB2
-
H2
-
HSQLDB
-
MySql
-
Oracle
-
Postgres
-
SqlServer
2.3. 開發您的第一個 Spring Cloud Task 應用
一個好的起點是簡單的“Hello, World!”應用,因此我們建立相應的 Spring Cloud Task 應用來突出框架的特性。大多數 IDE 對 Apache Maven 都有良好的支援,所以我們將其用作本專案的構建工具。
spring.io 網站包含許多使用 Spring Boot 的 “入門 ”指南。如果您需要解決特定問題,請先在那裡查詢。您可以透過訪問 Spring Initializr 建立新專案來簡化以下步驟。這樣做會自動生成新的專案結構,以便您可以立即開始編碼。我們建議您嘗試使用 Spring Initializr 以熟悉它。 |
2.3.1. 使用 Spring Initializr 建立 Spring Task 專案
現在我們可以建立一個並測試一個向控制檯列印 Hello, World!
的應用。
為此:
-
訪問 Spring Initialzr 網站。
-
建立一個新的 Maven 專案,其中 Group 名稱為
io.spring.demo
, Artifact 名稱為helloworld
。 -
在 Dependencies(依賴項)文字框中,輸入
task
,然後選擇Cloud Task
依賴項。 -
在 Dependencies(依賴項)文字框中,輸入
jdbc
,然後選擇JDBC
依賴項。 -
在 Dependencies(依賴項)文字框中,輸入
h2
,然後選擇H2
。(或您喜歡的資料庫) -
點選 Generate Project(生成專案)按鈕
-
-
解壓 helloworld.zip 檔案,並將專案匯入您喜歡的 IDE 中。
2.3.2. 編寫程式碼
為了完成我們的應用,我們需要使用以下內容更新生成的 HelloworldApplication
,以便它能夠啟動一個任務。
package io.spring.Helloworld;
@SpringBootApplication
@EnableTask
public class HelloworldApplication {
@Bean
public ApplicationRunner applicationRunner() {
return new HelloWorldApplicationRunner();
}
public static void main(String[] args) {
SpringApplication.run(HelloworldApplication.class, args);
}
public static class HelloWorldApplicationRunner implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
System.out.println("Hello, World!");
}
}
}
雖然程式碼量可能看起來不大,但其背後發生了很多事情。有關 Spring Boot 的更多細節,請參閱 Spring Boot 參考文件。
現在我們可以開啟位於 src/main/resources
目錄下的 application.properties
檔案。我們需要在 application.properties
中配置兩個屬性:
-
application.name
:設定應用名稱(它會被轉換為任務名稱) -
logging.level
:將 Spring Cloud Task 的日誌級別設定為DEBUG
,以便檢視其執行情況。
以下示例展示瞭如何同時進行這兩項配置:
logging.level.org.springframework.cloud.task=DEBUG
spring.application.name=helloWorld
任務自動配置
包含 Spring Cloud Task Starter 依賴項時,Task 會自動配置所有 bean 以引導其功能。此配置的一部分會註冊 TaskRepository
及其使用所需的基礎設施。
在我們的演示中,TaskRepository
使用嵌入式 H2 資料庫來記錄任務的結果。這種嵌入式 H2 資料庫對於生產環境來說並不是一個實用的解決方案,因為任務結束後 H2 資料庫就會消失。然而,為了快速入門體驗,我們可以在示例中使用它,同時將倉庫中更新的內容回顯到日誌中。在本文件後面介紹的配置部分,我們將介紹如何自定義 Spring Cloud Task 提供的各個元件的配置。
當我們的示例應用執行時,Spring Boot 會啟動我們的 HelloWorldCommandLineRunner
並將我們的“Hello, World!”訊息輸出到標準輸出。TaskLifecycleListener
會在倉庫中記錄任務的開始和結束。
main 方法
main 方法作為任何 Java 應用的入口點。我們的 main 方法委託給 Spring Boot 的 SpringApplication 類。
ApplicationRunner
Spring 包含多種引導應用邏輯的方式。Spring Boot 透過其 *Runner
介面(CommandLineRunner
或 ApplicationRunner
)提供了一種便捷且組織化的方式。一個設計良好的任務可以透過使用這兩種 runner 之一來引導任何邏輯。
任務的生命週期被認為是開始於 *Runner#run
方法執行之前,結束於所有方法執行完畢之後。Spring Boot 允許應用使用多個 *Runner
實現,Spring Cloud Task 也是如此。
任何透過 CommandLineRunner 或 ApplicationRunner 以外的機制(例如使用 InitializingBean#afterPropertiesSet )引導的處理都不會被 Spring Cloud Task 記錄。 |
2.3.3. 執行示例
至此,我們的應用應該可以工作了。由於這個應用基於 Spring Boot,我們可以從應用根目錄使用命令列 $ mvn spring-boot:run
來執行它,如下例所示(包括其輸出):
$ mvn clean spring-boot:run
....... . . .
....... . . . (Maven log output here)
....... . . .
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.0.3.RELEASE)
2018-07-23 17:44:34.426 INFO 1978 --- [ main] i.s.d.helloworld.HelloworldApplication : Starting HelloworldApplication on Glenns-MBP-2.attlocal.net with PID 1978 (/Users/glennrenfro/project/helloworld/target/classes started by glennrenfro in /Users/glennrenfro/project/helloworld)
2018-07-23 17:44:34.430 INFO 1978 --- [ main] i.s.d.helloworld.HelloworldApplication : No active profile set, falling back to default profiles: default
2018-07-23 17:44:34.472 INFO 1978 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@1d24f32d: startup date [Mon Jul 23 17:44:34 EDT 2018]; root of context hierarchy
2018-07-23 17:44:35.280 INFO 1978 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...
2018-07-23 17:44:35.410 INFO 1978 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
2018-07-23 17:44:35.419 DEBUG 1978 --- [ main] o.s.c.t.c.SimpleTaskConfiguration : Using org.springframework.cloud.task.configuration.DefaultTaskConfigurer TaskConfigurer
2018-07-23 17:44:35.420 DEBUG 1978 --- [ main] o.s.c.t.c.DefaultTaskConfigurer : No EntityManager was found, using DataSourceTransactionManager
2018-07-23 17:44:35.522 DEBUG 1978 --- [ main] o.s.c.t.r.s.TaskRepositoryInitializer : Initializing task schema for h2 database
2018-07-23 17:44:35.525 INFO 1978 --- [ main] o.s.jdbc.datasource.init.ScriptUtils : Executing SQL script from class path resource [org/springframework/cloud/task/schema-h2.sql]
2018-07-23 17:44:35.558 INFO 1978 --- [ main] o.s.jdbc.datasource.init.ScriptUtils : Executed SQL script from class path resource [org/springframework/cloud/task/schema-h2.sql] in 33 ms.
2018-07-23 17:44:35.728 INFO 1978 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Registering beans for JMX exposure on startup
2018-07-23 17:44:35.730 INFO 1978 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Bean with name 'dataSource' has been autodetected for JMX exposure
2018-07-23 17:44:35.733 INFO 1978 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Located MBean 'dataSource': registering with JMX server as MBean [com.zaxxer.hikari:name=dataSource,type=HikariDataSource]
2018-07-23 17:44:35.738 INFO 1978 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 0
2018-07-23 17:44:35.762 DEBUG 1978 --- [ main] o.s.c.t.r.support.SimpleTaskRepository : Creating: TaskExecution{executionId=0, parentExecutionId=null, exitCode=null, taskName='application', startTime=Mon Jul 23 17:44:35 EDT 2018, endTime=null, exitMessage='null', externalExecutionId='null', errorMessage='null', arguments=[]}
2018-07-23 17:44:35.772 INFO 1978 --- [ main] i.s.d.helloworld.HelloworldApplication : Started HelloworldApplication in 1.625 seconds (JVM running for 4.764)
Hello, World!
2018-07-23 17:44:35.782 DEBUG 1978 --- [ main] o.s.c.t.r.support.SimpleTaskRepository : Updating: TaskExecution with executionId=1 with the following {exitCode=0, endTime=Mon Jul 23 17:44:35 EDT 2018, exitMessage='null', errorMessage='null'}
上述輸出有三行是我們需要關注的:
-
SimpleTaskRepository
記錄了在TaskRepository
中建立條目。 -
我們的
CommandLineRunner
的執行,透過“Hello, World!”輸出展示。 -
SimpleTaskRepository
在TaskRepository
中記錄了任務的完成。
Spring Cloud Task 專案的 samples 模組中可以找到一個簡單的任務應用,點此檢視。 |
3. 特性
本節將更詳細地介紹 Spring Cloud Task,包括如何使用、如何配置以及適當的擴充套件點。
3.1. Spring Cloud Task 的生命週期
在大多數情況下,現代雲環境的設計圍繞著不期望結束的程序的執行。如果它們確實結束了,通常會被重新啟動。雖然大多數平臺都有某種方式可以執行結束時不會重新啟動的程序,但其執行結果通常不會以一種可消費的方式維護。Spring Cloud Task 提供了在環境中執行短生命週期程序並記錄結果的能力。這樣做可以透過訊息整合任務,從而構建圍繞短生命週期程序以及長期執行服務的微服務架構。
雖然此功能在雲環境中很有用,但同樣的問題也可能出現在傳統的部署模型中。當使用 cron 等排程器執行 Spring Boot 應用時,能夠在應用完成後監控其結果可能會很有用。
Spring Cloud Task 採取的方法是,Spring Boot 應用可以有開始和結束,並且仍然是成功的。批次應用就是期望結束(並且通常是短生命週期)的程序如何有用的一個例子。
Spring Cloud Task 記錄給定任務的生命週期事件。大多數長期執行的程序,以大多數 Web 應用為典型代表,不儲存其生命週期事件。而 Spring Cloud Task 核心的任務會儲存。
生命週期由一個任務執行組成。這是配置為任務的 Spring Boot 應用的一次物理執行(即,它具有 Spring Cloud Task 依賴項)。
在任務開始時,在任何 CommandLineRunner
或 ApplicationRunner
實現執行之前,會在 TaskRepository
中建立一個條目來記錄開始事件。此事件透過 Spring Framework 觸發 SmartLifecycle#start
來觸發。這向系統表明所有 bean 都已準備就緒,並在執行 Spring Boot 提供的任何 CommandLineRunner
或 ApplicationRunner
實現之前發生。
任務的記錄僅在 ApplicationContext 成功引導時發生。如果上下文完全無法引導,則任務的執行不會被記錄。 |
在 Spring Boot 的所有 *Runner#run
呼叫完成後,或者 ApplicationContext
失敗(由 ApplicationFailedEvent
指示)時,任務執行結果會在倉庫中更新。
如果應用需要在任務完成後(所有 *Runner#run 方法已呼叫且任務倉庫已更新)關閉 ApplicationContext ,請將屬性 spring.cloud.task.closecontextEnabled 設定為 true。 |
3.1.1. TaskExecution
儲存在 TaskRepository
中的資訊建模在 TaskExecution
類中,包含以下資訊:
欄位 | 描述 |
---|---|
|
任務執行的唯一 ID。 |
|
由 |
|
任務的名稱,由配置的 |
|
任務開始的時間,由 |
|
任務完成的時間,由 |
|
退出時可用的任何資訊。這可以透過 |
|
如果異常是任務結束的原因(如 |
|
傳遞給可執行 boot 應用的字串命令列引數的 |
3.1.2. 對映退出碼
任務完成後,它會嘗試向作業系統返回一個退出碼。如果我們檢視 原始示例,可以看到我們並未控制應用的這方面。因此,如果丟擲異常,JVM 返回的程式碼可能對您的除錯有用,也可能沒用。
因此,Spring Boot 提供了一個介面 ExitCodeExceptionMapper
,允許您將未捕獲的異常對映到退出碼。這樣做可以讓您在退出碼層面指示出錯原因。此外,透過這種方式對映退出碼,Spring Cloud Task 會記錄返回的退出碼。
如果任務以 SIG-INT 或 SIG-TERM 終止,除非在程式碼中另有指定,否則退出碼為零。
任務執行時,退出碼在倉庫中儲存為 null。任務完成後,會根據本節前面所述的指南儲存適當的退出碼。 |
3.2. 配置
Spring Cloud Task 提供了一套開箱即用的配置,這些配置定義在 DefaultTaskConfigurer
和 SimpleTaskConfiguration
類中。本節將介紹預設配置以及如何根據您的需求自定義 Spring Cloud Task。
3.2.1. DataSource
Spring Cloud Task 使用一個數據源來儲存任務執行的結果。預設情況下,我們提供了一個記憶體中的 H2 例項,以提供一種簡單的開發引導方法。然而,在生產環境中,您可能希望配置自己的 DataSource
。
如果您的應用僅使用一個 DataSource
,並且該資料來源同時用於您的業務 schema 和任務倉庫,那麼您只需提供任何 DataSource
即可(最簡單的方法是透過 Spring Boot 的配置約定來提供)。Spring Cloud Task 會自動使用此 DataSource
作為任務倉庫。
如果您的應用使用多個 DataSource
,您需要使用適當的 DataSource
來配置任務倉庫。這種自定義可以透過實現 TaskConfigurer
來完成。
3.2.2. 表字首
TaskRepository
的一個可修改屬性是任務表的表字首。預設情況下,所有任務表都以 TASK_
作為字首。TASK_EXECUTION
和 TASK_EXECUTION_PARAMS
是兩個例子。然而,可能有一些原因需要修改此字首。如果需要將 schema 名稱作為表名的字首,或者在同一 schema 中需要多組任務表,則必須更改表字首。您可以透過將 spring.cloud.task.tablePrefix
設定為您需要的字首來實現,如下所示:
spring.cloud.task.tablePrefix=yourPrefix
透過使用 spring.cloud.task.tablePrefix
,使用者承擔建立滿足任務表 schema 要求且根據使用者業務需求進行修改的任務表的責任。您可以使用 Spring Cloud Task Schema DDL 作為建立您自己的任務 DDL 的參考,點此檢視。
3.2.3. 啟用/停用表初始化
在您自行建立任務表,並且不希望 Spring Cloud Task 在任務啟動時建立它們的情況下,將屬性 spring.cloud.task.initialize-enabled
設定為 false
,如下所示:
spring.cloud.task.initialize-enabled=false
預設值為 true
。
屬性 spring.cloud.task.initialize.enable 已被棄用。 |
3.2.4. 外部生成的任務 ID
在某些情況下,您可能希望考慮到任務被請求與基礎設施實際啟動它之間的時間差。Spring Cloud Task 允許您在任務被請求時建立一個 TaskExecution
。然後將生成的 TaskExecution
的執行 ID 傳遞給任務,以便任務可以在其生命週期中更新 TaskExecution
。
透過在引用儲存 TaskExecution
物件的 datastore 的 TaskRepository
實現上呼叫 createTaskExecution
方法,可以建立一個 TaskExecution
。
為了配置您的任務使用生成的 TaskExecutionId
,新增以下屬性:
spring.cloud.task.executionid=yourtaskId
3.2.5. 外部任務 ID
Spring Cloud Task 允許您為每個 TaskExecution
儲存一個外部任務 ID。為了配置您的任務使用生成的 TaskExecutionId
,新增以下屬性:
spring.cloud.task.external-execution-id=<externalTaskId>
3.2.6. 父任務 ID
Spring Cloud Task 允許您為每個 TaskExecution
儲存一個父任務 ID。一個例子是,一個任務執行另一個或多個任務,並且您想記錄是哪個任務啟動了每個子任務。為了配置您的子任務設定父 TaskExecutionId
,請在子任務中新增以下屬性:
spring.cloud.task.parent-execution-id=<parentExecutionTaskId>
3.2.7. TaskConfigurer
TaskConfigurer
是一個策略介面,允許您自定義 Spring Cloud Task 各元件的配置方式。預設情況下,我們提供 DefaultTaskConfigurer
,它提供邏輯預設值:基於 Map
的記憶體元件(在未提供 DataSource
時對開發有用)和基於 JDBC 的元件(在有 DataSource
可用時有用)。
TaskConfigurer
允許您配置三個主要元件:
元件 | 描述 | 預設值(由 DefaultTaskConfigurer 提供) |
---|---|---|
|
將使用的 |
|
|
將使用的 |
|
|
用於任務更新時的事務管理器。 |
如果使用了 |
您可以透過建立 TaskConfigurer
介面的自定義實現來定製前面表格中描述的任何元件。通常,擴充套件 DefaultTaskConfigurer
(如果在找不到 TaskConfigurer
時提供)並重寫所需的 getter 就足夠了。但是,可能需要從頭開始實現您自己的版本。
使用者不應直接使用 TaskConfigurer 中的 getter 方法,除非他們是使用它來提供要暴露為 Spring Bean 的實現。 |
3.2.8. 任務執行監聽器
TaskExecutionListener
允許您註冊任務生命週期中發生的特定事件的監聽器。為此,建立一個實現 TaskExecutionListener
介面的類。實現 TaskExecutionListener
介面的類會收到以下事件的通知:
-
onTaskStartup
:在將TaskExecution
儲存到TaskRepository
之前。 -
onTaskEnd
:在更新TaskExecution
在TaskRepository
中的條目並標記任務最終狀態之前。 -
onTaskFailed
:當任務丟擲未處理的異常時,在呼叫onTaskEnd
方法之前。
Spring Cloud Task 還允許您使用以下方法註解將 TaskExecution
監聽器新增到 bean 中的方法上:
-
@BeforeTask
:在將TaskExecution
儲存到TaskRepository
之前。 -
@AfterTask
:在更新TaskExecution
在TaskRepository
中的條目並標記任務最終狀態之前。 -
@FailedTask
:當任務丟擲未處理的異常時,在呼叫@AfterTask
方法之前。
以下示例展示了這三個註解的使用:
public class MyBean {
@BeforeTask
public void methodA(TaskExecution taskExecution) {
}
@AfterTask
public void methodB(TaskExecution taskExecution) {
}
@FailedTask
public void methodC(TaskExecution taskExecution, Throwable throwable) {
}
}
在鏈中插入一個早於 TaskLifecycleListener 的 ApplicationListener 可能會導致意外的效果。 |
任務執行監聽器丟擲的異常
如果 TaskExecutionListener
事件處理器丟擲異常,該事件處理器的所有監聽器處理都會停止。例如,如果三個 onTaskStartup
監聽器已啟動,並且第一個 onTaskStartup
事件處理器丟擲異常,則不會呼叫另外兩個 onTaskStartup
方法。但是,TaskExecutionListeners
的其他事件處理器(onTaskEnd
和 onTaskFailed
)會被呼叫。
當 TaskExecutionListener
事件處理器丟擲異常時返回的退出碼是 ExitCodeEvent 報告的退出碼。如果沒有發出 ExitCodeEvent
,則會評估丟擲的異常,看其是否屬於 ExitCodeGenerator 型別。如果是,則返回 ExitCodeGenerator
的退出碼。否則,返回 1
。
如果在 onTaskStartup
方法中丟擲異常,應用的退出碼將為 1
。如果在 onTaskEnd
或 onTaskFailed
方法中丟擲異常,應用的退出碼將是根據上述規則確定的退出碼。
如果在 onTaskStartup 、onTaskEnd 或 onTaskFailed 中丟擲異常,您不能使用 ExitCodeExceptionMapper 來覆蓋應用的退出碼。 |
退出訊息
您可以使用 TaskExecutionListener
以程式設計方式設定任務的退出訊息。這是透過設定 TaskExecution
的 exitMessage
來完成的,然後該訊息會傳遞給 TaskExecutionListener
。以下示例顯示了一個使用 @AfterTask
ExecutionListener
註解的方法:
@AfterTask
public void afterMe(TaskExecution taskExecution) {
taskExecution.setExitMessage("AFTER EXIT MESSAGE");
}
可以在任何監聽器事件(onTaskStartup
、onTaskFailed
和 onTaskEnd
)中設定 ExitMessage
。三個監聽器的優先順序如下:
-
onTaskEnd
-
onTaskFailed
-
onTaskStartup
例如,如果您為 onTaskStartup
和 onTaskFailed
監聽器設定了 exitMessage
,並且任務在沒有失敗的情況下結束,則儲存在倉庫中的是來自 onTaskStartup
的 exitMessage
。否則,如果發生失敗,則儲存來自 onTaskFailed
的 exitMessage
。此外,如果您使用 onTaskEnd
監聽器設定了 exitMessage
,則來自 onTaskEnd
的 exitMessage
會覆蓋來自 onTaskStartup
和 onTaskFailed
的退出訊息。
3.2.9. 限制 Spring Cloud Task 例項
Spring Cloud Task 允許您指定具有給定任務名稱的任務一次只能執行一個例項。為此,您需要指定任務名稱,併為每個任務執行設定 spring.cloud.task.single-instance-enabled=true
。當第一個任務執行正在執行時,如果您嘗試使用相同的任務名稱和 spring.cloud.task.single-instance-enabled=true
再次執行任務,任務將失敗並顯示以下錯誤訊息:Task with name "application" is already running.
spring.cloud.task.single-instance-enabled
的預設值為 false
。以下示例展示瞭如何將 spring.cloud.task.single-instance-enabled
設定為 true
:
spring.cloud.task.single-instance-enabled=true or false
要使用此功能,您必須嚮應用新增以下 Spring Integration 依賴項:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jdbc</artifactId>
</dependency>
如果任務因啟用此功能且有另一個同名任務正在執行而失敗,則應用的退出碼將為 1。 |
Spring AOT 和原生編譯中的單例項用法
在建立原生編譯應用時使用 Spring Cloud Task 的單例項功能,您需要在構建時啟用該功能。為此,新增 process-aot 執行,並將 spring.cloud.task.single-step-instance-enabled=true
設定為 JVM 引數,如下所示:
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<id>process-aot</id>
<goals>
<goal>process-aot</goal>
</goals>
<configuration>
<jvmArguments>
-Dspring.cloud.task.single-instance-enabled=true
</jvmArguments>
</configuration>
</execution>
</executions>
</plugin>
3.2.10. 為 ApplicationRunner 和 CommandLineRunner 啟用觀測
要為 ApplicationRunner
或 CommandLineRunner
啟用任務觀測,請將 spring.cloud.task.observation.enabled
設定為 true。
一個啟用了觀測並使用 SimpleMeterRegistry
的任務應用示例可在 此處找到。
3.2.11. 停用 Spring Cloud Task 自動配置
在某些情況下,不應為某個實現自動配置 Spring Cloud Task,您可以停用 Task 的自動配置。這可以透過向您的任務應用新增以下註解來實現:
@EnableAutoConfiguration(exclude={SimpleTaskAutoConfiguration.class})
您也可以透過將屬性 spring.cloud.task.autoconfiguration.enabled
設定為 false
來停用 Task 的自動配置。
3.2.12. 關閉上下文
如果應用需要在任務完成後(所有 *Runner#run
方法已呼叫且任務倉庫已更新)關閉 ApplicationContext
,請將屬性 spring.cloud.task.closecontextEnabled
設定為 true
。
另一種需要關閉上下文的情況是當任務執行完成但應用沒有終止時。在這些情況下,上下文保持開啟狀態是因為分配了執行緒(例如:如果您正在使用 TaskExecutor)。在這種情況下,當啟動任務時,將 spring.cloud.task.closecontextEnabled
屬性設定為 true
。這會在任務完成後關閉應用的上下文。從而允許應用終止。
3.2.13. 啟用任務指標
Spring Cloud Task 集成了 Micrometer 併為其執行的任務建立觀測。要啟用任務可觀測性整合,您必須向您的任務應用新增 spring-boot-starter-actuator
、您首選的登錄檔實現(如果您想釋出指標)以及 micrometer-tracing(如果您想釋出跟蹤資料)。使用 Influx 啟用任務可觀測性和指標的 Maven 依賴示例是:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-influx</artifactId>
<scope>runtime</scope>
</dependency>
3.2.14. Spring Task 和 Spring Cloud Task 屬性
術語 task
是業內一個常用詞彙。一個例子是 Spring Boot 提供了 spring.task
,而 Spring Cloud Task 提供了 spring.cloud.task
屬性。這在過去導致了一些混淆,認為這兩組屬性直接相關。然而,它們代表了 Spring 生態系統中提供的兩組不同的功能。
-
spring.task
指的是配置ThreadPoolTaskScheduler
的屬性。 -
spring.cloud.task
指的是配置 Spring Cloud Task 功能的屬性。
4. 批處理
本節詳細介紹了 Spring Cloud Task 與 Spring Batch 的整合。本節涵蓋了如何跟蹤作業執行與其所在任務之間的關聯,以及透過 Spring Cloud Deployer 進行遠端分割槽。
4.1. 將作業執行與其所在任務關聯
Spring Boot 提供了在 über-jar 中執行批處理作業的功能。Spring Boot 對此功能的支援允許開發人員在該執行中執行多個批處理作業。Spring Cloud Task 提供了將作業執行(即作業執行)與任務執行關聯的能力,以便可以相互追溯。
Spring Cloud Task 透過使用 TaskBatchExecutionListener
實現此功能。預設情況下,在同時配置了 Spring Batch 作業(即在上下文中定義了型別為 Job
的 bean)並且類路徑中包含 spring-cloud-task-batch
jar 的任何上下文中,此監聽器都會自動配置。滿足這些條件的所有作業都會注入該監聽器。
4.1.1. 覆蓋 TaskBatchExecutionListener
要阻止監聽器被注入到當前上下文中的任何批處理作業中,可以使用標準的 Spring Boot 機制停用自動配置。
要僅將監聽器注入到上下文中的特定作業中,請覆蓋 batchTaskExecutionListenerBeanPostProcessor
並提供作業 bean ID 列表,如下例所示
public static TaskBatchExecutionListenerBeanPostProcessor batchTaskExecutionListenerBeanPostProcessor() {
TaskBatchExecutionListenerBeanPostProcessor postProcessor =
new TaskBatchExecutionListenerBeanPostProcessor();
postProcessor.setJobNames(Arrays.asList(new String[] {"job1", "job2"}));
return postProcessor;
}
您可以在 Spring Cloud Task 專案的 samples 模組中找到批處理應用程式示例,點此檢視。 |
4.2. 遠端分割槽
Spring Cloud Deployer 提供了在大多數雲基礎設施上啟動基於 Spring Boot 的應用程式的功能。DeployerPartitionHandler
和 DeployerStepExecutionHandler
將工作步驟執行的啟動委託給 Spring Cloud Deployer。
要配置 DeployerStepExecutionHandler
,必須提供表示要執行的 Spring Boot über-jar 的 Resource
、TaskLauncherHandler
和 JobExplorer
。您可以配置任何環境屬性,以及同時執行的最大工作執行緒數、輪詢結果的間隔(預設為 10 秒)和超時(預設為 -1 或無超時)。以下示例展示瞭如何配置此 PartitionHandler
@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher,
JobExplorer jobExplorer) throws Exception {
MavenProperties mavenProperties = new MavenProperties();
mavenProperties.setRemoteRepositories(new HashMap<>(Collections.singletonMap("springRepo",
new MavenProperties.RemoteRepository(repository))));
Resource resource =
MavenResource.parse(String.format("%s:%s:%s",
"io.spring.cloud",
"partitioned-batch-job",
"1.1.0.RELEASE"), mavenProperties);
DeployerPartitionHandler partitionHandler =
new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep");
List<String> commandLineArgs = new ArrayList<>(3);
commandLineArgs.add("--spring.profiles.active=worker");
commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
commandLineArgs.add("--spring.batch.initializer.enabled=false");
partitionHandler.setCommandLineArgsProvider(
new PassThroughCommandLineArgsProvider(commandLineArgs));
partitionHandler.setEnvironmentVariablesProvider(new NoOpEnvironmentVariablesProvider());
partitionHandler.setMaxWorkers(2);
partitionHandler.setApplicationName("PartitionedBatchJobTask");
return partitionHandler;
}
將環境變數傳遞給分割槽時,每個分割槽可能位於具有不同環境設定的不同機器上。因此,您只應傳遞那些必需的環境變數。 |
請注意上例中,我們將最大工作執行緒數設定為 2。設定最大工作執行緒數確定了應同時執行的最大分割槽數。
要執行的 Resource
預期是配置了 DeployerStepExecutionHandler
作為當前上下文中的 CommandLineRunner
的 Spring Boot über-jar。前面示例中列舉的倉庫應是 über-jar 所在的遠端倉庫。管理者和工作執行緒都應能訪問用作作業倉庫和任務倉庫的同一資料儲存。一旦底層基礎設施引導了 Spring Boot jar 並且 Spring Boot 啟動了 DeployerStepExecutionHandler
,該步驟處理器就會執行請求的 Step
。以下示例展示瞭如何配置 DeployerStepExecutionHandler
@Bean
public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) {
DeployerStepExecutionHandler handler =
new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository);
return handler;
}
您可以在 Spring Cloud Task 專案的 samples 模組中找到遠端分割槽應用程式示例,點此檢視。 |
4.2.1. 非同步啟動遠端批處理分割槽
預設情況下,批處理分割槽是順序啟動的。然而,在某些情況下,這可能會影響效能,因為每次啟動都會阻塞,直到資源(例如:在 Kubernetes 中預置 Pod)被預置完成。在這種情況下,您可以向 DeployerPartitionHandler
提供一個 ThreadPoolTaskExecutor
。這將根據 ThreadPoolTaskExecutor
的配置啟動遠端批處理分割槽。例如
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setThreadNamePrefix("default_task_executor_thread");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer,
TaskRepository taskRepository, ThreadPoolTaskExecutor executor) throws Exception {
Resource resource = this.resourceLoader
.getResource("maven://io.spring.cloud:partitioned-batch-job:2.2.0.BUILD-SNAPSHOT");
DeployerPartitionHandler partitionHandler =
new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
"workerStep", taskRepository, executor);
...
}
我們需要關閉上下文,因為使用 ThreadPoolTaskExecutor 會留下一個活動的執行緒,從而導致應用程式不會終止。要適當地關閉應用程式,我們需要將 spring.cloud.task.closecontextEnabled 屬性設定為 true 。 |
4.2.2. 關於為 Kubernetes 平臺開發批處理分割槽應用程式的注意事項
-
在 Kubernetes 平臺上部署分割槽應用程式時,必須使用 Spring Cloud Kubernetes Deployer 的以下依賴
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-deployer-kubernetes</artifactId> </dependency>
-
任務應用程式及其分割槽的應用程式名稱需要遵循以下正則表示式模式:
[a-z0-9]([-a-z0-9]*[a-z0-9])
。否則,將丟擲異常。
4.3. 批處理資訊訊息
Spring Cloud Task 提供了批處理作業發出資訊訊息的能力。“Spring Batch 事件” 部分詳細介紹了此功能。
4.4. 批處理作業退出碼
如前所述,Spring Cloud Task 應用程式支援記錄任務執行的退出碼。但是,在任務中執行 Spring Batch 作業的情況下,無論批處理作業執行如何完成,使用預設的 Batch/Boot 行為時,任務的結果始終為零。請記住,任務是引導應用程式,從任務返回的退出碼與引導應用程式相同。要覆蓋此行為並在批處理作業返回 BatchStatus 為 FAILED
時允許任務返回非零退出碼,請將 spring.cloud.task.batch.fail-on-job-failure
設定為 true
。然後退出碼可以是 1(預設值)或基於指定的 ExitCodeGenerator
)
此功能使用一個新的 ApplicationRunner
,它替換了 Spring Boot 提供的那一個。預設情況下,它的配置順序與 Spring Boot 提供的相同。但是,如果您想自定義 ApplicationRunner
的執行順序,可以透過設定 spring.cloud.task.batch.applicationRunnerOrder
屬性來設定其順序。要讓您的任務根據批處理作業執行結果返回退出碼,您需要編寫自己的 CommandLineRunner
。
5. 單步批處理作業啟動器
本節介紹瞭如何使用 Spring Cloud Task 中包含的啟動器開發具有單個 Step
的 Spring Batch Job
。該啟動器允許您使用配置來定義 ItemReader
、ItemWriter
或完整的單步 Spring Batch Job
。有關 Spring Batch 及其功能的更多資訊,請參閱Spring Batch 文件。
要獲取 Maven 啟動器,請將以下內容新增到您的構建中
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-single-step-batch-job</artifactId>
<version>2.3.0</version>
</dependency>
要獲取 Gradle 啟動器,請將以下內容新增到您的構建中
compile "org.springframework.cloud:spring-cloud-starter-single-step-batch-job:2.3.0"
5.1. 定義作業
您可以使用此啟動器定義一個簡單的 ItemReader
或 ItemWriter
,也可以定義一個完整的 Job
。在本節中,我們將定義配置 Job
所需的屬性。
5.1.1. 屬性
首先,此啟動器提供了一組屬性,允許您配置具有一個 Step 的 Job 的基本資訊
屬性 | 型別 | 預設值 | 描述 |
---|---|---|---|
|
|
|
作業的名稱。 |
|
|
|
步驟的名稱。 |
|
|
|
每個事務要處理的條目數。 |
配置了上述屬性後,您就擁有了一個具有單個基於 chunk 的步驟的作業。這個基於 chunk 的步驟以 Map<String, Object>
例項作為條目進行讀取、處理和寫入。但是,這個步驟還沒有執行任何操作。您需要配置一個 ItemReader
、一個可選的 ItemProcessor
和一個 ItemWriter
來讓它做一些事情。要配置其中之一,您可以使用屬性並配置提供了自動配置的選項之一,或者使用標準的 Spring 配置機制配置您自己的。
如果您配置自己的實現,輸入和輸出型別必須與步驟中的其他實現匹配。此啟動器中的 ItemReader 實現和 ItemWriter 實現都使用 Map<String, Object> 作為輸入和輸出條目。 |
5.2. ItemReader 實現的自動配置
此啟動器為四種不同的 ItemReader
實現提供了自動配置:AmqpItemReader
、FlatFileItemReader
、JdbcCursorItemReader
和 KafkaItemReader
。在本節中,我們將概述如何使用提供的自動配置來配置它們中的每一個。
5.2.1. AmqpItemReader
您可以使用 AmqpItemReader
透過 AMQP 從佇列或主題讀取資料。此 ItemReader
實現的自動配置取決於兩組配置。第一組是 AmqpTemplate
的配置。您可以自己配置,也可以使用 Spring Boot 提供的自動配置。請參閱Spring Boot AMQP 文件。配置好 AmqpTemplate
後,您可以透過設定以下屬性啟用批處理能力來支援它
屬性 | 型別 | 預設值 | 描述 |
---|---|---|---|
|
|
|
如果為 |
|
|
|
指示是否應該註冊 |
更多資訊,請參閱AmqpItemReader
文件。
5.2.2. FlatFileItemReader
FlatFileItemReader
允許您從平面檔案(如 CSV 和其他檔案格式)讀取資料。要從檔案讀取,您可以透過正常的 Spring 配置自行提供一些元件(LineTokenizer
, RecordSeparatorPolicy
, FieldSetMapper
, LineMapper
, 或 SkippedLinesCallback
)。您也可以使用以下屬性來配置讀取器
屬性 | 型別 | 預設值 | 描述 |
---|---|---|---|
|
|
|
確定是否應為重新啟動儲存狀態。 |
|
|
|
用於在 |
|
|
|
從檔案讀取的最大條目數。 |
|
|
0 |
已讀取的條目數。用於重新啟動。 |
|
|
空列表 |
一個字串列表,指示檔案中的註釋行(要忽略的行)。 |
|
|
|
要讀取的資源。 |
|
|
|
如果設定為 |
|
|
|
讀取檔案時使用的編碼。 |
|
|
0 |
指示檔案開頭要跳過的行數。 |
|
|
|
指示檔案是否為分隔檔案(CSV 及其他格式)。此屬性或 |
|
|
|
如果讀取分隔檔案,指示用於解析的分隔符。 |
|
|
|
用於確定用於引用值的字元。 |
|
|
空列表 |
一個索引列表,用於確定記錄中哪些欄位包含在條目中。 |
|
|
|
指示檔案的記錄是否按列號解析。此屬性或 |
|
|
空列表 |
用於解析定寬記錄的列範圍列表。請參閱Range 文件。 |
|
|
|
從記錄解析出的每個欄位的名稱列表。這些名稱是此 |
|
|
|
如果設定為 |
5.2.3. JdbcCursorItemReader
JdbcCursorItemReader
對關係資料庫執行查詢,並遍歷結果遊標(ResultSet
)以提供結果條目。此自動配置允許您提供一個 PreparedStatementSetter
、一個 RowMapper
或兩者。您還可以使用以下屬性來配置 JdbcCursorItemReader
屬性 | 型別 | 預設值 | 描述 |
---|---|---|---|
|
|
|
確定是否應為重新啟動儲存狀態。 |
|
|
|
用於在 |
|
|
|
從檔案讀取的最大條目數。 |
|
|
0 |
已讀取的條目數。用於重新啟動。 |
|
|
給驅動程式的一個提示,指示每次呼叫資料庫系統時檢索多少條記錄。為了獲得最佳效能,通常希望將其設定為與 chunk 大小匹配。 |
|
|
|
從資料庫讀取的最大條目數。 |
|
|
|
查詢超時的毫秒數。 |
|
|
|
|
確定讀取器在處理時是否應忽略 SQL 警告。 |
|
|
|
指示每次讀取後是否應驗證遊標位置,以驗證 |
|
|
|
指示驅動程式是否支援遊標的絕對定位。 |
|
|
|
指示連線是否與其他處理共享(因此是事務的一部分)。 |
|
|
|
從中讀取的 SQL 查詢。 |
您還可以使用以下屬性專門為讀取器指定 JDBC DataSource:.JdbcCursorItemReader
屬性
屬性 | 型別 | 預設值 | 描述 |
---|---|---|---|
|
|
|
確定是否應啟用 |
|
|
|
資料庫的 JDBC URL。 |
|
|
|
資料庫的登入使用者名稱。 |
|
|
|
資料庫的登入密碼。 |
|
|
|
JDBC 驅動程式的完全限定名。 |
如果未指定 jdbccursoritemreader_datasource ,JDBCCursorItemReader 將使用預設的 DataSource 。 |
5.2.4. KafkaItemReader
從 Kafka 主題攝取分割槽資料非常有用,這正是 KafkaItemReader
可以做到的。要配置 KafkaItemReader
,需要兩部分配置。首先,需要使用 Spring Boot 的 Kafka 自動配置來配置 Kafka(請參閱Spring Boot Kafka 文件)。一旦您配置了 Spring Boot 的 Kafka 屬性,就可以透過設定以下屬性來配置 KafkaItemReader
本身
屬性 | 型別 | 預設值 | 描述 |
---|---|---|---|
|
|
|
用於在 |
|
|
|
要從中讀取的主題名稱。 |
|
|
空列表 |
要從中讀取的分割槽索引列表。 |
|
|
30 |
|
|
|
|
確定是否應為重新啟動儲存狀態。 |
5.2.5. 本地編譯
單步批處理的優點是,在使用 JVM 時,您可以在執行時動態選擇要使用的讀取器和寫入器 bean。然而,在使用本地編譯時,您必須在構建時而不是執行時確定讀取器和寫入器。以下示例展示瞭如何做到這一點
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<id>process-aot</id>
<goals>
<goal>process-aot</goal>
</goals>
<configuration>
<jvmArguments>
-Dspring.batch.job.flatfileitemreader.name=fooReader
-Dspring.batch.job.flatfileitemwriter.name=fooWriter
</jvmArguments>
</configuration>
</execution>
</executions>
</plugin>
5.3. ItemProcessor 配置
如果 ApplicationContext
中有一個 ItemProcessor
可用,單步批處理作業自動配置將接受它。如果找到型別正確(ItemProcessor<Map<String, Object>, Map<String, Object>>
)的處理器,它將自動注入到步驟中。
5.4. ItemWriter 實現的自動配置
此啟動器為與支援的 ItemReader
實現匹配的 ItemWriter
實現提供自動配置:AmqpItemWriter
、FlatFileItemWriter
、JdbcItemWriter
和 KafkaItemWriter
。本節介紹瞭如何使用自動配置來配置支援的 ItemWriter
。
5.4.1. AmqpItemWriter
要寫入 RabbitMQ 佇列,您需要兩組配置。首先,您需要一個 AmqpTemplate
。最簡單的方法是使用 Spring Boot 的 RabbitMQ 自動配置。請參閱Spring Boot AMQP 文件。
配置好 AmqpTemplate
後,您可以透過設定以下屬性來配置 AmqpItemWriter
屬性 | 型別 | 預設值 | 描述 |
---|---|---|---|
|
|
|
如果為 |
|
|
|
指示是否應註冊 |
5.4.2. FlatFileItemWriter
要將檔案作為步驟的輸出寫入,您可以配置 FlatFileItemWriter
。自動配置接受已明確配置的元件(如 LineAggregator
、FieldExtractor
、FlatFileHeaderCallback
或 FlatFileFooterCallback
)以及透過設定以下指定屬性配置的元件
屬性 | 型別 | 預設值 | 描述 |
---|---|---|---|
|
|
|
要讀取的資源。 |
|
|
|
指示輸出檔案是否為分隔檔案。如果為 |
|
|
|
指示輸出檔案是否為格式化檔案。如果為 |
|
|
|
用於為格式化檔案生成輸出的格式。格式化透過使用 |
|
|
|
生成檔案時使用的 |
|
|
0 |
記錄的最大長度。如果為 0,則大小無限制。 |
|
|
0 |
記錄的最小長度。 |
|
|
|
用於分隔分隔檔案中欄位的 |
|
|
|
寫入檔案時使用的編碼。 |
|
|
|
指示檔案在重新整理時是否應強制同步到磁碟。 |
|
|
|
從記錄解析出的每個欄位的名稱列表。這些名稱是此 |
|
|
|
指示如果找到輸出檔案是否應追加到檔案中。 |
|
|
|
用於分隔輸出檔案中行的 |
|
|
|
用於在 |
|
|
|
確定是否應為重新啟動儲存狀態。 |
|
|
|
如果設定為 |
|
|
|
如果設定為 |
|
|
|
指示讀取器是否為事務性佇列(表示讀取的條目在失敗時返回佇列)。 |
5.4.3. JdbcBatchItemWriter
要將步驟的輸出寫入關係資料庫,此啟動器提供了自動配置 JdbcBatchItemWriter
的能力。透過設定以下屬性,自動配置允許您提供自己的 ItemPreparedStatementSetter
或 ItemSqlParameterSourceProvider
以及配置選項
屬性 | 型別 | 預設值 | 描述 |
---|---|---|---|
|
|
|
用於在 |
|
|
|
用於插入每個條目的 SQL。 |
|
|
|
是否驗證每次插入至少更新了一條記錄。 |
您還可以使用以下屬性專門為寫入器指定 JDBC DataSource:.JdbcBatchItemWriter
屬性
屬性 | 型別 | 預設值 | 描述 |
---|---|---|---|
|
|
|
確定是否應啟用 |
|
|
|
資料庫的 JDBC URL。 |
|
|
|
資料庫的登入使用者名稱。 |
|
|
|
資料庫的登入密碼。 |
|
|
|
JDBC 驅動程式的完全限定名。 |
如果未指定 jdbcbatchitemwriter_datasource ,JdbcBatchItemWriter 將使用預設的 DataSource 。 |
5.4.4. KafkaItemWriter
要將步驟輸出寫入 Kafka 主題,您需要 KafkaItemWriter
。此啟動器透過使用兩方面的功能為 KafkaItemWriter
提供自動配置。首先是 Spring Boot 的 Kafka 自動配置。(請參閱Spring Boot Kafka 文件。)其次,此啟動器允許您配置寫入器上的兩個屬性。
屬性 | 型別 | 預設值 | 描述 |
---|---|---|---|
|
|
|
要寫入的 Kafka 主題。 |
|
|
|
傳遞給寫入器的條目是否都作為刪除事件傳送到主題。 |
有關 KafkaItemWriter
配置選項的更多資訊,請參閱KafkaItemWiter
文件。
5.4.5. Spring AOT
使用 Spring AOT 和單步批處理啟動器時,必須在編譯時設定讀取器和寫入器的名稱屬性(除非為讀取器和/或寫入器建立 bean)。為此,您必須在 boot maven 外掛或 gradle 外掛中將您希望使用的讀取器和寫入器名稱作為引數或環境變數包含進來。例如,如果您希望在 Maven 中啟用 FlatFileItemReader
和 FlatFileItemWriter
,它看起來像這樣
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<id>process-aot</id>
<goals>
<goal>process-aot</goal>
</goals>
</execution>
</executions>
<configuration>
<arguments>
<argument>--spring.batch.job.flatfileitemreader.name=foobar</argument>
<argument>--spring.batch.job.flatfileitemwriter.name=fooWriter</argument>
</arguments>
</configuration>
</plugin>
6. Spring Cloud Stream 整合
任務本身可能有用,但將任務整合到更大的生態系統中可以使其在更復雜的處理和編排中發揮作用。本節涵蓋了 Spring Cloud Task 與 Spring Cloud Stream 的整合選項。
6.1. 從 Spring Cloud Stream 啟動任務
您可以從流中啟動任務。為此,請建立一個偵聽包含 TaskLaunchRequest
作為其有效載荷的訊息的 sink。TaskLaunchRequest
包含
-
uri
:指向要執行的任務 artifact。 -
applicationName
:與任務關聯的名稱。如果未設定 applicationName,TaskLaunchRequest
將生成一個由以下內容組成的任務名稱:Task-<UUID>
。 -
commandLineArguments
:一個包含任務命令列引數的列表。 -
environmentProperties
:一個包含任務要使用的環境變數的 Map。 -
deploymentProperties
:一個包含部署器用於部署任務的屬性的 Map。
如果有效載荷是不同型別,則 sink 將丟擲異常。 |
例如,可以建立一個流,該流具有一個處理器,該處理器從 HTTP 源接收資料並建立包含 TaskLaunchRequest
的 GenericMessage
,然後將訊息傳送到其輸出通道。任務 sink 將從其輸入通道接收訊息,然後啟動任務。
要建立 taskSink,只需建立一個包含 EnableTaskLauncher
註解的 Spring Boot 應用程式,如下例所示
@SpringBootApplication
@EnableTaskLauncher
public class TaskSinkApplication {
public static void main(String[] args) {
SpringApplication.run(TaskSinkApplication.class, args);
}
}
Spring Cloud Task 專案的 samples 模組包含一個 Sink 和 Processor 示例。要將這些示例安裝到您的本地 Maven 倉庫,請在 spring-cloud-task-samples
目錄中執行 Maven 構建,並將 skipInstall
屬性設定為 false
,如下例所示
mvn clean install
必須將 maven.remoteRepositories.springRepo.url 屬性設定為 über-jar 所在的遠端倉庫位置。如果未設定,則沒有遠端倉庫,因此僅依賴本地倉庫。 |
6.1.1. Spring Cloud Data Flow
要在 Spring Cloud Data Flow 中建立流,必須先註冊我們建立的 Task Sink 應用程式。在下面的示例中,我們使用 Spring Cloud Data Flow shell 註冊 Processor 和 Sink 示例應用程式。
app register --name taskSink --type sink --uri maven://io.spring.cloud:tasksink:<version>
app register --name taskProcessor --type processor --uri maven:io.spring.cloud:taskprocessor:<version>
以下示例展示瞭如何從 Spring Cloud Data Flow shell 建立流。
stream create foo --definition "http --server.port=9000|taskProcessor|taskSink" --deploy
6.2. Spring Cloud Task 事件
Spring Cloud Task 提供了當任務透過 Spring Cloud Stream 通道執行時,可以透過 Spring Cloud Stream 通道發出事件的能力。任務監聽器用於將 TaskExecution
釋出到名為 task-events
的訊息通道上。此功能會自動裝配到任何 classpath 中包含 spring-cloud-stream
、spring-cloud-stream-<binder>
和已定義任務的應用程式中。
要停用事件傳送監聽器,請將 spring.cloud.task.events.enabled 屬性設定為 false 。 |
定義了適當的 classpath 後,以下任務會在 task-events
通道上將 TaskExecution
作為事件發出(在任務開始和結束時都會發出)。
@SpringBootApplication
public class TaskEventsApplication {
public static void main(String[] args) {
SpringApplication.run(TaskEventsApplication.class, args);
}
@Configuration
public static class TaskConfiguration {
@Bean
public ApplicationRunner applicationRunner() {
return new ApplicationRunner() {
@Override
public void run(ApplicationArguments args) {
System.out.println("The ApplicationRunner was executed");
}
};
}
}
}
classpath 中還需要包含繫結器實現。 |
Spring Cloud Task 專案的 samples 模組中提供了一個任務事件示例應用程式,請參見此處。 |
6.2.1. 停用特定任務事件
要停用任務事件,可以將 spring.cloud.task.events.enabled
屬性設定為 false
。
6.3. Spring Batch 事件
當透過任務執行 Spring Batch 作業時,Spring Cloud Task 可以配置為根據 Spring Batch 中可用的 Spring Batch 監聽器發出資訊性訊息。具體來說,當透過 Spring Cloud Task 執行時,以下 Spring Batch 監聽器會自動配置到每個批處理作業中,並在相關的 Spring Cloud Stream 通道上發出訊息。
-
JobExecutionListener
監聽job-execution-events
-
StepExecutionListener
監聽step-execution-events
-
ChunkListener
監聽chunk-events
-
ItemReadListener
監聽item-read-events
-
ItemProcessListener
監聽item-process-events
-
ItemWriteListener
監聽item-write-events
-
SkipListener
監聽skip-events
當上下文中存在適當的 Bean(Job
和 TaskLifecycleListener
)時,這些監聽器會自動配置到任何 AbstractJob
中。監聽這些事件的配置方式與繫結任何其他 Spring Cloud Stream 通道相同。我們的任務(執行批處理作業的任務)充當 Source
,而監聽應用程式充當 Processor
或 Sink
。
一個例子是有一個應用程式監聽 job-execution-events
通道,以獲取作業的開始和停止事件。要配置監聽應用程式,您可以如下配置輸入為 job-execution-events
。
spring.cloud.stream.bindings.input.destination=job-execution-events
classpath 中還需要包含繫結器實現。 |
Spring Cloud Task 專案的 samples 模組中提供了一個批處理事件示例應用程式,請參見此處。 |
6.3.1. 將批處理事件傳送到不同的通道
Spring Cloud Task 為批處理事件提供的一個選項是更改特定監聽器發出訊息的通道的能力。為此,請使用以下配置:spring.cloud.stream.bindings.<the channel>.destination=<new destination>
。例如,如果 StepExecutionListener
需要將訊息傳送到名為 my-step-execution-events
的另一個通道,而不是預設的 step-execution-events
,您可以新增以下配置。
spring.cloud.task.batch.events.step-execution-events-binding-name=my-step-execution-events
6.3.2. 停用批處理事件
要停用所有批處理事件的監聽器功能,請使用以下配置。
spring.cloud.task.batch.events.enabled=false
要停用特定的批處理事件,請使用以下配置。
spring.cloud.task.batch.events.<batch event listener>.enabled=false
:
以下列表顯示了您可以停用的單個監聽器。
spring.cloud.task.batch.events.job-execution.enabled=false
spring.cloud.task.batch.events.step-execution.enabled=false
spring.cloud.task.batch.events.chunk.enabled=false
spring.cloud.task.batch.events.item-read.enabled=false
spring.cloud.task.batch.events.item-process.enabled=false
spring.cloud.task.batch.events.item-write.enabled=false
spring.cloud.task.batch.events.skip.enabled=false
6.3.3. 批處理事件的傳送順序
預設情況下,批處理事件具有 Ordered.LOWEST_PRECEDENCE
。要更改此值(例如,改為 5),請使用以下配置。
spring.cloud.task.batch.events.job-execution-order=5
spring.cloud.task.batch.events.step-execution-order=5
spring.cloud.task.batch.events.chunk-order=5
spring.cloud.task.batch.events.item-read-order=5
spring.cloud.task.batch.events.item-process-order=5
spring.cloud.task.batch.events.item-write-order=5
spring.cloud.task.batch.events.skip-order=5
7. 附錄
7.1. 任務倉庫 Schema
本附錄提供了任務倉庫中使用的資料庫 schema 的 ERD(實體關係圖)。

7.1.1. 表資訊
儲存任務執行資訊。
列名 | 必需 | 型別 | 欄位長度 | 備註 |
---|---|---|---|---|
TASK_EXECUTION_ID |
TRUE |
BIGINT |
X |
Spring Cloud Task Framework 在應用程式啟動時從 |
START_TIME |
FALSE |
DATETIME(6) |
X |
Spring Cloud Task Framework 在應用程式啟動時建立該值。 |
END_TIME |
FALSE |
DATETIME(6) |
X |
Spring Cloud Task Framework 在應用程式退出時建立該值。 |
TASK_NAME |
FALSE |
VARCHAR |
100 |
Spring Cloud Task Framework 在應用程式啟動時會將其設定為 "Application",除非使用者使用 |
EXIT_CODE |
FALSE |
INTEGER |
X |
遵循 Spring Boot 預設值,除非使用者如此處所述進行覆蓋。 |
EXIT_MESSAGE |
FALSE |
VARCHAR |
2500 |
使用者定義,如此處所述。 |
ERROR_MESSAGE |
FALSE |
VARCHAR |
2500 |
Spring Cloud Task Framework 在應用程式退出時建立該值。 |
LAST_UPDATED |
TRUE |
TIMESTAMP |
X |
Spring Cloud Task Framework 在應用程式啟動時建立該值。或者如果在任務之外建立記錄,則必須在記錄建立時填充此值。 |
EXTERNAL_EXECUTION_ID |
FALSE |
VARCHAR |
250 |
如果設定了 |
PARENT_TASK_EXECUTION_ID |
FALSE |
BIGINT |
X |
如果設定了 |
儲存任務執行使用的引數。
列名 | 必需 | 型別 | 欄位長度 |
---|---|---|---|
TASK_EXECUTION_ID |
TRUE |
BIGINT |
X |
TASK_PARAM |
FALSE |
VARCHAR |
2500 |
用於將任務執行連結到批處理執行。
列名 | 必需 | 型別 | 欄位長度 |
---|---|---|---|
TASK_EXECUTION_ID |
TRUE |
BIGINT |
X |
JOB_EXECUTION_ID |
TRUE |
BIGINT |
X |
用於此處討論的 single-instance-enabled
特性。
列名 | 必需 | 型別 | 欄位長度 | 備註 |
---|---|---|---|---|
LOCK_KEY |
TRUE |
CHAR |
36 |
此鎖的 UUID |
REGION |
TRUE |
VARCHAR |
100 |
使用者可以使用此欄位建立一組鎖。 |
CLIENT_ID |
TRUE |
CHAR |
36 |
包含要鎖定應用程式名稱的任務執行 ID。 |
CREATED_DATE |
TRUE |
DATETIME |
X |
建立條目的日期 |
每種資料庫型別的表設定 DDL 可以在此處找到。 |
7.1.2. SQL Server
預設情況下,Spring Cloud Task 使用序列表來確定 TASK_EXECUTION
表的 TASK_EXECUTION_ID
。但是,當在使用 SQL Server 時同時啟動多個任務,這可能會導致 TASK_SEQ
表發生死鎖。解決方法是刪除 TASK_EXECUTION_SEQ
表並建立一個同名的 sequence。例如:
DROP TABLE TASK_SEQ;
CREATE SEQUENCE [DBO].[TASK_SEQ] AS BIGINT
START WITH 1
INCREMENT BY 1;
將 START WITH 設定為您當前執行 ID 的更高值。 |
7.2. 構建本文件
本專案使用 Maven 生成本文件。要自己生成它,請執行以下命令:$ mvn clean install -DskipTests -P docs
。
7.3. 可觀測性元資料
7.3.1. 可觀測性 - 指標
下面您可以找到此專案宣告的所有指標列表。
活躍任務
圍繞任務執行建立的指標。
指標名稱 spring.cloud.task
(由約定類 org.springframework.cloud.task.listener.DefaultTaskExecutionObservationConvention
定義)。型別 timer
。
指標名稱 spring.cloud.task.active
(由約定類 org.springframework.cloud.task.listener.DefaultTaskExecutionObservationConvention
定義)。型別 long task timer
。
在開始觀測後新增的 KeyValue 可能不會出現在 *.active 指標中。 |
Micrometer 在內部使用 nanoseconds 作為基本單位。然而,每個後端決定實際的基本單位。(例如 Prometheus 使用秒) |
包含類 org.springframework.cloud.task.listener.TaskExecutionObservation
的完全限定名稱。
所有標籤必須以 spring.cloud.task 字首開頭! |
名稱 |
描述 |
|
CF 雲的應用程式 ID。 |
|
CF 雲的應用程式名稱。 |
|
CF 雲的應用程式版本。 |
|
CF 雲的例項索引。 |
|
CF 雲的組織名稱。 |
|
CF 雲的空間 ID。 |
|
CF 雲的空間名稱。 |
|
任務執行 ID。 |
|
任務退出碼。 |
|
任務的外部執行 ID。 |
|
任務名稱度量。 |
|
任務父執行 ID。 |
|
任務狀態。可以是 success 或 failure。 |
任務執行器觀測
任務執行器執行時建立的觀測。
指標名稱 spring.cloud.task.runner
(由約定類 org.springframework.cloud.task.configuration.observation.DefaultTaskObservationConvention
定義)。型別 timer
。
指標名稱 spring.cloud.task.runner.active
(由約定類 org.springframework.cloud.task.configuration.observation.DefaultTaskObservationConvention
定義)。型別 long task timer
。
在開始觀測後新增的 KeyValue 可能不會出現在 *.active 指標中。 |
Micrometer 在內部使用 nanoseconds 作為基本單位。然而,每個後端決定實際的基本單位。(例如 Prometheus 使用秒) |
包含類 org.springframework.cloud.task.configuration.observation.TaskDocumentedObservation
的完全限定名稱。
所有標籤必須以 spring.cloud.task 字首開頭! |
名稱 |
描述 |
|
由 Spring Cloud Task 執行的 Bean 的名稱。 |
7.3.2. 可觀測性 - Span
下面您可以找到此專案宣告的所有 span 列表。
活躍任務 Span
圍繞任務執行建立的指標。
Span 名稱 spring.cloud.task
(由約定類 org.springframework.cloud.task.listener.DefaultTaskExecutionObservationConvention
定義)。
包含類 org.springframework.cloud.task.listener.TaskExecutionObservation
的完全限定名稱。
所有標籤必須以 spring.cloud.task 字首開頭! |
名稱 |
描述 |
|
CF 雲的應用程式 ID。 |
|
CF 雲的應用程式名稱。 |
|
CF 雲的應用程式版本。 |
|
CF 雲的例項索引。 |
|
CF 雲的組織名稱。 |
|
CF 雲的空間 ID。 |
|
CF 雲的空間名稱。 |
|
任務執行 ID。 |
|
任務退出碼。 |
|
任務的外部執行 ID。 |
|
任務名稱度量。 |
|
任務父執行 ID。 |
|
任務狀態。可以是 success 或 failure。 |
任務執行器觀測 Span
任務執行器執行時建立的觀測。
Span 名稱 spring.cloud.task.runner
(由約定類 org.springframework.cloud.task.configuration.observation.DefaultTaskObservationConvention
定義)。
包含類 org.springframework.cloud.task.configuration.observation.TaskDocumentedObservation
的完全限定名稱。
所有標籤必須以 spring.cloud.task 字首開頭! |
名稱 |
描述 |
|
由 Spring Cloud Task 執行的 Bean 的名稱。 |