版本 3.0.4
© 2009-2022 VMware, Inc. 保留所有權利。
本文件可供您自己使用和分發給他人,前提是您不收取任何費用,並且無論以印刷品還是電子形式分發,每個副本都必須包含此版權宣告。
序言
1. 關於文件
Spring Cloud Task 參考指南提供 html、pdf 和 epub 格式。最新版本位於 docs.spring.io/spring-cloud-task/docs/current-SNAPSHOT/reference/html/。
本文件可供您自己使用和分發給他人,前提是您不收取任何費用,並且無論以印刷品還是電子形式分發,每個副本都必須包含此版權宣告。
2. 獲取幫助
使用 Spring Cloud Task 遇到問題了嗎?我們樂於提供幫助!
-
提問。我們監控 stackoverflow.com 上標記為
spring-cloud-task
的問題。 -
在 github.com/spring-cloud/spring-cloud-task/issues 報告 Spring Cloud Task 的錯誤。
Spring Cloud Task 的所有內容,包括文件,都是開源的。如果您發現文件有問題或只是想改進它們,請參與進來。 |
3. 入門
如果您剛開始接觸 Spring Cloud Task 或一般的 'Spring',我們建議閱讀入門章節。
從零開始入門,請閱讀以下章節
要按照教程進行操作,請閱讀開發您的第一個 Spring Cloud Task 應用
要執行您的示例,請閱讀執行示例
入門
如果您剛剛開始使用 Spring Cloud Task,應該閱讀本節。在這裡,我們回答基本的“是什麼?”、“如何做?”和“為什麼?”等問題。我們從 Spring Cloud Task 的溫和介紹開始。然後構建一個 Spring Cloud Task 應用,在此過程中討論一些核心原則。
4. Spring Cloud Task 簡介
Spring Cloud Task 使得建立短生命週期微服務變得容易。它提供了按需在生產環境中執行短生命週期 JVM 程序的功能。
5. 系統要求
您需要安裝 Java(Java 17 或更高版本)。要構建,您還需要安裝 Maven。
5.1. 資料庫要求
Spring Cloud Task 使用關係型資料庫儲存已執行任務的結果。雖然您可以在沒有資料庫的情況下開始開發任務(任務狀態會作為任務倉庫更新的一部分被記錄到日誌中),但在生產環境中,您會希望使用受支援的資料庫。Spring Cloud Task 目前支援以下資料庫
-
DB2
-
H2
-
HSQLDB
-
MySql
-
Oracle
-
Postgres
-
SqlServer
6. 開發您的第一個 Spring Cloud Task 應用
一個好的開始是使用一個簡單的“Hello, World!”應用,因此我們建立一個等效的 Spring Cloud Task 應用來突出框架的特性。大多數 IDE 對 Apache Maven 有良好的支援,因此我們將其用作本專案構建工具。
spring.io 網站包含許多使用 Spring Boot 的“入門 ”指南。如果您需要解決特定問題,請先在那裡查詢。您可以透過訪問Spring Initializr並建立新專案來簡化以下步驟。這樣做會自動生成一個新的專案結構,以便您可以立即開始編碼。我們建議嘗試使用 Spring Initializr 來熟悉它。 |
6.1. 使用 Spring Initializr 建立 Spring Task 專案
現在我們可以建立一個並測試一個將 Hello, World!
列印到控制檯的應用。
為此
-
訪問Spring Initialzr 網站。
-
建立一個新的 Maven 專案,Group 名稱為
io.spring.demo
,Artifact 名稱為helloworld
。 -
在 Dependencies 文字框中,輸入
task
,然後選擇Cloud Task
依賴項。 -
在 Dependencies 文字框中,輸入
jdbc
,然後選擇JDBC
依賴項。 -
在 Dependencies 文字框中,輸入
h2
,然後選擇H2
。(或您喜歡的資料庫) -
點選 Generate Project 按鈕
-
-
解壓 helloworld.zip 檔案,並將專案匯入您喜歡的 IDE。
6.2. 編寫程式碼
為了完成我們的應用,我們需要用以下內容更新生成的 HelloworldApplication
,以便它能啟動一個任務。
package io.spring.Helloworld;
@SpringBootApplication
@EnableTask
public class HelloworldApplication {
@Bean
public ApplicationRunner applicationRunner() {
return new HelloWorldApplicationRunner();
}
public static void main(String[] args) {
SpringApplication.run(HelloworldApplication.class, args);
}
public static class HelloWorldApplicationRunner implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
System.out.println("Hello, World!");
}
}
}
雖然看起來很小,但其中涉及不少內容。有關 Spring Boot 特定內容的更多資訊,請參閱Spring Boot 參考文件。
現在我們可以開啟 src/main/resources
目錄下的 application.properties
檔案。我們需要在 application.properties
中配置兩個屬性
-
application.name
: 設定應用名稱(它會被轉換成任務名稱) -
logging.level
: 將 Spring Cloud Task 的日誌級別設定為DEBUG
,以便檢視正在發生的情況。
以下示例展示瞭如何同時進行這兩項配置
logging.level.org.springframework.cloud.task=DEBUG
spring.application.name=helloWorld
6.2.1. 任務自動配置
當包含 Spring Cloud Task Starter 依賴項時,Task 會自動配置所有 bean 來啟動其功能。此配置的一部分是註冊 TaskRepository
及其使用所需的基礎設施。
在我們的示例中,TaskRepository
使用嵌入式 H2 資料庫記錄任務結果。對於生產環境來說,嵌入式 H2 資料庫不是一個實際的解決方案,因為任務結束後 H2 DB 就會消失。然而,對於快速入門體驗,我們可以在示例中使用它,並將倉庫中的更新資訊回顯到日誌中。在本文件後面的配置章節中,我們將介紹如何定製 Spring Cloud Task 提供的各個部分的配置。
當我們的示例應用執行時,Spring Boot 會啟動我們的 HelloWorldCommandLineRunner
並將“Hello, World!”訊息輸出到標準輸出。TaskLifecycleListener
在倉庫中記錄任務的開始和結束。
6.2.2. main 方法
main 方法是任何 Java 應用的入口點。我們的 main 方法委託給 Spring Boot 的SpringApplication 類。
6.2.3. ApplicationRunner
Spring 包含多種啟動應用邏輯的方式。Spring Boot 透過其 *Runner
介面(CommandLineRunner
或 ApplicationRunner
)提供了一種方便且有條理的方式來做到這一點。一個表現良好的任務可以使用這兩個 runner 中的一個來啟動任何邏輯。
任務的生命週期被認為是從 *Runner#run
方法執行之前開始,直到它們全部完成後結束。Spring Boot 允許一個應用使用多個 *Runner
實現,Spring Cloud Task 也是如此。
任何不是透過 CommandLineRunner 或 ApplicationRunner 啟動的機制(例如使用 InitializingBean#afterPropertiesSet )啟動的處理都不會被 Spring Cloud Task 記錄。 |
6.3. 執行示例
此時,我們的應用應該可以工作了。由於此應用基於 Spring Boot,我們可以從應用根目錄使用命令列命令 $ mvn spring-boot:run
來執行它,如下例所示(及其輸出)
$ mvn clean spring-boot:run
....... . . .
....... . . . (Maven log output here)
....... . . .
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.0.3.RELEASE)
2018-07-23 17:44:34.426 INFO 1978 --- [ main] i.s.d.helloworld.HelloworldApplication : Starting HelloworldApplication on Glenns-MBP-2.attlocal.net with PID 1978 (/Users/glennrenfro/project/helloworld/target/classes started by glennrenfro in /Users/glennrenfro/project/helloworld)
2018-07-23 17:44:34.430 INFO 1978 --- [ main] i.s.d.helloworld.HelloworldApplication : No active profile set, falling back to default profiles: default
2018-07-23 17:44:34.472 INFO 1978 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@1d24f32d: startup date [Mon Jul 23 17:44:34 EDT 2018]; root of context hierarchy
2018-07-23 17:44:35.280 INFO 1978 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...
2018-07-23 17:44:35.410 INFO 1978 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
2018-07-23 17:44:35.419 DEBUG 1978 --- [ main] o.s.c.t.c.SimpleTaskConfiguration : Using org.springframework.cloud.task.configuration.DefaultTaskConfigurer TaskConfigurer
2018-07-23 17:44:35.420 DEBUG 1978 --- [ main] o.s.c.t.c.DefaultTaskConfigurer : No EntityManager was found, using DataSourceTransactionManager
2018-07-23 17:44:35.522 DEBUG 1978 --- [ main] o.s.c.t.r.s.TaskRepositoryInitializer : Initializing task schema for h2 database
2018-07-23 17:44:35.525 INFO 1978 --- [ main] o.s.jdbc.datasource.init.ScriptUtils : Executing SQL script from class path resource [org/springframework/cloud/task/schema-h2.sql]
2018-07-23 17:44:35.558 INFO 1978 --- [ main] o.s.jdbc.datasource.init.ScriptUtils : Executed SQL script from class path resource [org/springframework/cloud/task/schema-h2.sql] in 33 ms.
2018-07-23 17:44:35.728 INFO 1978 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Registering beans for JMX exposure on startup
2018-07-23 17:44:35.730 INFO 1978 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Bean with name 'dataSource' has been autodetected for JMX exposure
2018-07-23 17:44:35.733 INFO 1978 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Located MBean 'dataSource': registering with JMX server as MBean [com.zaxxer.hikari:name=dataSource,type=HikariDataSource]
2018-07-23 17:44:35.738 INFO 1978 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 0
2018-07-23 17:44:35.762 DEBUG 1978 --- [ main] o.s.c.t.r.support.SimpleTaskRepository : Creating: TaskExecution{executionId=0, parentExecutionId=null, exitCode=null, taskName='application', startTime=Mon Jul 23 17:44:35 EDT 2018, endTime=null, exitMessage='null', externalExecutionId='null', errorMessage='null', arguments=[]}
2018-07-23 17:44:35.772 INFO 1978 --- [ main] i.s.d.helloworld.HelloworldApplication : Started HelloworldApplication in 1.625 seconds (JVM running for 4.764)
Hello, World!
2018-07-23 17:44:35.782 DEBUG 1978 --- [ main] o.s.c.t.r.support.SimpleTaskRepository : Updating: TaskExecution with executionId=1 with the following {exitCode=0, endTime=Mon Jul 23 17:44:35 EDT 2018, exitMessage='null', errorMessage='null'}
前面的輸出中有三行我們感興趣的內容
-
SimpleTaskRepository
記錄了在TaskRepository
中條目的建立。 -
我們的
CommandLineRunner
的執行,由“Hello, World!”輸出展示。 -
SimpleTaskRepository
記錄了任務在TaskRepository
中的完成。
Spring Cloud Task 專案的 samples 模組中可以找到一個簡單的任務應用,這裡。 |
特性
本節更詳細地介紹了 Spring Cloud Task,包括如何使用它、如何配置它以及適當的擴充套件點。
7. Spring Cloud Task 的生命週期
在大多數情況下,現代雲環境圍繞著預期不會結束的程序執行進行設計。如果它們結束了,通常會被重新啟動。雖然大多數平臺確實有某種方式來執行結束後不會重新啟動的程序,但該執行的結果通常不會以可消費的方式維護。Spring Cloud Task 提供了在一個環境中執行短生命週期程序並記錄結果的能力。這樣做允許構建圍繞短生命週期程序以及透過訊息整合任務來實現的長生命週期服務的微服務架構。
雖然此功能在雲環境中很有用,但在傳統部署模型中也可能出現同樣的問題。當使用 cron 等排程器執行 Spring Boot 應用時,能夠在應用完成後監控其結果會很有用。
Spring Cloud Task 採取了一種方法,即 Spring Boot 應用可以有開始和結束,並且仍然是成功的。批處理應用就是預期會結束(且通常是短生命週期)的程序如何發揮作用的一個例子。
Spring Cloud Task 記錄給定任務的生命週期事件。大多數長生命週期程序,典型的如大多數 Web 應用,不儲存其生命週期事件。Spring Cloud Task 核心的任務則會儲存。
生命週期由一次任務執行組成。這是一次配置為任務(即,包含 Spring Cloud Task 依賴項)的 Spring Boot 應用的物理執行。
在任務開始時,在任何 CommandLineRunner
或 ApplicationRunner
實現執行之前,會在 TaskRepository
中建立一個記錄開始事件的條目。此事件是透過 Spring Framework 觸發的 SmartLifecycle#start
來觸發的。這表示系統中的所有 bean 已準備就緒,並且發生在執行 Spring Boot 提供的任何 CommandLineRunner
或 ApplicationRunner
實現之前。
任務的記錄僅在 ApplicationContext 成功啟動後發生。如果上下文根本無法啟動,任務的執行將不會被記錄。 |
在 Spring Boot 的所有 *Runner#run
呼叫完成後,或者 ApplicationContext
失敗時(由 ApplicationFailedEvent
指示),任務執行結果會在倉庫中更新。
如果應用要求在任務完成後(所有 *Runner#run 方法都被呼叫並且任務倉庫已更新)關閉 ApplicationContext ,請將屬性 spring.cloud.task.closecontextEnabled 設定為 true。 |
7.1. TaskExecution
儲存在 TaskRepository
中的資訊在 TaskExecution
類中建模,包含以下資訊
欄位 | 描述 |
---|---|
|
任務執行的唯一 ID。 |
|
由 |
|
任務名稱,由配置的 |
|
任務啟動的時間,由 |
|
任務完成的時間,由 |
|
退出時可用的任何資訊。這可以透過 |
|
如果異常是任務結束的原因(由 |
|
字串命令列引數的 |
7.2. 對映退出碼
任務完成時,它會嘗試向作業系統返回一個退出碼。如果我們檢視我們的原始示例,我們可以看到我們沒有控制應用的這一方面。因此,如果丟擲異常,JVM 返回的程式碼可能對您的除錯有用,也可能沒用。
因此,Spring Boot 提供了一個介面 ExitCodeExceptionMapper
,允許您將未捕獲的異常對映到退出碼。這樣做可以讓您在退出碼級別指示出了什麼問題。此外,透過這種方式對映退出碼,Spring Cloud Task 會記錄返回的退出碼。
如果任務因 SIG-INT 或 SIG-TERM 終止,除非程式碼中另有指定,否則退出碼為零。
任務執行時,退出碼在倉庫中儲存為 null。任務完成後,根據本節前面描述的指南儲存適當的退出碼。 |
8. 配置
Spring Cloud Task 提供了開箱即用的配置,在 DefaultTaskConfigurer
和 SimpleTaskConfiguration
類中定義。本節將介紹預設配置以及如何根據您的需求定製 Spring Cloud Task。
8.1. DataSource
Spring Cloud Task 使用一個數據源來儲存任務執行的結果。預設情況下,我們提供一個記憶體中的 H2 例項,以提供一種簡單的開發引導方法。然而,在生產環境中,您可能希望配置自己的 DataSource
。
如果您的應用只使用一個 DataSource
,並且它同時作為您的業務 schema 和任務倉庫,您只需要提供任何 DataSource
(最簡單的方法是透過 Spring Boot 的配置約定)。Spring Cloud Task 會自動將此 DataSource
用於倉庫。
如果您的應用使用多個 DataSource
,您需要使用適當的 DataSource
配置任務倉庫。此定製可以透過實現 TaskConfigurer
來完成。
8.2. 表字首
TaskRepository
的一個可修改屬性是任務表的表字首。預設情況下,它們都以 TASK_
為字首。TASK_EXECUTION
和 TASK_EXECUTION_PARAMS
是兩個示例。然而,修改此字首可能有一些原因。如果需要在表名前新增 schema 名稱,或者在同一 schema 內需要多組任務表,您必須更改表字首。您可以透過將 spring.cloud.task.tablePrefix
設定為您需要的字首來做到這一點,如下所示
spring.cloud.task.tablePrefix=yourPrefix
透過使用 spring.cloud.task.tablePrefix
,使用者承擔建立滿足任務表 schema 標準但根據使用者業務需求進行修改的任務表的責任。在建立自己的任務 DDL 時,可以將 Spring Cloud Task Schema DDL 作為參考,如這裡所示。
8.3. 啟用/停用表初始化
如果您自己建立了任務表,並且不希望 Spring Cloud Task 在任務啟動時建立它們,請將 spring.cloud.task.initialize-enabled
屬性設定為 false
,如下所示
spring.cloud.task.initialize-enabled=false
它預設為 true
。
屬性 spring.cloud.task.initialize.enable 已棄用。 |
8.4. 外部生成的任務 ID
在某些情況下,您可能希望考慮任務被請求與基礎設施實際啟動它之間的時間差。Spring Cloud Task 允許您在任務被請求時建立一個 TaskExecution
。然後將生成的 TaskExecution
的執行 ID 傳遞給任務,以便它可以在任務的生命週期中更新 TaskExecution
。
可以透過在引用儲存 TaskExecution
物件的 datastore 的 TaskRepository
實現上呼叫 createTaskExecution
方法來建立一個 TaskExecution
。
為了配置您的任務以使用生成的 TaskExecutionId
,請新增以下屬性
spring.cloud.task.executionid=yourtaskId
8.5. 外部任務 Id
Spring Cloud Task 允許您為每個 TaskExecution
儲存一個外部任務 ID。為了配置您的任務以使用生成的 TaskExecutionId
,請新增以下屬性
spring.cloud.task.external-execution-id=<externalTaskId>
8.6. 父任務 Id
Spring Cloud Task 允許您為每個 TaskExecution
儲存一個父任務 ID。例如,一個任務執行另一個或多個任務,並且您想記錄哪個任務啟動了每個子任務。為了配置您的任務以設定父 TaskExecutionId
,請在子任務上新增以下屬性
spring.cloud.task.parent-execution-id=<parentExecutionTaskId>
8.7. TaskConfigurer
TaskConfigurer
是一個策略介面,允許您定製 Spring Cloud Task 元件的配置方式。預設情況下,我們提供 DefaultTaskConfigurer
,它提供邏輯預設值:基於 Map
的記憶體元件(如果未提供 DataSource
,則對開發有用)和基於 JDBC 的元件(如果提供了 DataSource
,則有用)。
TaskConfigurer
允許您配置三個主要元件
元件 | 描述 | 預設值(由 DefaultTaskConfigurer 提供) |
---|---|---|
|
要使用的 |
|
|
要使用的 |
|
|
用於執行任務更新時的事務管理器。 |
如果使用了 |
您可以透過建立 TaskConfigurer
介面的自定義實現來定製前面表格中描述的任何元件。通常,繼承 DefaultTaskConfigurer
(如果在找不到 TaskConfigurer
時提供)並覆蓋所需的 getter 就足夠了。然而,可能需要從頭開始實現自己的。
使用者不應直接使用 TaskConfigurer 的 getter 方法,除非他們使用它來提供作為 Spring Bean 暴露的實現。 |
8.8. 任務執行監聽器
TaskExecutionListener
允許您為任務生命週期期間發生的特定事件註冊監聽器。為此,建立一個實現 TaskExecutionListener
介面的類。實現 TaskExecutionListener
介面的類會收到以下事件的通知
-
onTaskStartup
: 在將TaskExecution
儲存到TaskRepository
之前。 -
onTaskEnd
: 在更新TaskRepository
中的TaskExecution
條目並標記任務最終狀態之前。 -
onTaskFailed
: 在任務丟擲未處理異常時呼叫onTaskEnd
方法之前。
Spring Cloud Task 還允許您使用以下方法註解將 TaskExecution
監聽器新增到 bean 中的方法
-
@BeforeTask
: 在將TaskExecution
儲存到TaskRepository
之前 -
@AfterTask
: 在更新TaskRepository
中的TaskExecution
條目並標記任務最終狀態之前。 -
@FailedTask
: 在任務丟擲未處理異常時呼叫@AfterTask
方法之前。
以下示例展示了這三個註解的使用
public class MyBean {
@BeforeTask
public void methodA(TaskExecution taskExecution) {
}
@AfterTask
public void methodB(TaskExecution taskExecution) {
}
@FailedTask
public void methodC(TaskExecution taskExecution, Throwable throwable) {
}
}
在 TaskLifecycleListener 之前插入 ApplicationListener 可能會導致意想不到的效果。 |
8.8.1. 任務執行監聽器丟擲的異常
如果 TaskExecutionListener
事件處理器丟擲異常,則該事件處理器的所有監聽處理都會停止。例如,如果三個 onTaskStartup
監聽器已啟動,並且第一個 onTaskStartup
事件處理器丟擲異常,則不會呼叫其他兩個 onTaskStartup
方法。然而,TaskExecutionListeners
的其他事件處理器(onTaskEnd
和 onTaskFailed
)仍會被呼叫。
當 TaskExecutionListener
事件處理器丟擲異常時返回的退出碼是 ExitCodeEvent 報告的退出碼。如果沒有發出 ExitCodeEvent
,則評估丟擲的異常,檢視其是否屬於 ExitCodeGenerator 型別。如果是,則返回 ExitCodeGenerator
的退出碼。否則,返回 1
。
如果在 onTaskStartup
方法中丟擲異常,應用的退出碼將是 1
。如果在 onTaskEnd
或 onTaskFailed
方法中丟擲異常,應用的退出碼將是使用上述規則確定的那個。
如果在 onTaskStartup 、onTaskEnd 或 onTaskFailed 中丟擲異常,您不能使用 ExitCodeExceptionMapper 覆蓋應用的退出碼。 |
8.8.2. 退出訊息
您可以使用 TaskExecutionListener
以程式設計方式設定任務的退出訊息。這透過設定 TaskExecution
的 exitMessage
來完成,該 exitMessage
隨後傳遞給 TaskExecutionListener
。以下示例顯示了一個使用 @AfterTask
ExecutionListener
註解的方法
@AfterTask
public void afterMe(TaskExecution taskExecution) {
taskExecution.setExitMessage("AFTER EXIT MESSAGE");
}
可以在任何監聽器事件(onTaskStartup
、onTaskFailed
和 onTaskEnd
)中設定 ExitMessage
。三個監聽器的優先順序如下
-
onTaskEnd
-
onTaskFailed
-
onTaskStartup
例如,如果您為 onTaskStartup
和 onTaskFailed
監聽器設定了 exitMessage
,並且任務在沒有失敗的情況下結束,則 onTaskStartup
中的 exitMessage
將儲存在倉庫中。否則,如果發生失敗,則儲存 onTaskFailed
中的 exitMessage
。此外,如果您使用 onTaskEnd
監聽器設定了 exitMessage
,則 onTaskEnd
中的 exitMessage
會覆蓋 onTaskStartup
和 onTaskFailed
中的 exit messages。
8.9. 限制 Spring Cloud Task 例項
Spring Cloud Task 允許您設定具有給定任務名稱的任務一次只能執行一個例項。為此,您需要設定 任務名稱 併為每個任務執行設定 spring.cloud.task.single-instance-enabled=true
。當第一個任務執行正在執行時,任何其他嘗試執行具有相同 任務名稱 和 spring.cloud.task.single-instance-enabled=true
的任務都會失敗,並顯示以下錯誤訊息:Task with name "application" is already running.
spring.cloud.task.single-instance-enabled
的預設值為 false
。以下示例展示瞭如何將 spring.cloud.task.single-instance-enabled
設定為 true
。
spring.cloud.task.single-instance-enabled=true or false
要使用此功能,您必須將以下 Spring Integration 依賴項新增到您的應用程式中
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jdbc</artifactId>
</dependency>
如果任務由於此功能被啟用且另一個同名任務正在執行而失敗,應用程式的退出碼將為 1。 |
8.9.1. Spring AOT 和原生編譯的單例項用法
在建立原生編譯應用程式時使用 Spring Cloud Task 的單例項功能,您需要在構建時啟用此功能。為此,請新增 process-aot execution 並將 spring.cloud.task.single-step-instance-enabled=true
作為 JVM 引數設定,如下所示
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<id>process-aot</id>
<goals>
<goal>process-aot</goal>
</goals>
<configuration>
<jvmArguments>
-Dspring.cloud.task.single-instance-enabled=true
</jvmArguments>
</configuration>
</execution>
</executions>
</plugin>
8.10. 為 ApplicationRunner 和 CommandLineRunner 啟用觀測
要為 ApplicationRunner
或 CommandLineRunner
啟用任務觀測,請將 spring.cloud.task.observation.enabled
設定為 true。
一個使用 SimpleMeterRegistry
啟用觀測功能的示例任務應用程式可以在這裡找到。
8.11. 停用 Spring Cloud Task 自動配置
在某些實現中,如果不需要 Spring Cloud Task 自動配置,您可以停用任務的自動配置。可以透過將以下註解新增到您的任務應用程式來實現這一點
@EnableAutoConfiguration(exclude={SimpleTaskAutoConfiguration.class})
您也可以透過將 spring.cloud.task.autoconfiguration.enabled
屬性設定為 false
來停用任務自動配置。
8.12. 關閉上下文
如果應用程式需要在任務完成時關閉 ApplicationContext
(所有 *Runner#run
方法已被呼叫且任務倉庫已更新),請將屬性 spring.cloud.task.closecontextEnabled
設定為 true
。
另一種需要關閉上下文的情況是,任務執行完成但應用程式並未終止。在這些情況下,上下文之所以保持開啟是因為分配了一個執行緒(例如:如果您正在使用一個 TaskExecutor
)。在這些情況下,當啟動任務時,將 spring.cloud.task.closecontextEnabled
屬性設定為 true
。這將在任務完成後關閉應用程式的上下文。從而允許應用程式終止。
8.13. 啟用任務指標
Spring Cloud Task 與 Micrometer 整合,併為其執行的任務建立觀測。要啟用任務可觀測性整合,您必須將 spring-boot-starter-actuator
、您首選的登錄檔實現(如果您想釋出指標)以及 micrometer-tracing(如果您想釋出跟蹤資料)新增到您的任務應用程式中。一個使用 Influx 啟用任務可觀測性和指標的 Maven 依賴項示例是
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-influx</artifactId>
<scope>runtime</scope>
</dependency>
8.14. Spring Task 和 Spring Cloud Task 屬性
術語 task
在行業中是一個常用詞。例如,Spring Boot 提供了 spring.task
,而 Spring Cloud Task 提供了 spring.cloud.task
屬性。這在過去引起了一些混淆,認為這兩組屬性直接相關。然而,它們代表了 Spring 生態系統中提供的兩組不同的功能。
-
spring.task
指的是配置ThreadPoolTaskScheduler
的屬性。 -
spring.cloud.task
指的是配置 Spring Cloud Task 功能的屬性。
批處理
本節將更詳細地介紹 Spring Cloud Task 與 Spring Batch 的整合。本節涵蓋了如何跟蹤作業執行與其執行的任務之間的關聯,以及如何透過 Spring Cloud Deployer 進行遠端分割槽。
9. 將作業執行關聯到執行它的任務
Spring Boot 提供了在 über-jar 中執行批處理作業的功能。Spring Boot 對此功能的支援允許開發人員在該執行中執行多個批處理作業。Spring Cloud Task 提供了將作業的執行(即作業執行)與任務的執行關聯起來的能力,以便可以相互追溯。
Spring Cloud Task 透過使用 TaskBatchExecutionListener
實現此功能。預設情況下,此監聽器在任何同時配置了 Spring Batch Job(透過在上下文中定義型別為 Job
的 bean)並且類路徑中有 spring-cloud-task-batch
jar 的上下文中自動配置。該監聽器會被注入到所有符合這些條件的作業中。
9.1. 覆蓋 TaskBatchExecutionListener
為了防止該監聽器被注入到當前上下文中的任何批處理作業中,您可以使用標準的 Spring Boot 機制停用自動配置。
要只將該監聽器注入到上下文中的特定作業中,請覆蓋 batchTaskExecutionListenerBeanPostProcessor
並提供作業 bean ID 列表,如下例所示
public static TaskBatchExecutionListenerBeanPostProcessor batchTaskExecutionListenerBeanPostProcessor() {
TaskBatchExecutionListenerBeanPostProcessor postProcessor =
new TaskBatchExecutionListenerBeanPostProcessor();
postProcessor.setJobNames(Arrays.asList(new String[] {"job1", "job2"}));
return postProcessor;
}
您可以在 Spring Cloud Task 專案的 samples 模組中找到一個示例批處理應用程式,連結在這裡。 |
10. 遠端分割槽
Spring Cloud Deployer 提供了在大多數雲基礎設施上啟動基於 Spring Boot 的應用程式的功能。DeployerPartitionHandler
和 DeployerStepExecutionHandler
將工作步驟執行的啟動委託給 Spring Cloud Deployer。
要配置 DeployerStepExecutionHandler
,您必須提供一個代表要執行的 Spring Boot über-jar 的 Resource
,一個 TaskLauncherHandler
和一個 JobExplorer
。您可以配置任何環境變數屬性,以及同時執行的最大工作程序數、輪詢結果的間隔(預設為 10 秒)以及超時時間(預設為 -1 或無超時)。以下示例展示瞭如何配置此 PartitionHandler
。
@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher,
JobExplorer jobExplorer) throws Exception {
MavenProperties mavenProperties = new MavenProperties();
mavenProperties.setRemoteRepositories(new HashMap<>(Collections.singletonMap("springRepo",
new MavenProperties.RemoteRepository(repository))));
Resource resource =
MavenResource.parse(String.format("%s:%s:%s",
"io.spring.cloud",
"partitioned-batch-job",
"1.1.0.RELEASE"), mavenProperties);
DeployerPartitionHandler partitionHandler =
new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep");
List<String> commandLineArgs = new ArrayList<>(3);
commandLineArgs.add("--spring.profiles.active=worker");
commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
commandLineArgs.add("--spring.batch.initializer.enabled=false");
partitionHandler.setCommandLineArgsProvider(
new PassThroughCommandLineArgsProvider(commandLineArgs));
partitionHandler.setEnvironmentVariablesProvider(new NoOpEnvironmentVariablesProvider());
partitionHandler.setMaxWorkers(2);
partitionHandler.setApplicationName("PartitionedBatchJobTask");
return partitionHandler;
}
在向分割槽傳遞環境變數時,每個分割槽可能位於不同的機器上,具有不同的環境設定。因此,您應該只傳遞必需的環境變數。 |
請注意,在上面的示例中,我們將最大工作程序數設定為 2。設定最大工作程序數確定了應同時執行的最大分割槽數。
要執行的 Resource
預期是一個 Spring Boot über-jar,其中在當前上下文中將 DeployerStepExecutionHandler
配置為 CommandLineRunner
。前面示例中列出的倉庫應該是 uber-jar 所在的遠端倉庫。管理器和工作程序都應該能夠訪問用作作業倉庫和任務倉庫的相同資料儲存。一旦底層基礎設施引導了 Spring Boot jar 並且 Spring Boot 啟動了 DeployerStepExecutionHandler
,步驟處理器就會執行請求的 Step
。以下示例展示瞭如何配置 DeployerStepExecutionHandler
。
@Bean
public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) {
DeployerStepExecutionHandler handler =
new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository);
return handler;
}
您可以在 Spring Cloud Task 專案的 samples 模組中找到一個示例遠端分割槽應用程式,連結在這裡。 |
10.1. 非同步啟動遠端批處理分割槽
預設情況下,批處理分割槽是按順序啟動的。然而,在某些情況下,這可能會影響效能,因為每次啟動都會阻塞直到資源(例如:在 Kubernetes 中 provision 一個 pod)被 provision 完成。在這些情況下,您可以向 DeployerPartitionHandler
提供一個 ThreadPoolTaskExecutor
。這將根據 ThreadPoolTaskExecutor
的配置啟動遠端批處理分割槽。例如
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setThreadNamePrefix("default_task_executor_thread");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer,
TaskRepository taskRepository, ThreadPoolTaskExecutor executor) throws Exception {
Resource resource = this.resourceLoader
.getResource("maven://io.spring.cloud:partitioned-batch-job:2.2.0.BUILD-SNAPSHOT");
DeployerPartitionHandler partitionHandler =
new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
"workerStep", taskRepository, executor);
...
}
由於使用 ThreadPoolTaskExecutor 會留下一個活動執行緒,應用程式將不會終止,因此我們需要關閉上下文。為了正確關閉應用程式,我們需要將 spring.cloud.task.closecontextEnabled 屬性設定為 true 。 |
10.2. 關於為 Kubernetes 平臺開發批處理分割槽應用程式的注意事項
-
在 Kubernetes 平臺上部署分割槽應用程式時,您必須使用 Spring Cloud Kubernetes Deployer 的以下依賴項
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-deployer-kubernetes</artifactId> </dependency>
-
任務應用程式及其分割槽的應用程式名稱需要遵循以下正則表示式模式:
[a-z0-9]([-a-z0-9]*[a-z0-9])
。否則,會丟擲異常。
11. 批處理資訊訊息
Spring Cloud Task 提供了批處理作業發出資訊訊息的能力。“Spring Batch 事件”部分詳細介紹了此功能。
12. 批處理作業退出碼
正如之前討論的,Spring Cloud Task 應用程式支援記錄任務執行的退出碼。然而,在任務中執行 Spring Batch Job 的情況下,無論 Batch Job Execution 如何完成,使用預設的 Batch/Boot 行為時,任務的結果總是零。請記住,任務是一個 boot 應用程式,從任務返回的退出碼與 boot 應用程式的退出碼相同。要覆蓋此行為並允許任務在批處理作業返回 FAILED
的 BatchStatus 時返回非零退出碼,請將 spring.cloud.task.batch.fail-on-job-failure
設定為 true
。此時,退出碼可以是 1(預設值),也可以基於指定的 ExitCodeGenerator
。
此功能使用一個新的 ApplicationRunner
,它替換了 Spring Boot 提供的那個。預設情況下,它配置了相同的順序。但是,如果您想自定義 ApplicationRunner
執行的順序,可以透過設定 spring.cloud.task.batch.applicationRunnerOrder
屬性來設定其順序。要讓您的任務根據批處理作業執行的結果返回退出碼,您需要編寫自己的 CommandLineRunner
。
單步批處理作業 Starter
本節介紹如何使用 Spring Cloud Task 中包含的 starter 開發具有單個 Step
的 Spring Batch Job
。此 starter 允許您使用配置來定義 ItemReader
、ItemWriter
或一個完整的單步 Spring Batch Job
。有關 Spring Batch 及其功能的更多資訊,請參閱Spring Batch 文件。
要獲取 Maven 的 starter,請將以下內容新增到您的構建中
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-single-step-batch-job</artifactId>
<version>2.3.0</version>
</dependency>
要獲取 Gradle 的 starter,請將以下內容新增到您的構建中
compile "org.springframework.cloud:spring-cloud-starter-single-step-batch-job:2.3.0"
13. 定義作業
您可以使用此 starter 定義最少的(例如 ItemReader
或 ItemWriter
)或最多的(例如完整的 Job
)。在本節中,我們定義了配置 Job
所需的屬性。
13.1. 屬性
首先,此 starter 提供了一組屬性,允許您配置包含一個 Step 的 Job 的基本資訊
屬性 | 型別 | 預設值 | 描述 |
---|---|---|---|
|
|
|
作業的名稱。 |
|
|
|
步驟的名稱。 |
|
|
|
每事務處理的項數。 |
配置了上述屬性後,您將擁有一個包含單個基於 chunk 的步驟的作業。此基於 chunk 的步驟將 Map<String, Object>
例項作為項讀取、處理和寫入。然而,此步驟尚無任何實際操作。您需要配置一個 ItemReader
、一個可選的 ItemProcessor
和一個 ItemWriter
來使其執行某些操作。要配置其中之一,您可以使用屬性並配置已提供自動配置的選項之一,或者使用標準的 Spring 配置機制配置您自己的。
如果您自行配置,輸入和輸出型別必須與步驟中的其他型別匹配。此 starter 中的 ItemReader 實現和 ItemWriter 實現都使用 Map<String, Object> 作為輸入和輸出項。 |
14. ItemReader 實現的自動配置
此 starter 為四種不同的 ItemReader
實現提供了自動配置:AmqpItemReader
、FlatFileItemReader
、JdbcCursorItemReader
和 KafkaItemReader
。在本節中,我們將概述如何使用提供的自動配置來配置這些實現。
14.1. AmqpItemReader
您可以使用 AmqpItemReader
透過 AMQP 從佇列或主題讀取資料。此 ItemReader
實現的自動配置依賴於兩組配置。第一組是 AmqpTemplate
的配置。您可以自行配置,也可以使用 Spring Boot 提供的自動配置。請參閱Spring Boot AMQP 文件。配置好 AmqpTemplate
後,您可以透過設定以下屬性來啟用支援其的批處理功能。
屬性 | 型別 | 預設值 | 描述 |
---|---|---|---|
|
|
|
如果為 |
|
|
|
指示是否應註冊 |
有關更多資訊,請參閱AmqpItemReader
文件。
14.2. FlatFileItemReader
FlatFileItemReader
允許您從平面檔案(例如 CSV 和其他檔案格式)讀取資料。要從檔案讀取,您可以透過普通的 Spring 配置(LineTokenizer
、RecordSeparatorPolicy
、FieldSetMapper
、LineMapper
或 SkippedLinesCallback
)自行提供一些元件。您也可以使用以下屬性來配置讀取器
屬性 | 型別 | 預設值 | 描述 |
---|---|---|---|
|
|
|
確定是否應為重啟儲存狀態。 |
|
|
|
在 |
|
|
|
從檔案讀取的最大項數。 |
|
|
0 |
已讀取的項數。用於重啟。 |
|
|
|
檔案中指示註釋行(要忽略的行)的字串列表。 |
|
|
|
要讀取的資源。 |
|
|
|
如果設定為 |
|
|
|
讀取檔案時使用的編碼。 |
|
|
0 |
指示在檔案開頭要跳過的行數。 |
|
|
|
指示檔案是否為分隔檔案(CSV 和其他格式)。此屬性或 |
|
|
|
如果讀取分隔檔案,指示用於解析的分隔符。 |
|
|
|
用於確定引用值的字元。 |
|
|
|
用於確定記錄中哪些欄位應包含在項中的索引列表。 |
|
|
|
指示檔案的記錄是否按列號解析。此屬性或 |
|
|
|
用於解析固定寬度記錄的列範圍列表。請參閱Range 文件。 |
|
|
|
從記錄解析出的每個欄位的名稱列表。這些名稱是此 |
|
|
|
如果設定為 |
14.3. JdbcCursorItemReader
JdbcCursorItemReader
對關係資料庫執行查詢,並遍歷結果遊標(ResultSet
)以提供結果項。此自動配置允許您提供 PreparedStatementSetter
、RowMapper
或兩者。您也可以使用以下屬性來配置 JdbcCursorItemReader
。
屬性 | 型別 | 預設值 | 描述 |
---|---|---|---|
|
|
|
確定是否應為重啟儲存狀態。 |
|
|
|
在 |
|
|
|
從檔案讀取的最大項數。 |
|
|
0 |
已讀取的項數。用於重啟。 |
|
|
對驅動程式的提示,指示每次呼叫資料庫系統時檢索多少條記錄。為了獲得最佳效能,通常希望將其設定為與 chunk size 匹配。 |
|
|
|
從資料庫讀取的最大項數。 |
|
|
|
查詢超時前的毫秒數。 |
|
|
|
|
確定讀取器在處理時是否應忽略 SQL 警告。 |
|
|
|
指示在每次讀取後是否應驗證遊標位置,以確認 |
|
|
|
指示驅動程式是否支援遊標的絕對定位。 |
|
|
|
指示連線是否與其他處理共享(因此是事務的一部分)。 |
|
|
|
從中讀取的 SQL 查詢。 |
您還可以使用以下屬性專門為讀取器指定 JDBC DataSource:.JdbcCursorItemReader
屬性
屬性 | 型別 | 預設值 | 描述 |
---|---|---|---|
|
|
|
確定是否應啟用 |
|
|
|
資料庫的 |
|
|
|
資料庫的登入使用者名稱。 |
|
|
|
資料庫的登入密碼。 |
|
|
|
|
如果未指定 jdbccursoritemreader_datasource ,JDBCCursorItemReader 將使用預設的 DataSource 。 |
14.4. KafkaItemReader
從 Kafka topic 攝取資料分割槽非常有用,這正是 KafkaItemReader
可以做到的。要配置 KafkaItemReader
,需要兩個配置項。首先,需要使用 Spring Boot 的 Kafka 自動配置來配置 Kafka(請參閱Spring Boot Kafka 文件)。配置好 Spring Boot 中的 Kafka 屬性後,您可以透過設定以下屬性來配置 KafkaItemReader
本身。
屬性 | 型別 | 預設值 | 描述 |
---|---|---|---|
|
|
|
在 |
|
|
|
要從中讀取的 topic 的名稱。 |
|
|
|
要從中讀取的分割槽索引列表。 |
|
|
30 |
|
|
|
|
確定是否應為重啟儲存狀態。 |
14.5. 原生編譯
單步批處理的優點是,在使用 JVM 時,它允許您在執行時動態選擇要使用的 reader 和 writer beans。但是,在使用原生編譯時,您必須在構建時而不是執行時確定 reader 和 writer。以下示例展示瞭如何做到這一點
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<id>process-aot</id>
<goals>
<goal>process-aot</goal>
</goals>
<configuration>
<jvmArguments>
-Dspring.batch.job.flatfileitemreader.name=fooReader
-Dspring.batch.job.flatfileitemwriter.name=fooWriter
</jvmArguments>
</configuration>
</execution>
</executions>
</plugin>
15. ItemProcessor 配置
如果 ApplicationContext
中存在 ItemProcessor
,單步批處理作業自動配置會接受它。如果找到型別正確(ItemProcessor<Map<String, Object>, Map<String, Object>>
)的 ItemProcessor,它將被自動注入到步驟中。
16. ItemWriter 實現的自動配置
此 starter 為與支援的 ItemReader
實現匹配的 ItemWriter
實現提供了自動配置:AmqpItemWriter
、FlatFileItemWriter
、JdbcItemWriter
和 KafkaItemWriter
。本節介紹如何使用自動配置來配置受支援的 ItemWriter
。
16.1. AmqpItemWriter
要寫入 RabbitMQ 佇列,您需要兩組配置。首先,您需要一個 AmqpTemplate
。獲取此物件的最簡單方法是使用 Spring Boot 的 RabbitMQ 自動配置。請參閱Spring Boot AMQP 文件。
配置好 AmqpTemplate
後,您可以透過設定以下屬性來配置 AmqpItemWriter
。
屬性 | 型別 | 預設值 | 描述 |
---|---|---|---|
|
|
|
如果為 |
|
|
|
指示是否應註冊 |
16.2. FlatFileItemWriter
要將檔案作為步驟的輸出寫入,可以配置 FlatFileItemWriter
。自動配置接受已明確配置的元件(例如 LineAggregator
、FieldExtractor
、FlatFileHeaderCallback
或 FlatFileFooterCallback
)以及透過設定以下指定屬性配置的元件。
屬性 | 型別 | 預設值 | 描述 |
---|---|---|---|
|
|
|
要讀取的資源。 |
|
|
|
指示輸出檔案是否為分隔檔案。如果為 |
|
|
|
指示輸出檔案是否為格式化檔案。如果為 |
|
|
|
用於為格式化檔案生成輸出的格式。格式化是使用 |
|
|
|
生成檔案時要使用的 |
|
|
0 |
記錄的最大長度。如果為 0,則大小無限制。 |
|
|
0 |
記錄的最小長度。 |
|
|
|
用於分隔分隔檔案中欄位的 |
|
|
|
寫入檔案時使用的編碼。 |
|
|
|
指示檔案在重新整理時是否應強制同步到磁碟。 |
|
|
|
從記錄解析出的每個欄位的名稱列表。這些名稱是此 |
|
|
|
指示如果找到輸出檔案,是否應向其追加內容。 |
|
|
|
用於分隔輸出檔案中行的 |
|
|
|
在 |
|
|
|
確定是否應為重啟儲存狀態。 |
|
|
|
如果設定為 |
|
|
|
如果設定為 |
|
|
|
指示讀取器是否是事務性佇列(表示讀取的項在失敗時會返回到佇列)。 |
16.3. JdbcBatchItemWriter
要將步驟的輸出寫入關係資料庫,此 starter 提供了自動配置 JdbcBatchItemWriter
的能力。自動配置允許您透過設定以下屬性來提供自己的 ItemPreparedStatementSetter
或 ItemSqlParameterSourceProvider
以及配置選項。
屬性 | 型別 | 預設值 | 描述 |
---|---|---|---|
|
|
|
在 |
|
|
|
用於插入每個項的 |
|
|
|
是否驗證每次插入至少更新一條記錄。 |
您還可以使用以下屬性專門為寫入器指定 JDBC DataSource
:.JdbcBatchItemWriter
屬性
屬性 | 型別 | 預設值 | 描述 |
---|---|---|---|
|
|
|
確定是否應啟用 |
|
|
|
資料庫的 |
|
|
|
資料庫的登入使用者名稱。 |
|
|
|
資料庫的登入密碼。 |
|
|
|
|
如果未指定 jdbcbatchitemwriter_datasource ,JdbcBatchItemWriter 將使用預設的 DataSource 。 |
16.4. KafkaItemWriter
要將步驟輸出寫入 Kafka topic,您需要 KafkaItemWriter
。此 starter 利用兩個地方的功能為 KafkaItemWriter
提供自動配置。第一個是 Spring Boot 的 Kafka 自動配置。(請參閱Spring Boot Kafka 文件。)其次,此 starter 允許您配置寫入器的兩個屬性。
屬性 | 型別 | 預設值 | 描述 |
---|---|---|---|
|
|
|
要寫入的 Kafka topic。 |
|
|
|
傳遞給寫入器的項是否全部作為刪除事件傳送到 topic。 |
有關 KafkaItemWriter
配置選項的更多資訊,請參閱KafkaItemWiter
文件。
16.5. Spring AOT
將 Spring AOT 與 Single Step Batch Starter 結合使用時,您必須在編譯時設定 reader 和 writer 名稱屬性(除非您為 reader 和/或 writer 建立 bean(s))。為此,您必須將您希望使用的 reader 和 writer 的名稱作為引數或環境變數包含在 boot maven plugin 或 gradle plugin 中。例如,如果您希望在 Maven 中啟用 FlatFileItemReader
和 FlatFileItemWriter
,它會是這樣
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<id>process-aot</id>
<goals>
<goal>process-aot</goal>
</goals>
</execution>
</executions>
<configuration>
<arguments>
<argument>--spring.batch.job.flatfileitemreader.name=foobar</argument>
<argument>--spring.batch.job.flatfileitemwriter.name=fooWriter</argument>
</arguments>
</configuration>
</plugin>
Spring Cloud Stream 整合
任務本身可能很有用,但將任務整合到更大的生態系統中,可以使其在更復雜的處理和編排中發揮作用。本節介紹了 Spring Cloud Task 與 Spring Cloud Stream 的整合選項。
17. 從 Spring Cloud Stream 啟動任務
您可以從 stream 啟動任務。為此,建立一個 sink,監聽包含 TaskLaunchRequest
作為 payload 的訊息。TaskLaunchRequest
包含
-
uri
:要執行的任務 artifact 的 URI。 -
applicationName
:與任務關聯的名稱。如果未設定 applicationName,TaskLaunchRequest
將生成一個包含以下內容的任務名稱:Task-<UUID>
。 -
commandLineArguments
:包含任務命令列引數的列表。 -
environmentProperties
:包含任務要使用的環境變數的 map。 -
deploymentProperties
:包含 deployer 用於部署任務的屬性的 map。
如果 payload 型別不同,sink 會丟擲異常。 |
例如,可以建立一個 stream,其中包含一個 processor,該 processor 接收來自 HTTP 源的資料並建立一個包含 TaskLaunchRequest
的 GenericMessage
,然後將訊息傳送到其輸出通道。任務 sink 將從其輸入通道接收該訊息,然後啟動任務。
要建立一個 taskSink,您只需要建立一個包含 EnableTaskLauncher
註解的 Spring Boot 應用程式,如下例所示
@SpringBootApplication
@EnableTaskLauncher
public class TaskSinkApplication {
public static void main(String[] args) {
SpringApplication.run(TaskSinkApplication.class, args);
}
}
Spring Cloud Task 專案的 samples 模組 包含了一個示例 Sink 和 Processor。要將這些示例安裝到你的本地 maven 倉庫,請在 spring-cloud-task-samples
目錄中執行 maven 構建,並將 skipInstall
屬性設定為 false
,如以下示例所示
mvn clean install
必須將 maven.remoteRepositories.springRepo.url 屬性設定為 über-jar 所在的遠端倉庫位置。如果未設定,則沒有遠端倉庫,因此它將僅依賴本地倉庫。 |
17.1. Spring Cloud Data Flow
要在 Spring Cloud Data Flow 中建立流,必須首先註冊我們建立的 Task Sink 應用程式。在以下示例中,我們使用 Spring Cloud Data Flow shell 註冊 Processor 和 Sink 示例應用程式。
app register --name taskSink --type sink --uri maven://io.spring.cloud:tasksink:<version>
app register --name taskProcessor --type processor --uri maven:io.spring.cloud:taskprocessor:<version>
以下示例展示瞭如何使用 Spring Cloud Data Flow shell 建立流。
stream create foo --definition "http --server.port=9000|taskProcessor|taskSink" --deploy
18. Spring Cloud Task 事件
Spring Cloud Task 提供了在透過 Spring Cloud Stream 通道執行任務時,透過 Spring Cloud Stream 通道發出事件的能力。任務監聽器用於將 TaskExecution
釋出到名為 task-events
的訊息通道上。此功能會自動注入到任何類路徑中包含 spring-cloud-stream
、spring-cloud-stream-<binder>
和已定義任務的任務中。
要停用事件發出監聽器,請將 spring.cloud.task.events.enabled 屬性設定為 false 。 |
定義了適當的類路徑後,以下任務將在 task-events
通道上將 TaskExecution
作為事件發出(在任務開始和結束時)。
@SpringBootApplication
public class TaskEventsApplication {
public static void main(String[] args) {
SpringApplication.run(TaskEventsApplication.class, args);
}
@Configuration
public static class TaskConfiguration {
@Bean
public ApplicationRunner applicationRunner() {
return new ApplicationRunner() {
@Override
public void run(ApplicationArguments args) {
System.out.println("The ApplicationRunner was executed");
}
};
}
}
}
類路徑中還需要包含一個繫結器實現。 |
可以在 Spring Cloud Task 專案的 samples 模組中找到一個示例任務事件應用程式,點選這裡。 |
18.1. 停用特定任務事件
要停用任務事件,可以將 spring.cloud.task.events.enabled
屬性設定為 false
。
19. Spring Batch 事件
透過任務執行 Spring Batch 作業時,Spring Cloud Task 可以配置為根據 Spring Batch 中可用的 Spring Batch 監聽器發出資訊性訊息。具體來說,當透過 Spring Cloud Task 執行時,以下 Spring Batch 監聽器會自動配置到每個批處理作業中,並在相關的 Spring Cloud Stream 通道上發出訊息。
-
JobExecutionListener
監聽job-execution-events
-
StepExecutionListener
監聽step-execution-events
-
ChunkListener
監聽chunk-events
-
ItemReadListener
監聽item-read-events
-
ItemProcessListener
監聽item-process-events
-
ItemWriteListener
監聽item-write-events
-
SkipListener
監聽skip-events
當上下文中存在適當的 bean(一個 Job
和一個 TaskLifecycleListener
)時,這些監聽器會自動配置到任何 AbstractJob
中。監聽這些事件的配置與繫結到任何其他 Spring Cloud Stream 通道的方式相同。我們的任務(執行批處理作業的任務)充當 Source
,而監聽應用程式充當 Processor
或 Sink
。
一個示例是有一個應用程式監聽 job-execution-events
通道,以瞭解作業的啟動和停止。要配置監聽應用程式,您可以按如下方式將輸入配置為 job-execution-events
。
spring.cloud.stream.bindings.input.destination=job-execution-events
類路徑中還需要包含一個繫結器實現。 |
可以在 Spring Cloud Task 專案的 samples 模組中找到一個示例批處理事件應用程式,點選這裡。 |
19.1. 將批處理事件傳送到不同的通道
Spring Cloud Task 為批處理事件提供的一個選項是更改特定監聽器可以發出訊息的通道的能力。為此,請使用以下配置:spring.cloud.stream.bindings.<the channel>.destination=<new destination>
。例如,如果 StepExecutionListener
需要將其訊息發出到名為 my-step-execution-events
的另一個通道,而不是預設的 step-execution-events
,您可以新增以下配置。
spring.cloud.task.batch.events.step-execution-events-binding-name=my-step-execution-events
19.2. 停用批處理事件
要停用所有批處理事件的監聽器功能,請使用以下配置
spring.cloud.task.batch.events.enabled=false
要停用特定的批處理事件,請使用以下配置
spring.cloud.task.batch.events.<batch event listener>.enabled=false
:
以下列表顯示了可以停用的各個監聽器
spring.cloud.task.batch.events.job-execution.enabled=false
spring.cloud.task.batch.events.step-execution.enabled=false
spring.cloud.task.batch.events.chunk.enabled=false
spring.cloud.task.batch.events.item-read.enabled=false
spring.cloud.task.batch.events.item-process.enabled=false
spring.cloud.task.batch.events.item-write.enabled=false
spring.cloud.task.batch.events.skip.enabled=false
19.3. 批處理事件的發出順序
預設情況下,批處理事件具有 Ordered.LOWEST_PRECEDENCE
。要更改此值(例如,更改為 5),請使用以下配置。
spring.cloud.task.batch.events.job-execution-order=5
spring.cloud.task.batch.events.step-execution-order=5
spring.cloud.task.batch.events.chunk-order=5
spring.cloud.task.batch.events.item-read-order=5
spring.cloud.task.batch.events.item-process-order=5
spring.cloud.task.batch.events.item-write-order=5
spring.cloud.task.batch.events.skip-order=5
附錄
20. 任務倉庫 Schema
本附錄提供了任務倉庫中使用的資料庫 schema 的 ERD。

20.1. 表資訊
儲存任務執行資訊。
列名 | 必需 | 型別 | 欄位長度 | 備註 |
---|---|---|---|---|
TASK_EXECUTION_ID |
是 |
BIGINT |
X |
Spring Cloud Task Framework 在應用程式啟動時根據 |
START_TIME |
否 |
DATETIME(6) |
X |
Spring Cloud Task Framework 在應用程式啟動時設定該值。 |
END_TIME |
否 |
DATETIME(6) |
X |
Spring Cloud Task Framework 在應用程式退出時設定該值。 |
TASK_NAME |
否 |
VARCHAR |
100 |
Spring Cloud Task Framework 在應用程式啟動時會將其設定為 "Application",除非使用者使用 |
EXIT_CODE |
否 |
INTEGER |
X |
遵循 Spring Boot 預設值,除非使用者如這裡討論的那樣覆蓋。 |
EXIT_MESSAGE |
否 |
VARCHAR |
2500 |
使用者定義,如這裡討論的那樣。 |
ERROR_MESSAGE |
否 |
VARCHAR |
2500 |
Spring Cloud Task Framework 在應用程式退出時設定該值。 |
LAST_UPDATED |
是 |
TIMESTAMP |
X |
Spring Cloud Task Framework 在應用程式啟動時設定該值。如果記錄是在任務外部建立的,則必須在記錄建立時填充該值。 |
EXTERNAL_EXECUTION_ID |
否 |
VARCHAR |
250 |
如果設定了 |
PARENT_TASK_EXECUTION_ID |
否 |
BIGINT |
X |
如果設定了 |
儲存任務執行使用的引數
列名 | 必需 | 型別 | 欄位長度 |
---|---|---|---|
TASK_EXECUTION_ID |
是 |
BIGINT |
X |
TASK_PARAM |
否 |
VARCHAR |
2500 |
用於將任務執行與批處理執行連結起來。
列名 | 必需 | 型別 | 欄位長度 |
---|---|---|---|
TASK_EXECUTION_ID |
是 |
BIGINT |
X |
JOB_EXECUTION_ID |
是 |
BIGINT |
X |
用於這裡討論的 single-instance-enabled
功能。
列名 | 必需 | 型別 | 欄位長度 | 備註 |
---|---|---|---|---|
LOCK_KEY |
是 |
CHAR |
36 |
此鎖的 UUID |
REGION |
是 |
VARCHAR |
100 |
使用者可以使用此欄位建立一組鎖。 |
CLIENT_ID |
是 |
CHAR |
36 |
包含要鎖定應用程式名稱的任務執行 ID。 |
CREATED_DATE |
是 |
DATETIME |
X |
條目建立日期 |
每種資料庫型別設定表的 DDL 可以在這裡找到。 |
20.2. SQL Server
預設情況下,Spring Cloud Task 使用序列表來確定 TASK_EXECUTION
表的 TASK_EXECUTION_ID
。然而,在使用 SQL Server 同時啟動多個任務時,這可能導致 TASK_SEQ
表發生死鎖。解決方法是刪除 TASK_EXECUTION_SEQ
表並使用相同的名稱建立一個序列。例如:
DROP TABLE TASK_SEQ;
CREATE SEQUENCE [DBO].[TASK_SEQ] AS BIGINT
START WITH 1
INCREMENT BY 1;
將 START WITH 設定為比您當前執行 ID 更高的值。 |
21. 構建此文件
本專案使用 Maven 生成此文件。要為您自己生成它,請執行以下命令:$ mvn clean install -DskipTests -P docs
。
22. 可觀測性元資料
22.1. 可觀測性 - 指標
下面您可以找到此專案宣告的所有指標列表。
22.1.1. 活躍任務
圍繞任務執行建立的指標。
指標名稱 spring.cloud.task
(由約定類 org.springframework.cloud.task.listener.DefaultTaskExecutionObservationConvention
定義)。型別 timer
。
指標名稱 spring.cloud.task.active
(由約定類 org.springframework.cloud.task.listener.DefaultTaskExecutionObservationConvention
定義)。型別 long task timer
。
啟動 Observation 後新增的 KeyValue 可能不會出現在 *.active 指標中。 |
Micrometer 內部使用 nanoseconds 作為基本單位。然而,每個後端會確定實際的基本單位。(例如 Prometheus 使用 seconds) |
封閉類的完全限定名 org.springframework.cloud.task.listener.TaskExecutionObservation
。
所有標籤必須以 spring.cloud.task 字首開頭! |
名稱 |
描述 |
|
CF 雲的 App ID。 |
|
CF 雲的 App 名稱。 |
|
CF 雲的 App 版本。 |
|
CF 雲的例項索引。 |
|
CF 雲的組織名稱。 |
|
CF 雲的空間 ID。 |
|
CF 雲的空間名稱。 |
|
任務執行 ID。 |
|
任務退出碼。 |
|
任務的外部執行 ID。 |
|
任務名稱度量。 |
|
任務父執行 ID。 |
|
任務狀態。可以是 success 或 failure。 |
22.1.2. 任務執行器觀測
任務執行器執行時建立的觀測。
指標名稱 spring.cloud.task.runner
(由約定類 org.springframework.cloud.task.configuration.observation.DefaultTaskObservationConvention
定義)。型別 timer
。
指標名稱 spring.cloud.task.runner.active
(由約定類 org.springframework.cloud.task.configuration.observation.DefaultTaskObservationConvention
定義)。型別 long task timer
。
啟動 Observation 後新增的 KeyValue 可能不會出現在 *.active 指標中。 |
Micrometer 內部使用 nanoseconds 作為基本單位。然而,每個後端會確定實際的基本單位。(例如 Prometheus 使用 seconds) |
封閉類的完全限定名 org.springframework.cloud.task.configuration.observation.TaskDocumentedObservation
。
所有標籤必須以 spring.cloud.task 字首開頭! |
名稱 |
描述 |
|
由 Spring Cloud Task 執行的 bean 的名稱。 |
22.2. 可觀測性 - Spans
下面您可以找到此專案宣告的所有 spans 列表。
22.2.1. 活躍任務 Span
圍繞任務執行建立的指標。
Span 名稱 spring.cloud.task
(由約定類 org.springframework.cloud.task.listener.DefaultTaskExecutionObservationConvention
定義)。
封閉類的完全限定名 org.springframework.cloud.task.listener.TaskExecutionObservation
。
所有標籤必須以 spring.cloud.task 字首開頭! |
名稱 |
描述 |
|
CF 雲的 App ID。 |
|
CF 雲的 App 名稱。 |
|
CF 雲的 App 版本。 |
|
CF 雲的例項索引。 |
|
CF 雲的組織名稱。 |
|
CF 雲的空間 ID。 |
|
CF 雲的空間名稱。 |
|
任務執行 ID。 |
|
任務退出碼。 |
|
任務的外部執行 ID。 |
|
任務名稱度量。 |
|
任務父執行 ID。 |
|
任務狀態。可以是 success 或 failure。 |
22.2.2. 任務執行器觀測 Span
任務執行器執行時建立的觀測。
Span 名稱 spring.cloud.task.runner
(由約定類 org.springframework.cloud.task.configuration.observation.DefaultTaskObservationConvention
定義)。
封閉類的完全限定名 org.springframework.cloud.task.configuration.observation.TaskDocumentedObservation
。
所有標籤必須以 spring.cloud.task 字首開頭! |
名稱 |
描述 |
|
由 Spring Cloud Task 執行的 bean 的名稱。 |