Spring Integration 互動

Spring Integration 框架擴充套件了 Spring 程式設計模型,以支援眾所周知的企業整合模式。它支援基於 Spring 的應用程式中的輕量級訊息傳遞,並透過宣告式介面卡支援與外部系統的整合。它還提供了一個高階 DSL,可以將各種操作(端點)組合成一個邏輯整合流。透過這種 DSL 配置的 lambda 風格,Spring Integration 已經很好地採用了 java.util.function 介面。@MessagingGateway 代理介面也可以作為 FunctionConsumer,根據 Spring Cloud Function 環境,可以將其註冊到函式目錄中。有關 Spring Integration 對函式的支援的更多資訊,請參閱 參考手冊

另一方面,從版本 4.0.3 開始,Spring Cloud Function 引入了一個 spring-cloud-function-integration 模組,該模組提供了一個更深入、更特定於雲且基於自動配置的 API,用於從 Spring Integration DSL 角度與 FunctionCatalog 互動。FunctionFlowBuilder 透過 FunctionCatalog 自動配置和自動裝配,並表示目標 IntegrationFlow 例項的函式特定 DSL 的入口點。除了標準 IntegrationFlow.from() 工廠(為了方便)之外,FunctionFlowBuilder 還公開了一個 fromSupplier(String supplierDefinition) 工廠,用於在提供的 FunctionCatalog 中查詢目標 Supplier。然後,此 FunctionFlowBuilder 導向 FunctionFlowDefinition。此 FunctionFlowDefinitionIntegrationFlowExtension 的實現,並公開 apply(String functionDefinition)accept(String consumerDefinition) 運算子,用於分別從 FunctionCatalog 中查詢 FunctionConsumer。有關更多資訊,請參閱它們的 Javadoc。

以下示例演示了 FunctionFlowBuilder 的實際應用以及 IntegrationFlow API 其餘部分的強大功能。

@Configuration
public class IntegrationConfiguration {

    @Bean
    Supplier<byte[]> simpleByteArraySupplier() {
        return "simple test data"::getBytes;
    }

    @Bean
    Function<String, String> upperCaseFunction() {
        return String::toUpperCase;
    }

    @Bean
    BlockingQueue<String> results() {
        return new LinkedBlockingQueue<>();
    }

    @Bean
    Consumer<String> simpleStringConsumer(BlockingQueue<String> results) {
        return results::add;
    }

    @Bean
    QueueChannel wireTapChannel() {
        return new QueueChannel();
    }

    @Bean
    IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
        return functionFlowBuilder
                .fromSupplier("simpleByteArraySupplier")
                .wireTap("wireTapChannel")
                .apply("upperCaseFunction")
                .log(LoggingHandler.Level.WARN)
                .accept("simpleStringConsumer");
    }

}

由於 FunctionCatalog.lookup() 功能不僅限於簡單的函式名稱,因此函式組合功能也可以用於上述 apply()accept() 運算子。

@Bean
IntegrationFlow functionCompositionFlow(FunctionFlowBuilder functionFlowBuilder) {
    return functionFlowBuilder
            .from("functionCompositionInput")
            .accept("upperCaseFunction|simpleStringConsumer");
}

當我們向 Spring Cloud 應用程式中新增預定義函式的自動配置依賴項時,此 API 變得更加相關。例如,Stream Applications 專案除了應用程式映像之外,還提供了用於各種整合用例的函式工件,例如 debezium-supplierelasticsearch-consumeraggregator-function 等。

以下配置分別基於 http-supplierspel-functionfile-consumer

@Bean
IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
    return functionFlowBuilder
            .fromSupplier("httpSupplier", e -> e.poller(Pollers.trigger(new OnlyOnceTrigger())))
            .<Flux<?>>handle((fluxPayload, headers) -> fluxPayload, e -> e.async(true))
            .channel(c -> c.flux())
            .apply("spelFunction")
            .<String, String>transform(String::toUpperCase)
            .accept("fileConsumer");
}

我們還需要做的就是將它們的配置新增到 application.properties 中(如果需要)。

http.path-pattern=/testPath
spel.function.expression=new String(payload)
file.consumer.name=test-data.txt
© . This site is unofficial and not affiliated with VMware.