Spring Integration 互動
Spring Integration 框架擴充套件了 Spring 程式設計模型,以支援眾所周知的企業整合模式。它支援基於 Spring 的應用程式中的輕量級訊息傳遞,並透過宣告式介面卡支援與外部系統的整合。它還提供了一個高階 DSL,可以將各種操作(端點)組合成一個邏輯整合流。透過這種 DSL 配置的 lambda 風格,Spring Integration 已經很好地採用了 java.util.function 介面。@MessagingGateway 代理介面也可以作為 Function 或 Consumer,根據 Spring Cloud Function 環境,可以將其註冊到函式目錄中。有關 Spring Integration 對函式的支援的更多資訊,請參閱 參考手冊。
另一方面,從版本 4.0.3 開始,Spring Cloud Function 引入了一個 spring-cloud-function-integration 模組,該模組提供了一個更深入、更特定於雲且基於自動配置的 API,用於從 Spring Integration DSL 角度與 FunctionCatalog 互動。FunctionFlowBuilder 透過 FunctionCatalog 自動配置和自動裝配,並表示目標 IntegrationFlow 例項的函式特定 DSL 的入口點。除了標準 IntegrationFlow.from() 工廠(為了方便)之外,FunctionFlowBuilder 還公開了一個 fromSupplier(String supplierDefinition) 工廠,用於在提供的 FunctionCatalog 中查詢目標 Supplier。然後,此 FunctionFlowBuilder 導向 FunctionFlowDefinition。此 FunctionFlowDefinition 是 IntegrationFlowExtension 的實現,並公開 apply(String functionDefinition) 和 accept(String consumerDefinition) 運算子,用於分別從 FunctionCatalog 中查詢 Function 或 Consumer。有關更多資訊,請參閱它們的 Javadoc。
以下示例演示了 FunctionFlowBuilder 的實際應用以及 IntegrationFlow API 其餘部分的強大功能。
@Configuration
public class IntegrationConfiguration {
@Bean
Supplier<byte[]> simpleByteArraySupplier() {
return "simple test data"::getBytes;
}
@Bean
Function<String, String> upperCaseFunction() {
return String::toUpperCase;
}
@Bean
BlockingQueue<String> results() {
return new LinkedBlockingQueue<>();
}
@Bean
Consumer<String> simpleStringConsumer(BlockingQueue<String> results) {
return results::add;
}
@Bean
QueueChannel wireTapChannel() {
return new QueueChannel();
}
@Bean
IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
return functionFlowBuilder
.fromSupplier("simpleByteArraySupplier")
.wireTap("wireTapChannel")
.apply("upperCaseFunction")
.log(LoggingHandler.Level.WARN)
.accept("simpleStringConsumer");
}
}
由於 FunctionCatalog.lookup() 功能不僅限於簡單的函式名稱,因此函式組合功能也可以用於上述 apply() 和 accept() 運算子。
@Bean
IntegrationFlow functionCompositionFlow(FunctionFlowBuilder functionFlowBuilder) {
return functionFlowBuilder
.from("functionCompositionInput")
.accept("upperCaseFunction|simpleStringConsumer");
}
當我們向 Spring Cloud 應用程式中新增預定義函式的自動配置依賴項時,此 API 變得更加相關。例如,Stream Applications 專案除了應用程式映像之外,還提供了用於各種整合用例的函式工件,例如 debezium-supplier、elasticsearch-consumer、aggregator-function 等。
以下配置分別基於 http-supplier、spel-function 和 file-consumer。
@Bean
IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
return functionFlowBuilder
.fromSupplier("httpSupplier", e -> e.poller(Pollers.trigger(new OnlyOnceTrigger())))
.<Flux<?>>handle((fluxPayload, headers) -> fluxPayload, e -> e.async(true))
.channel(c -> c.flux())
.apply("spelFunction")
.<String, String>transform(String::toUpperCase)
.accept("fileConsumer");
}
我們還需要做的就是將它們的配置新增到 application.properties 中(如果需要)。
http.path-pattern=/testPath
spel.function.expression=new String(payload)
file.consumer.name=test-data.txt