JMS 支援
Spring Integration 提供了用於接收和傳送 JMS 訊息的通道介面卡。
專案需要此依賴項
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jms</artifactId>
<version>7.0.0</version>
</dependency>
compile "org.springframework.integration:spring-integration-jms:7.0.0"
必須透過一些 JMS 廠商特定的實現(例如 Apache ActiveMQ)顯式新增 jakarta.jms:jakarta.jms-api。
實際上,有兩種基於 JMS 的入站通道介面卡。第一種使用 Spring 的 JmsTemplate 基於輪詢週期進行接收。第二種是“訊息驅動”的,依賴於 Spring MessageListener 容器。出站通道介面卡使用 JmsTemplate 按需轉換和傳送 JMS 訊息。
透過使用 JmsTemplate 和 MessageListener 容器,Spring Integration 依賴於 Spring 的 JMS 支援。理解這一點很重要,因為這些介面卡上公開的大多數屬性都配置了底層的 JmsTemplate 和 MessageListener 容器。有關 JmsTemplate 和 MessageListener 容器的更多詳細資訊,請參閱 Spring JMS 文件。
雖然 JMS 通道介面卡旨在用於單向訊息傳遞(僅傳送或僅接收),但 Spring Integration 還提供了入站和出站 JMS 閘道器,用於請求和回覆操作。入站閘道器依賴於 Spring 的 MessageListener 容器實現之一進行訊息驅動接收。它還能夠將返回值傳送到接收訊息提供的 reply-to 目標。出站閘道器將 JMS 訊息傳送到 request-destination(或 request-destination-name 或 request-destination-expression),然後接收回復訊息。您可以顯式配置 reply-destination 引用(或 reply-destination-name 或 reply-destination-expression)。否則,出站閘道器使用 JMS TemporaryQueue。
在 Spring Integration 2.2 之前,如果需要,會為每個請求或回覆建立(並刪除)一個 TemporaryQueue。從 Spring Integration 2.2 開始,您可以配置出站閘道器使用 MessageListener 容器來接收回復,而不是直接使用新的(或快取的)Consumer 來接收每個請求的回覆。當如此配置且未提供顯式回覆目標時,每個閘道器使用單個 TemporaryQueue,而不是每個請求使用一個。
從 6.0 版本開始,如果將 replyPubSubDomain 選項設定為 true,則出站閘道器會建立 TemporaryTopic 而不是 TemporaryQueue。一些 JMS 供應商對這些目標的處理方式不同。
入站通道介面卡
入站通道介面卡需要引用單個 JmsTemplate 例項或 ConnectionFactory 和 Destination(您可以使用“destinationName”代替“destination”引用)。以下示例定義了一個具有 Destination 引用的入站通道介面卡
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow jmsInbound(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(
Jms.inboundAdapter(connectionFactory)
.destination("inQueue"),
e -> e.poller(poller -> poller.fixedRate(30000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
@Bean
fun jmsInbound(connectionFactory: ConnectionFactory) =
integrationFlow(
Jms.inboundAdapter(connectionFactory).destination("inQueue"),
{ poller { Pollers.fixedRate(30000) } })
{
handle { m -> println(m.payload) }
}
@Bean
@InboundChannelAdapter(value = "exampleChannel", poller = @Poller(fixedRate = "30000"))
public MessageSource<Object> jmsIn(ConnectionFactory connectionFactory) {
JmsDestinationPollingSource source = new JmsDestinationPollingSource(new JmsTemplate(connectionFactory));
source.setDestinationName("inQueue");
return source;
}
<int-jms:inbound-channel-adapter id="jmsIn" destination="inQueue" channel="exampleChannel">
<int:poller fixed-rate="30000"/>
</int-jms:inbound-channel-adapter>
從上述配置中可以看出,inbound-channel-adapter 是一個輪詢消費者。這意味著當它被觸發時會呼叫 receive()。您應該只在輪詢不頻繁且及時性不重要的情況下使用此功能。對於所有其他情況(絕大多數基於 JMS 的用例),message-driven-channel-adapter(稍後描述)是更好的選擇。 |
預設情況下,所有需要引用 ConnectionFactory 的 JMS 介面卡都會自動查詢名為 jmsConnectionFactory 的 bean。這就是為什麼您在許多示例中看不到 connection-factory 屬性的原因。但是,如果您的 JMS ConnectionFactory 具有不同的 bean 名稱,則需要提供該屬性。 |
如果 extract-payload 設定為 true(預設值),則收到的 JMS 訊息將透過 MessageConverter。當依賴預設的 SimpleMessageConverter 時,這意味著生成的 Spring Integration Message 的有效負載是 JMS 訊息的正文。JMS TextMessage 生成基於字串的有效負載,JMS BytesMessage 生成位元組陣列有效負載,JMS ObjectMessage 的可序列化例項成為 Spring Integration 訊息的有效負載。如果您希望將原始 JMS 訊息作為 Spring Integration 訊息的有效負載,請將 extractPayload 選項設定為 false。
從版本 5.0.8 開始,對於 org.springframework.jms.connection.CachingConnectionFactory 和 cacheConsumers,receive-timeout 的預設值為 -1(不等待),否則為 1 秒。JMS 入站通道介面卡根據提供的 ConnectionFactory 和選項建立 DynamicJmsTemplate。如果需要外部 JmsTemplate(例如在 Spring Boot 環境中),或者 ConnectionFactory 未快取,或者沒有 cacheConsumers,建議如果期望非阻塞消費,則將 jmsTemplate.receiveTimeout(-1) 設定為 -1。
Jms.inboundAdapter(connectionFactory)
.destination(queueName)
.configureJmsTemplate(template -> template.receiveTimeout(-1))
事務
從 4.0 版本開始,入站通道介面卡支援 session-transacted 屬性。在早期版本中,您必須注入一個 JmsTemplate,其中 sessionTransacted 設定為 true。(介面卡確實允許您將 acknowledge 屬性設定為 transacted,但這不正確且不起作用)。
但是,請注意,將 session-transacted 設定為 true 幾乎沒有價值,因為事務在 receive() 操作之後立即提交,並且在訊息傳送到 channel 之前。
如果您希望整個流程都是事務性的(例如,如果存在下游出站通道介面卡),則必須使用帶有 JmsTransactionManager 的 transactional 輪詢器。或者,考慮使用 jms-message-driven-channel-adapter,並將 acknowledge 設定為 transacted(預設值)。
訊息驅動通道介面卡
message-driven-channel-adapter 需要引用 Spring MessageListener 容器的例項(AbstractMessageListenerContainer 的任何子類)或 ConnectionFactory 和 Destination(可以使用“destinationName”代替“destination”引用)。以下示例定義了一個具有 Destination 引用的訊息驅動通道介面卡
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow jmsMessageDrivenRedeliveryFlow() {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(jmsConnectionFactory())
.destination("inQueue"))
.channel("exampleChannel")
.get();
}
@Bean
fun jmsMessageDrivenFlowWithContainer() =
integrationFlow(
Jms.messageDrivenChannelAdapter(jmsConnectionFactory())
.destination("inQueue")) {
channel("exampleChannel")
}
@Bean
public JmsMessageDrivenEndpoint jmsIn() {
JmsMessageDrivenEndpoint endpoint = new JmsMessageDrivenEndpoint(container(), listener());
return endpoint;
}
@Bean
public AbstractMessageListenerContainer container() {
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(cf());
container.setDestinationName("inQueue");
return container;
}
@Bean
public ChannelPublishingJmsMessageListener listener() {
ChannelPublishingJmsMessageListener listener = new ChannelPublishingJmsMessageListener();
listener.setRequestChannelName("exampleChannel");
return listener;
}
<int-jms:message-driven-channel-adapter id="jmsIn" destination="inQueue" channel="exampleChannel"/>
|
訊息驅動介面卡還接受與 如果您有一個自定義監聽器容器實現(通常是 |
|
您不能使用 Spring JMS 名稱空間元素 建議為 |
從 4.2 版本開始,預設的 acknowledge 模式是 transacted,除非您提供外部容器。在這種情況下,您應該根據需要配置容器。我們建議將 transacted 與 DefaultMessageListenerContainer 結合使用,以避免訊息丟失。 |
'extract-payload' 屬性具有相同的效果,其預設值為 'true'。poller 元素不適用於訊息驅動通道介面卡,因為它被主動呼叫。對於大多數情況,訊息驅動方法更好,因為訊息一旦從底層 JMS 消費者接收到,就會立即傳遞到 MessageChannel。
最後,<message-driven-channel-adapter> 元素還接受“error-channel”屬性。這提供了與 進入 GatewayProxyFactoryBean 中描述的相同基本功能。以下示例顯示瞭如何在訊息驅動通道介面卡上設定錯誤通道
<int-jms:message-driven-channel-adapter id="jmsIn" destination="inQueue"
channel="exampleChannel"
error-channel="exampleErrorChannel"/>
將上述示例與通用閘道器配置或我們稍後討論的 JMS“入站閘道器”進行比較時,主要區別在於我們處於單向流中,因為這是一個“通道介面卡”,而不是閘道器。因此,“error-channel”下游的流也應該是單向的。例如,它可以傳送到日誌處理程式,或者它可以連線到另一個 JMS <outbound-channel-adapter> 元素。
從主題消費時,將 pub-sub-domain 屬性設定為 true。將 subscription-durable 設定為 true 以實現持久訂閱,或將 subscription-shared 設定為共享訂閱(需要 JMS 2.0 代理,並從 4.2 版本開始可用)。使用 subscription-name 來命名訂閱。
從版本 5.1 開始,當應用程式仍在執行時停止端點時,底層監聽器容器將關閉,從而關閉其共享連線和消費者。以前,連線和消費者保持開啟狀態。要恢復到以前的行為,請將 JmsMessageDrivenEndpoint 上的 shutdownContainerOnStop 設定為 false。
從版本 6.3 開始,ChannelPublishingJmsMessageListener 現在可以提供 RetryTemplate 和 RecoveryCallback<Message<?>>,用於下游傳送和傳送-接收操作的重試。這些選項也暴露在 Java DSL 的 JmsMessageDrivenChannelAdapterSpec 中。
入站轉換錯誤
從 4.2 版本開始,轉換錯誤也使用“error-channel”。以前,如果 JMS <message-driven-channel-adapter/> 或 <inbound-gateway/> 由於轉換錯誤而無法傳遞訊息,則會將異常拋回容器。如果容器配置為使用事務,則訊息將回滾並重復重新傳遞。轉換過程在訊息構造之前和期間發生,因此此類錯誤不會發送到“error-channel”。現在,此類轉換異常會導致 ErrorMessage 傳送到“error-channel”,異常作為 payload。如果您希望事務回滾,並且您已定義“error-channel”,則“error-channel”上的整合流必須重新丟擲異常(或其他異常)。如果錯誤流不丟擲異常,則事務將提交併刪除訊息。如果未定義“error-channel”,則異常將像以前一樣拋回容器。
出站通道介面卡
JmsSendingMessageHandler 實現了 MessageHandler 介面,能夠將 Spring Integration Messages 轉換為 JMS 訊息,然後傳送到 JMS 目標。它需要 jmsTemplate 引用或 jmsConnectionFactory 和 destination 引用(destinationName 可以代替 destination 提供)。與入站通道介面卡一樣,配置此介面卡最簡單的方法是使用名稱空間支援。以下配置生成一個介面卡,該介面卡從 exampleChannel 接收 Spring Integration 訊息,將其轉換為 JMS 訊息,並將其傳送到 bean 名稱為 outQueue 的 JMS 目標引用
-
Java DSL
-
Kotlin DSL
-
Groovy DSL
-
Java
-
XML
@Bean
public IntegrationFlow jmsOutboundFlow() {
return IntegrationFlow.from("exampleChannel")
.handle(Jms.outboundAdapter(cachingConnectionFactory())
.destinationExpression("headers." + SimpMessageHeaderAccessor.DESTINATION_HEADER)
.configureJmsTemplate(t -> t.id("jmsOutboundFlowTemplate")))
.get();
}
@Bean
fun jmsOutboundFlow() =
integrationFlow("exampleChannel") {
handle(Jms.outboundAdapter(jmsConnectionFactory())
.apply {
destinationExpression("headers." + SimpMessageHeaderAccessor.DESTINATION_HEADER)
deliveryModeFunction<Any> { DeliveryMode.NON_PERSISTENT }
timeToLiveExpression("10000")
configureJmsTemplate { it.explicitQosEnabled(true) }
}
)
}
@Bean
jmsOutboundFlow() {
integrationFlow('exampleChannel') {
handle(Jms.outboundAdapter(new ActiveMQConnectionFactory())
.with {
destinationExpression 'headers.' + SimpMessageHeaderAccessor.DESTINATION_HEADER
deliveryModeFunction { DeliveryMode.NON_PERSISTENT }
timeToLiveExpression '10000'
configureJmsTemplate {
it.explicitQosEnabled true
}
}
)
}
}
@Bean
@ServiceActivator(inputChannel = "exampleChannel")
public MessageHandler jmsOut() {
JmsSendingMessageHandler handler = new JmsSendingMessageHandler(new JmsTemplate(connectionFactory));
handler.setDestinationName("outQueue");
return handler;
}
<int-jms:outbound-channel-adapter id="jmsOut" destination="outQueue" channel="exampleChannel"/>
與入站通道介面卡一樣,有一個 'extract-payload' 屬性。但是,對於出站介面卡,其含義是相反的。布林屬性不是應用於 JMS 訊息,而是應用於 Spring Integration 訊息有效負載。換句話說,決定是將 Spring Integration 訊息本身作為 JMS 訊息正文傳遞,還是將 Spring Integration 訊息有效負載作為 JMS 訊息正文傳遞。預設值為 'true'。因此,如果您傳遞一個有效負載為 String 的 Spring Integration 訊息,則會建立一個 JMS TextMessage。另一方面,如果您想透過 JMS 將實際的 Spring Integration 訊息傳送到另一個系統,請將其設定為 'false'。
無論有效負載提取的布林值如何,只要您依賴預設轉換器或提供另一個 MessageConverter 例項的引用,Spring Integration MessageHeaders 都會對映到 JMS 屬性。(入站介面卡也是如此,只不過在那些情況下,JMS 屬性對映到 Spring Integration MessageHeaders)。 |
從版本 5.1 開始,<int-jms:outbound-channel-adapter> (JmsSendingMessageHandler) 可以配置 deliveryModeExpression 和 timeToLiveExpression 屬性,以根據請求 Spring Message 在執行時評估 JMS 訊息的適當 QoS 值。DefaultJmsHeaderMapper 的新 setMapInboundDeliveryMode(true) 和 setMapInboundExpiration(true) 選項可能有助於作為訊息頭中動態 deliveryMode 和 timeToLive 的資訊源
<int-jms:outbound-channel-adapter delivery-mode-expression="headers.jms_deliveryMode"
time-to-live-expression="headers.jms_expiration - T(System).currentTimeMillis()"/>
入站閘道器
Spring Integration 的訊息驅動 JMS 入站閘道器委託給 MessageListener 容器,支援動態調整併發消費者,並且還可以處理回覆。入站閘道器需要引用 ConnectionFactory 和請求 Destination(或“requestDestinationName”)。以下示例定義了一個 JMS inbound-gateway,它從 bean ID 為 inQueue 引用的 JMS 佇列接收,併發送到名為 exampleChannel 的 Spring Integration 通道
<int-jms:inbound-gateway id="jmsInGateway"
request-destination="inQueue"
request-channel="exampleChannel"/>
由於閘道器提供請求-回覆行為而不是單向傳送或接收行為,因此它們還具有兩個不同的“有效負載提取”屬性(如 前面討論 的通道介面卡的“extract-payload”設定)。對於入站閘道器,“extract-request-payload”屬性確定是否提取收到的 JMS 訊息正文。如果為“false”,則 JMS 訊息本身成為 Spring Integration 訊息有效負載。預設值為“true”。
同樣,對於入站閘道器,`extract-reply-payload` 屬性應用於要轉換為回覆 JMS 訊息的 Spring Integration 訊息。如果您想傳遞整個 Spring Integration 訊息(作為 JMS ObjectMessage 的正文),請將此值設定為“false”。預設情況下,Spring Integration 訊息負載也會轉換為 JMS 訊息(例如,`String` 負載會轉換為 JMS TextMessage)。
與任何其他情況一樣,閘道器呼叫可能會導致錯誤。預設情況下,生產者不會收到消費者端可能發生的錯誤的通知,並且在等待回覆時會超時。但是,有時您可能希望將錯誤條件傳回消費者(換句話說,您可能希望透過將其對映到訊息來將異常視為有效回覆)。為了實現這一點,JMS 入站閘道器支援一個訊息通道,錯誤可以傳送到該通道進行處理,從而可能導致回覆訊息有效負載符合定義呼叫者可能期望的“錯誤”回覆的某些約定。您可以使用 error-channel 屬性來配置此類通道,如以下示例所示
<int-jms:inbound-gateway request-destination="requestQueue"
request-channel="jmsInputChannel"
error-channel="errorTransformationChannel"/>
<int:transformer input-channel="exceptionTransformationChannel"
ref="exceptionTransformer" method="createErrorResponse"/>
您可能會注意到,此示例與 進入 GatewayProxyFactoryBean 中包含的示例非常相似。這裡適用相同的想法:exceptionTransformer 可以是一個建立錯誤響應物件的 POJO,您可以引用 nullChannel 來抑制錯誤,或者您可以省略 'error-channel' 以讓異常傳播。
請參閱 入站轉換錯誤。
從主題消費時,將 pub-sub-domain 屬性設定為 true。將 subscription-durable 設定為 true 以實現持久訂閱,或將 subscription-shared 設定為共享訂閱(需要 JMS 2.0 代理,並從 4.2 版本開始可用)。使用 subscription-name 來命名訂閱。
從 4.2 版本開始,預設的 acknowledge 模式是 transacted,除非提供了外部容器。在這種情況下,您應該根據需要配置容器。我們建議您將 transacted 與 DefaultMessageListenerContainer 結合使用,以避免訊息丟失。 |
從版本 5.1 開始,當應用程式仍在執行時停止端點時,底層監聽器容器將關閉,從而關閉其共享連線和消費者。以前,連線和消費者保持開啟狀態。要恢復到以前的行為,請將 JmsInboundGateway 上的 shutdownContainerOnStop 設定為 false。
預設情況下,JmsInboundGateway 會在收到的訊息中查詢 jakarta.jms.Message.getJMSReplyTo() 屬性,以確定回覆的傳送位置。否則,它可以透過靜態的 defaultReplyDestination、defaultReplyQueueName 或 defaultReplyTopicName 進行配置。此外,從 6.1 版本開始,可以在提供的 ChannelPublishingJmsMessageListener 上配置 replyToExpression,以便在請求中標準 JMSReplyTo 屬性為 null 時動態確定回覆目標。收到的 jakarta.jms.Message 用作根評估上下文物件。以下示例演示瞭如何使用 Java DSL API 配置一個 JMS 入站閘道器,該閘道器的自定義回覆目標從請求訊息中解析
@Bean
public IntegrationFlow jmsInboundGatewayFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(
Jms.inboundGateway(connectionFactory)
.requestDestination("requestDestination")
.replyToFunction(message -> message.getStringProperty("myReplyTo")))
.<String, String>transform(String::toUpperCase)
.get();
}
從版本 6.3 開始,Jms.inboundGateway() API 暴露了 retryTemplate() 和 recoveryCallback() 選項,用於重試內部發送和接收操作。
出站閘道器
出站閘道器從 Spring Integration 訊息建立 JMS 訊息,並將其傳送到 request-destination。然後,它透過使用選擇器從您配置的 reply-destination 接收,或者在未提供 reply-destination 的情況下,透過建立 JMS TemporaryQueue(如果 replyPubSubDomain=true,則建立 TemporaryTopic)例項來處理 JMS 回覆訊息。
|
將 如果您指定了回覆目標,建議不要使用快取的消費者。或者,考慮使用 如下所述 的 |
以下示例展示瞭如何配置出站閘道器
<int-jms:outbound-gateway id="jmsOutGateway"
request-destination="outQueue"
request-channel="outboundJmsRequests"
reply-channel="jmsReplies"/>
“出站閘道器”的有效負載提取屬性與“入站閘道器”的屬性(參見之前的討論)相反。這意味著“extract-request-payload”屬性值適用於轉換為 JMS 訊息以作為請求傳送的 Spring Integration 訊息。而“extract-reply-payload”屬性值適用於作為回覆接收的 JMS 訊息,然後轉換為 Spring Integration 訊息,以便隨後傳送到“reply-channel”,如上述配置示例所示。
使用 <reply-listener/>
Spring Integration 2.2 引入了一種處理回覆的替代技術。如果您向閘道器新增 <reply-listener/> 子元素,而不是為每個回覆建立消費者,則會使用 MessageListener 容器來接收回復並將其交給請求執行緒。這提供了許多效能優勢,並緩解了 前面提到的警告 中描述的快取消費者記憶體利用率問題。
當使用帶有出站閘道器的 <reply-listener/> 且沒有 reply-destination 時,將使用單個 TemporaryQueue,而不是為每個請求建立一個 TemporaryQueue。(如果與代理的連線丟失並恢復,閘道器會根據需要建立額外的 TemporaryQueue)。如果 replyPubSubDomain 設定為 true,從 6.0 版本開始,將建立 TemporaryTopic。
當使用 correlation-key 時,多個閘道器可以共享相同的回覆目標,因為監聽器容器使用每個閘道器唯一的選擇器。
|
如果您指定了回覆監聽器並指定了回覆目標(或回覆目標名稱),但未提供關聯鍵,則閘道器會記錄警告並回退到 2.2 版本之前的行為。這是因為在這種情況下無法配置選擇器。因此,無法避免回覆轉到可能配置了相同回覆目標的不同閘道器。 請注意,在這種情況下,每個請求都使用一個新的消費者,並且消費者可能會在記憶體中積累,如上述注意事項所述;因此,在這種情況下不應使用快取消費者。 |
以下示例顯示了具有預設屬性的回覆監聽器
<int-jms:outbound-gateway id="jmsOutGateway"
request-destination="outQueue"
request-channel="outboundJmsRequests"
reply-channel="jmsReplies">
<int-jms:reply-listener />
</int-jms-outbound-gateway>
監聽器非常輕量級,我們預計在大多數情況下您只需要一個消費者。但是,您可以新增 concurrent-consumers、max-concurrent-consumers 等屬性。有關支援屬性的完整列表及其含義,請參閱模式和 Spring JMS 文件。
空閒回覆監聽器
從 4.2 版本開始,您可以根據需要啟動回覆監聽器(並在空閒時間後停止它),而不是在閘道器的整個生命週期內執行。如果您在應用程式上下文中有很多閘道器(其中大部分處於空閒狀態),這可能很有用。其中一種情況是,上下文中有許多(不活躍的)分割槽 Spring Batch 作業使用 Spring Integration 和 JMS 進行分割槽分發。如果所有回覆監聽器都處於活動狀態,則 JMS 代理會為每個閘道器都有一個活動的消費者。透過啟用空閒超時,每個消費者僅在相應的批處理作業執行期間(以及作業完成後的一小段時間內)存在。
請參閱 屬性參考 中的 idle-reply-listener-timeout。
網關回復關聯
本節描述了用於回覆關聯的機制(確保源閘道器僅接收對其請求的回覆),具體取決於閘道器的配置方式。有關此處討論的屬性的完整描述,請參閱 屬性參考。
以下列表描述了各種場景(數字僅用於標識——順序無關緊要)
-
無
reply-destination*屬性且無<reply-listener>為每個請求建立一個
TemporaryQueue,並在請求完成時(成功或不成功)刪除。correlation-key無關緊要。 -
提供了
reply-destination*屬性,並且沒有提供<reply-listener/>也未提供correlation-key與出站訊息相同的
JMSCorrelationID用作消費者的訊息選擇器messageSelector = "JMSCorrelationID = '" + messageId + "'"響應系統應在回覆
JMSCorrelationID中返回入站JMSMessageID。這是一種常見模式,由 Spring Integration 入站閘道器以及 Spring 的MessageListenerAdapter用於訊息驅動 POJO 實現。當您使用此配置時,不應將主題用於回覆。回覆可能會丟失。 -
提供了
reply-destination*屬性,未提供<reply-listener/>,且correlation-key="JMSCorrelationID"閘道器生成一個唯一的關聯 ID 並將其插入
JMSCorrelationID頭中。訊息選擇器是messageSelector = "JMSCorrelationID = '" + uniqueId + "'"響應系統應在回覆
JMSCorrelationID中返回入站JMSCorrelationID。這是一種常見模式,由 Spring Integration 入站閘道器以及 Spring 的MessageListenerAdapter用於訊息驅動 POJO 實現。 -
提供了
reply-destination*屬性,未提供<reply-listener/>,且correlation-key="myCorrelationHeader"閘道器生成一個唯一的關聯 ID 並將其插入
myCorrelationHeader訊息屬性中。correlation-key可以是任何使用者定義的值。訊息選擇器是messageSelector = "myCorrelationHeader = '" + uniqueId + "'"響應系統應在回覆
myCorrelationHeader中返回入站myCorrelationHeader。 -
提供了
reply-destination*屬性,未提供<reply-listener/>,且correlation-key="JMSCorrelationID*"(注意關聯鍵中的*。)閘道器使用請求訊息中
jms_correlationId頭中的值(如果存在)並將其插入JMSCorrelationID頭中。訊息選擇器是messageSelector = "JMSCorrelationID = '" + headers['jms_correlationId'] + "'"使用者必須確保此值是唯一的。
如果頭不存在,則閘道器的行為與
3相同。響應系統應在回覆
JMSCorrelationID中返回入站JMSCorrelationID。這是一種常見模式,由 Spring Integration 入站閘道器以及 Spring 的MessageListenerAdapter用於訊息驅動 POJO 實現。 -
未提供
reply-destination*屬性,但提供了<reply-listener>建立了一個臨時佇列,用於此閘道器例項的所有回覆。訊息中不需要關聯資料,但出站
JMSMessageID在閘道器內部用於將回復定向到正確的請求執行緒。 -
提供了
reply-destination*屬性,提供了<reply-listener>,但未提供correlation-key不允許。
<reply-listener/>配置將被忽略,閘道器的行為與2相同。將寫入警告日誌訊息以指示此情況。 -
提供了
reply-destination*屬性,提供了<reply-listener>,且correlation-key="JMSCorrelationID"閘道器具有唯一的關聯 ID,並將其與遞增值一起插入
JMSCorrelationID頭中(gatewayId + "_" + ++seq)。訊息選擇器是messageSelector = "JMSCorrelationID LIKE '" + gatewayId%'"響應系統應在回覆
JMSCorrelationID中返回入站JMSCorrelationID。這是一種常見模式,由 Spring Integration 入站閘道器以及 Spring 的MessageListenerAdapter用於訊息驅動 POJO 實現。由於每個閘道器都有唯一的 ID,因此每個例項只獲取自己的回覆。完整的關聯資料用於將回復路由到正確的請求執行緒。 -
提供了
reply-destination*屬性,提供了<reply-listener/>,且correlation-key="myCorrelationHeader"閘道器具有唯一的關聯 ID,並將其與遞增值一起插入
myCorrelationHeader屬性中(gatewayId + "_" + ++seq)。correlation-key可以是任何使用者定義的值。訊息選擇器是messageSelector = "myCorrelationHeader LIKE '" + gatewayId%'"響應系統應在回覆
myCorrelationHeader中返回入站myCorrelationHeader。由於每個閘道器都有唯一的 ID,因此每個例項只獲取自己的回覆。完整的關聯資料用於將回復路由到正確的請求執行緒。 -
提供了
reply-destination*屬性,提供了<reply-listener/>,且correlation-key="JMSCorrelationID*"(注意關聯鍵中的
*)不允許。
不允許使用者提供的關聯 ID 與回覆監聽器一起使用。閘道器不會使用此配置進行初始化。
非同步閘道器
從 4.3 版本開始,您現在可以在配置出站閘道器時指定 async="true"(或在 Java 中指定 setAsync(true))。
預設情況下,當請求傳送到閘道器時,請求執行緒會暫停,直到收到回覆。然後流在該執行緒上繼續。如果 async 為 true,則請求執行緒在 send() 完成後立即釋放,並且回覆在監聽器容器執行緒上返回(並且流繼續)。當閘道器在輪詢器執行緒上呼叫時,這可能很有用。執行緒被釋放並可用於框架內的其他任務。
async 需要 <reply-listener/>(或在 Java 配置中設定 setUseReplyContainer(true))。它還需要指定 correlationKey(通常是 JMSCorrelationID)。如果這些條件中的任何一個不滿足,async 將被忽略。
屬性參考
以下列表顯示了 outbound-gateway 的所有可用屬性
<int-jms:outbound-gateway
connection-factory="connectionFactory" (1)
correlation-key="" (2)
delivery-persistent="" (3)
destination-resolver="" (4)
explicit-qos-enabled="" (5)
extract-reply-payload="true" (6)
extract-request-payload="true" (7)
header-mapper="" (8)
message-converter="" (9)
priority="" (10)
receive-timeout="" (11)
reply-channel="" (12)
reply-destination="" (13)
reply-destination-expression="" (14)
reply-destination-name="" (15)
reply-pub-sub-domain="" (16)
reply-timeout="" (17)
request-channel="" (18)
request-destination="" (19)
request-destination-expression="" (20)
request-destination-name="" (21)
request-pub-sub-domain="" (22)
time-to-live="" (23)
requires-reply="" (24)
idle-reply-listener-timeout="" (25)
async=""> (26)
<int-jms:reply-listener /> (27)
</int-jms:outbound-gateway>
| 1 | 引用 jakarta.jms.ConnectionFactory。預設值為 jmsConnectionFactory。 |
||
| 2 | 包含關聯資料以將響應與回覆關聯起來的屬性名稱。如果省略,閘道器期望響應系統在 JMSCorrelationID 頭中返回出站 JMSMessageID 頭的值。如果指定,閘道器將生成一個關聯 ID 並用它填充指定的屬性。響應系統必須在同一屬性中回顯該值。它可以設定為 JMSCorrelationID,在這種情況下,將使用標準頭而不是 String 屬性來儲存關聯資料。當您使用 <reply-container/> 時,如果您提供了顯式的 reply-destination,則必須指定 correlation-key。從 4.0.1 版本開始,此屬性還支援值 JMSCorrelationID*,這意味著如果出站訊息已經具有 JMSCorrelationID(從 jms_correlationId 對映)頭,則使用它而不是生成一個新的。請注意,當您使用 <reply-container/> 時不允許使用 JMSCorrelationID* 鍵,因為容器需要在初始化期間設定訊息選擇器。
|
||
| 3 | 一個布林值,指示傳遞模式應為 DeliveryMode.PERSISTENT(true)還是 DeliveryMode.NON_PERSISTENT(false)。此設定僅在 explicit-qos-enabled 為 true 時才生效。 |
||
| 4 | 一個 DestinationResolver。預設是一個 SimpleDestinationResolver,它將目標名稱對映到該名稱的佇列或主題並快取一個目標。 |
||
| 5 | 設定為 true 時,啟用質量服務屬性:priority、delivery-mode 和 time-to-live。 |
||
| 6 | 設定為 true(預設值)時,Spring Integration 回覆訊息的有效負載將從 JMS 回覆訊息的正文建立(使用 MessageConverter)。設定為 false 時,整個 JMS 訊息將成為 Spring Integration 訊息的有效負載。 |
||
| 7 | 當設定為 true(預設值)時,Spring Integration 訊息的有效負載將轉換為 JMSMessage(透過使用 MessageConverter)。當設定為 false 時,整個 Spring Integration 訊息將轉換為 JMSMessage。在這兩種情況下,Spring Integration 訊息頭都將透過 HeaderMapper 對映到 JMS 頭和屬性。 |
||
| 8 | 一個 HeaderMapper,用於將 Spring Integration 訊息頭對映到 JMS 訊息頭和屬性,以及反向對映。 |
||
| 9 | MessageConverter 的引用,用於在 JMS 訊息和 Spring Integration 訊息負載(或訊息,如果 extract-request-payload 為 false)之間進行轉換。預設是 SimpleMessageConverter。 |
||
| 10 | 請求訊息的預設優先順序。如果存在訊息優先順序頭,則會被覆蓋。其範圍為 0 到 9。此設定僅在 explicit-qos-enabled 為 true 時才生效。 |
||
| 11 | 等待回覆的時間(毫秒)。預設值為 5000(五秒)。 |
||
| 12 | 回覆訊息傳送到的通道。 | ||
| 13 | 對 Destination 的引用,該引用被設定為 JMSReplyTo 頭。最多隻允許一個 reply-destination、reply-destination-expression 或 reply-destination-name。如果未提供,則使用 TemporaryQueue 作為此閘道器的回覆。 |
||
| 14 | 評估為 Destination 的 SpEL 表示式,它將設定為 JMSReplyTo 頭。表示式可以生成 Destination 物件或 String。它由 DestinationResolver 用於解析實際的 Destination。最多隻允許一個 reply-destination、reply-destination-expression 或 reply-destination-name。如果未提供,則使用 TemporaryQueue 作為此閘道器的回覆。 |
||
| 15 | 設定為 JMSReplyTo 頭的目標名稱。它由 DestinationResolver 用於解析實際的 Destination。最多隻允許一個 reply-destination、reply-destination-expression 或 reply-destination-name。如果未提供,則使用 TemporaryQueue 作為此閘道器的回覆。 |
||
| 16 | 當設定為 true 時,表示由 DestinationResolver 解析的任何回覆 Destination 都應該是 Topic 而不是 Queue。 |
||
| 17 | 閘道器將回復訊息傳送到 reply-channel 時等待的時間。這僅在 reply-channel 可以阻塞時才生效,例如容量限制已滿的 QueueChannel。預設值為無窮大。 |
||
| 18 | 此閘道器接收請求訊息的通道。 | ||
| 19 | 對傳送請求訊息的 Destination 的引用。reply-destination、reply-destination-expression 或 reply-destination-name 中的一個為必需。您只能使用這三個屬性中的一個。 |
||
| 20 | 評估為 Destination 的 SpEL 表示式,用於傳送請求訊息。表示式可以生成 Destination 物件或 String。它由 DestinationResolver 用於解析實際的 Destination。reply-destination、reply-destination-expression 或 reply-destination-name 中的一個為必需。您只能使用這三個屬性中的一個。 |
||
| 21 | 傳送請求訊息到的目標名稱。它由 DestinationResolver 用於解析實際的 Destination。reply-destination、reply-destination-expression 或 reply-destination-name 中的一個為必需。您只能使用這三個屬性中的一個。 |
||
| 22 | 當設定為 true 時,表示由 DestinationResolver 解析的任何請求 Destination 都應該是 Topic 而不是 Queue。 |
||
| 23 | 指定訊息的存活時間。此設定僅在 explicit-qos-enabled 為 true 時才生效。 |
||
| 24 | 指定此出站閘道器是否必須返回非空值。預設情況下,此值為 true,當底層服務在 receive-timeout 後未返回值時,將丟擲 MessageTimeoutException。請注意,如果服務從不期望返回回覆,最好使用 <int-jms:outbound-channel-adapter/> 而不是 <int-jms:outbound-gateway/> 並設定 requires-reply="false"。後者會阻塞傳送執行緒,等待回覆,時間為 receive-timeout 週期。 |
||
| 25 | 當您使用 <reply-listener /> 時,它的生命週期(啟動和停止)預設與閘道器的生命週期匹配。當此值大於 0 時,容器按需啟動(當傳送請求時)。容器將繼續執行,直到至少此時間過去,沒有收到請求(並且沒有待處理的回覆)。容器將在下一個請求時再次啟動。停止時間是最小值,實際上可能高達此值的 1.5 倍。 |
||
| 26 | 請參閱 非同步閘道器。 | ||
| 27 | 當包含此元素時,回覆由非同步 MessageListenerContainer 接收,而不是為每個回覆建立消費者。在許多情況下,這可能更高效。 |
將訊息頭對映到 JMS 訊息和從 JMS 訊息對映訊息頭
JMS 訊息可以包含元資訊,例如 JMS API 訊息頭和簡單屬性。您可以使用 JmsHeaderMapper 將這些訊息頭對映到 Spring Integration 訊息頭以及從 Spring Integration 訊息頭對映。JMS API 訊息頭傳遞給適當的 setter 方法(例如 setJMSReplyTo),而其他訊息頭則複製到 JMS 訊息的通用屬性中。JMS 出站閘道器使用 JmsHeaderMapper 的預設實現進行引導,該實現將對映標準 JMS API 訊息頭以及基本型別或 String 訊息頭。您還可以透過使用入站和出站閘道器的 header-mapper 屬性來提供自定義訊息頭對映器。
許多 JMS 廠商特定的客戶端不允許直接在已建立的 JMS 訊息上設定 deliveryMode、priority 和 timeToLive 屬性。它們被認為是 QoS 屬性,因此必須傳播到目標 MessageProducer.send(message, deliveryMode, priority, timeToLive) API。因此,DefaultJmsHeaderMapper 不會將適當的 Spring Integration 訊息頭(或表示式結果)對映到上述 JMS 訊息屬性中。相反,JmsSendingMessageHandler 使用 DynamicJmsTemplate 將請求訊息中的訊息頭值傳播到 MessageProducer.send() API 中。要啟用此功能,您必須使用 explicitQosEnabled 屬性設定為 true 的 DynamicJmsTemplate 配置出站端點。Spring Integration Java DSL 預設配置 DynamicJmsTemplate,但您仍然必須設定 explicitQosEnabled 屬性。 |
從 4.0 版本開始,JMSPriority 頭會對映到入站訊息的標準 priority 頭。以前,priority 頭僅用於出站訊息。要恢復到以前的行為(即不對映入站優先順序),請將 DefaultJmsHeaderMapper 的 mapInboundPriority 屬性設定為 false。 |
從 4.3 版本開始,DefaultJmsHeaderMapper 透過呼叫其 toString() 方法將標準 correlationId 標頭對映為訊息屬性(correlationId 通常是 UUID,JMS 不支援)。在入站端,它對映為 String。這獨立於 jms_correlationId 標頭,後者對映到 JMSCorrelationID 標頭和從 JMSCorrelationID 標頭對映。JMSCorrelationID 通常用於關聯請求和回覆,而 correlationId 通常用於將相關訊息組合成一個組(例如與聚合器或重排序器一起使用)。 |
從 5.1 版本開始,DefaultJmsHeaderMapper 可以配置為對映入站 JMSDeliveryMode 和 JMSExpiration 屬性
@Bean
public DefaultJmsHeaderMapper jmsHeaderMapper() {
DefaultJmsHeaderMapper mapper = new DefaultJmsHeaderMapper();
mapper.setMapInboundDeliveryMode(true)
mapper.setMapInboundExpiration(true)
return mapper;
}
這些 JMS 屬性分別對映到 JmsHeaders.DELIVERY_MODE 和 JmsHeaders.EXPIRATION Spring 訊息頭。
訊息轉換、編組和解組
如果您需要轉換訊息,所有 JMS 介面卡和閘道器都允許您透過設定 message-converter 屬性來提供 MessageConverter。為此,請提供同一 ApplicationContext 中可用的 MessageConverter 例項的 bean 名稱。此外,為了與編組器和解組器介面保持一致,Spring 提供了 MarshallingMessageConverter,您可以將其配置為您自己的自定義編組器和解組器。以下示例演示瞭如何執行此操作
<int-jms:inbound-gateway request-destination="requestQueue"
request-channel="inbound-gateway-channel"
message-converter="marshallingMessageConverter"/>
<bean id="marshallingMessageConverter"
class="org.springframework.jms.support.converter.MarshallingMessageConverter">
<constructor-arg>
<bean class="org.bar.SampleMarshaller"/>
</constructor-arg>
<constructor-arg>
<bean class="org.bar.SampleUnmarshaller"/>
</constructor-arg>
</bean>
當您提供自己的 MessageConverter 例項時,它仍然被封裝在 HeaderMappingMessageConverter 中。這意味著“extract-request-payload”和“extract-reply-payload”屬性可能會影響傳遞給您的轉換器的實際物件。HeaderMappingMessageConverter 本身委託給目標 MessageConverter,同時還將 Spring Integration MessageHeaders 對映到 JMS 訊息屬性並再次映射回來。 |
JMS 支援的訊息通道
前面介紹的通道介面卡和閘道器都用於與外部系統整合的應用程式。入站選項假定其他系統正在將 JMS 訊息傳送到 JMS 目標,而出站選項假定其他系統正在從目標接收訊息。其他系統可能是也可能不是 Spring Integration 應用程式。當然,當將 Spring Integration 訊息例項作為 JMS 訊息本身的正文傳送時(將“extract-payload”值設定為 false),假定其他系統是基於 Spring Integration 的。但是,這絕不是一個要求。這種靈活性是使用基於訊息的整合選項和“通道”(或 JMS 情況下的目標)抽象的好處之一。
有時,給定 JMS 目標的生產者和消費者都旨在成為同一應用程式的一部分,在同一程序中執行。您可以透過使用一對入站和出站通道介面卡來實現這一點。這種方法的缺點是您需要兩個介面卡,即使從概念上講,目標是擁有單個訊息通道。Spring Integration 2.0 版本中支援更好的選項。現在可以使用 JMS 名稱空間定義單個“通道”,如以下示例所示
<int-jms:channel id="jmsChannel" queue="exampleQueue"/>
上述示例中的通道行為很像主 Spring Integration 名稱空間中的普通 <channel/> 元素。它可以由任何端點的 input-channel 和 output-channel 屬性引用。不同之處在於,此通道由名為 exampleQueue 的 JMS Queue 例項支援。這意味著在生產者和消費者端點之間可以進行非同步訊息傳遞。但是,與透過在非 JMS <channel/> 元素中新增 <queue/> 元素建立的更簡單的非同步訊息通道不同,訊息不會儲存在記憶體佇列中。相反,這些訊息在 JMS 訊息正文中傳遞,並且底層 JMS 提供程式的全部功能可用於該通道。使用此替代方案最常見的原因可能是利用 JMS 訊息的儲存轉發方法提供的永續性。
如果配置得當,JMS 支援的訊息通道也支援事務。換句話說,如果生產者的傳送操作是回滾事務的一部分,則生產者實際上不會寫入事務性 JMS 支援的通道。同樣,如果接收訊息是回滾事務的一部分,則消費者不會物理地從通道中刪除 JMS 訊息。請注意,在這種情況下,生產者和消費者事務是獨立的。這與事務上下文透過沒有 <queue/> 子元素的簡單同步 <channel/> 元素傳播的情況顯著不同。
由於上述示例引用了 JMS 佇列例項,因此它充當點對點通道。另一方面,如果您需要釋出-訂閱行為,您可以使用單獨的元素並引用 JMS 主題。以下示例顯示瞭如何執行此操作
<int-jms:publish-subscribe-channel id="jmsChannel" topic="exampleTopic"/>
對於任何型別的 JMS 支援通道,可以提供目標名稱而不是引用,如以下示例所示
<int-jms:channel id="jmsQueueChannel" queue-name="exampleQueueName"/>
<jms:publish-subscribe-channel id="jmsTopicChannel" topic-name="exampleTopicName"/>
在前面的示例中,目標名稱由 Spring 的預設 SimpleDestinationResolver 實現解析,但您可以提供 DestinationResolver 介面的任何實現。此外,JMS ConnectionFactory 是通道的必需屬性,但預設情況下,預期的 bean 名稱將是 jmsConnectionFactory。以下示例為 JMS 目標名稱的解析和 ConnectionFactory 的不同名稱提供了一個自定義例項
<int-jms:channel id="jmsChannel" queue-name="exampleQueueName"
destination-resolver="customDestinationResolver"
connection-factory="customConnectionFactory"/>
對於 <publish-subscribe-channel />,將 durable 屬性設定為 true 以實現持久訂閱,或將 subscription-shared 設定為共享訂閱(需要 JMS 2.0 代理,並從 4.2 版本開始可用)。使用 subscription 來命名訂閱。
使用 JMS 訊息選擇器
藉助 JMS 訊息選擇器,您可以根據 JMS 訊息頭和 JMS 屬性過濾 JMS 訊息。例如,如果您想監聽自定義 JMS 訊息頭屬性 myHeaderProperty 等於 something 的訊息,您可以指定以下表達式
myHeaderProperty = 'something'
訊息選擇器表示式是 SQL-92 條件表示式語法的一個子集,並作為 Java 訊息服務 規範的一部分進行定義。您可以使用 XML 名稱空間配置為以下 Spring Integration JMS 元件指定 JMS 訊息 selector 屬性
-
JMS 通道
-
JMS 釋出訂閱通道
-
JMS 入站通道介面卡
-
JMS 入站閘道器
-
JMS 訊息驅動通道介面卡
| 您不能使用 JMS 訊息選擇器引用訊息正文值。 |
JMS 示例
要體驗這些 JMS 介面卡,請檢視 Spring Integration Samples Git 儲存庫中提供的 JMS 示例:https://github.com/spring-projects/spring-integration-samples/tree/master/basic/jms。
該儲存庫包含兩個示例。一個提供入站和出站通道介面卡,另一個提供入站和出站閘道器。它們配置為使用嵌入式 ActiveMQ 程序執行,但您可以修改每個示例的 common.xml Spring 應用程式上下文檔案,以支援不同的 JMS 提供程式或獨立的 ActiveMQ 程序。
換句話說,您可以拆分配置,以便入站和出站介面卡在單獨的 JVM 中執行。如果您已安裝 ActiveMQ,請修改 common.xml 檔案中的 brokerURL 屬性,以使用 tcp://:61616(而不是 vm://)。這兩個示例都接受來自 stdin 的輸入並回顯到 stdout。檢視配置以瞭解這些訊息如何透過 JMS 路由。