Spring Integration Interaction

Spring Integration Framework 擴充套件了 Spring 程式設計模型以支援眾所周知的企業整合模式。它支援基於 Spring 的應用程式中的輕量級訊息傳遞,並透過宣告式介面卡支援與外部系統的整合。它還提供了一個高階 DSL 來將各種操作(端點)組合成一個邏輯整合流。憑藉這種 DSL 配置的 lambda 風格,Spring Integration 已經具有良好的java.util.function介面採用率。 @MessagingGateway 代理介面也可以作為FunctionConsumer,根據 Spring Cloud Function 環境,可以將其註冊到函式目錄中。有關其對函式支援的更多資訊,請參見 Spring Integration 參考手冊

另一方面,從版本4.0.3開始,Spring Cloud Function 引入了一個spring-cloud-function-integration模組,該模組為從 Spring Integration DSL 角度與FunctionCatalog的互動提供了更深入、更特定於雲且基於自動配置的 API。 FunctionFlowBuilder透過FunctionCatalog自動配置和自動裝配,並表示目標IntegrationFlow例項的特定於函式的 DSL 的入口點。除了標準的IntegrationFlow.from()工廠(為了方便起見),FunctionFlowBuilder公開了一個fromSupplier(String supplierDefinition)工廠,用於在提供的FunctionCatalog中查詢目標Supplier。然後,此FunctionFlowBuilder會引導到FunctionFlowDefinition。此FunctionFlowDefinitionIntegrationFlowExtension的實現,並公開apply(String functionDefinition)accept(String consumerDefinition)運算子,分別從FunctionCatalog中查詢FunctionConsumer。有關更多資訊,請參見其 Javadoc。

以下示例演示了與IntegrationFlow API 的其餘功能一起使用的FunctionFlowBuilder

@Configuration
public class IntegrationConfiguration {

    @Bean
    Supplier<byte[]> simpleByteArraySupplier() {
        return "simple test data"::getBytes;
    }

    @Bean
    Function<String, String> upperCaseFunction() {
        return String::toUpperCase;
    }

    @Bean
    BlockingQueue<String> results() {
        return new LinkedBlockingQueue<>();
    }

    @Bean
    Consumer<String> simpleStringConsumer(BlockingQueue<String> results) {
        return results::add;
    }

    @Bean
    QueueChannel wireTapChannel() {
        return new QueueChannel();
    }

    @Bean
    IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
        return functionFlowBuilder
                .fromSupplier("simpleByteArraySupplier")
                .wireTap("wireTapChannel")
                .apply("upperCaseFunction")
                .log(LoggingHandler.Level.WARN)
                .accept("simpleStringConsumer");
    }

}

由於FunctionCatalog.lookup()功能不僅限於簡單的函式名稱,因此函式組合功能也可以在提到的apply()accept()運算子中使用

@Bean
IntegrationFlow functionCompositionFlow(FunctionFlowBuilder functionFlowBuilder) {
    return functionFlowBuilder
            .from("functionCompositionInput")
            .accept("upperCaseFunction|simpleStringConsumer");
}

當我們將預定義函式的自動配置依賴項新增到 Spring Cloud 應用程式中時,此 API 變得更加相關。例如Stream Applications專案,除了應用程式映像外,還提供了具有各種整合用例的函式的工件,例如debezium-supplierelasticsearch-consumeraggregator-function等。

以下配置分別基於http-supplierspel-functionfile-consumer

@Bean
IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
    return functionFlowBuilder
            .fromSupplier("httpSupplier", e -> e.poller(Pollers.trigger(new OnlyOnceTrigger())))
            .<Flux<?>>handle((fluxPayload, headers) -> fluxPayload, e -> e.async(true))
            .channel(c -> c.flux())
            .apply("spelFunction")
            .<String, String>transform(String::toUpperCase)
            .accept("fileConsumer");
}

我們需要做的另一件事是將它們的配置新增到application.properties中(如果需要)

http.path-pattern=/testPath
spel.function.expression=new String(payload)
file.consumer.name=test-data.txt