JMS 支援

Spring Integration 提供用於接收和傳送 JMS 訊息的通道介面卡。

您需要在專案中包含此依賴項

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-jms</artifactId>
    <version>6.4.4</version>
</dependency>
compile "org.springframework.integration:spring-integration-jms:6.4.4"

必須透過某些 JMS 供應商特定的實現(例如 Apache ActiveMQ)顯式新增 jakarta.jms:jakarta.jms-api

實際上有兩種基於 JMS 的入站通道介面卡。第一種使用 Spring 的 JmsTemplate 基於輪詢週期進行接收。第二種是“訊息驅動”的,它依賴於 Spring 的 MessageListener 容器。出站通道介面卡使用 JmsTemplate 按需轉換併發送 JMS 訊息。

透過使用 JmsTemplateMessageListener 容器,Spring Integration 依賴於 Spring 的 JMS 支援。理解這一點很重要,因為這些介面卡上的大多數屬性都配置底層的 JmsTemplateMessageListener 容器。有關 JmsTemplateMessageListener 容器的更多詳細資訊,請參閱Spring JMS 文件

JMS 通道介面卡用於單向訊息傳遞(僅傳送或僅接收),而 Spring Integration 還提供入站和出站 JMS 閘道器,用於請求和回覆操作。入站閘道器依賴於 Spring 的一種 MessageListener 容器實現來進行訊息驅動的接收。它還能夠將返回值傳送到接收訊息提供的 reply-to 目標。出站閘道器將 JMS 訊息傳送到 request-destination(或 request-destination-namerequest-destination-expression),然後接收回復訊息。您可以顯式配置 reply-destination 引用(或 reply-destination-namereply-destination-expression)。否則,出站閘道器使用 JMS TemporaryQueue

在 Spring Integration 2.2 之前,如果需要,會為每個請求或回覆建立一個(並刪除)TemporaryQueue。從 Spring Integration 2.2 開始,您可以配置出站閘道器使用 MessageListener 容器接收回復,而不是直接使用新的(或快取的)Consumer 為每個請求接收回復。這樣配置後,並且沒有提供顯式的回覆目標,每個閘道器將使用單個 TemporaryQueue,而不是每個請求使用一個。

從版本 6.0 開始,如果 replyPubSubDomain 選項設定為 true,出站閘道器會建立 TemporaryTopic 而不是 TemporaryQueue。一些 JMS 供應商對這些目標的處理方式不同。

入站通道介面卡

入站通道介面卡需要引用單個 JmsTemplate 例項,或者同時引用 ConnectionFactoryDestination(您可以使用 'destinationName' 代替 'destination' 引用)。以下示例定義了一個引用 Destination 的入站通道介面卡

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@Bean
public IntegrationFlow jmsInbound(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(
                    Jms.inboundAdapter(connectionFactory)
                       .destination("inQueue"),
                    e -> e.poller(poller -> poller.fixedRate(30000)))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
}
@Bean
fun jmsInbound(connectionFactory: ConnectionFactory) =
    integrationFlow(
            Jms.inboundAdapter(connectionFactory).destination("inQueue"),
            { poller { Pollers.fixedRate(30000) } })
       {
            handle { m -> println(m.payload) }
       }
@Bean
@InboundChannelAdapter(value = "exampleChannel", poller = @Poller(fixedRate = "30000"))
public MessageSource<Object> jmsIn(ConnectionFactory connectionFactory) {
    JmsDestinationPollingSource source = new JmsDestinationPollingSource(new JmsTemplate(connectionFactory));
    source.setDestinationName("inQueue");
    return source;
}
<int-jms:inbound-channel-adapter id="jmsIn" destination="inQueue" channel="exampleChannel">
    <int:poller fixed-rate="30000"/>
</int-jms:inbound-channel-adapter>
從前面的配置中可以看出,inbound-channel-adapter 是一個輪詢消費者。這意味著它在觸發時呼叫 receive()。您只應在輪詢頻率相對較低且及時性不重要的場景中使用此選項。對於所有其他場景(絕大多數基於 JMS 的用例),訊息驅動的通道介面卡 message-driven-channel-adapter稍後描述)是更好的選擇。
預設情況下,所有需要引用 ConnectionFactory 的 JMS 介面卡都會自動查詢名為 jmsConnectionFactory 的 bean。這就是為什麼您在許多示例中沒有看到 connection-factory 屬性。但是,如果您的 JMS ConnectionFactory 有不同的 bean 名稱,則需要提供該屬性。

如果 extract-payload 設定為 true(預設值),則接收到的 JMS Message 會透過 MessageConverter。當依賴預設的 SimpleMessageConverter 時,這意味著生成的 Spring Integration Message 的載荷是 JMS 訊息的主體。JMS TextMessage 會產生基於字串的載荷,JMS BytesMessage 會產生位元組陣列載荷,而 JMS ObjectMessage 的可序列化例項會成為 Spring Integration 訊息的載荷。如果您希望將原始 JMS 訊息作為 Spring Integration 訊息的載荷,請將 extractPayload 選項設定為 false

從版本 5.0.8 開始,對於 org.springframework.jms.connection.CachingConnectionFactorycacheConsumersreceive-timeout 的預設值為 -1(不等待),否則為 1 秒。JMS 入站通道介面卡根據提供的 ConnectionFactory 和選項建立一個 DynamicJmsTemplate。如果需要外部 JmsTemplate(例如在 Spring Boot 環境中),或者 ConnectionFactory 不是快取的,或者沒有 cacheConsumers,建議在預期非阻塞消費時設定 jmsTemplate.receiveTimeout(-1)

Jms.inboundAdapter(connectionFactory)
        .destination(queueName)
        .configureJmsTemplate(template -> template.receiveTimeout(-1))

事務

從版本 4.0 開始,入站通道介面卡支援 session-transacted 屬性。在早期版本中,您必須注入一個設定了 sessionTransactedtrueJmsTemplate。(介面卡確實允許您將 acknowledge 屬性設定為 transacted,但這不正確且不起作用)。

但是請注意,將 session-transacted 設定為 true 價值不大,因為事務在 receive() 操作之後、訊息傳送到 channel 之前立即提交。

如果您希望整個流程是事務性的(例如,如果下游有一個出站通道介面卡),則必須使用帶有 JmsTransactionManagertransactional 輪詢器。或者,考慮使用設定了 acknowledgetransacted(預設值)的 jms-message-driven-channel-adapter

訊息驅動通道介面卡

message-driven-channel-adapter 需要引用 Spring 的 MessageListener 容器例項(AbstractMessageListenerContainer 的任何子類),或者同時引用 ConnectionFactoryDestination(可以使用 'destinationName' 代替 'destination' 引用)。以下示例定義了一個引用 Destination 的訊息驅動通道介面卡

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@Bean
public IntegrationFlow jmsMessageDrivenRedeliveryFlow() {
    return IntegrationFlow
            .from(Jms.messageDrivenChannelAdapter(jmsConnectionFactory())
                     .destination("inQueue"))
            .channel("exampleChannel")
            .get();
}
@Bean
fun jmsMessageDrivenFlowWithContainer() =
        integrationFlow(
                Jms.messageDrivenChannelAdapter(jmsConnectionFactory())
                             .destination("inQueue")) {
            channel("exampleChannel")
        }
@Bean
public JmsMessageDrivenEndpoint jmsIn() {
    JmsMessageDrivenEndpoint endpoint = new JmsMessageDrivenEndpoint(container(), listener());
    return endpoint;
}
@Bean
public AbstractMessageListenerContainer container() {
    DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
    container.setConnectionFactory(cf());
    container.setDestinationName("inQueue");
    return container;
}

@Bean
public ChannelPublishingJmsMessageListener listener() {
    ChannelPublishingJmsMessageListener listener = new ChannelPublishingJmsMessageListener();
    listener.setRequestChannelName("exampleChannel");
    return listener;
}
<int-jms:message-driven-channel-adapter id="jmsIn" destination="inQueue" channel="exampleChannel"/>

訊息驅動介面卡還接受與 MessageListener 容器相關的幾個屬性。只有在您不提供 container 引用時,才會考慮這些值。在這種情況下,將根據這些屬性建立並配置 DefaultMessageListenerContainer 的例項。例如,您可以指定 transaction-manager 引用、concurrent-consumers 值以及其他一些屬性引用和值。有關更多詳細資訊,請參閱Javadoc 和 Spring Integration 的 JMS 模式 (spring-integration-jms.xsd)。

如果您有自定義的監聽器容器實現(通常是 DefaultMessageListenerContainer 的子類),您可以使用 container 屬性提供其例項的引用,或者使用 container-class 屬性提供其完全限定類名。在這種情況下,介面卡上的屬性將轉移到您的自定義容器例項。

您不能使用 Spring JMS 名稱空間元素 <jms:listener-container/><int-jms:message-driven-channel-adapter> 配置容器引用,因為該元素實際上並不引用容器。每個 <jms:listener/> 子元素都會獲得自己的 DefaultMessageListenerContainer(帶有父 <jms:listener-container/> 元素上定義的共享屬性)。您可以為每個監聽器子元素指定一個 id,並用它注入到通道介面卡中,然而,<jms:/> 名稱空間需要一個真正的監聽器。

建議為 DefaultMessageListenerContainer 配置一個常規的 <bean>,並在通道介面卡中將其用作引用。

從版本 4.2 開始,除非您提供了外部容器,否則預設的 acknowledge 模式是 transacted。在這種情況下,您應該根據需要配置容器。我們建議將 transactedDefaultMessageListenerContainer 一起使用,以避免訊息丟失。

'extract-payload' 屬性具有相同的效果,其預設值為 'true'。poller 元素不適用於訊息驅動通道介面卡,因為它會被主動呼叫。對於大多數場景,訊息驅動方法更好,因為訊息從底層 JMS 消費者接收後會立即傳遞給 MessageChannel

最後,<message-driven-channel-adapter> 元素還接受 'error-channel' 屬性。這提供了與 進入 GatewayProxyFactoryBean 中描述的相同基本功能。以下示例展示瞭如何在訊息驅動通道介面卡上設定錯誤通道

<int-jms:message-driven-channel-adapter id="jmsIn" destination="inQueue"
    channel="exampleChannel"
    error-channel="exampleErrorChannel"/>

將前面的示例與我們稍後討論的通用閘道器配置或 JMS 'inbound-gateway' 進行比較,主要區別在於我們處於單向流中,因為這是一個 'channel-adapter',而不是閘道器。因此,從 'error-channel' 向下遊的流也應該是單向的。例如,它可以傳送到日誌處理器,或者可以連線到不同的 JMS <outbound-channel-adapter> 元素。

從主題消費時,將 pub-sub-domain 屬性設定為 true。對於持久訂閱,將 subscription-durable 設定為 true;對於共享訂閱(需要 JMS 2.0 代理,自版本 4.2 起可用),將 subscription-shared 設定為 true。使用 subscription-name 為訂閱命名。

從版本 5.1 開始,當應用仍在執行時停止端點時,底層的監聽器容器會被關閉,其共享連線和消費者也會關閉。以前,連線和消費者保持開放。要恢復到以前的行為,請將 JmsMessageDrivenEndpoint 上的 shutdownContainerOnStop 設定為 false

從版本 6.3 開始,現在可以為 ChannelPublishingJmsMessageListener 提供 RetryTemplateRecoveryCallback<Message<?>>,用於下游傳送和傳送-接收操作的重試。這些選項也暴露到 Java DSL 的 JmsMessageDrivenChannelAdapterSpec 中。

入站轉換錯誤

從版本 4.2 開始,'error-channel' 也用於處理轉換錯誤。以前,如果 JMS <message-driven-channel-adapter/><inbound-gateway/> 由於轉換錯誤而無法傳送訊息,異常會拋回給容器。如果容器配置為使用事務,則訊息會被回滾並重復重傳。轉換過程在訊息構造之前和期間發生,因此此類錯誤不會發送到 'error-channel'。現在,此類轉換異常會導致將 ErrorMessage 傳送到 'error-channel',並將異常作為 payload。如果您希望事務回滾,並且定義了 'error-channel',則 'error-channel' 上的整合流必須重新丟擲異常(或另一個異常)。如果錯誤流不丟擲異常,則事務會提交併刪除訊息。如果未定義 'error-channel',則異常會像以前一樣拋回給容器。

出站通道介面卡

JmsSendingMessageHandler 實現了 MessageHandler 介面,能夠將 Spring Integration Messages 轉換為 JMS 訊息,然後傳送到 JMS 目標。它需要 jmsTemplate 引用,或者同時需要 jmsConnectionFactorydestination 引用(可以使用 destinationName 代替 destination)。與入站通道介面卡一樣,配置此介面卡最簡單的方法是使用名稱空間支援。以下配置會生成一個介面卡,它從 exampleChannel 接收 Spring Integration 訊息,將這些訊息轉換為 JMS 訊息,然後將其傳送到 bean 名稱為 outQueue 的 JMS 目標引用

  • Java DSL

  • Kotlin DSL

  • Groovy DSL

  • Java

  • XML

@Bean
public IntegrationFlow jmsOutboundFlow() {
    return IntegrationFlow.from("exampleChannel")
                .handle(Jms.outboundAdapter(cachingConnectionFactory())
                    .destinationExpression("headers." + SimpMessageHeaderAccessor.DESTINATION_HEADER)
                    .configureJmsTemplate(t -> t.id("jmsOutboundFlowTemplate")));
}
@Bean
fun jmsOutboundFlow() =
        integrationFlow("exampleChannel") {
            handle(Jms.outboundAdapter(jmsConnectionFactory())
                    .apply {
                        destinationExpression("headers." + SimpMessageHeaderAccessor.DESTINATION_HEADER)
                        deliveryModeFunction<Any> { DeliveryMode.NON_PERSISTENT }
                        timeToLiveExpression("10000")
                        configureJmsTemplate { it.explicitQosEnabled(true) }
                    }
            )
        }
@Bean
jmsOutboundFlow() {
    integrationFlow('exampleChannel') {
        handle(Jms.outboundAdapter(new ActiveMQConnectionFactory())
                .with {
                    destinationExpression 'headers.' + SimpMessageHeaderAccessor.DESTINATION_HEADER
                    deliveryModeFunction { DeliveryMode.NON_PERSISTENT }
                    timeToLiveExpression '10000'
                    configureJmsTemplate {
                        it.explicitQosEnabled true
                    }
                }
        )
    }
}
@Bean
@ServiceActivator(inputChannel = "exampleChannel")
public MessageHandler jmsOut() {
    JmsSendingMessageHandler handler = new JmsSendingMessageHandler(new JmsTemplate(connectionFactory));
    handler.setDestinationName("outQueue");
    return handler;
}
<int-jms:outbound-channel-adapter id="jmsOut" destination="outQueue" channel="exampleChannel"/>

與入站通道介面卡一樣,這裡也有一個 'extract-payload' 屬性。但是,對於出站介面卡來說,其含義是相反的。這個布林屬性並非應用於 JMS 訊息,而是應用於 Spring Integration 訊息的載荷。換句話說,決定是將 Spring Integration 訊息本身作為 JMS 訊息體傳遞,還是將 Spring Integration 訊息載荷作為 JMS 訊息體傳遞。預設值為 'true'。因此,如果您傳遞一個載荷是 String 的 Spring Integration 訊息,則會建立一個 JMS TextMessage。另一方面,如果您想透過 JMS 將實際的 Spring Integration 訊息傳送到另一個系統,請將其設定為 'false'。

無論載荷提取的布林值如何,只要您依賴預設轉換器或提供了另一個 MessageConverter 例項的引用,Spring Integration 的 MessageHeaders 都會對映到 JMS 屬性。(對於“入站”介面卡也是如此,只不過在這種情況下,JMS 屬性會對映到 Spring Integration 的 MessageHeaders)。

從版本 5.1 開始,可以透過配置 <int-jms:outbound-channel-adapter> (JmsSendingMessageHandler) 的 deliveryModeExpressiontimeToLiveExpression 屬性來在執行時針對請求 Spring Message 評估傳送 JMS 訊息的適當 QoS 值。DefaultJmsHeaderMapper 的新選項 setMapInboundDeliveryMode(true)setMapInboundExpiration(true) 可以作為從訊息頭獲取動態 deliveryModetimeToLive 資訊的來源

<int-jms:outbound-channel-adapter delivery-mode-expression="headers.jms_deliveryMode"
                        time-to-live-expression="headers.jms_expiration - T(System).currentTimeMillis()"/>

事務

從版本 4.0 開始,出站通道介面卡支援 session-transacted 屬性。在早期版本中,您必須注入一個設定了 sessionTransactedtrueJmsTemplate。該屬性現在設定了內建預設 JmsTemplate 上的屬性。如果存在事務(可能來自上游的 message-driven-channel-adapter),則傳送操作會在同一事務中執行。否則,會啟動一個新事務。

入站閘道器

Spring Integration 的訊息驅動 JMS 入站閘道器委託給 MessageListener 容器,支援動態調整併發消費者,並且可以處理回覆。入站閘道器需要引用 ConnectionFactory 和請求 Destination(或 'requestDestinationName')。以下示例定義了一個 JMS inbound-gateway,它從 bean id 為 inQueue 引用的 JMS 佇列接收訊息,並將其傳送到名為 exampleChannel 的 Spring Integration 通道

<int-jms:inbound-gateway id="jmsInGateway"
    request-destination="inQueue"
    request-channel="exampleChannel"/>

由於閘道器提供請求-回覆行為而非單向傳送或接收行為,它們也有兩個不同的“載荷提取”屬性(如 之前討論的 通道介面卡的 'extract-payload' 設定)。對於入站閘道器,'extract-request-payload' 屬性決定是否提取接收到的 JMS Message 主體。如果為 'false',則 JMS 訊息本身成為 Spring Integration 訊息載荷。預設值為 'true'。

類似地,對於入站閘道器,'extract-reply-payload' 屬性應用於要轉換為回覆 JMS Message 的 Spring Integration 訊息。如果您想傳遞整個 Spring Integration 訊息(作為 JMS ObjectMessage 的主體),請將其值設定為 'false'。預設情況下,Spring Integration 訊息載荷也會被轉換為 JMS Message(例如,String 載荷會變成 JMS TextMessage)。

與其他任何情況一樣,閘道器呼叫可能會導致錯誤。預設情況下,生產者不會收到消費者端可能發生的錯誤的通知,並且會在等待回覆時超時。但是,有時您可能希望將錯誤情況傳回給消費者(換句話說,您可能希望透過將異常對映到訊息來將其視為有效的回覆)。為此,JMS 入站閘道器提供了訊息通道支援,可以將錯誤傳送到該通道進行處理,這可能會產生符合某個合約的回覆訊息載荷,該合約定義了呼叫者作為“錯誤”回覆所期望的內容。您可以使用 error-channel 屬性來配置此類通道,如下例所示

<int-jms:inbound-gateway request-destination="requestQueue"
          request-channel="jmsInputChannel"
          error-channel="errorTransformationChannel"/>

<int:transformer input-channel="exceptionTransformationChannel"
        ref="exceptionTransformer" method="createErrorResponse"/>

您可能會注意到,此示例與 進入 GatewayProxyFactoryBean 中包含的示例非常相似。這裡的想法是相同的:exceptionTransformer 可以是一個建立錯誤響應物件的 POJO,您可以引用 nullChannel 來抑制錯誤,或者您可以省略 'error-channel' 以讓異常傳播。

從主題消費時,將 pub-sub-domain 屬性設定為 true。對於持久訂閱,將 subscription-durable 設定為 true;對於共享訂閱(需要 JMS 2.0 代理,自版本 4.2 起可用),將 subscription-shared 設定為 true。使用 subscription-name 為訂閱命名。

從版本 4.2 開始,除非提供外部容器,否則預設的 acknowledge 模式是 transacted。在這種情況下,您應根據需要配置容器。我們建議您將 transactedDefaultMessageListenerContainer 一起使用,以避免訊息丟失。

從版本 5.1 開始,當應用仍在執行時停止端點時,底層監聽器容器將關閉,從而關閉其共享連線和消費者。以前,連線和消費者保持開啟狀態。要恢復到之前的行為,請將 JmsInboundGateway 上的 shutdownContainerOnStop 設定為 false

預設情況下,JmsInboundGateway 會在接收到的訊息中查詢 jakarta.jms.Message.getJMSReplyTo() 屬性來確定傳送回覆的位置。否則,可以使用靜態的 defaultReplyDestinationdefaultReplyQueueNamedefaultReplyTopicName 進行配置。此外,從版本 6.1 開始,可以在提供的 ChannelPublishingJmsMessageListener 上配置 replyToExpression,以便在請求訊息的標準 JMSReplyTo 屬性為 null 時動態確定回覆目的地。收到的 jakarta.jms.Message 用作根評估上下文物件。以下示例演示瞭如何使用 Java DSL API 配置入站 JMS 閘道器,其自定義回覆目的地從請求訊息中解析得出:

@Bean
public IntegrationFlow jmsInboundGatewayFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(
                    Jms.inboundGateway(connectionFactory)
                            .requestDestination("requestDestination")
                            .replyToFunction(message -> message.getStringProperty("myReplyTo")))
            .<String, String>transform(String::toUpperCase)
            .get();
}

從版本 6.3 開始,Jms.inboundGateway() API 暴露了 retryTemplate()recoveryCallback() 選項,用於重試內部發送和接收操作。

出站閘道器

出站閘道器從 Spring Integration 訊息建立 JMS 訊息,並將它們傳送到 request-destination。然後,它透過使用選擇器從您配置的 reply-destination 接收或(如果沒有提供 reply-destination)透過建立 JMS TemporaryQueue(如果 replyPubSubDomain= true 則建立 TemporaryTopic)例項來處理 JMS 回覆訊息。

reply-destination(或 reply-destination-name)與快取消費者設定為 trueCachingConnectionFactory 一起使用可能會導致記憶體溢位。這是因為每個請求都會獲得一個新的消費者,該消費者帶有新的選擇器(根據 correlation-key 值進行選擇,或者在沒有 correlation-key 時根據傳送的 JMSMessageID 進行選擇)。鑑於這些選擇器是唯一的,在當前請求完成後,它們會保留在快取中(未使用)。

如果指定了回覆目的地,建議不要使用快取消費者。或者,可以考慮使用 <reply-listener/>如下所述

以下示例展示瞭如何配置出站閘道器:

<int-jms:outbound-gateway id="jmsOutGateway"
    request-destination="outQueue"
    request-channel="outboundJmsRequests"
    reply-channel="jmsReplies"/>

“outbound-gateway”的有效載荷提取屬性與“inbound-gateway”的屬性相反(參見先前的討論)。這意味著“extract-request-payload”屬性值適用於轉換為要傳送的 JMS 訊息的 Spring Integration 訊息。而“extract-reply-payload”屬性值適用於作為回覆接收並隨後轉換為 Spring Integration 訊息以傳送到“reply-channel”的 JMS 訊息,如前面的配置示例所示。

使用 <reply-listener/>

Spring Integration 2.2 引入了一種處理回覆的替代技術。如果向閘道器新增 <reply-listener/> 子元素而不是為每個回覆建立消費者,則使用 MessageListener 容器來接收回復並將它們交給請求執行緒。這提供了許多效能優勢,同時也緩解了先前警告中描述的快取消費者記憶體利用問題。

當使用沒有 reply-destination 的出站閘道器時,使用 <reply-listener/>,不是為每個請求建立一個 TemporaryQueue,而是使用一個單獨的 TemporaryQueue。(如果與 broker 的連線丟失並恢復,閘道器會根據需要建立一個額外的 TemporaryQueue)。如果 replyPubSubDomain 設定為 true,則從版本 6.0 開始,會建立一個 TemporaryTopic

使用 correlation-key 時,多個閘道器可以共享同一個回覆目的地,因為監聽器容器使用對每個閘道器唯一的選擇器。

如果指定了回覆監聽器並指定了回覆目的地(或回覆目的地名稱)但沒有提供關聯鍵,則閘道器會記錄警告並回退到 2.2 版本之前的行為。這是因為在這種情況下無法配置選擇器。因此,無法避免回覆傳送到可能配置了相同回覆目的地的不同閘道器。

請注意,在這種情況下,每個請求都會使用一個新的消費者,並且如上述警告所述,消費者會在記憶體中累積;因此,在這種情況下不應使用快取消費者。

以下示例展示了一個帶有預設屬性的回覆監聽器:

<int-jms:outbound-gateway id="jmsOutGateway"
        request-destination="outQueue"
        request-channel="outboundJmsRequests"
        reply-channel="jmsReplies">
    <int-jms:reply-listener />
</int-jms-outbound-gateway>

監聽器非常輕量級,我們預計在大多數情況下,只需要一個消費者。但是,您可以新增諸如 concurrent-consumersmax-concurrent-consumers 等屬性。有關支援的屬性的完整列表,請參閱模式檔案,以及Spring JMS 文件瞭解其含義。

空閒回覆監聽器

從版本 4.2 開始,您可以根據需要啟動回覆監聽器(並在空閒一段時間後停止它),而不是在閘道器的整個生命週期內執行。如果您在應用程式上下文中有很多閘道器(例如許多不活動的、分割槽的 Spring Batch 作業使用 Spring Integration 和 JMS 進行分割槽分發)並且它們大多數時候處於空閒狀態,這會很有用。如果所有回覆監聽器都處於活動狀態,則 JMS broker 會為每個閘道器配備一個活動消費者。透過啟用空閒超時,每個消費者僅在相應的批處理作業執行時存在(並在作業完成後短暫存在一段時間)。

參見屬性參考中的 idle-reply-listener-timeout

網關回復關聯

本節描述用於回覆關聯的機制(確保源閘道器僅接收對其請求的回覆),具體取決於閘道器的配置方式。有關此處討論的屬性的完整描述,請參見屬性參考

以下列表描述了各種場景(數字僅用於標識 — 順序無關緊要):

  1. 沒有 reply-destination* 屬性且沒有 <reply-listener>

    為每個請求建立一個 TemporaryQueue,並在請求完成(成功或失敗)後刪除。correlation-key 不相關。

  2. 提供了 reply-destination* 屬性,且未提供 <reply-listener/>correlation-key

    等於出站訊息的 JMSCorrelationID 用作消費者的訊息選擇器

    messageSelector = "JMSCorrelationID = '" + messageId + "'"

    期望響應系統在回覆的 JMSCorrelationID 中返回入站 JMSMessageID。這是一種常見模式,由 Spring Integration 入站閘道器以及 Spring 的 MessageListenerAdapter(用於訊息驅動 POJO)實現。

    使用此配置時,不應將 topic 用於回覆。回覆可能會丟失。
  3. 提供了 reply-destination* 屬性,未提供 <reply-listener/>,且 correlation-key="JMSCorrelationID"

    閘道器生成一個唯一的關聯 ID 並將其插入到 JMSCorrelationID 頭部。訊息選擇器為:

    messageSelector = "JMSCorrelationID = '" + uniqueId + "'"

    期望響應系統在回覆的 JMSCorrelationID 中返回入站 JMSCorrelationID。這是一種常見模式,由 Spring Integration 入站閘道器以及 Spring 的 MessageListenerAdapter(用於訊息驅動 POJO)實現。

  4. 提供了 reply-destination* 屬性,未提供 <reply-listener/>,且 correlation-key="myCorrelationHeader"

    閘道器生成一個唯一的關聯 ID 並將其插入到 myCorrelationHeader 訊息屬性中。correlation-key 可以是任何使用者定義的值。訊息選擇器為:

    messageSelector = "myCorrelationHeader = '" + uniqueId + "'"

    期望響應系統在回覆的 myCorrelationHeader 中返回入站 myCorrelationHeader

  5. 提供了 reply-destination* 屬性,未提供 <reply-listener/>,且 correlation-key="JMSCorrelationID*"(注意關聯鍵中的 *。)

    閘道器使用請求訊息中 jms_correlationId 頭部中的值(如果存在),並將其插入到 JMSCorrelationID 頭部中。訊息選擇器為:

    messageSelector = "JMSCorrelationID = '" + headers['jms_correlationId'] + "'"

    使用者必須確保此值是唯一的。

    如果頭部不存在,閘道器行為與場景 3 相同。

    期望響應系統在回覆的 JMSCorrelationID 中返回入站 JMSCorrelationID。這是一種常見模式,由 Spring Integration 入站閘道器以及 Spring 的 MessageListenerAdapter(用於訊息驅動 POJO)實現。

  6. 未提供 reply-destination* 屬性,且提供了 <reply-listener>

    建立一個臨時佇列,用於來自此閘道器例項的所有回覆。訊息中不需要關聯資料,但出站 JMSMessageID 在閘道器內部用於將回復定向到正確的請求執行緒。

  7. 提供了 reply-destination* 屬性,提供了 <reply-listener>,且未提供 correlation-key

    不允許。

    <reply-listener/> 配置被忽略,閘道器行為與場景 2 相同。會寫入警告日誌訊息以指示此情況。

  8. 提供了 reply-destination* 屬性,提供了 <reply-listener>,且 correlation-key="JMSCorrelationID"

    閘道器具有唯一的關聯 ID,並將其與遞增值一起插入到 JMSCorrelationID 頭部(gatewayId + "_" + ++seq)。訊息選擇器為:

    messageSelector = "JMSCorrelationID LIKE '" + gatewayId%'"

    期望響應系統在回覆的 JMSCorrelationID 中返回入站 JMSCorrelationID。這是一種常見模式,由 Spring Integration 入站閘道器以及 Spring 的 MessageListenerAdapter(用於訊息驅動 POJO)實現。由於每個閘道器都有唯一的 ID,因此每個例項只接收自己的回覆。完整的關聯資料用於將回復路由到正確的請求執行緒。

  9. 提供了 reply-destination* 屬性,提供了 <reply-listener/>,且 correlation-key="myCorrelationHeader"

    閘道器具有唯一的關聯 ID,並將其與遞增值一起插入到 myCorrelationHeader 屬性中(gatewayId + "_" + ++seq)。correlation-key 可以是任何使用者定義的值。訊息選擇器為:

    messageSelector = "myCorrelationHeader LIKE '" + gatewayId%'"

    期望響應系統在回覆的 myCorrelationHeader 中返回入站 myCorrelationHeader。由於每個閘道器都有唯一的 ID,因此每個例項只接收自己的回覆。完整的關聯資料用於將回復路由到正確的請求執行緒。

  10. 提供了 reply-destination* 屬性,提供了 <reply-listener/>,且 correlation-key="JMSCorrelationID*"

    (注意關聯鍵中的 *

    不允許。

    不允許使用者提供的關聯 ID 與回覆監聽器一起使用。閘道器不會使用此配置進行初始化。

非同步閘道器

從版本 4.3 開始,現在配置出站閘道器時可以指定 async="true"(或在 Java 中使用 setAsync(true))。

預設情況下,當請求傳送到閘道器時,請求執行緒會被掛起,直到收到回覆。然後流程在該執行緒上繼續。如果 asynctrue,則在 send() 完成後立即釋放請求執行緒,回覆在監聽器容器執行緒上返回(並且流程繼續)。當閘道器在輪詢執行緒上呼叫時,這會很有用。執行緒被釋放並可用於框架內的其他任務。

async 需要 <reply-listener/>(或在使用 Java 配置時使用 setUseReplyContainer(true))。它還需要指定 correlationKey(通常是 JMSCorrelationID)。如果這兩個條件中的任何一個未滿足,async 將被忽略。

屬性參考

以下列表顯示了 outbound-gateway 的所有可用屬性:

<int-jms:outbound-gateway
    connection-factory="connectionFactory" (1)
    correlation-key="" (2)
    delivery-persistent="" (3)
    destination-resolver="" (4)
    explicit-qos-enabled="" (5)
    extract-reply-payload="true" (6)
    extract-request-payload="true" (7)
    header-mapper="" (8)
    message-converter="" (9)
    priority="" (10)
    receive-timeout="" (11)
    reply-channel="" (12)
    reply-destination="" (13)
    reply-destination-expression="" (14)
    reply-destination-name="" (15)
    reply-pub-sub-domain="" (16)
    reply-timeout="" (17)
    request-channel="" (18)
    request-destination="" (19)
    request-destination-expression="" (20)
    request-destination-name="" (21)
    request-pub-sub-domain="" (22)
    time-to-live="" (23)
    requires-reply="" (24)
    idle-reply-listener-timeout="" (25)
    async=""> (26)
  <int-jms:reply-listener /> (27)
</int-jms:outbound-gateway>
1 jakarta.jms.ConnectionFactory 的引用。預設為 jmsConnectionFactory
2 包含關聯資料以關聯響應與回覆的屬性名稱。如果省略,閘道器期望響應系統在 JMSCorrelationID 頭部中返回出站 JMSMessageID 頭部的值。如果指定,閘道器會生成一個關聯 ID 並使用它填充指定的屬性。響應系統必須在同一屬性中回顯該值。可以將其設定為 JMSCorrelationID,在這種情況下,使用標準頭部而不是 String 屬性來儲存關聯資料。當使用 <reply-container/> 時,如果提供了明確的 reply-destination,則必須指定 correlation-key。從版本 4.0.1 開始,此屬性還支援值 JMSCorrelationID*,這意味著如果出站訊息已具有 JMSCorrelationID(從 jms_correlationId 對映而來)頭部,則使用該值而不是生成一個新的關聯 ID。請注意,當使用 <reply-container/> 時,不允許使用 JMSCorrelationID* 鍵,因為容器需要在初始化期間設定訊息選擇器。
您應該理解,閘道器無法保證唯一性,如果提供的關聯 ID 不唯一,可能會發生意外的副作用。
3 一個布林值,指示投遞模式應為 DeliveryMode.PERSISTENTtrue)還是 DeliveryMode.NON_PERSISTENTfalse)。此設定僅在 explicit-qos-enabledtrue 時生效。
4 一個 DestinationResolver。預設為 DynamicDestinationResolver,它將目的地名稱對映到同名佇列或主題。
5 當設定為 true 時,它啟用質量服務(QoS)屬性的使用:prioritydelivery-modetime-to-live
6 當設定為 true(預設值)時,Spring Integration 回覆訊息的有效載荷從 JMS 回覆訊息的主體建立(使用 MessageConverter)。當設定為 false 時,整個 JMS 訊息成為 Spring Integration 訊息的有效載荷。
7 當設定為 true(預設值)時,Spring Integration 訊息的有效載荷被轉換為 JMSMessage(使用 MessageConverter)。當設定為 false 時,整個 Spring Integration Message 被轉換為 JMSMessage。在這兩種情況下,Spring Integration 訊息頭部都透過 HeaderMapper 對映到 JMS 頭部和屬性。
8 一個 HeaderMapper,用於將 Spring Integration 訊息頭部對映到 JMS 訊息頭部和屬性之間以及從 JMS 訊息頭部和屬性映射回 Spring Integration 訊息頭部。
9 MessageConverter 的引用,用於在 JMS 訊息和 Spring Integration 訊息有效載荷(或訊息,如果 extract-request-payloadfalse)之間進行轉換。預設為 SimpleMessageConverter
10 請求訊息的預設優先順序。如果存在訊息優先順序頭部,則會被覆蓋。其範圍是 09。此設定僅在 explicit-qos-enabledtrue 時生效。
11 等待回覆的時間(以毫秒為單位)。預設為 5000(五秒)。
12 回覆訊息傳送到的通道。
13 Destination 的引用,將設定為 JMSReplyTo 頭部。最多隻能允許一個 reply-destinationreply-destination-expressionreply-destination-name。如果未提供任何一個,則為對此閘道器的回覆使用 TemporaryQueue
14 一個 SpEL 表示式,評估結果為 Destination,將設定為 JMSReplyTo 頭部。表示式結果可以是 Destination 物件或 String。它由 DestinationResolver 用於解析實際的 Destination。最多隻能允許一個 reply-destinationreply-destination-expressionreply-destination-name。如果未提供任何一個,則為對此閘道器的回覆使用 TemporaryQueue
15 將設定為 JMSReplyTo 頭部的目的地名稱。它由 DestinationResolver 用於解析實際的 Destination。最多隻能允許一個 reply-destinationreply-destination-expressionreply-destination-name。如果未提供任何一個,則為對此閘道器的回覆使用 TemporaryQueue
16 當設定為 true 時,表示由 DestinationResolver 解析的任何回覆 Destination 應為 Topic 而不是 Queue
17 閘道器在向 reply-channel 傳送回覆訊息時等待的時間。這僅在 reply-channel 可能阻塞時有效 — 例如具有容量限制且當前已滿的 QueueChannel。預設為無限。
18 此閘道器接收請求訊息的通道。
19 對傳送請求訊息的 Destination 的引用。request-destinationrequest-destination-expressionrequest-destination-name 之一是必需的。您只能使用這三個屬性中的一個。
20 一個 SpEL 表示式,評估結果為傳送請求訊息的 Destination。表示式結果可以是 Destination 物件或 String。它由 DestinationResolver 用於解析實際的 Destinationrequest-destinationrequest-destination-expressionrequest-destination-name 之一是必需的。您只能使用這三個屬性中的一個。
21 傳送請求訊息的目的地名稱。它由 DestinationResolver 用於解析實際的 Destinationrequest-destinationrequest-destination-expressionrequest-destination-name 之一是必需的。您只能使用這三個屬性中的一個。
22 當設定為 true 時,表示由 DestinationResolver 解析的任何請求 Destination 應為 Topic 而不是 Queue
23 指定訊息的存活時間。此設定僅在 explicit-qos-enabledtrue 時生效。
24 指定此出站閘道器是否必須返回非空值。預設情況下,此值為 true,當底層服務在 receive-timeout 後未返回值時,將丟擲 MessageTimeoutException。請注意,如果服務永遠不期望返回回覆,最好使用 <int-jms:outbound-channel-adapter/> 而不是帶有 requires-reply="false"<int-jms:outbound-gateway/>。使用後者時,傳送執行緒會被阻塞,等待回覆直至 receive-timeout 時間結束。
25 當使用 <reply-listener /> 時,其生命週期(啟動和停止)預設與閘道器的生命週期匹配。當此值大於 0 時,容器會按需啟動(當傳送請求時)。容器會繼續執行直到至少經過此時間且沒有收到請求(並且直到沒有未完成的回覆)。容器會在下一個請求時再次啟動。停止時間是最小值,實際可能高達此值的 1.5 倍。
26 參見非同步閘道器
27 當包含此元素時,回覆由非同步 MessageListenerContainer 接收,而不是為每個回覆建立一個消費者。在許多情況下,這會更有效。

將訊息頭部對映到 JMS 訊息以及從 JMS 訊息對映

JMS 訊息可以包含元資訊,如 JMS API 頭部和簡單屬性。您可以使用 JmsHeaderMapper 將這些資訊對映到 Spring Integration 訊息頭部以及從 Spring Integration 訊息頭部映射回來。JMS API 頭部會傳遞給相應的 setter 方法(例如 setJMSReplyTo),而其他頭部則複製到 JMS Message 的一般屬性中。JMS 出站閘道器透過 JmsHeaderMapper 的預設實現進行引導,該實現將對映標準 JMS API 頭部以及基本型別或 String 訊息頭部。您還可以透過入站和出站閘道器的 header-mapper 屬性提供自定義頭部對映器。

許多 JMS 供應商特定的客戶端不允許直接在已建立的 JMS 訊息上設定 deliveryModeprioritytimeToLive 屬性。它們被認為是 QoS 屬性,因此必須傳播到目標 MessageProducer.send(message, deliveryMode, priority, timeToLive) API。因此,DefaultJmsHeaderMapper 不會將適當的 Spring Integration 頭部(或表示式結果)對映到上述 JMS 訊息屬性中。相反,JmsSendingMessageHandler 使用 DynamicJmsTemplate 將請求訊息中的頭部值傳播到 MessageProducer.send() API。要啟用此功能,必須使用 DynamicJmsTemplate 配置出站端點,並將其 explicitQosEnabled 屬性設定為 true。Spring Integration Java DSL 預設配置 DynamicJmsTemplate,但您仍然必須設定 explicitQosEnabled 屬性。
從版本 4.0 開始,JMSPriority 頭部被對映到入站訊息的標準 priority 頭部。以前,priority 頭部僅用於出站訊息。要恢復到之前的行為(即不對映入站優先順序),請將 DefaultJmsHeaderMappermapInboundPriority 屬性設定為 false
從版本 4.3 開始,DefaultJmsHeaderMapper 將標準 correlationId 頭部對映為訊息屬性,透過呼叫其 toString() 方法(correlationId 通常是 UUID,JMS 不支援)。在入站端,它被對映為 String。這獨立於 jms_correlationId 頭部,後者被對映到 JMSCorrelationID 頭部以及從 JMSCorrelationID 頭部映射回來。JMSCorrelationID 通常用於關聯請求和回覆,而 correlationId 通常用於將相關訊息組合成一組(例如與聚合器或重排序器一起使用)。

從版本 5.1 開始,DefaultJmsHeaderMapper 可以配置為對映入站 JMSDeliveryModeJMSExpiration 屬性:

@Bean
public DefaultJmsHeaderMapper jmsHeaderMapper() {
    DefaultJmsHeaderMapper mapper = new DefaultJmsHeaderMapper();
    mapper.setMapInboundDeliveryMode(true)
    mapper.setMapInboundExpiration(true)
    return mapper;
}

這些 JMS 屬性分別對映到 JmsHeaders.DELIVERY_MODEJmsHeaders.EXPIRATION Spring Message 頭部。

訊息轉換、編組和解組

如果需要轉換訊息,所有 JMS 介面卡和閘道器都允許您透過設定 message-converter 屬性來提供 MessageConverter。為此,請提供在同一 ApplicationContext 中可用的 MessageConverter 例項的 bean 名稱。此外,為了與編組器和解組器介面保持一致,Spring 提供了 MarshallingMessageConverter,您可以配置自己的自定義編組器和解組器。以下示例展示瞭如何操作:

<int-jms:inbound-gateway request-destination="requestQueue"
    request-channel="inbound-gateway-channel"
    message-converter="marshallingMessageConverter"/>

<bean id="marshallingMessageConverter"
    class="org.springframework.jms.support.converter.MarshallingMessageConverter">
    <constructor-arg>
        <bean class="org.bar.SampleMarshaller"/>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.bar.SampleUnmarshaller"/>
    </constructor-arg>
</bean>
當您提供自己的 MessageConverter 例項時,它仍會包裝在 HeaderMappingMessageConverter 中。這意味著 'extract-request-payload' 和 'extract-reply-payload' 屬性可以影響傳遞給您的轉換器的實際物件。HeaderMappingMessageConverter 本身委託給目標 MessageConverter,同時還將 Spring Integration MessageHeaders 對映到 JMS 訊息屬性並再次映射回來。

JMS 支援的訊息通道

前面介紹的通道介面卡和閘道器都旨在用於與外部系統整合的應用程式。入站選項假定其他系統正在向 JMS 目的地傳送 JMS 訊息,而出站選項假定其他系統正在從目的地接收訊息。這個其他系統可能是也可能不是 Spring Integration 應用程式。當然,當將 Spring Integration 訊息例項作為 JMS 訊息本身的主體傳送時(將 'extract-payload' 值設定為 false),假定其他系統基於 Spring Integration。然而,這絕不是必需的要求。這種靈活性是使用基於訊息的整合選項(抽象為“通道”(或在 JMS 中為目的地))的優勢之一。

有時,給定 JMS Destination 的生產者和消費者都打算成為同一應用程式的一部分,並在同一程序中執行。這可以透過使用一對入站和出站通道介面卡來實現。這種方法的問題是需要兩個介面卡,即使從概念上講,目標是擁有一個單一的訊息通道。自 Spring Integration 2.0 版本以來,支援更好的選項。現在可以在使用 JMS 名稱空間時定義一個單一的“通道”,如下例所示:

<int-jms:channel id="jmsChannel" queue="exampleQueue"/>

前一個示例中的通道的行為與主 Spring Integration 名稱空間中的普通 <channel/> 元素非常相似。它可以被任何端點的 input-channeloutput-channel 屬性引用。不同之處在於,此通道由名為 exampleQueue 的 JMS Queue 例項支援。這意味著生產者和消費者端點之間可以進行非同步訊息傳遞。然而,與透過在非 JMS <channel/> 元素中新增 <queue/> 元素建立的更簡單的非同步訊息通道不同,訊息不儲存在記憶體佇列中。相反,這些訊息在 JMS 訊息主體中傳遞,底層 JMS 提供程式的全部功能即可用於該通道。使用這種替代方案最常見的原因可能是利用 JMS 訊息的儲存轉發方法提供的永續性。

如果配置得當,JMS 支援的訊息通道也支援事務。換句話說,如果其傳送操作是回滾的事務的一部分,生產者實際上不會寫入事務性的 JMS 支援通道。同樣,如果接收訊息是回滾的事務的一部分,消費者不會物理地從通道中刪除 JMS 訊息。請注意,在這種場景中,生產者和消費者的事務是獨立的。這與跨沒有 <queue/> 子元素的簡單同步 <channel/> 元素傳播事務上下文有顯著差異。

由於前一個示例引用了 JMS Queue 例項,因此它充當點對點通道。另一方面,如果您需要釋出/訂閱行為,可以使用單獨的元素並引用 JMS Topic。以下示例展示瞭如何操作:

<int-jms:publish-subscribe-channel id="jmsChannel" topic="exampleTopic"/>

對於任何型別的 JMS 支援通道,可以提供目標名稱而不是引用,如下例所示:

<int-jms:channel id="jmsQueueChannel" queue-name="exampleQueueName"/>

<jms:publish-subscribe-channel id="jmsTopicChannel" topic-name="exampleTopicName"/>

在前述示例中,目標名稱由 Spring 的預設 DynamicDestinationResolver 實現解析,但您可以提供任何 DestinationResolver 介面的實現。此外,JMS ConnectionFactory 是通道的必需屬性,但預設情況下,預期的 bean 名稱將是 jmsConnectionFactory。以下示例既提供了一個用於解析 JMS 目標名稱的自定義例項,又為 ConnectionFactory 提供了不同的名稱:

<int-jms:channel id="jmsChannel" queue-name="exampleQueueName"
    destination-resolver="customDestinationResolver"
    connection-factory="customConnectionFactory"/>

對於 <publish-subscribe-channel />,將 durable 屬性設定為 true 以實現持久訂閱,或將 subscription-shared 設定為共享訂閱(需要 JMS 2.0 broker,自版本 4.2 起可用)。使用 subscription 為訂閱命名。

使用 JMS 訊息選擇器

藉助 JMS 訊息選擇器,您可以根據 JMS 頭部和 JMS 屬性過濾 JMS 訊息。例如,如果您想監聽自定義 JMS 頭部屬性 myHeaderProperty 等於 something 的訊息,您可以指定以下表達式:

myHeaderProperty = 'something'

訊息選擇器表示式是 SQL-92 條件表示式語法的一個子集,並被定義為 Java Message Service 規範的一部分。您可以透過以下 Spring Integration JMS 元件的 XML 名稱空間配置來指定 JMS 訊息 selector 屬性:

  • JMS Channel

  • JMS Publish Subscribe Channel

  • JMS Inbound Channel Adapter

  • JMS Inbound Gateway

  • JMS Message-driven Channel Adapter

您不能使用 JMS 訊息選擇器引用訊息主體值。

JMS 示例

要嘗試這些 JMS 介面卡,請檢視 Spring Integration Samples Git 倉庫中提供的 JMS 示例:https://github.com/spring-projects/spring-integration-samples/tree/master/basic/jms

該倉庫包含兩個示例。一個提供了入站和出站通道介面卡,另一個提供了入站和出站閘道器。它們配置為使用嵌入式 ActiveMQ 程序執行,但您可以修改每個示例的 common.xml Spring 應用程式上下文檔案以支援不同的 JMS 提供程式或獨立的 ActiveMQ 程序。

換句話說,您可以拆分配置,以便入站和出站介面卡在獨立的 JVM 中執行。如果您安裝了 ActiveMQ,請修改 common.xml 檔案中的 brokerURL 屬性,以使用 tcp://:61616(而不是 vm://)。這兩個示例都接受來自 stdin 的輸入並回顯到 stdout。檢視配置以瞭解這些訊息如何透過 JMS 進行路由。