Spring Integration Interaction
Spring Integration Framework 擴充套件了 Spring 程式設計模型以支援眾所周知的企業整合模式。它支援基於 Spring 的應用程式中的輕量級訊息傳遞,並透過宣告式介面卡支援與外部系統的整合。它還提供了一個高階 DSL 來將各種操作(端點)組合成一個邏輯整合流。憑藉這種 DSL 配置的 lambda 風格,Spring Integration 已經具有良好的java.util.function
介面採用率。 @MessagingGateway
代理介面也可以作為Function
或Consumer
,根據 Spring Cloud Function 環境,可以將其註冊到函式目錄中。有關其對函式支援的更多資訊,請參見 Spring Integration 參考手冊。
另一方面,從版本4.0.3
開始,Spring Cloud Function 引入了一個spring-cloud-function-integration
模組,該模組為從 Spring Integration DSL 角度與FunctionCatalog
的互動提供了更深入、更特定於雲且基於自動配置的 API。 FunctionFlowBuilder
透過FunctionCatalog
自動配置和自動裝配,並表示目標IntegrationFlow
例項的特定於函式的 DSL 的入口點。除了標準的IntegrationFlow.from()
工廠(為了方便起見),FunctionFlowBuilder
公開了一個fromSupplier(String supplierDefinition)
工廠,用於在提供的FunctionCatalog
中查詢目標Supplier
。然後,此FunctionFlowBuilder
會引導到FunctionFlowDefinition
。此FunctionFlowDefinition
是IntegrationFlowExtension
的實現,並公開apply(String functionDefinition)
和accept(String consumerDefinition)
運算子,分別從FunctionCatalog
中查詢Function
或Consumer
。有關更多資訊,請參見其 Javadoc。
以下示例演示了與IntegrationFlow
API 的其餘功能一起使用的FunctionFlowBuilder
@Configuration
public class IntegrationConfiguration {
@Bean
Supplier<byte[]> simpleByteArraySupplier() {
return "simple test data"::getBytes;
}
@Bean
Function<String, String> upperCaseFunction() {
return String::toUpperCase;
}
@Bean
BlockingQueue<String> results() {
return new LinkedBlockingQueue<>();
}
@Bean
Consumer<String> simpleStringConsumer(BlockingQueue<String> results) {
return results::add;
}
@Bean
QueueChannel wireTapChannel() {
return new QueueChannel();
}
@Bean
IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
return functionFlowBuilder
.fromSupplier("simpleByteArraySupplier")
.wireTap("wireTapChannel")
.apply("upperCaseFunction")
.log(LoggingHandler.Level.WARN)
.accept("simpleStringConsumer");
}
}
由於FunctionCatalog.lookup()
功能不僅限於簡單的函式名稱,因此函式組合功能也可以在提到的apply()
和accept()
運算子中使用
@Bean
IntegrationFlow functionCompositionFlow(FunctionFlowBuilder functionFlowBuilder) {
return functionFlowBuilder
.from("functionCompositionInput")
.accept("upperCaseFunction|simpleStringConsumer");
}
當我們將預定義函式的自動配置依賴項新增到 Spring Cloud 應用程式中時,此 API 變得更加相關。例如Stream Applications專案,除了應用程式映像外,還提供了具有各種整合用例的函式的工件,例如debezium-supplier
、elasticsearch-consumer
、aggregator-function
等。
以下配置分別基於http-supplier
、spel-function
和file-consumer
@Bean
IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
return functionFlowBuilder
.fromSupplier("httpSupplier", e -> e.poller(Pollers.trigger(new OnlyOnceTrigger())))
.<Flux<?>>handle((fluxPayload, headers) -> fluxPayload, e -> e.async(true))
.channel(c -> c.flux())
.apply("spelFunction")
.<String, String>transform(String::toUpperCase)
.accept("fileConsumer");
}
我們需要做的另一件事是將它們的配置新增到application.properties
中(如果需要)
http.path-pattern=/testPath
spel.function.expression=new String(payload)
file.consumer.name=test-data.txt