子元素

當此 GatewayPollableChannel 接收訊息時,您必須提供一個全域性預設 Poller 或向 Job Launching Gateway 提供一個 Poller 子元素。

  • Java

  • XML

以下示例展示瞭如何在 Java 中提供 Poller

Java 配置
@Bean
@ServiceActivator(inputChannel = "queueChannel", poller = @Poller(fixedRate="1000"))
public JobLaunchingGateway sampleJobLaunchingGateway() {
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher());
    jobLaunchingGateway.setOutputChannel(replyChannel());
    return jobLaunchingGateway;
}

以下示例展示瞭如何在 XML 中提供 Poller

XML 配置
<batch-int:job-launching-gateway request-channel="queueChannel"
    reply-channel="replyChannel" job-launcher="jobLauncher">
  <int:poller fixed-rate="1000">
</batch-int:job-launching-gateway>

提供資訊性訊息的反饋

由於 Spring Batch job 可能執行很長時間,提供進度資訊通常至關重要。例如,利益相關者可能希望在批處理 job 的部分或全部失敗時收到通知。Spring Batch 透過以下方式支援收集此資訊:

  • 主動輪詢

  • 事件驅動的監聽器

當非同步啟動 Spring Batch job 時(例如,透過使用 Job Launching Gateway),會返回一個 JobExecution 例項。因此,您可以使用 JobExecution.getJobId() 透過使用 JobExplorerJobRepository 中檢索更新的 JobExecution 例項來持續輪詢狀態更新。但是,這被認為是次優的,推薦使用事件驅動的方法。

因此,Spring Batch 提供了監聽器,包括三種最常用的監聽器:

  • StepListener

  • ChunkListener

  • JobExecutionListener

在下圖所示的示例中,Spring Batch job 已配置了 StepExecutionListener。因此,Spring Integration 接收並處理任何 Step 執行前或執行後事件。例如,您可以使用 Router 檢查收到的 StepExecution。根據檢查結果,可以發生各種事情(例如,將訊息路由到郵件出站通道介面卡),以便可以根據某些條件傳送電子郵件通知。

Handling Informational Messages
圖 1. 處理資訊性訊息

以下兩部分示例展示瞭如何配置監聽器以便將訊息傳送到用於 StepExecution 事件的 Gateway 並將其輸出記錄到 logging-channel-adapter

首先,建立通知整合 bean。

  • Java

  • XML

以下示例展示瞭如何在 Java 中建立通知整合 bean

Java 配置
@Bean
@ServiceActivator(inputChannel = "stepExecutionsChannel")
public LoggingHandler loggingHandler() {
    LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.WARN);
    adapter.setLoggerName("TEST_LOGGER");
    adapter.setLogExpressionString("headers.id + ': ' + payload");
    return adapter;
}

@MessagingGateway(name = "notificationExecutionsListener", defaultRequestChannel = "stepExecutionsChannel")
public interface NotificationExecutionListener extends StepExecutionListener {}
您需要在配置中新增 @IntegrationComponentScan 註解。

以下示例展示瞭如何在 XML 中建立通知整合 bean

XML 配置
<int:channel id="stepExecutionsChannel"/>

<int:gateway id="notificationExecutionsListener"
    service-interface="org.springframework.batch.core.StepExecutionListener"
    default-request-channel="stepExecutionsChannel"/>

<int:logging-channel-adapter channel="stepExecutionsChannel"/>

其次,修改您的 job 以新增 Step 級別的監聽器。

  • Java

  • XML

以下示例展示瞭如何在 Java 中新增 Step 級別的監聽器

Java 配置
public Job importPaymentsJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
    return new JobBuilder("importPayments", jobRepository)
        .start(new StepBuilder("step1", jobRepository)
                .chunk(200, transactionManager)
                .listener(notificationExecutionsListener())
                // ...
                .build();
              )
        .build();
}

以下示例展示瞭如何在 XML 中新增 Step 級別的監聽器

XML 配置
<job id="importPayments">
    <step id="step1">
        <tasklet ../>
            <chunk ../>
            <listeners>
                <listener ref="notificationExecutionsListener"/>
            </listeners>
        </tasklet>
        ...
    </step>
</job>

非同步處理器

非同步處理器可幫助您擴充套件 Item 的處理能力。在非同步處理器用例中,一個 AsyncItemProcessor 充當排程器,在新執行緒上執行 Item 的 ItemProcessor 邏輯。Item 完成後,將 Future 傳遞給 AsyncItemWriter 進行寫入。

因此,您可以透過使用非同步 Item 處理來提高效能,這基本上使您可以實現分叉-合併(fork-join)場景。一旦所有結果可用,AsyncItemWriter 就會收集結果並寫回 Chunk。

  • Java

  • XML

以下示例展示瞭如何在 Java 中配置 AsyncItemProcessor

Java 配置
@Bean
public AsyncItemProcessor processor(ItemProcessor itemProcessor, TaskExecutor taskExecutor) {
    AsyncItemProcessor asyncItemProcessor = new AsyncItemProcessor();
    asyncItemProcessor.setTaskExecutor(taskExecutor);
    asyncItemProcessor.setDelegate(itemProcessor);
    return asyncItemProcessor;
}

以下示例展示瞭如何在 XML 中配置 AsyncItemProcessor

XML 配置
<bean id="processor"
    class="org.springframework.batch.integration.async.AsyncItemProcessor">
  <property name="delegate">
    <bean class="your.ItemProcessor"/>
  </property>
  <property name="taskExecutor">
    <bean class="org.springframework.core.task.SimpleAsyncTaskExecutor"/>
  </property>
</bean>

delegate 屬性引用您的 ItemProcessor bean,而 taskExecutor 屬性引用您選擇的 TaskExecutor

  • Java

  • XML

以下示例展示瞭如何在 Java 中配置 AsyncItemWriter

Java 配置
@Bean
public AsyncItemWriter writer(ItemWriter itemWriter) {
    AsyncItemWriter asyncItemWriter = new AsyncItemWriter();
    asyncItemWriter.setDelegate(itemWriter);
    return asyncItemWriter;
}

以下示例展示瞭如何在 XML 中配置 AsyncItemWriter

XML 配置
<bean id="itemWriter"
    class="org.springframework.batch.integration.async.AsyncItemWriter">
  <property name="delegate">
    <bean id="itemWriter" class="your.ItemWriter"/>
  </property>
</bean>

同樣,delegate 屬性實際上是您的 ItemWriter bean 的引用。

外部化批處理執行

到目前為止討論的整合方法提出了 Spring Integration 像外殼一樣包裹 Spring Batch 的用例。然而,Spring Batch 也可以在內部使用 Spring Integration。透過這種方法,Spring Batch 使用者可以將 Item 甚至 Chunk 的處理委託給外部程序。這使您可以分載複雜的處理。Spring Batch Integration 為以下方面提供了專門的支援:

  • 遠端 Chunking

  • 遠端 Partitioning

遠端 Chunking

下圖展示了將 Spring Batch 與 Spring Integration 一起使用時,遠端 Chunking 工作的一種方式

Remote Chunking
圖 2. 遠端 Chunking

更進一步,您還可以透過使用 ChunkMessageChannelItemWriter(由 Spring Batch Integration 提供)來外部化 Chunk 處理,它傳送 Item 並收集結果。一旦傳送,Spring Batch 會繼續讀取和分組 Item 的過程,而無需等待結果。相反,ChunkMessageChannelItemWriter 的責任是收集結果並將其整合回 Spring Batch 程序中。

使用 Spring Integration,您可以完全控制程序的併發性(例如,透過使用 QueueChannel 而不是 DirectChannel)。此外,透過依賴 Spring Integration 豐富的通道介面卡集合(如 JMS 和 AMQP),您可以將批處理 Job 的 Chunk 分發到外部系統進行處理。

  • Java

  • XML

包含需要遠端 Chunking 的 Step 的 Job 在 Java 中的配置可能與以下類似

Java 配置
public Job chunkJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
     return new JobBuilder("personJob", jobRepository)
             .start(new StepBuilder("step1", jobRepository)
                     .<Person, Person>chunk(200, transactionManager)
                     .reader(itemReader())
                     .writer(itemWriter())
                     .build())
             .build();
 }

包含需要遠端 Chunking 的 Step 的 Job 在 XML 中的配置可能與以下類似

XML 配置
<job id="personJob">
  <step id="step1">
    <tasklet>
      <chunk reader="itemReader" writer="itemWriter" commit-interval="200"/>
    </tasklet>
    ...
  </step>
</job>

ItemReader 引用指向您希望在管理器上用於讀取資料的 bean。ItemWriter 引用指向一個特殊的 ItemWriter(稱為 ChunkMessageChannelItemWriter),如前所述。處理器(如果有)被排除在管理器配置之外,因為它在 worker 上配置。在實現您的用例時,您應檢查任何其他元件屬性,例如節流限制等。

  • Java

  • XML

以下 Java 配置提供了基本的管理器設定

Java 配置
@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
    factory.setBrokerURL("tcp://:61616");
    return factory;
}

/*
 * Configure outbound flow (requests going to workers)
 */
@Bean
public DirectChannel requests() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlow
            .from(requests())
            .handle(Jms.outboundAdapter(connectionFactory).destination("requests"))
            .get();
}

/*
 * Configure inbound flow (replies coming from workers)
 */
@Bean
public QueueChannel replies() {
    return new QueueChannel();
}

@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlow
            .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
            .channel(replies())
            .get();
}

/*
 * Configure the ChunkMessageChannelItemWriter
 */
@Bean
public ItemWriter<Integer> itemWriter() {
    MessagingTemplate messagingTemplate = new MessagingTemplate();
    messagingTemplate.setDefaultChannel(requests());
    messagingTemplate.setReceiveTimeout(2000);
    ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter
            = new ChunkMessageChannelItemWriter<>();
    chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);
    chunkMessageChannelItemWriter.setReplyChannel(replies());
    return chunkMessageChannelItemWriter;
}

以下 XML 配置提供了基本的管理器設定

XML 配置
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL" value="tcp://:61616"/>
</bean>

<int-jms:outbound-channel-adapter id="jmsRequests" destination-name="requests"/>

<bean id="messagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate">
  <property name="defaultChannel" ref="requests"/>
  <property name="receiveTimeout" value="2000"/>
</bean>

<bean id="itemWriter"
    class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
    scope="step">
  <property name="messagingOperations" ref="messagingTemplate"/>
  <property name="replyChannel" ref="replies"/>
</bean>

<int:channel id="replies">
  <int:queue/>
</int:channel>

<int-jms:message-driven-channel-adapter id="jmsReplies"
    destination-name="replies"
    channel="replies"/>

前面的配置為我們提供了一些 bean。我們使用 ActiveMQ 以及 Spring Integration 提供的入站和出站 JMS 介面卡來配置我們的訊息中介軟體。如所示,我們的 itemWriter bean(由我們的 job step 引用)使用 ChunkMessageChannelItemWriter 透過配置的中介軟體寫入 Chunk。

現在我們可以繼續進行 worker 配置,如下例所示

  • Java

  • XML

以下示例展示了 Java 中的 worker 配置

Java 配置
@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
    factory.setBrokerURL("tcp://:61616");
    return factory;
}

/*
 * Configure inbound flow (requests coming from the manager)
 */
@Bean
public DirectChannel requests() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlow
            .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
            .channel(requests())
            .get();
}

/*
 * Configure outbound flow (replies going to the manager)
 */
@Bean
public DirectChannel replies() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlow
            .from(replies())
            .handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
            .get();
}

/*
 * Configure the ChunkProcessorChunkHandler
 */
@Bean
@ServiceActivator(inputChannel = "requests", outputChannel = "replies")
public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() {
    ChunkProcessor<Integer> chunkProcessor
            = new SimpleChunkProcessor<>(itemProcessor(), itemWriter());
    ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler
            = new ChunkProcessorChunkHandler<>();
    chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);
    return chunkProcessorChunkHandler;
}

以下示例展示了 XML 中的 worker 配置

XML 配置
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL" value="tcp://:61616"/>
</bean>

<int:channel id="requests"/>
<int:channel id="replies"/>

<int-jms:message-driven-channel-adapter id="incomingRequests"
    destination-name="requests"
    channel="requests"/>

<int-jms:outbound-channel-adapter id="outgoingReplies"
    destination-name="replies"
    channel="replies">
</int-jms:outbound-channel-adapter>

<int:service-activator id="serviceActivator"
    input-channel="requests"
    output-channel="replies"
    ref="chunkProcessorChunkHandler"
    method="handleChunk"/>

<bean id="chunkProcessorChunkHandler"
    class="org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler">
  <property name="chunkProcessor">
    <bean class="org.springframework.batch.core.step.item.SimpleChunkProcessor">
      <property name="itemWriter">
        <bean class="io.spring.sbi.PersonItemWriter"/>
      </property>
      <property name="itemProcessor">
        <bean class="io.spring.sbi.PersonItemProcessor"/>
      </property>
    </bean>
  </property>
</bean>

這些配置項中的大多數應該看起來與管理器配置相似。Worker 不需要訪問 Spring Batch JobRepository 或實際的 job 配置檔案。主要的關注 bean 是 chunkProcessorChunkHandlerChunkProcessorChunkHandlerchunkProcessor 屬性接受一個配置好的 SimpleChunkProcessor,您可以在其中提供對您的 ItemWriter(以及可選的 ItemProcessor)的引用,當 worker 從管理器接收到 Chunk 時,這些元件將在 worker 上執行。

有關更多資訊,請參閱“可伸縮性”章節中關於遠端 Chunking 的部分。

從版本 4.1 開始,Spring Batch Integration 引入了 @EnableBatchIntegration 註解,可用於簡化遠端 Chunking 設定。此註解提供了兩個可以在您的應用程式上下文中自動裝配的 bean:

  • RemoteChunkingManagerStepBuilderFactory: 配置管理器 Step

  • RemoteChunkingWorkerBuilder: 配置遠端 worker 整合流程

這些 API 負責配置許多元件,如下圖所示

Remote Chunking Configuration
圖 3. 遠端 Chunking 配置

在管理器端,RemoteChunkingManagerStepBuilderFactory 允許您透過宣告以下內容來配置管理器 Step:

  • 用於讀取 Item 並將其傳送給 worker 的 Item Reader

  • 用於向 worker 傳送請求的出站通道(“Outgoing requests”)

  • 用於接收 worker 回覆的入站通道(“Incoming replies”)

您無需顯式配置 ChunkMessageChannelItemWriterMessagingTemplate。(如果您有理由這樣做,仍然可以顯式配置它們)。

在 worker 端,RemoteChunkingWorkerBuilder 允許您配置 worker 以:

  • 監聽管理器在入站通道(“Incoming requests”)上傳送的請求

  • 使用配置好的 ItemProcessorItemWriter 對每個請求呼叫 ChunkProcessorChunkHandlerhandleChunk 方法

  • 在出站通道(“Outgoing replies”)上向管理器傳送回覆

您無需顯式配置 SimpleChunkProcessorChunkProcessorChunkHandler。(如果您有理由這樣做,仍然可以顯式配置它們)。

以下示例展示瞭如何使用這些 API

@EnableBatchIntegration
@EnableBatchProcessing
public class RemoteChunkingJobConfiguration {

    @Configuration
    public static class ManagerConfiguration {

        @Autowired
        private RemoteChunkingManagerStepBuilderFactory managerStepBuilderFactory;

        @Bean
        public TaskletStep managerStep() {
            return this.managerStepBuilderFactory.get("managerStep")
                       .chunk(100)
                       .reader(itemReader())
                       .outputChannel(requests()) // requests sent to workers
                       .inputChannel(replies())   // replies received from workers
                       .build();
        }

        // Middleware beans setup omitted

    }

    @Configuration
    public static class WorkerConfiguration {

        @Autowired
        private RemoteChunkingWorkerBuilder workerBuilder;

        @Bean
        public IntegrationFlow workerFlow() {
            return this.workerBuilder
                       .itemProcessor(itemProcessor())
                       .itemWriter(itemWriter())
                       .inputChannel(requests()) // requests received from the manager
                       .outputChannel(replies()) // replies sent to the manager
                       .build();
        }

        // Middleware beans setup omitted

    }

}

您可以在此處找到遠端 Chunking Job 的完整示例。

遠端 Partitioning

下圖展示了典型的遠端 Partitioning 情況

Remote Partitioning
圖 4. 遠端 Partitioning

另一方面,遠端 Partitioning 在不是 Item 處理而是相關的 I/O 導致瓶頸時很有用。使用遠端 Partitioning,您可以將工作傳送給執行完整 Spring Batch Step 的 worker。因此,每個 worker 都有自己的 ItemReaderItemProcessorItemWriter。為此,Spring Batch Integration 提供了 MessageChannelPartitionHandler

PartitionHandler 介面的此實現使用 MessageChannel 例項向遠端 worker 傳送指令並接收其響應。這提供了一個很好的抽象,使其與用於與遠端 worker 通訊的傳輸方式(如 JMS 和 AMQP)分離。

“可伸縮性”章節中討論遠端 Partitioning 的部分概述了配置遠端 Partitioning 所需的概念和元件,並展示了使用預設 TaskExecutorPartitionHandler 在單獨的本地執行執行緒中進行 Partitioning 的示例。對於到多個 JVM 的遠端 Partitioning,還需要兩個額外的元件:

  • 一個遠端呼叫框架或網格環境

  • 支援所需的遠端呼叫框架或網格環境的 PartitionHandler 實現

與遠端 Chunking 類似,您可以使用 JMS 作為“遠端呼叫框架”。在這種情況下,如前所述,使用 MessageChannelPartitionHandler 例項作為 PartitionHandler 實現。

  • Java

  • XML

以下示例假設存在一個已 Partitioning 的 Job,並重點介紹 Java 中的 MessageChannelPartitionHandler 和 JMS 配置

Java 配置
/*
 * Configuration of the manager side
 */
@Bean
public PartitionHandler partitionHandler() {
    MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();
    partitionHandler.setStepName("step1");
    partitionHandler.setGridSize(3);
    partitionHandler.setReplyChannel(outboundReplies());
    MessagingTemplate template = new MessagingTemplate();
    template.setDefaultChannel(outboundRequests());
    template.setReceiveTimeout(100000);
    partitionHandler.setMessagingOperations(template);
    return partitionHandler;
}

@Bean
public QueueChannel outboundReplies() {
    return new QueueChannel();
}

@Bean
public DirectChannel outboundRequests() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundJmsRequests() {
    return IntegrationFlow.from("outboundRequests")
            .handle(Jms.outboundGateway(connectionFactory())
                    .requestDestination("requestsQueue"))
            .get();
}

@Bean
@ServiceActivator(inputChannel = "inboundStaging")
public AggregatorFactoryBean partitioningMessageHandler() throws Exception {
    AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
    aggregatorFactoryBean.setProcessorBean(partitionHandler());
    aggregatorFactoryBean.setOutputChannel(outboundReplies());
    // configure other propeties of the aggregatorFactoryBean
    return aggregatorFactoryBean;
}

@Bean
public DirectChannel inboundStaging() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow inboundJmsStaging() {
    return IntegrationFlow
            .from(Jms.messageDrivenChannelAdapter(connectionFactory())
                    .configureListenerContainer(c -> c.subscriptionDurable(false))
                    .destination("stagingQueue"))
            .channel(inboundStaging())
            .get();
}

/*
 * Configuration of the worker side
 */
@Bean
public StepExecutionRequestHandler stepExecutionRequestHandler() {
    StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
    stepExecutionRequestHandler.setJobExplorer(jobExplorer);
    stepExecutionRequestHandler.setStepLocator(stepLocator());
    return stepExecutionRequestHandler;
}

@Bean
@ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging")
public StepExecutionRequestHandler serviceActivator() throws Exception {
    return stepExecutionRequestHandler();
}

@Bean
public DirectChannel inboundRequests() {
    return new DirectChannel();
}

public IntegrationFlow inboundJmsRequests() {
    return IntegrationFlow
            .from(Jms.messageDrivenChannelAdapter(connectionFactory())
                    .configureListenerContainer(c -> c.subscriptionDurable(false))
                    .destination("requestsQueue"))
            .channel(inboundRequests())
            .get();
}

@Bean
public DirectChannel outboundStaging() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundJmsStaging() {
    return IntegrationFlow.from("outboundStaging")
            .handle(Jms.outboundGateway(connectionFactory())
                    .requestDestination("stagingQueue"))
            .get();
}

以下示例假設存在一個已 Partitioning 的 Job,並重點介紹 XML 中的 MessageChannelPartitionHandler 和 JMS 配置

XML 配置
<bean id="partitionHandler"
   class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
  <property name="stepName" value="step1"/>
  <property name="gridSize" value="3"/>
  <property name="replyChannel" ref="outbound-replies"/>
  <property name="messagingOperations">
    <bean class="org.springframework.integration.core.MessagingTemplate">
      <property name="defaultChannel" ref="outbound-requests"/>
      <property name="receiveTimeout" value="100000"/>
    </bean>
  </property>
</bean>

<int:channel id="outbound-requests"/>
<int-jms:outbound-channel-adapter destination="requestsQueue"
    channel="outbound-requests"/>

<int:channel id="inbound-requests"/>
<int-jms:message-driven-channel-adapter destination="requestsQueue"
    channel="inbound-requests"/>

<bean id="stepExecutionRequestHandler"
    class="org.springframework.batch.integration.partition.StepExecutionRequestHandler">
  <property name="jobExplorer" ref="jobExplorer"/>
  <property name="stepLocator" ref="stepLocator"/>
</bean>

<int:service-activator ref="stepExecutionRequestHandler" input-channel="inbound-requests"
    output-channel="outbound-staging"/>

<int:channel id="outbound-staging"/>
<int-jms:outbound-channel-adapter destination="stagingQueue"
    channel="outbound-staging"/>

<int:channel id="inbound-staging"/>
<int-jms:message-driven-channel-adapter destination="stagingQueue"
    channel="inbound-staging"/>

<int:aggregator ref="partitionHandler" input-channel="inbound-staging"
    output-channel="outbound-replies"/>

<int:channel id="outbound-replies">
  <int:queue/>
</int:channel>

<bean id="stepLocator"
    class="org.springframework.batch.integration.partition.BeanFactoryStepLocator" />

您還必須確保 Partition 的 handler 屬性對映到 partitionHandler bean。

  • Java

  • XML

以下示例展示瞭如何將 Java 中的 Partition handler 屬性對映到 partitionHandler

Java 配置
	public Job personJob(JobRepository jobRepository) {
		return new JobBuilder("personJob", jobRepository)
				.start(new StepBuilder("step1.manager", jobRepository)
						.partitioner("step1.worker", partitioner())
						.partitionHandler(partitionHandler())
						.build())
				.build();
	}

以下示例展示瞭如何將 XML 中的 Partition handler 屬性對映到 partitionHandler

XML 配置
<job id="personJob">
  <step id="step1.manager">
    <partition partitioner="partitioner" handler="partitionHandler"/>
    ...
  </step>
</job>

您可以在此處找到遠端 Partitioning Job 的完整示例。

您可以使用 @EnableBatchIntegration 註解來簡化遠端 Partitioning 設定。此註解提供了兩個對遠端 Partitioning 有用的 bean:

  • RemotePartitioningManagerStepBuilderFactory: 配置管理器 Step

  • RemotePartitioningWorkerStepBuilderFactory: 配置 worker Step

這些 API 負責配置許多元件,如下圖所示

Remote Partitioning Configuration (with job repository polling)
圖 5. 遠端 Partitioning 配置(帶 Job 倉庫輪詢)
Remote Partitioning Configuration (with replies aggregation)
圖 6. 遠端 Partitioning 配置(帶回復聚合)

在管理器端,RemotePartitioningManagerStepBuilderFactory 允許您透過宣告以下內容來配置管理器 Step:

  • 用於 Partition 資料的 Partitioner

  • 用於向 worker 傳送請求的出站通道(“Outgoing requests”)

  • 用於接收 worker 回覆的入站通道(“Incoming replies”)(配置回覆聚合時)

  • 輪詢間隔和超時引數(配置 Job 倉庫輪詢時)

您無需顯式配置 MessageChannelPartitionHandlerMessagingTemplate。(如果您有理由這樣做,仍然可以顯式配置它們)。

在 worker 端,RemotePartitioningWorkerStepBuilderFactory 允許您配置 worker 以:

  • 監聽管理器在入站通道(“Incoming requests”)上傳送的請求

  • 對每個請求呼叫 StepExecutionRequestHandlerhandle 方法

  • 在出站通道(“Outgoing replies”)上向管理器傳送回覆

您無需顯式配置 StepExecutionRequestHandler。(如果您有理由這樣做,仍然可以顯式配置它)。

以下示例展示瞭如何使用這些 API

@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
public class RemotePartitioningJobConfiguration {

    @Configuration
    public static class ManagerConfiguration {

        @Autowired
        private RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;

        @Bean
        public Step managerStep() {
                 return this.managerStepBuilderFactory
                    .get("managerStep")
                    .partitioner("workerStep", partitioner())
                    .gridSize(10)
                    .outputChannel(outgoingRequestsToWorkers())
                    .inputChannel(incomingRepliesFromWorkers())
                    .build();
        }

        // Middleware beans setup omitted

    }

    @Configuration
    public static class WorkerConfiguration {

        @Autowired
        private RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;

        @Bean
        public Step workerStep() {
                 return this.workerStepBuilderFactory
                    .get("workerStep")
                    .inputChannel(incomingRequestsFromManager())
                    .outputChannel(outgoingRepliesToManager())
                    .chunk(100)
                    .reader(itemReader())
                    .processor(itemProcessor())
                    .writer(itemWriter())
                    .build();
        }

        // Middleware beans setup omitted

    }

}