入站通道介面卡

通常,訊息流從入站通道介面卡(例如 <int-jdbc:inbound-channel-adapter>)開始。介面卡配置了 <poller>,並要求 MessageSource<?> 定期生成訊息。Java DSL 也允許從 MessageSource<?> 開始一個 IntegrationFlow。為此,IntegrationFlow 流式 API 提供了一個過載的 IntegrationFlow.from(MessageSource<?> messageSource) 方法。您可以將 MessageSource<?> 配置為一個 bean,並將其作為該方法的引數提供。IntegrationFlow.from() 的第二個引數是一個 Consumer<SourcePollingChannelAdapterSpec> lambda,它允許您為 SourcePollingChannelAdapter 提供選項(例如 PollerMetadataSmartLifecycle)。以下示例展示瞭如何使用流式 API 和 lambda 來建立 IntegrationFlow

@Bean
public MessageSource<Object> jdbcMessageSource() {
    return new JdbcPollingChannelAdapter(this.dataSource, "SELECT * FROM something");
}

@Bean
public IntegrationFlow pollingFlow() {
    return IntegrationFlow.from(jdbcMessageSource(),
                c -> c.poller(Pollers.fixedRate(100).maxMessagesPerPoll(1)))
            .transform(Transformers.toJson())
            .channel("furtherProcessChannel")
            .get();
}

對於那些沒有直接構建 Message 物件要求的場景,您可以使用基於 java.util.function.SupplierIntegrationFlow.fromSupplier() 變體。Supplier.get() 的結果會被自動封裝到 Message 中(如果它本身不是 Message)。