Apache Camel 支援

Spring Integration 提供了 API 和配置,用於與在同一應用上下文中宣告的 Apache Camel 端點進行通訊。

你需要將此依賴項新增到你的專案中

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-camel</artifactId>
    <version>6.4.4</version>
</dependency>
compile "org.springframework.integration:spring-integration-camel:6.4.4"

Spring Integration 和 Apache Camel 都實現了企業整合模式,並提供了方便的方式來組合它們,但這兩個專案在 API 和抽象實現上採用了不同的方法。Spring Integration 完全依賴於 Spring Core 的依賴注入容器。它使用許多其他 Spring 專案(如 Spring Data、Spring AMQP、Spring for Apache Kafka 等)來實現其通道介面卡。它還使用 MessageChannel 抽象作為一級公民,開發者在組合其整合流時需要了解這一點。另一方面,Apache Camel 不提供訊息通道的一級公民抽象,並建議透過隱藏在 API 背後的內部交換來組合其路由。此外,它還需要一些額外的依賴項和配置才能在 Spring 應用中使用。

即使最終的企業整合解決方案的各個部分如何實現並不重要,開發者體驗和高生產力也是需要考慮的因素。因此,開發者可能會出於許多原因選擇一個框架而不是另一個,或者在某些目標系統支援存在空白時兩者都使用。Spring Integration 和 Apache Camel 應用可以透過許多外部協議互相互動,它們都為這些協議實現了通道介面卡。例如,一個 Spring Integration 流可以將記錄釋出到 Apache Kafka 主題,該主題由消費者端的 Apache Camel 端點消費。或者,一個 Apache Camel 路由可以將資料寫入 SFTP 檔案目錄,該目錄由 Spring Integration 的 SFTP 入站通道介面卡輪詢。或者,在同一個 Spring 應用上下文中,它們可以透過 ApplicationEvent 抽象進行通訊。

為了簡化開發過程並避免不必要的網路跳躍,Apache Camel 提供了一個模組,用於透過訊息通道與 Spring Integration 進行通訊。所需的只是應用上下文中的 MessageChannel 引用,用於傳送或消費訊息。當 Apache Camel 路由是訊息流的發起者,而 Spring Integration 僅作為解決方案的一部分扮演支援角色時,這種方式效果很好。

為了提供類似的開發者體驗,Spring Integration 現在提供了一個通道介面卡,用於呼叫 Apache Camel 端點,並且可選地等待回覆。沒有入站通道介面卡,因為從 Spring Integration API 和抽象的角度來看,訂閱 MessageChannel 來消費 Apache Camel 訊息就足夠了。

Apache Camel 出站通道介面卡

CamelMessageHandlerAbstractReplyProducingMessageHandler 的實現,可以工作在單向(預設)和請求-回覆兩種模式下。它使用 org.apache.camel.ProducerTemplateorg.apache.camel.Endpoint 傳送(或傳送並接收)。互動模式可以透過 ExchangePattern 選項控制(該選項可以在執行時透過 SpEL 表示式對請求訊息進行評估)。目標 Apache Camel 端點可以明確配置,或配置為在執行時評估的 SpEL 表示式。否則,它會回退到 ProducerTemplate 上提供的 defaultEndpoint。除了指定端點,還可以提供內聯的、明確的 LambdaRouteBuilder,例如,用於呼叫 Spring Integration 中沒有通道介面卡支援的 Apache Camel 元件。

此外,可以提供一個 HeaderMapper<org.apache.camel.Message>CamelHeaderMapper 是預設實現),用於確定在 Spring Integration 和 Apache Camel 訊息之間對映哪些訊息頭。預設情況下,所有訊息頭都會被對映。

CamelMessageHandler 支援 async 模式,它呼叫 ProducerTemplate.asyncSend() 並生成一個 CompletableFuture 用於回覆處理(如果存在回覆)。

exchangeProperties 可以透過 SpEL 表示式進行自定義,該表示式必須評估為一個 Map

如果未提供 ProducerTemplate,則會透過從應用上下文中解析的 CamelContext bean 建立一個。

@Bean
@ServiceActivator(inputChannel = "sendToCamel")
CamelMessageHandler camelService(ProducerTemplate producerTemplate) {
    CamelHeaderMapper headerMapper = new CamelHeaderMapper();
    headerMapper.setOutboundHeaderNames("");
    headerMapper.setInboundHeaderNames("testHeader");

    CamelMessageHandler camelMessageHandler = new CamelMessageHandler(producerTemplate);
    camelMessageHandler.setEndpointUri("direct:simple");
    camelMessageHandler.setExchangePatternExpression(spelExpressionParser.parseExpression("headers.exchangePattern"));
    camelMessageHandler.setHeaderMapper(headerMapper);
    return camelMessageHandler;
}

對於 Java DSL 流定義,此通道介面卡可以透過 Camel 工廠提供的幾種變體進行配置

@Bean
IntegrationFlow camelFlow() {
    return f -> f
            .handle(Camel.gateway().endpointUri("direct:simple"))
            .handle(Camel.route(this::camelRoute))
            .handle(Camel.handler().endpointUri("log:com.mycompany.order?level=WARN"));
}

private void camelRoute(RouteBuilder routeBuilder) {
    routeBuilder.from("direct:inbound").transform(routeBuilder.simple("${body.toUpperCase()}"));
}