外部化批處理流程執行
到目前為止討論的整合方法表明了 Spring Integration 像一個外殼一樣封裝 Spring Batch 的用例。然而,Spring Batch 也可以在內部使用 Spring Integration。透過這種方法,Spring Batch 使用者可以將專案甚至塊的處理委託給外部程序。這使您可以解除安裝複雜的處理。Spring Batch Integration 為以下功能提供專門支援:
-
遠端分塊
-
遠端分割槽
遠端分塊
下圖展示了當您將 Spring Batch 與 Spring Integration 結合使用時,遠端分塊的一種工作方式
更進一步,您還可以透過使用 ChunkMessageChannelItemWriter(由 Spring Batch Integration 提供)來外部化分塊處理,該寫入器傳送專案並收集結果。一旦傳送,Spring Batch 會繼續讀取和分組專案,而無需等待結果。相反,ChunkMessageChannelItemWriter 的職責是收集結果並將其重新整合到 Spring Batch 流程中。
藉助 Spring Integration,您可以完全控制程序的併發性(例如,透過使用 QueueChannel 而不是 DirectChannel)。此外,透過依賴 Spring Integration 豐富的通道介面卡集合(例如 JMS 和 AMQP),您可以將批處理作業的塊分發到外部系統進行處理。
-
Java
-
XML
具有遠端分塊步驟的作業在 Java 中的配置可能類似於以下內容
public Job chunkJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new JobBuilder("personJob", jobRepository)
.start(new StepBuilder("step1", jobRepository)
.<Person, Person>chunk(200, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.build())
.build();
}
具有遠端分塊步驟的作業在 XML 中的配置可能類似於以下內容
<job id="personJob">
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="200"/>
</tasklet>
...
</step>
</job>
ItemReader 引用指向您希望在管理器上讀取資料的 bean。ItemWriter 引用指向一個特殊的 ItemWriter(稱為 ChunkMessageChannelItemWriter),如前所述。處理器(如果有)在管理器配置中省略,因為它是在工作器上配置的。在實現用例時,您應該檢查任何附加元件屬性,例如節流限制等。
-
Java
-
XML
以下 Java 配置提供了基本的管理器設定
@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL("tcp://:61616");
return factory;
}
/*
* Configure outbound flow (requests going to workers)
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(requests())
.handle(Jms.outboundAdapter(connectionFactory).destination("requests"))
.get();
}
/*
* Configure inbound flow (replies coming from workers)
*/
@Bean
public QueueChannel replies() {
return new QueueChannel();
}
@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
.channel(replies())
.get();
}
/*
* Configure the ChunkMessageChannelItemWriter
*/
@Bean
public ItemWriter<Integer> itemWriter() {
MessagingTemplate messagingTemplate = new MessagingTemplate();
messagingTemplate.setDefaultChannel(requests());
messagingTemplate.setReceiveTimeout(2000);
ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter
= new ChunkMessageChannelItemWriter<>();
chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);
chunkMessageChannelItemWriter.setReplyChannel(replies());
return chunkMessageChannelItemWriter;
}
以下 XML 配置提供了基本的管理器設定
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://:61616"/>
</bean>
<int-jms:outbound-channel-adapter id="jmsRequests" destination-name="requests"/>
<bean id="messagingTemplate"
class="org.springframework.integration.core.MessagingTemplate">
<property name="defaultChannel" ref="requests"/>
<property name="receiveTimeout" value="2000"/>
</bean>
<bean id="itemWriter"
class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
scope="step">
<property name="messagingOperations" ref="messagingTemplate"/>
<property name="replyChannel" ref="replies"/>
</bean>
<int:channel id="replies">
<int:queue/>
</int:channel>
<int-jms:message-driven-channel-adapter id="jmsReplies"
destination-name="replies"
channel="replies"/>
上述配置為我們提供了許多 bean。我們使用 ActiveMQ 以及 Spring Integration 提供的入站和出站 JMS 介面卡來配置我們的訊息中介軟體。如所示,我們的 itemWriter bean(由我們的作業步驟引用)使用 ChunkMessageChannelItemWriter 透過配置的中介軟體寫入塊。
現在我們可以繼續工作器配置,如下例所示
-
Java
-
XML
以下示例顯示了 Java 中的工作器配置
@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL("tcp://:61616");
return factory;
}
/*
* Configure inbound flow (requests coming from the manager)
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
.channel(requests())
.get();
}
/*
* Configure outbound flow (replies going to the manager)
*/
@Bean
public DirectChannel replies() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(replies())
.handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
.get();
}
/*
* Configure the ChunkProcessorChunkHandler
*/
@Bean
@ServiceActivator(inputChannel = "requests", outputChannel = "replies")
public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() {
ChunkProcessor<Integer> chunkProcessor
= new SimpleChunkProcessor<>(itemProcessor(), itemWriter());
ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler
= new ChunkProcessorChunkHandler<>();
chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);
return chunkProcessorChunkHandler;
}
以下示例顯示了 XML 中的工作器配置
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://:61616"/>
</bean>
<int:channel id="requests"/>
<int:channel id="replies"/>
<int-jms:message-driven-channel-adapter id="incomingRequests"
destination-name="requests"
channel="requests"/>
<int-jms:outbound-channel-adapter id="outgoingReplies"
destination-name="replies"
channel="replies">
</int-jms:outbound-channel-adapter>
<int:service-activator id="serviceActivator"
input-channel="requests"
output-channel="replies"
ref="chunkProcessorChunkHandler"
method="handleChunk"/>
<bean id="chunkProcessorChunkHandler"
class="org.springframework.batch.integration.chunk.ChunkProcessorChunkRequestHandler">
<property name="chunkProcessor">
<bean class="org.springframework.batch.core.step.item.SimpleChunkProcessor">
<property name="itemWriter">
<bean class="io.spring.sbi.PersonItemWriter"/>
</property>
<property name="itemProcessor">
<bean class="io.spring.sbi.PersonItemProcessor"/>
</property>
</bean>
</property>
</bean>
這些配置項中的大多數在管理器配置中應該看起來很熟悉。工作器不需要訪問 Spring Batch JobRepository,也不需要訪問實際的作業配置檔案。主要關注的 bean 是 chunkProcessorChunkHandler。ChunkProcessorChunkRequestHandler 的 chunkProcessor 屬性接受一個配置好的 SimpleChunkProcessor,您可以在其中提供對您的 ItemWriter(以及可選的 ItemProcessor)的引用,當工作器從管理器接收塊時,該寫入器和處理器將在工作器上執行。
有關更多資訊,請參閱“可伸縮性”一章中關於遠端分塊的部分。
從版本 4.1 開始,Spring Batch Integration 引入了 @EnableBatchIntegration 註解,可用於簡化遠端分塊設定。此註解提供兩個可以在應用程式上下文中自動裝配的 bean:
-
RemoteChunkingManagerStepBuilderFactory:配置管理器步驟 -
RemoteChunkingWorkerBuilder:配置遠端工作器整合流
這些 API 負責配置許多元件,如下圖所示
在管理器端,RemoteChunkingManagerStepBuilderFactory 允許您透過宣告以下內容來配置管理器步驟:
-
用於讀取專案並將其傳送給工作器的專案讀取器
-
用於向工作器傳送請求的輸出通道(“出站請求”)
-
用於接收來自工作器回覆的輸入通道(“入站回覆”)
您無需顯式配置 ChunkMessageChannelItemWriter 和 MessagingTemplate。(如果您發現有理由這樣做,仍然可以顯式配置它們)。
在工作器端,RemoteChunkingWorkerBuilder 允許您配置工作器以:
-
為每個請求呼叫
ChunkProcessorChunkRequestHandler的handleChunk方法,並使用配置的ItemProcessor和ItemWriter -
為每個請求呼叫
ChunkProcessorChunkRequestHandler的handleChunk方法,並使用配置的ItemProcessor和ItemWriter -
在輸出通道(“出站回覆”)上向管理器傳送回覆
您無需顯式配置 SimpleChunkProcessor 和 ChunkProcessorChunkRequestHandler。(如果您發現有理由這樣做,仍然可以顯式配置它們)。
以下示例展示瞭如何使用這些 API
@EnableBatchIntegration
@EnableBatchProcessing
public class RemoteChunkingJobConfiguration {
@Configuration
public static class ManagerConfiguration {
@Autowired
private RemoteChunkingManagerStepBuilderFactory managerStepBuilderFactory;
@Bean
public TaskletStep managerStep() {
return this.managerStepBuilderFactory.get("managerStep")
.chunk(100)
.reader(itemReader())
.outputChannel(requests()) // requests sent to workers
.inputChannel(replies()) // replies received from workers
.build();
}
// Middleware beans setup omitted
}
@Configuration
public static class WorkerConfiguration {
@Autowired
private RemoteChunkingWorkerBuilder workerBuilder;
@Bean
public IntegrationFlow workerFlow() {
return this.workerBuilder
.itemProcessor(itemProcessor())
.itemWriter(itemWriter())
.inputChannel(requests()) // requests received from the manager
.outputChannel(replies()) // replies sent to the manager
.build();
}
// Middleware beans setup omitted
}
}
您可以在此處找到遠端分塊作業的完整示例。
遠端分割槽
下圖顯示了典型的遠端分割槽情況
另一方面,當瓶頸不是專案處理而是相關的 I/O 時,遠端分割槽非常有用。透過遠端分割槽,您可以將工作傳送給執行完整 Spring Batch 步驟的工作器。因此,每個工作器都有自己的 ItemReader、ItemProcessor 和 ItemWriter。為此,Spring Batch Integration 提供了 MessageChannelPartitionHandler。
PartitionHandler 介面的此實現使用 MessageChannel 例項向遠端工作器傳送指令並接收其響應。這為與遠端工作器通訊所使用的傳輸(例如 JMS 和 AMQP)提供了一個很好的抽象。
“可伸縮性”一章中關於遠端分割槽的部分概述了配置遠端分割槽所需的理念和元件,並展示了使用預設 TaskExecutorPartitionHandler 在單獨的本地執行執行緒中進行分割槽的示例。對於到多個 JVM 的遠端分割槽,需要兩個額外的元件:
-
遠端處理架構或網格環境
-
支援所需遠端處理架構或網格環境的
PartitionHandler實現
與遠端分塊類似,您可以使用 JMS 作為“遠端處理架構”。在這種情況下,如前所述,使用 MessageChannelPartitionHandler 例項作為 PartitionHandler 實現。
-
Java
-
XML
以下示例假設存在一個已分割槽作業,並著重於 Java 中的 MessageChannelPartitionHandler 和 JMS 配置
/*
* Configuration of the manager side
*/
@Bean
public PartitionHandler partitionHandler() {
MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();
partitionHandler.setStepName("step1");
partitionHandler.setGridSize(3);
partitionHandler.setReplyChannel(outboundReplies());
MessagingTemplate template = new MessagingTemplate();
template.setDefaultChannel(outboundRequests());
template.setReceiveTimeout(100000);
partitionHandler.setMessagingOperations(template);
return partitionHandler;
}
@Bean
public QueueChannel outboundReplies() {
return new QueueChannel();
}
@Bean
public DirectChannel outboundRequests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundJmsRequests() {
return IntegrationFlow.from("outboundRequests")
.handle(Jms.outboundGateway(connectionFactory())
.requestDestination("requestsQueue"))
.get();
}
@Bean
@ServiceActivator(inputChannel = "inboundStaging")
public AggregatorFactoryBean partitioningMessageHandler() throws Exception {
AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
aggregatorFactoryBean.setProcessorBean(partitionHandler());
aggregatorFactoryBean.setOutputChannel(outboundReplies());
// configure other propeties of the aggregatorFactoryBean
return aggregatorFactoryBean;
}
@Bean
public DirectChannel inboundStaging() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundJmsStaging() {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory())
.configureListenerContainer(c -> c.subscriptionDurable(false))
.destination("stagingQueue"))
.channel(inboundStaging())
.get();
}
/*
* Configuration of the worker side
*/
@Bean
public StepExecutionRequestHandler stepExecutionRequestHandler() {
StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
stepExecutionRequestHandler.setJobExplorer(jobExplorer);
stepExecutionRequestHandler.setStepLocator(stepLocator());
return stepExecutionRequestHandler;
}
@Bean
@ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging")
public StepExecutionRequestHandler serviceActivator() throws Exception {
return stepExecutionRequestHandler();
}
@Bean
public DirectChannel inboundRequests() {
return new DirectChannel();
}
public IntegrationFlow inboundJmsRequests() {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory())
.configureListenerContainer(c -> c.subscriptionDurable(false))
.destination("requestsQueue"))
.channel(inboundRequests())
.get();
}
@Bean
public DirectChannel outboundStaging() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundJmsStaging() {
return IntegrationFlow.from("outboundStaging")
.handle(Jms.outboundGateway(connectionFactory())
.requestDestination("stagingQueue"))
.get();
}
以下示例假設存在一個已分割槽作業,並著重於 XML 中的 MessageChannelPartitionHandler 和 JMS 配置
<bean id="partitionHandler"
class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
<property name="stepName" value="step1"/>
<property name="gridSize" value="3"/>
<property name="replyChannel" ref="outbound-replies"/>
<property name="messagingOperations">
<bean class="org.springframework.integration.core.MessagingTemplate">
<property name="defaultChannel" ref="outbound-requests"/>
<property name="receiveTimeout" value="100000"/>
</bean>
</property>
</bean>
<int:channel id="outbound-requests"/>
<int-jms:outbound-channel-adapter destination="requestsQueue"
channel="outbound-requests"/>
<int:channel id="inbound-requests"/>
<int-jms:message-driven-channel-adapter destination="requestsQueue"
channel="inbound-requests"/>
<bean id="stepExecutionRequestHandler"
class="org.springframework.batch.integration.partition.StepExecutionRequestHandler">
<property name="jobExplorer" ref="jobExplorer"/>
<property name="stepLocator" ref="stepLocator"/>
</bean>
<int:service-activator ref="stepExecutionRequestHandler" input-channel="inbound-requests"
output-channel="outbound-staging"/>
<int:channel id="outbound-staging"/>
<int-jms:outbound-channel-adapter destination="stagingQueue"
channel="outbound-staging"/>
<int:channel id="inbound-staging"/>
<int-jms:message-driven-channel-adapter destination="stagingQueue"
channel="inbound-staging"/>
<int:aggregator ref="partitionHandler" input-channel="inbound-staging"
output-channel="outbound-replies"/>
<int:channel id="outbound-replies">
<int:queue/>
</int:channel>
<bean id="stepLocator"
class="org.springframework.batch.integration.partition.BeanFactoryStepLocator" />
您還必須確保分割槽 handler 屬性對映到 partitionHandler bean。
-
Java
-
XML
以下示例將 Java 中的分割槽 handler 屬性對映到 partitionHandler
public Job personJob(JobRepository jobRepository) {
return new JobBuilder("personJob", jobRepository)
.start(new StepBuilder("step1.manager", jobRepository)
.partitioner("step1.worker", partitioner())
.partitionHandler(partitionHandler())
.build())
.build();
}
以下示例將 XML 中的分割槽 handler 屬性對映到 partitionHandler
<job id="personJob">
<step id="step1.manager">
<partition partitioner="partitioner" handler="partitionHandler"/>
...
</step>
</job>
您可以在此處找到遠端分割槽作業的完整示例。
您可以使用 @EnableBatchIntegration 註解來簡化遠端分割槽設定。此註解提供兩個對遠端分割槽有用的 bean:
-
RemotePartitioningManagerStepBuilderFactory:配置管理器步驟 -
RemotePartitioningWorkerStepBuilderFactory:配置工作器步驟
這些 API 負責配置許多元件,如下圖所示
在管理器端,RemotePartitioningManagerStepBuilderFactory 允許您透過宣告以下內容來配置管理器步驟:
-
用於分割槽資料的
Partitioner -
用於向工作器傳送請求的輸出通道(“出站請求”)
-
用於接收來自工作器回覆的輸入通道(“入站回覆”)(當配置回覆聚合時)
-
輪詢間隔和超時引數(當配置作業倉庫輪詢時)
您無需顯式配置 MessageChannelPartitionHandler 和 MessagingTemplate。(如果您發現有理由這樣做,仍然可以顯式配置它們)。
在工作器端,RemotePartitioningWorkerStepBuilderFactory 允許您配置工作器以:
-
為每個請求呼叫
ChunkProcessorChunkRequestHandler的handleChunk方法,並使用配置的ItemProcessor和ItemWriter -
為每個請求呼叫
StepExecutionRequestHandler的handle方法 -
在輸出通道(“出站回覆”)上向管理器傳送回覆
您無需顯式配置 StepExecutionRequestHandler。(如果您發現有理由這樣做,仍然可以顯式配置它)。
以下示例展示瞭如何使用這些 API
@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
public class RemotePartitioningJobConfiguration {
@Configuration
public static class ManagerConfiguration {
@Autowired
private RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;
@Bean
public Step managerStep() {
return this.managerStepBuilderFactory
.get("managerStep")
.partitioner("workerStep", partitioner())
.gridSize(10)
.outputChannel(outgoingRequestsToWorkers())
.inputChannel(incomingRepliesFromWorkers())
.build();
}
// Middleware beans setup omitted
}
@Configuration
public static class WorkerConfiguration {
@Autowired
private RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;
@Bean
public Step workerStep() {
return this.workerStepBuilderFactory
.get("workerStep")
.inputChannel(incomingRequestsFromManager())
.outputChannel(outgoingRepliesToManager())
.chunk(100)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
// Middleware beans setup omitted
}
}