版本 3.0.4
© 2009-2022 VMware, Inc. 保留所有權利。
您可以為自己使用和分發此文件的副本,前提是您不對這些副本收取任何費用,並且每份副本(無論是印刷版還是電子版)都包含此版權宣告。
前言
1. 關於文件
Spring Cloud Task 參考指南提供 html、 pdf 和 epub 格式。最新副本可在 docs.spring.io/spring-cloud-task/docs/current-SNAPSHOT/reference/html/ 獲取。
您可以為自己使用和分發此文件的副本,前提是您不對這些副本收取任何費用,並且每份副本(無論是印刷版還是電子版)都包含此版權宣告。
2. 獲取幫助
使用 Spring Cloud Task 遇到問題?我們樂意幫助!
-
提問。我們會在 stackoverflow.com 上監控標記為
spring-cloud-task的問題。 -
在 github.com/spring-cloud/spring-cloud-task/issues 上報告 Spring Cloud Task 的 bug。
| Spring Cloud Task 的所有內容,包括文件,都是開源的。如果您發現文件有問題,或者您只是想改進它們,請 參與進來。 |
3. 第一步
如果您剛開始接觸 Spring Cloud Task 或一般的“Spring”,我們建議您閱讀 入門 章節。
要從頭開始,請閱讀以下部分
要學習本教程,請閱讀 開發您的第一個 Spring Cloud Task 應用程式
要執行您的示例,請閱讀 執行示例
入門
如果您剛開始使用 Spring Cloud Task,您應該閱讀本節。在這裡,我們將回答基本的“是什麼?”、“如何做?”和“為什麼?”等問題。我們首先對 Spring Cloud Task 進行簡單介紹。然後,我們構建一個 Spring Cloud Task 應用程式,並在過程中討論一些核心原則。
4. Spring Cloud Task 簡介
Spring Cloud Task 使建立短生命週期的微服務變得容易。它提供了在生產環境中按需執行短生命週期 JVM 程序的功能。
5. 系統要求
您需要安裝 Java (Java 17 或更高版本)。要構建,您還需要安裝 Maven。
5.1. 資料庫要求
Spring Cloud Task 使用關係型資料庫儲存已執行任務的結果。雖然您可以在沒有資料庫的情況下開始開發任務(任務狀態會作為任務倉庫更新的一部分進行日誌記錄),但對於生產環境,您需要使用受支援的資料庫。Spring Cloud Task 目前支援以下資料庫:
-
DB2
-
H2
-
HSQLDB
-
MySql
-
Oracle
-
Postgres
-
SqlServer
6. 開發您的第一個 Spring Cloud Task 應用程式
一個好的起點是一個簡單的“Hello, World!”應用程式,因此我們建立 Spring Cloud Task 等效應用程式以突出框架的功能。大多數 IDE 對 Apache Maven 都有很好的支援,因此我們將其用作此專案的構建工具。
spring.io 網站包含許多使用 Spring Boot 的“入門”指南。如果您需要解決特定問題,請先在那裡檢視。您可以透過訪問 Spring Initializr 並建立一個新專案來簡化以下步驟。這樣做會自動生成新的專案結構,以便您可以立即開始編寫程式碼。我們建議嘗試使用 Spring Initializr 來熟悉它。 |
6.1. 使用 Spring Initializr 建立 Spring Task 專案
現在我們可以建立並測試一個在控制檯列印 Hello, World! 的應用程式。
為此:
-
訪問 Spring Initializr 網站。
-
建立一個新的 Maven 專案,Group 名稱為
io.spring.demo,Artifact 名稱為helloworld。 -
在 Dependencies 文字框中,鍵入
task,然後選擇Cloud Task依賴項。 -
在 Dependencies 文字框中,鍵入
jdbc,然後選擇JDBC依賴項。 -
在 Dependencies 文字框中,鍵入
h2,然後選擇H2。(或您喜歡的資料庫) -
點選 Generate Project 按鈕
-
-
解壓 helloworld.zip 檔案,並將專案匯入您喜歡的 IDE。
6.2. 編寫程式碼
為了完成我們的應用程式,我們需要用以下內容更新生成的 HelloworldApplication,以便它啟動一個任務。
package io.spring.Helloworld;
@SpringBootApplication
@EnableTask
public class HelloworldApplication {
@Bean
public ApplicationRunner applicationRunner() {
return new HelloWorldApplicationRunner();
}
public static void main(String[] args) {
SpringApplication.run(HelloworldApplication.class, args);
}
public static class HelloWorldApplicationRunner implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
System.out.println("Hello, World!");
}
}
}
雖然看起來很小,但其中包含了許多內容。有關 Spring Boot 的具體細節,請參閱 Spring Boot 參考文件。
現在我們可以開啟 src/main/resources 中的 application.properties 檔案。我們需要在 application.properties 中配置兩個屬性:
-
application.name:設定應用程式名稱(它被轉換為任務名稱) -
logging.level:將 Spring Cloud Task 的日誌記錄設定為DEBUG,以便檢視正在發生的事情。
以下示例展示瞭如何同時執行這兩項操作
logging.level.org.springframework.cloud.task=DEBUG
spring.application.name=helloWorld
6.2.1. 任務自動配置
當包含 Spring Cloud Task Starter 依賴項時,Task 會自動配置所有 bean 以啟動其功能。此配置的一部分註冊了 TaskRepository 及其基礎設施。
在我們的演示中,TaskRepository 使用嵌入式 H2 資料庫來記錄任務的結果。這個 H2 嵌入式資料庫對於生產環境來說不是一個實用的解決方案,因為一旦任務結束,H2 資料庫就會消失。然而,為了快速入門體驗,我們可以在我們的示例中使用它,同時將該倉庫中正在更新的內容回顯到日誌中。在 配置 部分(本文件的後面),我們介紹瞭如何自定義 Spring Cloud Task 提供的各個部分的配置。
當我們的示例應用程式執行時,Spring Boot 會啟動我們的 HelloWorldCommandLineRunner 並將“Hello, World!”訊息輸出到標準輸出。TaskLifecycleListener 在倉庫中記錄任務的開始和結束。
6.2.2. main 方法
main 方法作為任何 Java 應用程式的入口點。我們的 main 方法委託給 Spring Boot 的 SpringApplication 類。
6.2.3. ApplicationRunner
Spring 包含多種引導應用程式邏輯的方法。Spring Boot 透過其 *Runner 介面(CommandLineRunner 或 ApplicationRunner)提供了一種方便且有組織的方法。一個行為良好的任務可以透過使用這些執行器中的一個來引導任何邏輯。
任務的生命週期被認為是:在 *Runner#run 方法執行之前開始,直到所有方法都完成。Spring Boot 允許應用程式使用多個 *Runner 實現,Spring Cloud Task 也是如此。
透過 CommandLineRunner 或 ApplicationRunner 以外的機制(例如,使用 InitializingBean#afterPropertiesSet)引導的任何處理都不會被 Spring Cloud Task 記錄。 |
6.3. 執行示例
此時,我們的應用程式應該可以工作了。由於此應用程式基於 Spring Boot,我們可以透過在應用程式根目錄中使用 $ mvn spring-boot:run 從命令列執行它,如下例所示(及其輸出)
$ mvn clean spring-boot:run
....... . . .
....... . . . (Maven log output here)
....... . . .
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.0.3.RELEASE)
2018-07-23 17:44:34.426 INFO 1978 --- [ main] i.s.d.helloworld.HelloworldApplication : Starting HelloworldApplication on Glenns-MBP-2.attlocal.net with PID 1978 (/Users/glennrenfro/project/helloworld/target/classes started by glennrenfro in /Users/glennrenfro/project/helloworld)
2018-07-23 17:44:34.430 INFO 1978 --- [ main] i.s.d.helloworld.HelloworldApplication : No active profile set, falling back to default profiles: default
2018-07-23 17:44:34.472 INFO 1978 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@1d24f32d: startup date [Mon Jul 23 17:44:34 EDT 2018]; root of context hierarchy
2018-07-23 17:44:35.280 INFO 1978 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...
2018-07-23 17:44:35.410 INFO 1978 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
2018-07-23 17:44:35.419 DEBUG 1978 --- [ main] o.s.c.t.c.SimpleTaskConfiguration : Using org.springframework.cloud.task.configuration.DefaultTaskConfigurer TaskConfigurer
2018-07-23 17:44:35.420 DEBUG 1978 --- [ main] o.s.c.t.c.DefaultTaskConfigurer : No EntityManager was found, using DataSourceTransactionManager
2018-07-23 17:44:35.522 DEBUG 1978 --- [ main] o.s.c.t.r.s.TaskRepositoryInitializer : Initializing task schema for h2 database
2018-07-23 17:44:35.525 INFO 1978 --- [ main] o.s.jdbc.datasource.init.ScriptUtils : Executing SQL script from class path resource [org/springframework/cloud/task/schema-h2.sql]
2018-07-23 17:44:35.558 INFO 1978 --- [ main] o.s.jdbc.datasource.init.ScriptUtils : Executed SQL script from class path resource [org/springframework/cloud/task/schema-h2.sql] in 33 ms.
2018-07-23 17:44:35.728 INFO 1978 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Registering beans for JMX exposure on startup
2018-07-23 17:44:35.730 INFO 1978 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Bean with name 'dataSource' has been autodetected for JMX exposure
2018-07-23 17:44:35.733 INFO 1978 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Located MBean 'dataSource': registering with JMX server as MBean [com.zaxxer.hikari:name=dataSource,type=HikariDataSource]
2018-07-23 17:44:35.738 INFO 1978 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 0
2018-07-23 17:44:35.762 DEBUG 1978 --- [ main] o.s.c.t.r.support.SimpleTaskRepository : Creating: TaskExecution{executionId=0, parentExecutionId=null, exitCode=null, taskName='application', startTime=Mon Jul 23 17:44:35 EDT 2018, endTime=null, exitMessage='null', externalExecutionId='null', errorMessage='null', arguments=[]}
2018-07-23 17:44:35.772 INFO 1978 --- [ main] i.s.d.helloworld.HelloworldApplication : Started HelloworldApplication in 1.625 seconds (JVM running for 4.764)
Hello, World!
2018-07-23 17:44:35.782 DEBUG 1978 --- [ main] o.s.c.t.r.support.SimpleTaskRepository : Updating: TaskExecution with executionId=1 with the following {exitCode=0, endTime=Mon Jul 23 17:44:35 EDT 2018, exitMessage='null', errorMessage='null'}
前面的輸出有三行我們感興趣的內容
-
SimpleTaskRepository記錄了TaskRepository中條目的建立。 -
我們的
CommandLineRunner的執行,由“Hello, World!”輸出演示。 -
SimpleTaskRepository記錄了TaskRepository中任務的完成。
| 一個簡單的任務應用程式可以在 Spring Cloud Task 專案的 samples 模組中找到這裡。 |
功能
本節將更詳細地介紹 Spring Cloud Task,包括如何使用它、如何配置它以及適當的擴充套件點。
7. Spring Cloud Task 的生命週期
在大多數情況下,現代雲環境是圍繞不期望結束的程序執行而設計的。如果它們確實結束了,它們通常會被重新啟動。雖然大多數平臺確實有某種方法可以執行一個在結束後不重新啟動的程序,但該執行的結果通常不會以可消費的方式維護。Spring Cloud Task 提供了在環境中執行短生命週期程序並記錄結果的能力。這樣做允許圍繞短生命週期程序以及透過訊息整合任務來實現長時間執行服務的微服務架構。
雖然此功能在雲環境中很有用,但在傳統部署模型中也可能出現同樣的問題。當使用 cron 等排程程式執行 Spring Boot 應用程式時,能夠在應用程式完成之後監視其結果會很有用。
Spring Cloud Task 採取的方法是,Spring Boot 應用程式可以有開始和結束,並且仍然成功。批處理應用程式是預期結束(並且通常是短生命週期)的程序如何提供幫助的一個示例。
Spring Cloud Task 記錄給定任務的生命週期事件。大多數長時間執行的程序(以大多數 Web 應用程式為典型)不儲存其生命週期事件。Spring Cloud Task 的核心任務則會。
生命週期由單個任務執行組成。這是配置為任務(即具有 Sprint Cloud Task 依賴項)的 Spring Boot 應用程式的物理執行。
在任務開始時,在任何 CommandLineRunner 或 ApplicationRunner 實現執行之前,會在 TaskRepository 中建立一個記錄開始事件的條目。此事件透過 Spring Framework 觸發 SmartLifecycle#start 來觸發。這表示系統所有 Bean 都已準備好使用,並且在執行 Spring Boot 提供的任何 CommandLineRunner 或 ApplicationRunner 實現之前發生。
任務的記錄僅在 ApplicationContext 成功引導時發生。如果上下文完全無法引導,則不記錄任務的執行。 |
在 Spring Boot 的所有 *Runner#run 呼叫完成後或 ApplicationContext 失敗(由 ApplicationFailedEvent 指示)時,任務執行的結果會在儲存庫中更新。
如果應用程式要求在任務完成時(所有 *Runner#run 方法已呼叫且任務儲存庫已更新)關閉 ApplicationContext,請將屬性 spring.cloud.task.closecontextEnabled 設定為 true。 |
7.1. TaskExecution
儲存在 TaskRepository 中的資訊在 TaskExecution 類中建模,幷包含以下資訊
| 欄位 | 描述 |
|---|---|
|
任務執行的唯一 ID。 |
|
從 |
|
任務的名稱,由配置的 |
|
任務開始時間,由 |
|
任務完成時間,由 |
|
退出時可用的任何資訊。這可以透過 |
|
如果異常是任務結束的原因(由 |
|
作為命令列引數傳遞給可執行引導應用程式的字串命令列引數列表。 |
7.2. 對映退出程式碼
當任務完成時,它會嘗試向作業系統返回一個退出程式碼。如果我們檢視我們的 原始示例,我們可以看到我們沒有控制應用程式的這方面。因此,如果丟擲異常,JVM 會返回一個可能對您除錯有用也可能沒用的程式碼。
因此,Spring Boot 提供了一個介面 ExitCodeExceptionMapper,它允許您將未捕獲的異常對映到退出程式碼。這樣做可以讓您在退出程式碼級別指示出了什麼問題。此外,透過以這種方式對映退出程式碼,Spring Cloud Task 會記錄返回的退出程式碼。
如果任務因 SIG-INT 或 SIG-TERM 終止,則除非程式碼中另有規定,否則退出程式碼為零。
| 任務執行時,退出程式碼在儲存庫中儲存為 null。任務完成後,根據本節前面所述的準則儲存相應的退出程式碼。 |
8. 配置
Spring Cloud Task 提供了開箱即用的配置,如 DefaultTaskConfigurer 和 SimpleTaskConfiguration 類中定義。本節將介紹預設值以及如何根據您的需求自定義 Spring Cloud Task。
8.1. DataSource
Spring Cloud Task 使用資料來源來儲存任務執行的結果。預設情況下,我們提供了一個 H2 記憶體例項,以提供一種簡單的開發引導方法。但是,在生產環境中,您可能希望配置自己的 DataSource。
如果您的應用程式只使用一個 DataSource,並且該資料來源既用作您的業務模式又用作任務儲存庫,則您只需提供任何 DataSource(最簡單的方法是透過 Spring Boot 的配置約定)。此 DataSource 將由 Spring Cloud Task 自動用於儲存庫。
如果您的應用程式使用多個 DataSource,則需要使用適當的 DataSource 配置任務儲存庫。此自定義可以透過 TaskConfigurer 的實現來完成。
8.2. 表字首
TaskRepository 的一個可修改屬性是任務表的表字首。預設情況下,它們都以 TASK_ 為字首。TASK_EXECUTION 和 TASK_EXECUTION_PARAMS 是兩個示例。但是,修改此字首可能有一些原因。如果需要將模式名稱新增到表名稱前面,或者在同一模式中需要多組任務表,則必須更改表字首。您可以透過將 spring.cloud.task.tablePrefix 設定為您需要的字首來完成此操作,如下所示
spring.cloud.task.tablePrefix=yourPrefix
透過使用 spring.cloud.task.tablePrefix,使用者承擔建立符合任務表模式標準但又根據使用者業務需求進行修改的任務表的責任。您可以將 Spring Cloud Task Schema DDL 作為建立您自己的任務 DDL 的指南,如此處所示。
8.3. 啟用/停用表初始化
如果您正在建立任務表並且不希望 Spring Cloud Task 在任務啟動時建立它們,請將 spring.cloud.task.initialize-enabled 屬性設定為 false,如下所示
spring.cloud.task.initialize-enabled=false
它預設為 true。
屬性 spring.cloud.task.initialize.enable 已棄用。 |
8.4. 外部生成的任務 ID
在某些情況下,您可能希望允許任務請求時間與基礎設施實際啟動任務時間之間的差異。Spring Cloud Task 允許您在請求任務時建立 TaskExecution。然後將生成的 TaskExecution 的執行 ID 傳遞給任務,以便任務可以透過任務的生命週期更新 TaskExecution。
可以透過在引用儲存 TaskExecution 物件的資料庫的 TaskRepository 實現上呼叫 createTaskExecution 方法來建立 TaskExecution。
要將您的任務配置為使用生成的 TaskExecutionId,請新增以下屬性
spring.cloud.task.executionid=yourtaskId
8.5. 外部任務 ID
Spring Cloud Task 允許您為每個 TaskExecution 儲存一個外部任務 ID。要將您的任務配置為使用生成的 TaskExecutionId,請新增以下屬性
spring.cloud.task.external-execution-id=<externalTaskId>
8.6. 父任務 ID
Spring Cloud Task 允許您為每個 TaskExecution 儲存一個父任務 ID。例如,一個任務執行另一個任務或多個任務,並且您希望記錄哪個任務啟動了每個子任務。要將您的任務配置為設定父 TaskExecutionId,請在子任務上新增以下屬性
spring.cloud.task.parent-execution-id=<parentExecutionTaskId>
8.7. TaskConfigurer
TaskConfigurer 是一個策略介面,允許您自定義 Spring Cloud Task 元件的配置方式。預設情況下,我們提供了 DefaultTaskConfigurer,它提供邏輯預設值:基於 Map 的記憶體中元件(如果沒有提供 DataSource,則適用於開發)和基於 JDBC 的元件(如果 DataSource 可用,則適用)。
TaskConfigurer 允許您配置三個主要元件
| 元件 | 描述 | 預設(由 DefaultTaskConfigurer 提供) |
|---|---|---|
|
要使用的 |
|
|
要使用的 |
|
|
在執行任務更新時使用的事務管理器。 |
如果使用了 |
您可以透過建立 TaskConfigurer 介面的自定義實現來定製上表中描述的任何元件。通常,擴充套件 DefaultTaskConfigurer(如果未找到 TaskConfigurer 則提供)並重寫所需的 getter 就足夠了。但是,可能需要從頭開始實現您自己的。
除非使用者將其用於提供要作為 Spring Bean 公開的實現,否則不應直接使用 TaskConfigurer 中的 getter 方法。 |
8.8. 任務執行監聽器
TaskExecutionListener 允許您註冊在任務生命週期期間發生的特定事件的監聽器。為此,請建立一個實現 TaskExecutionListener 介面的類。實現 TaskExecutionListener 介面的類將收到以下事件的通知
-
onTaskStartup:在將TaskExecution儲存到TaskRepository之前。 -
onTaskEnd:在更新TaskRepository中的TaskExecution條目並標記任務的最終狀態之前。 -
onTaskFailed:當任務丟擲未處理的異常時,在呼叫onTaskEnd方法之前。
Spring Cloud Task 還允許您使用以下方法註解將 TaskExecution 監聽器新增到 Bean 中的方法
-
@BeforeTask:在將TaskExecution儲存到TaskRepository之前。 -
@AfterTask:在更新TaskRepository中的TaskExecution條目並標記任務的最終狀態之前。 -
@FailedTask:當任務丟擲未處理的異常時,在呼叫@AfterTask方法之前。
以下示例顯示了正在使用的三個註解
public class MyBean {
@BeforeTask
public void methodA(TaskExecution taskExecution) {
}
@AfterTask
public void methodB(TaskExecution taskExecution) {
}
@FailedTask
public void methodC(TaskExecution taskExecution, Throwable throwable) {
}
}
在鏈中比 TaskLifecycleListener 存在更早地插入 ApplicationListener 可能會導致意外的影響。 |
8.8.1. 任務執行監聽器丟擲的異常
如果 TaskExecutionListener 事件處理程式丟擲異常,則該事件處理程式的所有監聽器處理都將停止。例如,如果三個 onTaskStartup 監聽器已啟動,並且第一個 onTaskStartup 事件處理程式丟擲異常,則其他兩個 onTaskStartup 方法不會被呼叫。但是,TaskExecutionListeners 的其他事件處理程式(onTaskEnd 和 onTaskFailed)將被呼叫。
當 TaskExecutionListener 事件處理程式丟擲異常時返回的退出程式碼是 ExitCodeEvent 報告的退出程式碼。如果沒有發出 ExitCodeEvent,則評估丟擲的異常以檢視其是否為 ExitCodeGenerator 型別。如果是,則返回 ExitCodeGenerator 的退出程式碼。否則,返回 1。
如果在 onTaskStartup 方法中丟擲異常,則應用程式的退出程式碼將為 1。如果在 onTaskEnd 或 onTaskFailed 方法中丟擲異常,則應用程式的退出程式碼將是根據上述規則確定的退出程式碼。
如果 onTaskStartup、onTaskEnd 或 onTaskFailed 中丟擲異常,您不能使用 ExitCodeExceptionMapper 覆蓋應用程式的退出程式碼。 |
8.8.2. 退出訊息
您可以透過使用 TaskExecutionListener 以程式設計方式設定任務的退出訊息。這是透過設定 TaskExecution 的 exitMessage 來完成的,該訊息隨後會傳遞給 TaskExecutionListener。以下示例展示了一個使用 @AfterTask ExecutionListener 註解的方法
@AfterTask
public void afterMe(TaskExecution taskExecution) {
taskExecution.setExitMessage("AFTER EXIT MESSAGE");
}
可以在任何監聽器事件(onTaskStartup、onTaskFailed 和 onTaskEnd)中設定 ExitMessage。三個監聽器的優先順序順序如下
-
onTaskEnd -
onTaskFailed -
onTaskStartup
例如,如果您為 onTaskStartup 和 onTaskFailed 監聽器設定了 exitMessage,並且任務在沒有失敗的情況下結束,則 onTaskStartup 的 exitMessage 將儲存在儲存庫中。否則,如果發生故障,則儲存 onTaskFailed 的 exitMessage。此外,如果您使用 onTaskEnd 監聽器設定 exitMessage,則 onTaskEnd 的 exitMessage 將覆蓋 onTaskStartup 和 onTaskFailed 的退出訊息。
8.9. 限制 Spring Cloud Task 例項
Spring Cloud Task 允許您確定給定任務名稱的某個任務只能同時執行一個例項。為此,您需要建立 任務名稱 併為每個任務執行設定 spring.cloud.task.single-instance-enabled=true。當第一個任務執行正在執行時,任何其他嘗試執行具有相同 任務名稱 和 spring.cloud.task.single-instance-enabled=true 的任務都將失敗,並出現以下錯誤訊息:Task with name "application" is already running. spring.cloud.task.single-instance-enabled 的預設值為 false。以下示例展示瞭如何將 spring.cloud.task.single-instance-enabled 設定為 true
spring.cloud.task.single-instance-enabled=true or false
要使用此功能,您必須將以下 Spring Integration 依賴項新增到您的應用程式中
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jdbc</artifactId>
</dependency>
| 如果任務因啟用此功能且另一個任務正在執行具有相同任務名稱而失敗,則應用程式的退出程式碼將為 1。 |
8.9.1. Spring AOT 和本機編譯的單例項用法
在建立原生編譯應用程式時使用 Spring Cloud Task 的單例項功能,您需要在構建時啟用該功能。為此,請新增 process-aot 執行並將 spring.cloud.task.single-step-instance-enabled=true 設定為 JVM 引數,如下所示
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<id>process-aot</id>
<goals>
<goal>process-aot</goal>
</goals>
<configuration>
<jvmArguments>
-Dspring.cloud.task.single-instance-enabled=true
</jvmArguments>
</configuration>
</execution>
</executions>
</plugin>
8.10. 啟用 ApplicationRunner 和 CommandLineRunner 的觀察
要為 ApplicationRunner 或 CommandLineRunner 啟用任務觀測,請將 spring.cloud.task.observation.enabled 設定為 true。
使用 SimpleMeterRegistry 啟用觀測的示例任務應用程式可以在此處找到。
8.11. 停用 Spring Cloud Task 自動配置
在某些情況下,Spring Cloud Task 不應該為某個實現自動配置,您可以停用 Task 的自動配置。這可以透過將以下註解新增到您的任務應用程式來完成
@EnableAutoConfiguration(exclude={SimpleTaskAutoConfiguration.class})
您還可以透過將 spring.cloud.task.autoconfiguration.enabled 屬性設定為 false 來停用任務自動配置。
8.12. 關閉上下文
如果應用程式要求在任務完成時(所有 *Runner#run 方法已呼叫且任務儲存庫已更新)關閉 ApplicationContext,請將屬性 spring.cloud.task.closecontextEnabled 設定為 true。
另一種關閉上下文的情況是當任務執行完成但應用程式不終止時。在這些情況下,上下文保持開啟狀態,因為已分配了一個執行緒(例如:如果您正在使用 TaskExecutor)。在這些情況下,在啟動任務時將 spring.cloud.task.closecontextEnabled 屬性設定為 true。這將關閉應用程式的上下文,一旦任務完成。從而允許應用程式終止。
8.13. 啟用任務指標
Spring Cloud Task 與 Micrometer 整合,併為其執行的任務建立觀測。要啟用任務可觀測性整合,您必須將 spring-boot-starter-actuator、您首選的登錄檔實現(如果您想釋出指標)和 micrometer-tracing(如果您想釋出跟蹤資料)新增到您的任務應用程式中。以下是使用 Influx 啟用任務可觀測性和指標的 Maven 依賴項示例
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-influx</artifactId>
<scope>runtime</scope>
</dependency>
8.14. Spring Task 和 Spring Cloud Task 屬性
“任務”一詞在行業中經常使用。例如,Spring Boot 提供了 spring.task,而 Spring Cloud Task 提供了 spring.cloud.task 屬性。這在過去造成了一些混淆,認為這兩組屬性直接相關。然而,它們代表了 Spring 生態系統中提供的兩組不同的功能。
-
spring.task指的是配置ThreadPoolTaskScheduler的屬性。 -
spring.cloud.task指的是配置 Spring Cloud Task 功能的屬性。
批處理
本節將詳細介紹 Spring Cloud Task 與 Spring Batch 的整合。本節涵蓋了跟蹤作業執行與其執行的任務之間的關聯,以及透過 Spring Cloud Deployer 進行遠端分割槽。
9. 將作業執行與執行它的任務關聯
Spring Boot 提供了在 uber-jar 中執行批處理作業的便利。Spring Boot 對此功能的支援使開發人員可以在該執行中執行多個批處理作業。Spring Cloud Task 提供了將作業的執行(作業執行)與任務的執行關聯起來的能力,以便可以相互追溯。
Spring Cloud Task 透過使用 TaskBatchExecutionListener 來實現此功能。預設情況下,此監聽器會在任何同時配置了 Spring Batch 作業(透過在上下文中定義了一個 Job 型別的 bean)和類路徑上存在 spring-cloud-task-batch jar 的上下文中自動配置。該監聽器會被注入到所有符合這些條件的作業中。
9.1. 覆蓋 TaskBatchExecutionListener
為了防止監聽器被注入到當前上下文中的任何批處理作業中,您可以使用標準的 Spring Boot 機制停用自動配置。
要僅將監聽器注入到上下文中的特定作業中,請覆蓋 batchTaskExecutionListenerBeanPostProcessor 並提供作業 bean ID 列表,如下例所示
public static TaskBatchExecutionListenerBeanPostProcessor batchTaskExecutionListenerBeanPostProcessor() {
TaskBatchExecutionListenerBeanPostProcessor postProcessor =
new TaskBatchExecutionListenerBeanPostProcessor();
postProcessor.setJobNames(Arrays.asList(new String[] {"job1", "job2"}));
return postProcessor;
}
| 您可以在 Spring Cloud Task 專案的 samples 模組中找到一個示例批處理應用程式,此處。 |
10. 遠端分割槽
Spring Cloud Deployer 提供了在大多數雲基礎設施上啟動基於 Spring Boot 的應用程式的功能。DeployerPartitionHandler 和 DeployerStepExecutionHandler 將工作者步驟執行的啟動委託給 Spring Cloud Deployer。
要配置 DeployerStepExecutionHandler,您必須提供一個表示要執行的 Spring Boot uber-jar 的 Resource、一個 TaskLauncherHandler 和一個 JobExplorer。您可以配置任何環境屬性以及一次執行的最大工作者數量、輪詢結果的間隔(預設為 10 秒)和超時(預設為 -1 或無超時)。以下示例展示瞭如何配置此 PartitionHandler
@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher,
JobExplorer jobExplorer) throws Exception {
MavenProperties mavenProperties = new MavenProperties();
mavenProperties.setRemoteRepositories(new HashMap<>(Collections.singletonMap("springRepo",
new MavenProperties.RemoteRepository(repository))));
Resource resource =
MavenResource.parse(String.format("%s:%s:%s",
"io.spring.cloud",
"partitioned-batch-job",
"1.1.0.RELEASE"), mavenProperties);
DeployerPartitionHandler partitionHandler =
new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep");
List<String> commandLineArgs = new ArrayList<>(3);
commandLineArgs.add("--spring.profiles.active=worker");
commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
commandLineArgs.add("--spring.batch.initializer.enabled=false");
partitionHandler.setCommandLineArgsProvider(
new PassThroughCommandLineArgsProvider(commandLineArgs));
partitionHandler.setEnvironmentVariablesProvider(new NoOpEnvironmentVariablesProvider());
partitionHandler.setMaxWorkers(2);
partitionHandler.setApplicationName("PartitionedBatchJobTask");
return partitionHandler;
}
| 將環境變數傳遞給分割槽時,每個分割槽可能位於具有不同環境設定的不同機器上。因此,您應該只傳遞所需的那些環境變數。 |
請注意,在上面的示例中,我們已將最大工作者數量設定為 2。設定最大工作者數量可確定應同時執行的最大分割槽數量。
要執行的 Resource 預計是一個 Spring Boot uber-jar,其中 DeployerStepExecutionHandler 在當前上下文中配置為 CommandLineRunner。前面示例中列舉的倉庫應該是 uber-jar 所在的遠端倉庫。管理器和工作者都應該能夠訪問用作作業倉庫和任務倉庫的相同資料儲存。一旦底層基礎設施引導了 Spring Boot jar 並且 Spring Boot 啟動了 DeployerStepExecutionHandler,步驟處理程式就會執行請求的 Step。以下示例展示瞭如何配置 DeployerStepExecutionHandler
@Bean
public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) {
DeployerStepExecutionHandler handler =
new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository);
return handler;
}
| 您可以在 Spring Cloud Task 專案的 samples 模組中找到一個示例遠端分割槽應用程式,此處。 |
10.1. 非同步啟動遠端批處理分割槽
預設情況下,批處理分割槽是按順序啟動的。但是,在某些情況下,這可能會影響效能,因為每次啟動都會阻塞,直到資源(例如:在 Kubernetes 中提供 Pod)被提供。在這些情況下,您可以為 DeployerPartitionHandler 提供一個 ThreadPoolTaskExecutor。這將根據 ThreadPoolTaskExecutor 的配置啟動遠端批處理分割槽。例如
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setThreadNamePrefix("default_task_executor_thread");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer,
TaskRepository taskRepository, ThreadPoolTaskExecutor executor) throws Exception {
Resource resource = this.resourceLoader
.getResource("maven://io.spring.cloud:partitioned-batch-job:2.2.0.BUILD-SNAPSHOT");
DeployerPartitionHandler partitionHandler =
new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
"workerStep", taskRepository, executor);
...
}
我們需要關閉上下文,因為使用 ThreadPoolTaskExecutor 會使執行緒處於活動狀態,因此應用程式不會終止。要適當關閉應用程式,我們需要將 spring.cloud.task.closecontextEnabled 屬性設定為 true。 |
10.2. 在 Kubernetes 平臺開發批處理分割槽應用程式的注意事項
-
在 Kubernetes 平臺上部署分割槽應用程式時,您必須使用以下依賴項來獲取 Spring Cloud Kubernetes Deployer
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-deployer-kubernetes</artifactId> </dependency> -
任務應用程式及其分割槽的應用程式名稱需要遵循以下正則表示式模式:
[a-z0-9]([-a-z0-9]*[a-z0-9])。否則,將丟擲異常。
11. 批處理資訊訊息
Spring Cloud Task 提供了批處理作業發出資訊訊息的功能。“Spring Batch 事件”部分詳細介紹了此功能。
12. 批處理作業退出程式碼
如 前文 所述,Spring Cloud Task 應用程式支援記錄任務執行的退出程式碼。但是,在您在任務中執行 Spring Batch 作業的情況下,無論 Batch Job Execution 如何完成,使用預設的 Batch/Boot 行為時,任務的結果始終為零。請記住,任務是一個引導應用程式,並且從任務返回的退出程式碼與引導應用程式相同。要覆蓋此行為並允許任務在批處理作業返回 FAILED 的 BatchStatus 時返回非零退出程式碼,請將 spring.cloud.task.batch.fail-on-job-failure 設定為 true。然後退出程式碼可以是 1(預設值)或基於 指定的 ExitCodeGenerator)。
此功能使用了一個新的 ApplicationRunner,它取代了 Spring Boot 提供的那個。預設情況下,它配置為相同的順序。但是,如果您想自定義 ApplicationRunner 執行的順序,可以透過設定 spring.cloud.task.batch.applicationRunnerOrder 屬性來設定其順序。要使任務根據批處理作業執行的結果返回退出程式碼,您需要編寫自己的 CommandLineRunner。
單步批處理作業啟動器
本節將介紹如何使用 Spring Cloud Task 中包含的啟動器,開發一個包含單個 Step 的 Spring Batch Job。此啟動器允許您透過配置定義 ItemReader、ItemWriter 或完整的單步 Spring Batch Job。有關 Spring Batch 及其功能的更多資訊,請參閱 Spring Batch 文件。
要獲取 Maven 啟動器,請將以下內容新增到您的構建中
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-single-step-batch-job</artifactId>
<version>2.3.0</version>
</dependency>
要獲取 Gradle 啟動器,請將以下內容新增到您的構建中
compile "org.springframework.cloud:spring-cloud-starter-single-step-batch-job:2.3.0"
13. 定義作業
您可以使用啟動器來定義最少的 ItemReader 或 ItemWriter,或者定義完整的 Job。在本節中,我們將定義配置 Job 所需的屬性。
13.1. 屬性
首先,啟動器提供了一組屬性,讓您可以配置包含一個 Step 的 Job 的基本資訊
| 財產 | 型別 | 預設值 | 描述 |
|---|---|---|---|
|
|
|
作業名稱。 |
|
|
|
步驟名稱。 |
|
|
|
每個事務處理的項數。 |
配置上述屬性後,您將擁有一個包含單個基於塊的步驟的作業。此基於塊的步驟讀取、處理並寫入 Map<String, Object> 例項作為項。但是,該步驟尚未執行任何操作。您需要配置一個 ItemReader、一個可選的 ItemProcessor 和一個 ItemWriter 以使其執行某些操作。要配置其中之一,您可以使用屬性並配置已提供自動配置的選項之一,或者您可以使用標準的 Spring 配置機制配置您自己的。
如果您配置自己的,則輸入和輸出型別必須與步驟中的其他型別匹配。此啟動器中的 ItemReader 實現和 ItemWriter 實現都使用 Map<String, Object> 作為輸入和輸出項。 |
14. ItemReader 實現的自動配置
此啟動器為四種不同的 ItemReader 實現提供了自動配置:AmqpItemReader、FlatFileItemReader、JdbcCursorItemReader 和 KafkaItemReader。在本節中,我們將概述如何使用提供的自動配置來配置這些實現。
14.1. AmqpItemReader
您可以使用 AmqpItemReader 從 AMQP 佇列或主題讀取資料。此 ItemReader 實現的自動配置取決於兩組配置。首先是 AmqpTemplate 的配置。您可以自己配置此項,也可以使用 Spring Boot 提供的自動配置。請參閱 Spring Boot AMQP 文件。配置 AmqpTemplate 後,您可以透過設定以下屬性來啟用批處理功能以支援它
| 財產 | 型別 | 預設值 | 描述 |
|---|---|---|---|
|
|
|
如果為 |
|
|
|
指示是否應註冊 |
有關更多資訊,請參閱 AmqpItemReader 文件。
14.2. FlatFileItemReader
FlatFileItemReader 允許您從平面檔案(例如 CSV 和其他檔案格式)讀取資料。要從檔案讀取資料,您可以透過正常的 Spring 配置(LineTokenizer、RecordSeparatorPolicy、FieldSetMapper、LineMapper 或 SkippedLinesCallback)自行提供一些元件。您還可以使用以下屬性配置讀取器
| 財產 | 型別 | 預設值 | 描述 |
|---|---|---|---|
|
|
|
確定是否應儲存狀態以進行重新啟動。 |
|
|
|
用於在 |
|
|
|
要從檔案中讀取的最大項數。 |
|
|
0 |
已讀取的項數。在重新啟動時使用。 |
|
|
空列表 |
指示檔案中註釋行(要忽略的行)的字串列表。 |
|
|
|
要讀取的資源。 |
|
|
|
如果設定為 |
|
|
|
讀取檔案時使用的編碼。 |
|
|
0 |
指示在檔案開頭要跳過的行數。 |
|
|
|
指示檔案是否為分隔檔案(CSV 和其他格式)。此屬性或 |
|
|
|
如果讀取分隔檔案,則指示要解析的分隔符。 |
|
|
|
用於確定用於引用值的字元。 |
|
|
空列表 |
用於確定記錄中要包含在項中的欄位的索引列表。 |
|
|
|
指示檔案的記錄是否按列號解析。此屬性或 |
|
|
空列表 |
用於解析固定寬度記錄的列範圍列表。請參閱 Range 文件。 |
|
|
|
從記錄中解析出的每個欄位的名稱列表。這些名稱是此 |
|
|
|
如果設定為 |
14.3. JdbcCursorItemReader
JdbcCursorItemReader 對關係資料庫執行查詢,並迭代結果遊標(ResultSet)以提供結果項。此自動配置允許您提供 PreparedStatementSetter、RowMapper 或兩者。您還可以使用以下屬性配置 JdbcCursorItemReader
| 財產 | 型別 | 預設值 | 描述 |
|---|---|---|---|
|
|
|
確定是否應儲存狀態以進行重新啟動。 |
|
|
|
用於在 |
|
|
|
要從檔案中讀取的最大項數。 |
|
|
0 |
已讀取的項數。在重新啟動時使用。 |
|
|
向驅動程式提供的提示,指示每次呼叫資料庫系統時要檢索的記錄數。為了獲得最佳效能,您通常希望將其設定為與塊大小匹配。 |
|
|
|
要從資料庫讀取的最大項數。 |
|
|
|
查詢超時前的毫秒數。 |
|
|
|
|
確定讀取器在處理時是否應忽略 SQL 警告。 |
|
|
|
指示是否應在每次讀取後驗證遊標位置,以驗證 |
|
|
|
指示驅動程式是否支援遊標的絕對定位。 |
|
|
|
指示連線是否與其他處理共享(因此是事務的一部分)。 |
|
|
|
要讀取的 SQL 查詢。 |
您還可以使用以下屬性專門為讀取器指定 JDBC DataSource:。JdbcCursorItemReader 屬性
| 財產 | 型別 | 預設值 | 描述 |
|---|---|---|---|
|
|
|
確定是否應啟用 |
|
|
|
資料庫的 JDBC URL。 |
|
|
|
資料庫的登入使用者名稱。 |
|
|
|
資料庫的登入密碼。 |
|
|
|
JDBC 驅動程式的完全限定名稱。 |
如果未指定 jdbccursoritemreader_datasource,則 JDBCCursorItemReader 將使用預設的 DataSource。 |
14.4. KafkaItemReader
從 Kafka 主題攝取分割槽資料非常有用,這正是 KafkaItemReader 可以做到的。要配置 KafkaItemReader,需要兩部分配置。首先,需要使用 Spring Boot 的 Kafka 自動配置來配置 Kafka(請參閱 Spring Boot Kafka 文件)。配置 Spring Boot 的 Kafka 屬性後,您可以透過設定以下屬性來配置 KafkaItemReader 本身
| 財產 | 型別 | 預設值 | 描述 |
|---|---|---|---|
|
|
|
用於在 |
|
|
|
要讀取的主題名稱。 |
|
|
空列表 |
要讀取的分割槽索引列表。 |
|
|
30 |
|
|
|
|
確定是否應儲存狀態以進行重新啟動。 |
請參閱 KafkaItemReader 文件。
14.5. 本機編譯
單步批處理的優勢在於,當您使用 JVM 時,它允許您在執行時動態選擇要使用的讀取器和寫入器 Bean。但是,當您使用原生編譯時,必須在構建時而不是在執行時確定讀取器和寫入器。以下示例演示瞭如何實現這一點
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<id>process-aot</id>
<goals>
<goal>process-aot</goal>
</goals>
<configuration>
<jvmArguments>
-Dspring.batch.job.flatfileitemreader.name=fooReader
-Dspring.batch.job.flatfileitemwriter.name=fooWriter
</jvmArguments>
</configuration>
</execution>
</executions>
</plugin>
15. ItemProcessor 配置
如果 ApplicationContext 中有一個 ItemProcessor 可用,則單步批處理作業自動配置將接受該 ItemProcessor。如果找到正確型別(ItemProcessor<Map<String, Object>, Map<String, Object>>)的處理器,它將被自動注入到步驟中。
16. ItemWriter 實現的自動配置
此啟動器為與支援的 ItemReader 實現相匹配的 ItemWriter 實現提供自動配置:AmqpItemWriter、FlatFileItemWriter、JdbcItemWriter 和 KafkaItemWriter。本節介紹如何使用自動配置來配置受支援的 ItemWriter。
16.1. AmqpItemWriter
要寫入 RabbitMQ 佇列,您需要兩組配置。首先,您需要一個 AmqpTemplate。最簡單的方法是使用 Spring Boot 的 RabbitMQ 自動配置。請參閱 Spring Boot AMQP 文件。
配置 AmqpTemplate 後,您可以透過設定以下屬性來配置 AmqpItemWriter
| 財產 | 型別 | 預設值 | 描述 |
|---|---|---|---|
|
|
|
如果為 |
|
|
|
指示是否應註冊 |
16.2. FlatFileItemWriter
要將檔案作為步驟的輸出寫入,您可以配置 FlatFileItemWriter。自動配置接受已明確配置的元件(例如 LineAggregator、FieldExtractor、FlatFileHeaderCallback 或 FlatFileFooterCallback)以及透過設定以下指定屬性配置的元件
| 財產 | 型別 | 預設值 | 描述 |
|---|---|---|---|
|
|
|
要讀取的資源。 |
|
|
|
指示輸出檔案是否為分隔檔案。如果為 |
|
|
|
指示輸出檔案是否為格式化檔案。如果為 |
|
|
|
用於為格式化檔案生成輸出的格式。格式化透過 |
|
|
|
生成檔案時使用的 |
|
|
0 |
記錄的最大長度。如果為 0,則大小無限制。 |
|
|
0 |
最小記錄長度。 |
|
|
|
用於分隔分隔檔案中欄位的 |
|
|
|
寫入檔案時使用的編碼。 |
|
|
|
指示檔案在重新整理時是否應強制同步到磁碟。 |
|
|
|
從記錄中解析出的每個欄位的名稱列表。這些名稱是此 |
|
|
|
指示如果找到輸出檔案,是否應追加到檔案。 |
|
|
|
用於在輸出檔案中分隔行的 |
|
|
|
用於在 |
|
|
|
確定是否應儲存狀態以進行重新啟動。 |
|
|
|
如果設定為 |
|
|
|
如果設定為 |
|
|
|
指示讀取器是否為事務佇列(指示讀取的項在失敗時返回到佇列)。 |
16.3. JdbcBatchItemWriter
要將步驟的輸出寫入關係資料庫,此啟動器提供了自動配置 JdbcBatchItemWriter 的能力。自動配置允許您透過設定以下屬性提供您自己的 ItemPreparedStatementSetter 或 ItemSqlParameterSourceProvider 以及配置選項
| 財產 | 型別 | 預設值 | 描述 |
|---|---|---|---|
|
|
|
用於在 |
|
|
|
用於插入每個項的 SQL。 |
|
|
|
是否驗證每次插入是否至少更新一條記錄。 |
您還可以使用以下屬性專門為寫入器指定 JDBC DataSource:。JdbcBatchItemWriter 屬性
| 財產 | 型別 | 預設值 | 描述 |
|---|---|---|---|
|
|
|
確定是否應啟用 |
|
|
|
資料庫的 JDBC URL。 |
|
|
|
資料庫的登入使用者名稱。 |
|
|
|
資料庫的登入密碼。 |
|
|
|
JDBC 驅動程式的完全限定名稱。 |
如果未指定 jdbcbatchitemwriter_datasource,則 JdbcBatchItemWriter 將使用預設的 DataSource。 |
16.4. KafkaItemWriter
要將步驟輸出寫入 Kafka 主題,您需要 KafkaItemWriter。此啟動器透過使用兩個來源的功能為 KafkaItemWriter 提供自動配置。首先是 Spring Boot 的 Kafka 自動配置。(請參閱 Spring Boot Kafka 文件。)其次,此啟動器允許您配置寫入器上的兩個屬性。
| 財產 | 型別 | 預設值 | 描述 |
|---|---|---|---|
|
|
|
要寫入的 Kafka 主題。 |
|
|
|
傳遞給寫入器的所有項是否都作為刪除事件傳送到主題。 |
有關 KafkaItemWriter 的配置選項的更多資訊,請參閱 KafkaItemWiter 文件。
16.5. Spring AOT
當您將 Spring AOT 與單步批處理啟動器一起使用時,您必須在編譯時設定讀取器和寫入器名稱屬性(除非您為讀取器和/或寫入器建立 Bean)。為此,您必須在 Maven 外掛或 Gradle 外掛的啟動引數或環境變數中包含您希望使用的讀取器和寫入器名稱。例如,如果您希望在 Maven 中啟用 FlatFileItemReader 和 FlatFileItemWriter,它將如下所示
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<id>process-aot</id>
<goals>
<goal>process-aot</goal>
</goals>
</execution>
</executions>
<configuration>
<arguments>
<argument>--spring.batch.job.flatfileitemreader.name=foobar</argument>
<argument>--spring.batch.job.flatfileitemwriter.name=fooWriter</argument>
</arguments>
</configuration>
</plugin>
Spring Cloud Stream 整合
任務本身可能很有用,但將任務整合到更大的生態系統中可以使其用於更復雜的處理和編排。本節涵蓋 Spring Cloud Task 與 Spring Cloud Stream 的整合選項。
17. 從 Spring Cloud Stream 啟動任務
您可以從流中啟動任務。為此,建立一個偵聽包含 TaskLaunchRequest 作為其有效負載的訊息的接收器。TaskLaunchRequest 包含
-
uri: 要執行的任務工件的 URI。 -
applicationName: 與任務關聯的名稱。如果未設定 applicationName,TaskLaunchRequest將生成一個由以下內容組成的任務名稱:Task-<UUID>。 -
commandLineArguments: 包含任務命令列引數的列表。 -
environmentProperties: 包含任務要使用的環境變數的對映。 -
deploymentProperties: 包含部署器用於部署任務的屬性的對映。
| 如果有效負載是不同型別,則接收器會丟擲異常。 |
例如,可以建立一個流,其中包含一個處理器,該處理器從 HTTP 源獲取資料並建立包含 TaskLaunchRequest 的 GenericMessage,並將訊息傳送到其輸出通道。然後任務接收器將從其輸入通道接收訊息並啟動任務。
要建立任務接收器,您只需要建立一個包含 EnableTaskLauncher 註解的 Spring Boot 應用程式,如下例所示
@SpringBootApplication
@EnableTaskLauncher
public class TaskSinkApplication {
public static void main(String[] args) {
SpringApplication.run(TaskSinkApplication.class, args);
}
}
Spring Cloud Task 專案的 示例模組 包含一個示例 Sink 和 Processor。要將這些示例安裝到您的本地 maven 倉庫中,請從 spring-cloud-task-samples 目錄執行 maven 構建,並將 skipInstall 屬性設定為 false,如下例所示
mvn clean install
maven.remoteRepositories.springRepo.url 屬性必須設定為 uber-jar 所在的遠端倉庫的位置。如果未設定,則沒有遠端倉庫,因此它只依賴於本地倉庫。 |
17.1. Spring Cloud Data Flow
要在 Spring Cloud Data Flow 中建立流,您必須首先註冊我們建立的任務接收器應用程式。在以下示例中,我們使用 Spring Cloud Data Flow shell 註冊 Processor 和 Sink 示例應用程式
app register --name taskSink --type sink --uri maven://io.spring.cloud:tasksink:<version>
app register --name taskProcessor --type processor --uri maven:io.spring.cloud:taskprocessor:<version>
以下示例展示瞭如何從 Spring Cloud Data Flow shell 建立流
stream create foo --definition "http --server.port=9000|taskProcessor|taskSink" --deploy
18. Spring Cloud Task 事件
Spring Cloud Task 提供了在任務透過 Spring Cloud Stream 通道執行時,透過 Spring Cloud Stream 通道發出事件的能力。任務監聽器用於在名為 task-events 的訊息通道上釋出 TaskExecution。此功能會自動裝配到任何具有 spring-cloud-stream、spring-cloud-stream-<binder> 並在其類路徑上定義了任務的任務中。
要停用事件發射監聽器,請將 spring.cloud.task.events.enabled 屬性設定為 false。 |
透過定義適當的類路徑,以下任務會在 task-events 通道(在任務開始和結束時)發出 TaskExecution 作為事件
@SpringBootApplication
public class TaskEventsApplication {
public static void main(String[] args) {
SpringApplication.run(TaskEventsApplication.class, args);
}
@Configuration
public static class TaskConfiguration {
@Bean
public ApplicationRunner applicationRunner() {
return new ApplicationRunner() {
@Override
public void run(ApplicationArguments args) {
System.out.println("The ApplicationRunner was executed");
}
};
}
}
}
| 還需要在類路徑中包含一個繫結器實現。 |
| Spring Cloud Task 專案的 samples 模組中提供了任務事件示例應用程式,點選此處檢視。 |
18.1. 停用特定任務事件
要停用任務事件,可以將 spring.cloud.task.events.enabled 屬性設定為 false。
19. Spring Batch 事件
當透過任務執行 Spring Batch 作業時,Spring Cloud Task 可以配置為根據 Spring Batch 中可用的 Spring Batch 監聽器發出資訊性訊息。具體來說,當透過 Spring Cloud Task 執行時,以下 Spring Batch 監聽器會自動配置到每個批處理作業中,並在關聯的 Spring Cloud Stream 通道上發出訊息
-
JobExecutionListener監聽job-execution-events -
StepExecutionListener監聽step-execution-events -
ChunkListener監聽chunk-events -
ItemReadListener監聽item-read-events -
ItemProcessListener監聽item-process-events -
ItemWriteListener監聽item-write-events -
SkipListener監聽skip-events
當上下文中存在適當的 Bean(一個 Job 和一個 TaskLifecycleListener)時,這些監聽器會自動配置到任何 AbstractJob 中。監聽這些事件的配置方式與繫結到任何其他 Spring Cloud Stream 通道的方式相同。我們的任務(執行批處理作業的任務)充當 Source,而監聽應用程式充當 Processor 或 Sink。
一個示例可以是有一個應用程式監聽 job-execution-events 通道以獲取作業的開始和停止。要配置監聽應用程式,您可以將輸入配置為 job-execution-events,如下所示
spring.cloud.stream.bindings.input.destination=job-execution-events
| 還需要在類路徑中包含一個繫結器實現。 |
| Spring Cloud Task 專案的 samples 模組中提供了批處理事件示例應用程式,點選此處檢視。 |
19.1. 將批處理事件傳送到不同通道
Spring Cloud Task 為批處理事件提供的選項之一是能夠更改特定監聽器可以發出訊息的通道。為此,請使用以下配置:spring.cloud.stream.bindings.<the channel>.destination=<new destination>。例如,如果 StepExecutionListener 需要將其訊息發出到名為 my-step-execution-events 的另一個通道,而不是預設的 step-execution-events,您可以新增以下配置
spring.cloud.task.batch.events.step-execution-events-binding-name=my-step-execution-events
19.2. 停用批處理事件
要停用所有批處理事件的監聽器功能,請使用以下配置
spring.cloud.task.batch.events.enabled=false
要停用特定的批處理事件,請使用以下配置
spring.cloud.task.batch.events.<batch event listener>.enabled=false:
以下列表顯示了您可以停用的各個監聽器
spring.cloud.task.batch.events.job-execution.enabled=false
spring.cloud.task.batch.events.step-execution.enabled=false
spring.cloud.task.batch.events.chunk.enabled=false
spring.cloud.task.batch.events.item-read.enabled=false
spring.cloud.task.batch.events.item-process.enabled=false
spring.cloud.task.batch.events.item-write.enabled=false
spring.cloud.task.batch.events.skip.enabled=false
19.3. 批處理事件的發出順序
預設情況下,批處理事件具有 Ordered.LOWEST_PRECEDENCE。要更改此值(例如,更改為 5),請使用以下配置
spring.cloud.task.batch.events.job-execution-order=5
spring.cloud.task.batch.events.step-execution-order=5
spring.cloud.task.batch.events.chunk-order=5
spring.cloud.task.batch.events.item-read-order=5
spring.cloud.task.batch.events.item-process-order=5
spring.cloud.task.batch.events.item-write-order=5
spring.cloud.task.batch.events.skip-order=5
附錄
20. 任務倉庫模式
本附錄提供了任務倉庫中使用的資料庫模式的 ERD。
20.1. 表資訊
儲存任務執行資訊。
| 列名 | 必需 | 型別 | 欄位長度 | 備註 |
|---|---|---|---|---|
TASK_EXECUTION_ID |
TRUE |
BIGINT |
X |
Spring Cloud Task 框架在應用程式啟動時,從 |
START_TIME |
FALSE |
DATETIME(6) |
X |
Spring Cloud Task 框架在應用程式啟動時設定該值。 |
END_TIME |
FALSE |
DATETIME(6) |
X |
Spring Cloud Task 框架在應用程式退出時設定該值。 |
TASK_NAME |
FALSE |
VARCHAR |
100 |
除非使用者使用 |
EXIT_CODE |
FALSE |
INTEGER |
X |
遵循 Spring Boot 預設值,除非使用者根據此處討論的進行覆蓋。 |
EXIT_MESSAGE |
FALSE |
VARCHAR |
2500 |
使用者自定義,如此處討論。 |
ERROR_MESSAGE |
FALSE |
VARCHAR |
2500 |
Spring Cloud Task 框架在應用程式退出時設定該值。 |
LAST_UPDATED |
TRUE |
TIMESTAMP |
X |
Spring Cloud Task 框架在應用程式啟動時設定該值。如果記錄是在任務外部建立的,則必須在記錄建立時填充該值。 |
EXTERNAL_EXECUTION_ID |
FALSE |
VARCHAR |
250 |
如果設定了 |
PARENT_TASK_EXECUTION_ID |
FALSE |
BIGINT |
X |
如果設定了 |
儲存任務執行使用的引數
| 列名 | 必需 | 型別 | 欄位長度 |
|---|---|---|---|
TASK_EXECUTION_ID |
TRUE |
BIGINT |
X |
TASK_PARAM |
FALSE |
VARCHAR |
2500 |
用於將任務執行連結到批處理執行。
| 列名 | 必需 | 型別 | 欄位長度 |
|---|---|---|---|
TASK_EXECUTION_ID |
TRUE |
BIGINT |
X |
JOB_EXECUTION_ID |
TRUE |
BIGINT |
X |
用於 此處 討論的 single-instance-enabled 功能。
| 列名 | 必需 | 型別 | 欄位長度 | 備註 |
|---|---|---|---|---|
LOCK_KEY |
TRUE |
CHAR |
36 |
此鎖的 UUID |
REGION |
TRUE |
VARCHAR |
100 |
使用者可以使用此欄位建立一組鎖。 |
CLIENT_ID |
TRUE |
CHAR |
36 |
包含要鎖定應用程式名稱的任務執行 ID。 |
CREATED_DATE |
TRUE |
DATETIME |
X |
條目建立日期 |
| 每種資料庫型別設定表的 DDL 可以在此處找到。 |
20.2. SQL Server
預設情況下,Spring Cloud Task 使用序列表來確定 TASK_EXECUTION 表的 TASK_EXECUTION_ID。但是,當在 SQL Server 上同時啟動多個任務時,這可能會導致 TASK_SEQ 表死鎖。解決方案是刪除 TASK_EXECUTION_SEQ 表並使用相同的名稱建立一個序列。例如:
DROP TABLE TASK_SEQ;
CREATE SEQUENCE [DBO].[TASK_SEQ] AS BIGINT
START WITH 1
INCREMENT BY 1;
將 START WITH 設定為高於當前執行 ID 的值。 |
21. 構建此文件
本專案使用 Maven 生成此文件。要自行生成,請執行以下命令:$ mvn clean install -DskipTests -P docs。
22. 可觀察性元資料
22.1. 可觀察性 - 指標
以下是本專案宣告的所有指標列表。
22.1.1. 任務活躍
圍繞任務執行建立的指標。
指標名稱 spring.cloud.task(由約定類 org.springframework.cloud.task.listener.DefaultTaskExecutionObservationConvention 定義)。 型別 timer。
指標名稱 spring.cloud.task.active(由約定類 org.springframework.cloud.task.listener.DefaultTaskExecutionObservationConvention 定義)。 型別 long task timer。
| 在啟動觀測後新增的鍵值可能會從 *.active 指標中缺失。 |
Micrometer 內部使用 納秒 作為基本單位。但是,每個後端確定實際的基本單位。(即 Prometheus 使用秒) |
封閉類的完全限定名稱 org.springframework.cloud.task.listener.TaskExecutionObservation。
所有標籤必須以 spring.cloud.task 字首開頭! |
名稱 |
描述 |
|
CF 雲的應用程式 ID。 |
|
CF 雲的應用程式名稱。 |
|
CF 雲的應用程式版本。 |
|
CF 雲的例項索引。 |
|
CF 雲的組織名稱。 |
|
CF 雲的空間 ID。 |
|
CF 雲的空間名稱。 |
|
任務執行 ID。 |
|
任務退出程式碼。 |
|
任務的外部執行 ID。 |
|
任務名稱測量。 |
|
任務父級執行 ID。 |
|
任務狀態。可以是成功或失敗。 |
22.1.2. 任務執行器觀察
任務執行器執行時建立的觀測。
指標名稱 spring.cloud.task.runner(由約定類 org.springframework.cloud.task.configuration.observation.DefaultTaskObservationConvention 定義)。 型別 timer。
指標名稱 spring.cloud.task.runner.active(由約定類 org.springframework.cloud.task.configuration.observation.DefaultTaskObservationConvention 定義)。 型別 long task timer。
| 在啟動觀測後新增的鍵值可能會從 *.active 指標中缺失。 |
Micrometer 內部使用 納秒 作為基本單位。但是,每個後端確定實際的基本單位。(即 Prometheus 使用秒) |
封閉類的完全限定名稱 org.springframework.cloud.task.configuration.observation.TaskDocumentedObservation。
所有標籤必須以 spring.cloud.task 字首開頭! |
名稱 |
描述 |
|
由 Spring Cloud Task 執行的 bean 的名稱。 |
22.2. 可觀察性 - 跨度
以下是本專案宣告的所有 Span 列表。
22.2.1. 任務活躍跨度
圍繞任務執行建立的指標。
跨度名稱 spring.cloud.task(由約定類 org.springframework.cloud.task.listener.DefaultTaskExecutionObservationConvention 定義)。
封閉類的完全限定名稱 org.springframework.cloud.task.listener.TaskExecutionObservation。
所有標籤必須以 spring.cloud.task 字首開頭! |
名稱 |
描述 |
|
CF 雲的應用程式 ID。 |
|
CF 雲的應用程式名稱。 |
|
CF 雲的應用程式版本。 |
|
CF 雲的例項索引。 |
|
CF 雲的組織名稱。 |
|
CF 雲的空間 ID。 |
|
CF 雲的空間名稱。 |
|
任務執行 ID。 |
|
任務退出程式碼。 |
|
任務的外部執行 ID。 |
|
任務名稱測量。 |
|
任務父級執行 ID。 |
|
任務狀態。可以是成功或失敗。 |
22.2.2. 任務執行器觀察跨度
任務執行器執行時建立的觀測。
跨度名稱 spring.cloud.task.runner(由約定類 org.springframework.cloud.task.configuration.observation.DefaultTaskObservationConvention 定義)。
封閉類的完全限定名稱 org.springframework.cloud.task.configuration.observation.TaskDocumentedObservation。
所有標籤必須以 spring.cloud.task 字首開頭! |
名稱 |
描述 |
|
由 Spring Cloud Task 執行的 bean 的名稱。 |