單步批處理作業啟動器

本節詳細介紹如何使用 Spring Cloud Task 中包含的啟動器開發一個包含單個 Step 的 Spring Batch Job。此啟動器允許您使用配置來定義一個 ItemReader、一個 ItemWriter 或一個完整的單步 Spring Batch Job。有關 Spring Batch 及其功能的更多資訊,請參閱 Spring Batch 文件

要獲取 Maven 啟動器,請將以下內容新增到您的構建中

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-single-step-batch-job</artifactId>
    <version>2.3.0</version>
</dependency>

要獲取 Gradle 啟動器,請將以下內容新增到您的構建中

compile "org.springframework.cloud:spring-cloud-starter-single-step-batch-job:2.3.0"

定義 Job

您可以使用該啟動器定義最小的 ItemReaderItemWriter,或者定義完整的 Job。在本節中,我們將定義配置 Job 所需的屬性。

屬性

首先,此啟動器提供了一組屬性,允許您配置包含一個 Step 的 Job 的基本資訊

表 1. Job 屬性
屬性 型別 預設值 描述

spring.batch.job.jobName

String

null

Job 的名稱。

spring.batch.job.stepName

String

null

Step 的名稱。

spring.batch.job.chunkSize

Integer

null

每個事務要處理的項數。

配置上述屬性後,您將獲得一個包含單個、基於塊的 Step 的 Job。這個基於塊的 Step 會讀取、處理和寫入 Map<String, Object> 例項作為項。但是,該 Step 尚未執行任何操作。您需要配置一個 ItemReader、一個可選的 ItemProcessor 和一個 ItemWriter 來使其執行某些操作。要配置其中之一,您可以使用屬性並配置提供了自動配置的選項之一,或者您可以使用標準 Spring 配置機制配置自己的實現。

如果您自行配置,輸入和輸出型別必須與 Step 中的其他部分匹配。此啟動器中的 ItemReader 實現和 ItemWriter 實現都使用 Map<String, Object> 作為輸入和輸出項。

ItemReader 實現的自動配置

此啟動器為四種不同的 ItemReader 實現提供了自動配置:AmqpItemReaderFlatFileItemReaderJdbcCursorItemReaderKafkaItemReader。本節概述瞭如何使用提供的自動配置來配置其中每種實現。

AmqpItemReader

您可以使用 AmqpItemReader 從 AMQP 佇列或主題讀取資料。此 ItemReader 實現的自動配置取決於兩組配置。第一組是 AmqpTemplate 的配置。您可以自行配置,也可以使用 Spring Boot 提供的自動配置。請參閱 Spring Boot AMQP 文件。配置 AmqpTemplate 後,您可以透過設定以下屬性來啟用批處理功能以支援它

表 2. AmqpItemReader 屬性
屬性 型別 預設值 描述

spring.batch.job.amqpitemreader.enabled

boolean

false

如果為 true,將執行自動配置。

spring.batch.job.amqpitemreader.jsonConverterEnabled

boolean

true

指示是否應註冊 Jackson2JsonMessageConverter 來解析訊息。

有關更多資訊,請參閱 AmqpItemReader 文件

FlatFileItemReader

FlatFileItemReader 允許您從平面檔案(例如 CSV 和其他檔案格式)讀取資料。要從檔案讀取,您可以透過正常的 Spring 配置自行提供一些元件(LineTokenizerRecordSeparatorPolicyFieldSetMapperLineMapperSkippedLinesCallback)。您還可以使用以下屬性來配置讀取器

表 3. FlatFileItemReader 屬性
屬性 型別 預設值 描述

spring.batch.job.flatfileitemreader.saveState

boolean

true

確定是否應為重啟儲存狀態。

spring.batch.job.flatfileitemreader.name

String

null

用於在 ExecutionContext 中提供唯一鍵的名稱。

spring.batch.job.flatfileitemreader.maxItemcount

int

Integer.MAX_VALUE

從檔案讀取的最大項數。

spring.batch.job.flatfileitemreader.currentItemCount

int

0

已讀取的項數。用於重啟。

spring.batch.job.flatfileitemreader.comments

List<String>

空列表

表示檔案中註釋行(將被忽略的行)的 String 列表。

spring.batch.job.flatfileitemreader.resource

Resource

null

要讀取的資源。

spring.batch.job.flatfileitemreader.strict

boolean

true

如果設定為 true,則在找不到資源時讀取器會丟擲異常。

spring.batch.job.flatfileitemreader.encoding

String

FlatFileItemReader.DEFAULT_CHARSET

讀取檔案時使用的編碼。

spring.batch.job.flatfileitemreader.linesToSkip

int

0

指示在檔案開頭要跳過的行數。

spring.batch.job.flatfileitemreader.delimited

boolean

false

指示檔案是否為分隔符檔案(CSV 和其他格式)。此屬性或 spring.batch.job.flatfileitemreader.fixedLength 只能有一個同時為 true

spring.batch.job.flatfileitemreader.delimiter

String

DelimitedLineTokenizer.DELIMITER_COMMA

如果讀取分隔符檔案,指示用於解析的分隔符。

spring.batch.job.flatfileitemreader.quoteCharacter

char

DelimitedLineTokenizer.DEFAULT_QUOTE_CHARACTER

用於確定用於引用值的字元。

spring.batch.job.flatfileitemreader.includedFields

List<Integer>

空列表

一個索引列表,用於確定記錄中要包含在項中的欄位。

spring.batch.job.flatfileitemreader.fixedLength

boolean

false

指示檔案的記錄是否按列號解析。此屬性或 spring.batch.job.flatfileitemreader.delimited 只能有一個同時為 true

spring.batch.job.flatfileitemreader.ranges

List<Range>

空列表

用於解析固定寬度記錄的列範圍列表。請參閱 Range 文件

spring.batch.job.flatfileitemreader.names

String []

null

從記錄解析出的每個欄位的名稱列表。這些名稱是此 ItemReader 返回的項中 Map<String, Object> 中的鍵。

spring.batch.job.flatfileitemreader.parsingStrict

boolean

true

如果設定為 true,則如果欄位無法對映,對映將失敗。

JdbcCursorItemReader

JdbcCursorItemReader 對關係資料庫執行查詢,並遍歷結果遊標(ResultSet)以提供結果項。此自動配置允許您提供一個 PreparedStatementSetter、一個 RowMapper 或兩者。您還可以使用以下屬性來配置 JdbcCursorItemReader

表 4. JdbcCursorItemReader 屬性
屬性 型別 預設值 描述

spring.batch.job.jdbccursoritemreader.saveState

boolean

true

確定是否應為重啟儲存狀態。

spring.batch.job.jdbccursoritemreader.name

String

null

用於在 ExecutionContext 中提供唯一鍵的名稱。

spring.batch.job.jdbccursoritemreader.maxItemcount

int

Integer.MAX_VALUE

從檔案讀取的最大項數。

spring.batch.job.jdbccursoritemreader.currentItemCount

int

0

已讀取的項數。用於重啟。

spring.batch.job.jdbccursoritemreader.fetchSize

int

給驅動程式的提示,指示每次呼叫資料庫系統時要檢索多少條記錄。為了獲得最佳效能,通常希望將其設定為與塊大小匹配。

spring.batch.job.jdbccursoritemreader.maxRows

int

從資料庫讀取的最大項數。

spring.batch.job.jdbccursoritemreader.queryTimeout

int

查詢超時的時間(毫秒)。

spring.batch.job.jdbccursoritemreader.ignoreWarnings

boolean

true

確定讀取器在處理時是否應忽略 SQL 警告。

spring.batch.job.jdbccursoritemreader.verifyCursorPosition

boolean

true

指示每次讀取後是否應驗證遊標位置,以確認 RowMapper 未移動遊標。

spring.batch.job.jdbccursoritemreader.driverSupportsAbsolute

boolean

false

指示驅動程式是否支援遊標的絕對定位。

spring.batch.job.jdbccursoritemreader.useSharedExtendedConnection

boolean

false

指示連線是否與其他處理共享(因此是事務的一部分)。

spring.batch.job.jdbccursoritemreader.sql

String

null

用於讀取的 SQL 查詢。

您還可以使用以下屬性專門為讀取器指定 JDBC DataSource:.`JdbcCursorItemReader` 屬性

屬性 型別 預設值 描述

spring.batch.job.jdbccursoritemreader.datasource.enable

boolean

false

確定是否應啟用 JdbcCursorItemReaderDataSource

jdbccursoritemreader.datasource.url

String

null

資料庫的 JDBC URL。

jdbccursoritemreader.datasource.username

String

null

資料庫的登入使用者名稱。

jdbccursoritemreader.datasource.password

String

null

資料庫的登入密碼。

jdbccursoritemreader.datasource.driver-class-name

String

null

JDBC 驅動程式的完全限定名。

如果未指定 jdbccursoritemreader_datasourceJDBCCursorItemReader 將使用預設的 DataSource

KafkaItemReader

從 Kafka 主題攝取分割槽資料非常有用,而這正是 KafkaItemReader 可以做到的。要配置 KafkaItemReader,需要兩部分配置。首先,需要使用 Spring Boot 的 Kafka 自動配置來配置 Kafka(請參閱 Spring Boot Kafka 文件)。配置 Spring Boot 中的 Kafka 屬性後,您可以透過設定以下屬性來配置 KafkaItemReader 本身

表 5. KafkaItemReader 屬性
屬性 型別 預設值 描述

spring.batch.job.kafkaitemreader.name

String

null

用於在 ExecutionContext 中提供唯一鍵的名稱。

spring.batch.job.kafkaitemreader.topic

String

null

要從中讀取的主題名稱。

spring.batch.job.kafkaitemreader.partitions

List<Integer>

空列表

要從中讀取的分割槽索引列表。

spring.batch.job.kafkaitemreader.pollTimeOutInSeconds

long

30

poll() 操作的超時時間。

spring.batch.job.kafkaitemreader.saveState

boolean

true

確定是否應為重啟儲存狀態。

請參閱 KafkaItemReader 文件

本機編譯

單步批處理的優勢在於,當您使用 JVM 時,它允許您在執行時動態選擇要使用的讀取器和寫入器 Bean。但是,當您使用本機編譯時,必須在構建時而不是執行時確定讀取器和寫入器。以下示例演示瞭如何做到這一點

<plugin>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-maven-plugin</artifactId>
    <executions>
        <execution>
            <id>process-aot</id>
            <goals>
                <goal>process-aot</goal>
            </goals>
            <configuration>
                <jvmArguments>
                    -Dspring.batch.job.flatfileitemreader.name=fooReader
                    -Dspring.batch.job.flatfileitemwriter.name=fooWriter
                </jvmArguments>
            </configuration>
        </execution>
    </executions>
</plugin>

ItemProcessor 配置

單步批處理作業自動配置會接受一個 ItemProcessor(如果在 ApplicationContext 中可用)。如果找到型別正確的 ItemProcessorItemProcessor<Map<String, Object>, Map<String, Object>>),它將被自動注入到 Step 中。

ItemWriter 實現的自動配置

此啟動器為與受支援的 ItemReader 實現匹配的 ItemWriter 實現提供了自動配置:AmqpItemWriterFlatFileItemWriterJdbcItemWriterKafkaItemWriter。本節介紹瞭如何使用自動配置來配置受支援的 ItemWriter

AmqpItemWriter

要寫入 RabbitMQ 佇列,您需要兩組配置。首先,您需要一個 AmqpTemplate。最簡單的方法是使用 Spring Boot 的 RabbitMQ 自動配置。請參閱 Spring Boot AMQP 文件

配置 AmqpTemplate 後,您可以透過設定以下屬性來配置 AmqpItemWriter

表 6. AmqpItemWriter 屬性
屬性 型別 預設值 描述

spring.batch.job.amqpitemwriter.enabled

boolean

false

如果為 true,將執行自動配置。

spring.batch.job.amqpitemwriter.jsonConverterEnabled

boolean

true

指示是否應註冊 Jackson2JsonMessageConverter 來轉換訊息。

FlatFileItemWriter

要將檔案作為 Step 的輸出寫入,您可以配置 FlatFileItemWriter。自動配置接受已顯式配置的元件(例如 LineAggregatorFieldExtractorFlatFileHeaderCallbackFlatFileFooterCallback)以及透過設定以下指定屬性配置的元件

表 7. FlatFileItemWriter 屬性
屬性 型別 預設值 描述

spring.batch.job.flatfileitemwriter.resource

Resource

null

要讀取的資源。

spring.batch.job.flatfileitemwriter.delimited

boolean

false

指示輸出檔案是否為分隔符檔案。如果為 true,則 spring.batch.job.flatfileitemwriter.formatted 必須為 false

spring.batch.job.flatfileitemwriter.formatted

boolean

false

指示輸出檔案是否為格式化檔案。如果為 true,則 spring.batch.job.flatfileitemwriter.delimited 必須為 false

spring.batch.job.flatfileitemwriter.format

String

null

用於生成格式化檔案輸出的格式。格式化透過使用 String.format 完成。

spring.batch.job.flatfileitemwriter.locale

Locale

Locale.getDefault()

生成檔案時使用的 Locale

spring.batch.job.flatfileitemwriter.maximumLength

int

0

記錄的最大長度。如果為 0,則大小不受限制。

spring.batch.job.flatfileitemwriter.minimumLength

int

0

最小記錄長度。

spring.batch.job.flatfileitemwriter.delimiter

String

,

用於分隔分隔符檔案中欄位的 String

spring.batch.job.flatfileitemwriter.encoding

String

FlatFileItemReader.DEFAULT_CHARSET

寫入檔案時使用的編碼。

spring.batch.job.flatfileitemwriter.forceSync

boolean

false

指示在重新整理時檔案是否應強制同步到磁碟。

spring.batch.job.flatfileitemwriter.names

String []

null

從記錄解析出的每個欄位的名稱列表。這些名稱是此 ItemWriter 接收的項中 Map<String, Object> 的鍵。

spring.batch.job.flatfileitemwriter.append

boolean

false

指示如果找到輸出檔案,是否應追加寫入。

spring.batch.job.flatfileitemwriter.lineSeparator

String

FlatFileItemWriter.DEFAULT_LINE_SEPARATOR

用於分隔輸出檔案中行的 String

spring.batch.job.flatfileitemwriter.name

String

null

用於在 ExecutionContext 中提供唯一鍵的名稱。

spring.batch.job.flatfileitemwriter.saveState

boolean

true

確定是否應為重啟儲存狀態。

spring.batch.job.flatfileitemwriter.shouldDeleteIfEmpty

boolean

false

如果設定為 true,則在 Job 完成時刪除空檔案(沒有輸出)。

spring.batch.job.flatfileitemwriter.shouldDeleteIfExists

boolean

true

如果設定為 true 並且在應有的位置找到輸出檔案,則在 Step 開始之前將其刪除。

spring.batch.job.flatfileitemwriter.transactional

boolean

FlatFileItemWriter.DEFAULT_TRANSACTIONAL

指示讀取器是否是事務性佇列(表示讀取的項在失敗時會返回到佇列)。

JdbcBatchItemWriter

要將 Step 的輸出寫入關係資料庫,此啟動器提供了自動配置 JdbcBatchItemWriter 的能力。自動配置允許您提供自己的 ItemPreparedStatementSetterItemSqlParameterSourceProvider,並透過設定以下屬性來配置選項

表 8. JdbcBatchItemWriter 屬性
屬性 型別 預設值 描述

spring.batch.job.jdbcbatchitemwriter.name

String

null

用於在 ExecutionContext 中提供唯一鍵的名稱。

spring.batch.job.jdbcbatchitemwriter.sql

String

null

用於插入每個項的 SQL。

spring.batch.job.jdbcbatchitemwriter.assertUpdates

boolean

true

是否驗證每次插入都會導致至少一條記錄被更新。

您還可以使用以下屬性專門為寫入器指定 JDBC DataSource:.`JdbcBatchItemWriter` 屬性

屬性 型別 預設值 描述

spring.batch.job.jdbcbatchitemwriter.datasource.enable

boolean

false

確定是否應啟用 JdbcCursorItemReaderDataSource

jdbcbatchitemwriter.datasource.url

String

null

資料庫的 JDBC URL。

jdbcbatchitemwriter.datasource.username

String

null

資料庫的登入使用者名稱。

jdbcbatchitemwriter.datasource.password

String

null

資料庫的登入密碼。

jdbcbatchitemreader.datasource.driver-class-name

String

null

JDBC 驅動程式的完全限定名。

如果未指定 jdbcbatchitemwriter_datasourceJdbcBatchItemWriter 將使用預設的 DataSource

KafkaItemWriter

要將 Step 輸出寫入 Kafka 主題,您需要 KafkaItemWriter。此啟動器透過利用兩個來源的能力為 KafkaItemWriter 提供了自動配置。首先是 Spring Boot 的 Kafka 自動配置。(請參閱 Spring Boot Kafka 文件。)其次,此啟動器允許您配置寫入器上的兩個屬性。

表 9. KafkaItemWriter 屬性
屬性 型別 預設值 描述

spring.batch.job.kafkaitemwriter.topic

String

null

要寫入的 Kafka 主題。

spring.batch.job.kafkaitemwriter.delete

boolean

false

傳遞給寫入器的項是否都應作為刪除事件傳送到主題。

有關 KafkaItemWriter 配置選項的更多資訊,請參閱 KafkaItemWiter 文件

Spring AOT

當將 Spring AOT 與單步批處理啟動器一起使用時,您必須在編譯時設定讀取器和寫入器的名稱屬性(除非您為讀取器或寫入器建立 Bean)。為此,您必須在 boot maven 外掛或 gradle 外掛中將您希望使用的讀取器和寫入器名稱作為引數或環境變數包含進去。例如,如果您希望在 Maven 中啟用 FlatFileItemReaderFlatFileItemWriter,它將如下所示

    <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
        <executions>
            <execution>
            <id>process-aot</id>
            <goals>
                <goal>process-aot</goal>
            </goals>
            </execution>
        </executions>
        <configuration>
            <arguments>
                <argument>--spring.batch.job.flatfileitemreader.name=foobar</argument>
                <argument>--spring.batch.job.flatfileitemwriter.name=fooWriter</argument>
            </arguments>
        </configuration>
    </plugin>