擴充套件與並行處理

許多批處理問題可以使用單執行緒、單程序的 Job 來解決,因此在考慮更復雜的實現之前,始終應該仔細檢查它們是否滿足您的需求。首先測量一個實際 Job 的效能,看看最簡單的實現是否滿足您的需求。即使使用標準硬體,您也可以在不到一分鐘的時間內讀寫一個幾百兆位元組的檔案。

當您準備開始使用一些並行處理來實現 Job 時,Spring Batch 提供了多種選項,本章將對此進行介紹,儘管有些特性在其他地方已經涵蓋。從高層次來看,並行處理有兩種模式:

  • 單程序、多執行緒

  • 多程序

這些也可以細分為以下類別:

  • 多執行緒 Step(單程序)

  • 並行 Step(單程序)

  • Step 的遠端 Chunking(多程序)

  • 分割槽 Step(單程序或多程序)

首先,我們回顧單程序選項。然後,我們回顧多程序選項。

多執行緒 Step

開始並行處理最簡單的方法是將 TaskExecutor 新增到您的 Step 配置中。

  • Java

  • XML

使用 Java 配置時,可以將 TaskExecutor 新增到 Step 中,如下例所示:

Java 配置
@Bean
public TaskExecutor taskExecutor() {
    return new SimpleAsyncTaskExecutor("spring_batch");
}

@Bean
public Step sampleStep(TaskExecutor taskExecutor, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("sampleStep", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.taskExecutor(taskExecutor)
				.build();
}

例如,您可以向 tasklet 新增屬性,如下所示:

<step id="loading">
    <tasklet task-executor="taskExecutor">...</tasklet>
</step>

在此示例中,taskExecutor 是對實現 TaskExecutor 介面的另一個 bean 定義的引用。TaskExecutor 是一個標準的 Spring 介面,有關可用實現的詳細資訊,請參閱 Spring 使用者指南。最簡單的多執行緒 TaskExecutorSimpleAsyncTaskExecutor

前述配置的結果是,Step 透過在單獨的執行執行緒中讀取、處理和寫入每個 item chunk(每個提交間隔)來執行。請注意,這意味著 item 的處理沒有固定順序,並且與單執行緒情況相比,一個 chunk 可能包含非連續的 item。除了任務執行器施加的任何限制(例如它是否由執行緒池支援)之外,tasklet 配置還有一個限制(預設值:4)。您可能需要增加此限制以確保執行緒池得到充分利用。

  • Java

  • XML

使用 Java 配置時,builder 提供了對限制的訪問,如下所示:

Java 配置
@Bean
public Step sampleStep(TaskExecutor taskExecutor, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("sampleStep", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.taskExecutor(taskExecutor)
				.throttleLimit(20)
				.build();
}

例如,您可以增加限制,如下所示:

<step id="loading"> <tasklet
    task-executor="taskExecutor"
    throttle-limit="20">...</tasklet>
</step>

另請注意,您的 Step 中使用的任何池化資源(例如 DataSource)可能會對併發性施加限制。確保這些資源中的池至少與 Step 中所需的併發執行緒數一樣大。

限制棄用

從 v5.0 開始,限制已被棄用,沒有替代方案。如果您想替換預設 TaskExecutorRepeatTemplate 中的當前限制機制,您需要提供一個自定義的 RepeatOperations 實現(基於帶有有界任務佇列的 TaskExecutor),並使用 StepBuilder#stepOperations 將其設定在 Step 上。

Java 配置
@Bean
public Step sampleStep(RepeatOperations customRepeatOperations, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("sampleStep", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.stepOperations(customRepeatOperations)
				.build();
}

對於某些常見的批處理用例,使用多執行緒 Step 實現存在一些實際限制。Step 中的許多參與者(例如 reader 和 writer)都是有狀態的。如果狀態沒有按執行緒隔離,則這些元件無法在多執行緒 Step 中使用。特別是,Spring Batch 中的大多數 reader 和 writer 都不是為多執行緒使用而設計的。但是,可以使用無狀態或執行緒安全的 reader 和 writer,Spring Batch Samples 中有一個示例(名為 parallelJob),展示瞭如何使用程序指示器(參見防止狀態持久化)來跟蹤資料庫輸入表中已處理的 item。

Spring Batch 提供了一些 ItemWriterItemReader 的實現。通常,它們會在 Javadoc 中說明是否是執行緒安全的,或者在併發環境中如何避免問題。如果 Javadoc 中沒有資訊,您可以檢查實現以檢視是否存在任何狀態。如果 reader 不是執行緒安全的,您可以使用提供的 SynchronizedItemStreamReader 對其進行裝飾,或者在您自己的同步 delegator 中使用它。您可以同步對 read() 的呼叫,只要處理和寫入是 chunk 中最耗時的部分,您的 Step 仍然可能比單執行緒配置更快地完成。

並行 Step

只要需要並行處理的應用程式邏輯可以分解為不同的職責並分配給單個 Step,就可以在單個程序中並行處理。並行 Step 執行易於配置和使用。

  • Java

  • XML

使用 Java 配置時,與 step3 並行執行 Step (step1,step2) 非常簡單,如下所示:

Java 配置
@Bean
public Job job(JobRepository jobRepository) {
    return new JobBuilder("job", jobRepository)
        .start(splitFlow())
        .next(step4())
        .build()        //builds FlowJobBuilder instance
        .build();       //builds Job instance
}

@Bean
public Flow splitFlow() {
    return new FlowBuilder<SimpleFlow>("splitFlow")
        .split(taskExecutor())
        .add(flow1(), flow2())
        .build();
}

@Bean
public Flow flow1() {
    return new FlowBuilder<SimpleFlow>("flow1")
        .start(step1())
        .next(step2())
        .build();
}

@Bean
public Flow flow2() {
    return new FlowBuilder<SimpleFlow>("flow2")
        .start(step3())
        .build();
}

@Bean
public TaskExecutor taskExecutor() {
    return new SimpleAsyncTaskExecutor("spring_batch");
}

例如,與 step3 並行執行 Step (step1,step2) 非常簡單,如下所示:

<job id="job1">
    <split id="split1" task-executor="taskExecutor" next="step4">
        <flow>
            <step id="step1" parent="s1" next="step2"/>
            <step id="step2" parent="s2"/>
        </flow>
        <flow>
            <step id="step3" parent="s3"/>
        </flow>
    </split>
    <step id="step4" parent="s4"/>
</job>

<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>

可配置的任務執行器用於指定應執行各個流程的 TaskExecutor 實現。預設是 SyncTaskExecutor,但需要非同步 TaskExecutor 才能並行執行 Step。請注意,Job 確保 split 中的每個流程在聚合退出狀態並轉換之前完成。

更多詳細資訊請參閱Split Flows 部分。

遠端 Chunking

在遠端 chunking 中,Step 處理被分割到多個程序中,這些程序透過某些中介軟體相互通訊。下圖展示了該模式:

Remote Chunking
圖 1. 遠端 Chunking

管理器元件是一個單程序,而 workers 是多個遠端程序。如果管理器不是瓶頸,則此模式效果最佳,因此處理必須比讀取 item 更耗時(實際情況通常如此)。

管理器是 Spring Batch Step 的實現,其 ItemWriter 被替換為一個通用版本,該版本知道如何將 item chunk 作為訊息傳送到中介軟體。workers 是正在使用的任何中介軟體的標準監聽器(例如,對於 JMS,它們將是 MesssageListener 實現),它們的作用是透過 ChunkProcessor 介面,使用標準的 ItemWriterItemProcessorItemWriter 來處理 item chunk。使用此模式的一個優點是 reader、processor 和 writer 元件都是現成的(與 Step 的本地執行使用的相同)。item 是動態劃分的,並且工作是透過中介軟體共享的,因此,如果監聽器都是積極消費者,則負載均衡是自動的。

中介軟體必須是持久的,具有保證的傳遞和每個訊息的單個消費者。JMS 是顯而易見的候選方案,但在網格計算和共享記憶體產品領域存在其他選項(例如 JavaSpaces)。

更多詳細資訊請參閱Spring Batch 整合 - 遠端 Chunking 部分。

分割槽

Spring Batch 還提供了一個 SPI,用於對 Step 執行進行分割槽並在遠端執行。在這種情況下,遠端參與者是 Step 例項,它們也可以輕鬆地配置用於本地處理。下圖展示了該模式:

Partitioning Overview
圖 2. 分割槽

Job 在左側作為一系列 Step 例項執行,其中一個 Step 例項被標記為管理器。圖中的 workers 都是相同的 Step 例項,實際上可以取代管理器,從而為 Job 帶來相同的結果。workers 通常是遠端服務,但也可能是本地執行執行緒。管理器在此模式中傳送給 workers 的訊息不需要是持久的或具有保證的傳遞。JobRepository 中的 Spring Batch 元資料確保每個 worker 在每個 Job 執行中只執行一次。

Spring Batch 中的 SPI 由 Step 的一個特殊實現(稱為 PartitionStep)和需要針對特定環境實現的兩個策略介面組成。策略介面是 PartitionHandlerStepExecutionSplitter,以下時序圖展示了它們的作用:

Partitioning SPI
圖 3. 分割槽 SPI

在這種情況下,右側的 Step 是“遠端” worker,因此,可能有很多物件和/或程序扮演此角色,而 PartitionStep 顯示為驅動執行。

  • Java

  • XML

以下示例展示了使用 Java 配置時的 PartitionStep 配置:

Java 配置
@Bean
public Step step1Manager(JobRepository jobRepository) {
    return new StepBuilder("step1.manager", jobRepository)
        .<String, String>partitioner("step1", partitioner())
        .step(step1())
        .gridSize(10)
        .taskExecutor(taskExecutor())
        .build();
}

與多執行緒 Step 的 throttleLimit 方法類似,gridSize 方法可防止任務執行器被單個 Step 的請求飽和。

以下示例展示了使用 XML 配置時的 PartitionStep 配置:

<step id="step1.manager">
    <partition step="step1" partitioner="partitioner">
        <handler grid-size="10" task-executor="taskExecutor"/>
    </partition>
</step>

與多執行緒 Step 的 throttle-limit 屬性類似,grid-size 屬性可防止任務執行器被單個 Step 的請求飽和。

Spring Batch Samples 的單元測試套件(參見 partition*Job.xml 配置)有一個簡單的示例,您可以複製和擴充套件。

Spring Batch 為分割槽建立名為 step1:partition0 等等的 Step 執行。許多人更喜歡為了保持一致性而將管理器 Step 命名為 step1:manager。您可以使用 Step 的別名(透過指定 name 屬性而不是 id 屬性)。

PartitionHandler

PartitionHandler 是知道 remoting 或網格環境結構的元件。它能夠將 StepExecution 請求傳送到遠端 Step 例項,並以某種特定於結構的格式包裝,例如 DTO。它無需知道如何分割輸入資料或如何聚合多個 Step 執行的結果。一般來說,它可能也無需瞭解彈性或故障轉移,因為這些在許多情況下是結構的特性。無論如何,Spring Batch 總是提供獨立於結構的重啟能力。失敗的 Job 總是可以重啟,在這種情況下,只有失敗的 Steps 會被重新執行。

PartitionHandler 介面可以針對各種結構型別擁有專門的實現,包括簡單的 RMI remoting、EJB remoting、自定義 web service、JMS、Java Spaces、共享記憶體網格(如 Terracotta 或 Coherence)以及網格執行結構(如 GridGain)。Spring Batch 不包含任何專有網格或 remoting 結構的實現。

然而,Spring Batch 確實提供了 PartitionHandler 的一個有用實現,它使用 Spring 的 TaskExecutor 策略在本地的獨立執行執行緒中執行 Step 例項。此實現稱為 TaskExecutorPartitionHandler

  • Java

  • XML

您可以使用 Java 配置明確配置 TaskExecutorPartitionHandler,如下所示:

Java 配置
@Bean
public Step step1Manager(JobRepository jobRepository) {
    return new StepBuilder("step1.manager", jobRepository)
        .partitioner("step1", partitioner())
        .partitionHandler(partitionHandler())
        .build();
}

@Bean
public PartitionHandler partitionHandler() {
    TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
    retVal.setTaskExecutor(taskExecutor());
    retVal.setStep(step1());
    retVal.setGridSize(10);
    return retVal;
}

TaskExecutorPartitionHandler 是使用前面顯示的 XML namespace 配置的 Step 的預設設定。您也可以明確配置它,如下所示:

<step id="step1.manager">
    <partition step="step1" handler="handler"/>
</step>

<bean class="org.spr...TaskExecutorPartitionHandler">
    <property name="taskExecutor" ref="taskExecutor"/>
    <property name="step" ref="step1" />
    <property name="gridSize" value="10" />
</bean>

gridSize 屬性決定了要建立的獨立 Step 執行的數量,因此可以將其與 TaskExecutor 中執行緒池的大小相匹配。或者,可以將其設定為大於可用執行緒的數量,這會使工作塊變小。

TaskExecutorPartitionHandler 對於 IO 密集型 Step 例項很有用,例如複製大量檔案或將檔案系統複製到內容管理系統中。它也可以透過提供作為遠端呼叫的代理的 Step 實現(例如使用 Spring Remoting)來用於遠端執行。

Partitioner

Partitioner 責任更簡單:僅為新的 Step 執行生成執行上下文作為輸入引數(無需擔心重啟)。它有一個方法,如下介面定義所示:

public interface Partitioner {
    Map<String, ExecutionContext> partition(int gridSize);
}

此方法的返回值將每個 Step 執行的唯一名稱(String 型別)與 ExecutionContext 形式的輸入引數相關聯。這些名稱隨後在 Batch 元資料中作為分割槽 StepExecions 的 Step 名稱出現。ExecutionContext 只是一個鍵值對的集合,因此它可能包含主鍵範圍、行號或輸入檔案的位置。然後,遠端 Step 通常透過使用 #{…​} 佔位符(Step 範圍內的後期繫結)繫結到上下文輸入,如下一節所示。

Step 執行的名稱(Partitioner 返回的 Map 中的 key)在 Job 的 Step 執行中必須是唯一的,但沒有其他特定要求。最簡單的方法(並使名稱對使用者有意義)是使用 prefix+suffix 命名約定,其中 prefix 是正在執行的 Step 的名稱(該名稱在 Job 中本身是唯一的),suffix 只是一個計數器。框架中有一個使用此約定的 SimplePartitioner

您可以使用一個可選介面 PartitionNameProvider 來獨立於分割槽本身提供分割槽名稱。如果 Partitioner 實現了此介面,則在重啟時只查詢名稱。如果分割槽操作很耗時,這可以是一個有用的最佳化。PartitionNameProvider 提供的名稱必須與 Partitioner 提供的名稱匹配。

將輸入資料繫結到 Step

對於由 PartitionHandler 執行的 Step,擁有相同的配置以及在執行時從 ExecutionContext 繫結其輸入引數是非常高效的。使用 Spring Batch 的 StepScope 特性(更多詳細資訊請參見後期繫結部分)可以輕鬆實現這一點。例如,如果 Partitioner 建立帶有屬性鍵 fileNameExecutionContext 例項,並且該鍵指向每個 Step 呼叫不同的檔案(或目錄),則 Partitioner 的輸出可能類似於下表的內容:

表 1. Partitioner 提供的面向目錄處理的 Step 執行名稱到執行上下文示例

Step 執行名稱 (key)

ExecutionContext (value)

filecopy:partition0

fileName=/home/data/one

filecopy:partition1

fileName=/home/data/two

filecopy:partition2

fileName=/home/data/three

然後可以使用後期繫結到執行上下文將檔名繫結到 Step。

  • Java

  • XML

以下示例展示瞭如何在 Java 中定義後期繫結:

Java 配置
@Bean
public MultiResourceItemReader itemReader(
	@Value("#{stepExecutionContext['fileName']}/*") Resource [] resources) {
	return new MultiResourceItemReaderBuilder<String>()
			.delegate(fileReader())
			.name("itemReader")
			.resources(resources)
			.build();
}

以下示例展示瞭如何在 XML 中定義後期繫結:

XML 配置
<bean id="itemReader" scope="step"
      class="org.spr...MultiResourceItemReader">
    <property name="resources" value="#{stepExecutionContext[fileName]}/*"/>
</bean>