擴充套件和並行處理
許多批處理問題可以透過單執行緒、單程序作業解決,因此在考慮更復雜的實現之前,最好先檢查這是否能滿足您的需求。衡量一個實際作業的效能,首先看看最簡單的實現是否滿足您的需求。即使使用標準硬體,您也可以在不到一分鐘的時間內讀寫數百兆位元組的檔案。
當您準備開始實現帶有並行處理的作業時,Spring Batch 提供了一系列選項,這些選項將在本章中描述,儘管某些功能在其他地方有所涉及。從高層次上看,並行處理有兩種模式:
-
單程序,多執行緒
-
多程序
這些模式也分為以下幾類:
-
多執行緒步驟(單程序)
-
並行步驟(單程序)
-
步驟的本地分塊(單程序)
-
步驟的遠端分塊(多程序)
-
步驟分割槽(單程序或多程序)
-
遠端步驟(多程序)
首先,我們回顧單程序選項。然後我們回顧多程序選項。
多執行緒步驟
啟動並行處理最簡單的方法是向您的 Step 配置新增一個 TaskExecutor。
-
Java
-
XML
當使用 Java 配置時,您可以向步驟新增一個 TaskExecutor,如下例所示:
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("spring_batch");
}
@Bean
public Step sampleStep(TaskExecutor taskExecutor, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("sampleStep", jobRepository)
.<String, String>chunk(10).transactionManager(transactionManager)
.reader(itemReader())
.writer(itemWriter())
.taskExecutor(taskExecutor)
.build();
}
例如,您可以向 tasklet 新增一個屬性,如下所示:
<step id="loading">
<tasklet task-executor="taskExecutor">...</tasklet>
</step>
在此示例中,taskExecutor 是對另一個實現 TaskExecutor 介面的 bean 定義的引用。TaskExecutor 是一個標準的 Spring 介面,因此請查閱 Spring 使用者指南以瞭解可用實現的詳細資訊。最簡單的多執行緒 TaskExecutor 是 SimpleAsyncTaskExecutor。
上述配置的結果是,Step 透過在單獨的執行執行緒中讀取、處理和寫入每個專案塊(每個提交間隔)來執行。請注意,這意味著專案的處理沒有固定的順序,並且與單執行緒情況相比,一個塊可能包含非連續的專案。
另請注意,您的步驟中使用的任何池化資源(例如 DataSource)都可能限制併發。請確保這些資源中的池至少與步驟中所需的併發執行緒數一樣大。
對於一些常見的批處理用例,使用多執行緒 Step 實現存在一些實際限制。Step 中的許多參與者(例如讀取器和寫入器)是有狀態的。如果狀態未按執行緒隔離,則這些元件無法在多執行緒 Step 中使用。特別是,Spring Batch 中的大多數讀取器和寫入器並非設計用於多執行緒使用。然而,可以使用無狀態或執行緒安全的讀取器和寫入器,並且 Spring Batch 示例中有一個示例(名為 parallelJob)展示瞭如何使用程序指示器(請參閱防止狀態持久化)來跟蹤資料庫輸入表中已處理的專案。
Spring Batch 提供了一些 ItemWriter 和 ItemReader 的實現。通常,它們的 Javadoc 中會說明它們是否執行緒安全,或者在併發環境中如何避免問題。如果 Javadoc 中沒有資訊,您可以檢查實現以檢視是否存在任何狀態。如果讀取器不是執行緒安全的,您可以使用提供的 SynchronizedItemStreamReader 裝飾它,或者在您自己的同步委託器中使用它。您可以同步對 read() 的呼叫,並且只要處理和寫入是塊中最昂貴的部分,您的步驟仍然可以比單執行緒配置更快地完成。
並行步驟
只要需要並行化的應用程式邏輯可以分解為不同的職責並分配給單獨的步驟,它就可以在單個程序中並行化。並行步驟執行易於配置和使用。
-
Java
-
XML
當使用 Java 配置時,並行執行步驟 (step1,step2) 和 step3 非常簡單,如下所示:
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(splitFlow())
.next(step4())
.build() //builds FlowJobBuilder instance
.build(); //builds Job instance
}
@Bean
public Flow splitFlow() {
return new FlowBuilder<SimpleFlow>("splitFlow")
.split(taskExecutor())
.add(flow1(), flow2())
.build();
}
@Bean
public Flow flow1() {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build();
}
@Bean
public Flow flow2() {
return new FlowBuilder<SimpleFlow>("flow2")
.start(step3())
.build();
}
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("spring_batch");
}
例如,並行執行步驟 (step1,step2) 和 step3 非常簡單,如下所示:
<job id="job1">
<split id="split1" task-executor="taskExecutor" next="step4">
<flow>
<step id="step1" parent="s1" next="step2"/>
<step id="step2" parent="s2"/>
</flow>
<flow>
<step id="step3" parent="s3"/>
</flow>
</split>
<step id="step4" parent="s4"/>
</job>
<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>
可配置的任務執行器用於指定哪個 TaskExecutor 實現應該執行各個流。預設是 SyncTaskExecutor,但需要一個非同步 TaskExecutor 才能並行執行這些步驟。請注意,作業確保拆分中的每個流都在聚合退出狀態並進行轉換之前完成。
有關更多詳細資訊,請參閱 拆分流部分。
本地分塊
本地分塊是 v6.0 中的一項新功能,它允許您在同一 JVM 中使用多個執行緒在本地並行處理專案塊。當您有大量專案需要處理並希望利用多核處理器時,此功能特別有用。透過本地分塊,您可以配置面向塊的步驟,以使用多個執行緒併發處理專案塊。每個執行緒將獨立讀取、處理和寫入自己的專案塊,而步驟將管理整體執行並提交結果。
此功能可透過使用 ChunkMessageChannelItemWriter 來實現,它是一個專案寫入器,用於將塊請求從 TaskExecutor 提交到本地工作器
@Bean
public ChunkTaskExecutorItemWriter<Vet> itemWriter(ChunkProcessor<Vet> chunkProcessor) {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(4);
taskExecutor.setThreadNamePrefix("worker-thread-");
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.afterPropertiesSet();
return new ChunkTaskExecutorItemWriter<>(chunkProcessor, taskExecutor);
}
ChunkMessageChannelItemWriter 需要一個 TaskExecutor 來併發處理塊,以及一個 ChunkProcessor 來定義如何處理每個塊。這是一個將每個專案塊寫入關係資料庫表的塊處理器示例:
@Bean
public ChunkProcessor<Vet> chunkProcessor(DataSource dataSource, TransactionTemplate transactionTemplate) {
String sql = "insert into vets (firstname, lastname) values (?, ?)";
JdbcBatchItemWriter<Vet> itemWriter = new JdbcBatchItemWriterBuilder<Vet>().dataSource(dataSource)
.sql(sql)
.itemPreparedStatementSetter((item, ps) -> {
ps.setString(1, item.firstname());
ps.setString(2, item.lastname());
})
.build();
return (chunk, contribution) -> transactionTemplate.executeWithoutResult(transactionStatus -> {
try {
itemWriter.write(chunk);
contribution.incrementWriteCount(chunk.size());
contribution.setExitStatus(ExitStatus.COMPLETED);
}
catch (Exception e) {
transactionStatus.setRollbackOnly();
contribution.setExitStatus(ExitStatus.FAILED.addExitDescription(e));
}
});
}
您可以在本地分塊示例中找到這種擴充套件技術的示例。
遠端分塊
在遠端分塊中,Step 處理透過一些中介軟體在多個程序之間拆分。下圖顯示了該模式:
管理器元件是一個單程序,而工作器是多個遠端程序。如果管理器不是瓶頸,此模式效果最佳,因此處理必須比讀取專案更耗時(這在實踐中通常是如此)。
管理器是 Spring Batch Step 的實現,其 ItemWriter 被替換為通用版本,該版本知道如何將專案塊作為訊息傳送到中介軟體。工作器是所用中介軟體的標準偵聽器(例如,對於 JMS,它們將是 MessageListener 實現),其作用是透過 ChunkProcessor 介面,使用標準 ItemWriter 或 ItemProcessor 以及 ItemWriter 來處理專案塊。使用此模式的優點之一是讀取器、處理器和寫入器元件是現成的(與步驟的本地執行所使用的相同)。專案動態劃分,並透過中介軟體共享工作,因此,如果偵聽器都是急切的消費者,則負載均衡是自動的。
中介軟體必須是持久的,具有保證交付和每個訊息的單個消費者。JMS 是顯而易見的選擇,但網格計算和共享記憶體產品領域也存在其他選項(例如 JavaSpaces)。
有關更多詳細資訊,請參閱 Spring Batch 整合 - 遠端分塊部分。
分割槽
Spring Batch 還提供了一個 SPI,用於分割槽 Step 執行並遠端執行它。在這種情況下,遠端參與者是 Step 例項,它們可以像配置和用於本地處理一樣容易地進行配置和使用。下圖顯示了該模式:
Job 在左側作為一系列 Step 例項執行,其中一個 Step 例項被標記為管理器。此圖片中的工作器都是 Step 的相同例項,它們實際上可以取代管理器,從而為 Job 產生相同的結果。工作器通常是遠端服務,但也可能是本地執行執行緒。在此模式中,管理器傳送給工作器的訊息不需要是持久的或具有保證交付。JobRepository 中的 Spring Batch 元資料確保每個工作器針對每個 Job 執行只執行一次。</p>
Spring Batch 中的 SPI 由 Step 的特殊實現(稱為 PartitionStep)和兩個需要為特定環境實現的策略介面組成。策略介面是 PartitionHandler 和 StepExecutionSplitter,以下序列圖顯示了它們的作用:
在這種情況下,右側的 Step 是“遠端”工作器,因此,可能有許多物件和/或程序扮演此角色,並且 PartitionStep 顯示正在驅動執行。
-
Java
-
XML
以下示例顯示了使用 Java 配置時的 PartitionStep 配置:
@Bean
public Step step1Manager(JobRepository jobRepository) {
return new StepBuilder("step1.manager", jobRepository)
.<String, String>partitioner("step1", partitioner())
.step(step1())
.gridSize(10)
.taskExecutor(taskExecutor())
.build();
}
類似於多執行緒步驟的 throttleLimit 方法,gridSize 方法阻止任務執行器被單個步驟的請求飽和。
以下示例顯示了使用 XML 配置時的 PartitionStep 配置:
<step id="step1.manager">
<partition step="step1" partitioner="partitioner">
<handler grid-size="10" task-executor="taskExecutor"/>
</partition>
</step>
類似於多執行緒步驟的 throttle-limit 屬性,grid-size 屬性阻止任務執行器被單個步驟的請求飽和。
Spring Batch 示例的單元測試套件(參見 partition*Job.xml 配置)有一個簡單的示例,您可以複製和擴充套件。
Spring Batch 為名為 step1:partition0 等的分割槽建立步驟執行。許多人為了保持一致性,更喜歡將管理器步驟命名為 step1:manager。您可以使用步驟的別名(透過指定 name 屬性而不是 id 屬性)。
PartitionHandler
PartitionHandler 是瞭解遠端或網格環境結構(fabric)的元件。它能夠將 StepExecution 請求傳送到遠端 Step 例項,這些請求被封裝在某種特定於結構(fabric-specific)的格式中,例如 DTO。它不需要知道如何拆分輸入資料,也不需要知道如何聚合多個 Step 執行的結果。一般來說,它可能也不需要知道彈性或故障轉移,因為在許多情況下這些都是結構(fabric)的特性。無論如何,Spring Batch 始終提供獨立於結構(fabric)的重啟能力。失敗的 Job 始終可以重新啟動,在這種情況下,只有失敗的 Steps 會被重新執行。
PartitionHandler 介面可以針對各種結構型別具有專門的實現,包括簡單的 RMI 遠端呼叫、EJB 遠端呼叫、自定義 Web 服務、JMS、Java Spaces、共享記憶體網格(例如 Terracotta 或 Coherence)和網格執行結構(例如 GridGain)。Spring Batch 不包含任何專有網格或遠端呼叫結構的實現。
然而,Spring Batch 確實提供了一個有用的 PartitionHandler 實現,該實現使用 Spring 的 TaskExecutor 策略在單獨的執行執行緒中本地執行 Step 例項。該實現名為 TaskExecutorPartitionHandler。
-
Java
-
XML
您可以使用 Java 配置顯式配置 TaskExecutorPartitionHandler,如下所示:
@Bean
public Step step1Manager(JobRepository jobRepository) {
return new StepBuilder("step1.manager", jobRepository)
.partitioner("step1", partitioner())
.partitionHandler(partitionHandler())
.build();
}
@Bean
public PartitionHandler partitionHandler() {
TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
retVal.setTaskExecutor(taskExecutor());
retVal.setStep(step1());
retVal.setGridSize(10);
return retVal;
}
TaskExecutorPartitionHandler 是使用前面顯示的 XML 名稱空間配置的步驟的預設值。您也可以顯式配置它,如下所示:
<step id="step1.manager">
<partition step="step1" handler="handler"/>
</step>
<bean class="org.spr...TaskExecutorPartitionHandler">
<property name="taskExecutor" ref="taskExecutor"/>
<property name="step" ref="step1" />
<property name="gridSize" value="10" />
</bean>
gridSize 屬性決定了要建立的獨立步驟執行的數量,因此它可以與 TaskExecutor 中的執行緒池大小相匹配。或者,它可以設定為大於可用執行緒數,這會使工作塊變小。
TaskExecutorPartitionHandler 對於 I/O 密集型 Step 例項非常有用,例如複製大量檔案或將檔案系統複製到內容管理系統中。它還可以透過提供一個作為遠端呼叫代理的 Step 實現(例如使用 Spring Remoting)來用於遠端執行。
分割槽器
Partitioner 具有更簡單的職責:僅為新的步驟執行生成作為輸入引數的執行上下文(無需擔心重啟)。它只有一個方法,如下面的介面定義所示:
public interface Partitioner {
Map<String, ExecutionContext> partition(int gridSize);
}
此方法的返回值將每個步驟執行的唯一名稱(String)與 ExecutionContext 形式的輸入引數相關聯。這些名稱稍後會在批處理元資料中作為分割槽 StepExecutions 中的步驟名稱出現。ExecutionContext 只是一個名稱-值對的集合,因此它可能包含一系列主鍵、行號或輸入檔案的位置。遠端 Step 通常透過使用 #{…} 佔位符(步驟範圍內的後期繫結)繫結到上下文輸入,如下一節所示。
步驟執行的名稱(Partitioner 返回的 Map 中的鍵)需要在 Job 的步驟執行中是唯一的,但沒有其他特定要求。最簡單的方法(並使名稱對使用者有意義)是使用字首+字尾命名約定,其中字首是被執行步驟的名稱(在 Job 中本身是唯一的),字尾只是一個計數器。框架中有一個 SimplePartitioner 使用此約定。
您可以使用一個可選介面 PartitionNameProvider 來提供分割槽名稱,使其與分割槽本身分離。如果 Partitioner 實現了此介面,則在重新啟動時僅查詢名稱。如果分割槽成本較高,這可能是一個有用的最佳化。PartitionNameProvider 提供的名稱必須與 Partitioner 提供的名稱匹配。
將輸入資料繫結到步驟
由 PartitionHandler 執行的步驟具有相同的配置,並且它們的輸入引數在執行時從 ExecutionContext 繫結,這是非常高效的。這很容易透過 Spring Batch 的 StepScope 功能實現(在後期繫結部分有更詳細的介紹)。例如,如果 Partitioner 建立的 ExecutionContext 例項中有一個名為 fileName 的屬性鍵,指向每個步驟呼叫不同的檔案(或目錄),則 Partitioner 的輸出可能類似於下表的內容:
步驟執行名稱(鍵) |
執行上下文(值) |
filecopy:partition0 |
fileName=/home/data/one |
filecopy:partition1 |
fileName=/home/data/two |
filecopy:partition2 |
fileName=/home/data/three |
然後,檔名可以通過後期繫結到執行上下文的方式繫結到步驟。
-
Java
-
XML
以下示例展示瞭如何在 Java 中定義後期繫結:
@Bean
public MultiResourceItemReader itemReader(
@Value("#{stepExecutionContext['fileName']}/*") Resource [] resources) {
return new MultiResourceItemReaderBuilder<String>()
.delegate(fileReader())
.name("itemReader")
.resources(resources)
.build();
}
以下示例展示瞭如何在 XML 中定義後期繫結:
<bean id="itemReader" scope="step"
class="org.spr...MultiResourceItemReader">
<property name="resources" value="#{stepExecutionContext[fileName]}/*"/>
</bean>
遠端步驟執行
從 v6.0 開始,Spring Batch 提供了對遠端步驟執行的支援,允許您在遠端機器或叢集上執行批處理作業的步驟。此功能對於大規模批處理場景特別有用,在這種場景中,您希望將工作負載分發到多個節點以提高效能和可擴充套件性。遠端步驟執行由 RemoteStep 類提供,該類使用 Spring Integration 訊息通道來實現本地作業執行環境與遠端步驟執行器之間的通訊。
RemoteStep 被配置為一個常規步驟,透過提供遠端步驟名稱和訊息模板來向遠端工作器傳送步驟執行請求
@Bean
public Step step(MessagingTemplate messagingTemplate, JobRepository jobRepository) {
return new RemoteStep("step", "workerStep", jobRepository, messagingTemplate);
}
在工作器端,您需要定義要執行的遠端步驟(此示例中為 workerStep),並配置一個 Spring Integration 流來攔截步驟執行請求並呼叫 StepExecutionRequestHandler
@Bean
public Step workerStep(JobRepository jobRepository, JdbcTransactionManager transactionManager) {
return new StepBuilder("workerStep", jobRepository)
// define step logic
.build();
}
/*
* Configure inbound flow (requests coming from the manager)
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory, JobRepository jobRepository,
StepLocator stepLocator) {
StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
stepExecutionRequestHandler.setJobRepository(jobRepository);
stepExecutionRequestHandler.setStepLocator(stepLocator);
return IntegrationFlow.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
.channel(requests())
.handle(stepExecutionRequestHandler, "handle")
.get();
}
@Bean
public StepLocator stepLocator(BeanFactory beanFactory) {
BeanFactoryStepLocator beanFactoryStepLocator = new BeanFactoryStepLocator();
beanFactoryStepLocator.setBeanFactory(beanFactory);
return beanFactoryStepLocator;
}
您可以在遠端步驟示例中找到完整的示例。