reactive() 端點

從 5.5 版本開始,ConsumerEndpointSpec 提供了一個 reactive() 配置屬性,帶有一個可選的 customizer Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>。此選項將目標端點配置為 ReactiveStreamsConsumer 例項,這與輸入通道型別無關,輸入通道透過 IntegrationReactiveUtils.messageChannelToFlux() 轉換為 Flux。提供的函式從 Flux.transform() 運算子中使用,用於自定義來自輸入通道的響應式流源(例如 publishOn()log()doOnNext() 等)。

以下示例演示瞭如何獨立於最終訂閱者和生產者更改來自輸入通道的釋出執行緒到該 DirectChannel

@Bean
public IntegrationFlow reactiveEndpointFlow() {
    return IntegrationFlow
            .from("inputChannel")
            .transformWith(t -> t
                              .<String, Integer>transformer(Integer::parseInt)
                              .reactive(flux -> flux.publishOn(Schedulers.parallel()))
            )
            .get();
}

更多資訊請參閱Reactive Streams 支援