reactive() 端點
從 5.5 版本開始,ConsumerEndpointSpec 提供了一個 reactive() 配置屬性,帶有一個可選的定製器 Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>。此選項將目標端點配置為 ReactiveStreamsConsumer 例項,而不管輸入通道型別如何,該通道型別透過 IntegrationReactiveUtils.messageChannelToFlux() 轉換為 Flux。所提供的函式用於 Flux.transform() 運算子,以定製(publishOn()、log()、doOnNext() 等)來自輸入通道的反應式流源。
以下示例演示瞭如何獨立於最終訂閱者和生產者更改輸入通道的釋出執行緒到該 DirectChannel
@Bean
public IntegrationFlow reactiveEndpointFlow() {
return IntegrationFlow
.from("inputChannel")
.transformWith(t -> t
.<String, Integer>transformer(Integer::parseInt)
.reactive(flux -> flux.publishOn(Schedulers.parallel()))
)
.get();
}
有關更多資訊,請參閱 反應式流支援。