入站通道介面卡
通常,訊息流從入站通道介面卡(例如 <int-jdbc:inbound-channel-adapter>
)開始。介面卡配置了 <poller>
,並要求 MessageSource<?>
定期生成訊息。Java DSL 也允許從 MessageSource<?>
開始一個 IntegrationFlow
。為此,IntegrationFlow
流式 API 提供了一個過載的 IntegrationFlow.from(MessageSource<?> messageSource)
方法。您可以將 MessageSource<?>
配置為一個 bean,並將其作為該方法的引數提供。IntegrationFlow.from()
的第二個引數是一個 Consumer<SourcePollingChannelAdapterSpec>
lambda,它允許您為 SourcePollingChannelAdapter
提供選項(例如 PollerMetadata
或 SmartLifecycle
)。以下示例展示瞭如何使用流式 API 和 lambda 來建立 IntegrationFlow
@Bean
public MessageSource<Object> jdbcMessageSource() {
return new JdbcPollingChannelAdapter(this.dataSource, "SELECT * FROM something");
}
@Bean
public IntegrationFlow pollingFlow() {
return IntegrationFlow.from(jdbcMessageSource(),
c -> c.poller(Pollers.fixedRate(100).maxMessagesPerPoll(1)))
.transform(Transformers.toJson())
.channel("furtherProcessChannel")
.get();
}
對於那些沒有直接構建 Message
物件要求的場景,您可以使用基於 java.util.function.Supplier
的 IntegrationFlow.fromSupplier()
變體。Supplier.get()
的結果會被自動封裝到 Message
中(如果它本身不是 Message
)。