使用 Spring JMS

本節介紹如何使用 Spring 的 JMS 元件。

使用 JmsTemplate

JmsTemplate 類是 JMS 核心包中的中心類。它簡化了 JMS 的使用,因為它在傳送或同步接收訊息時處理資源的建立和釋放。

使用 JmsTemplate 的程式碼只需要實現回撥介面,這些介面提供了清晰定義的高階契約。MessageCreator 回撥介面在給定由 JmsTemplate 中的呼叫程式碼提供的 Session 時建立訊息。為了允許更復雜的 JMS API 使用,SessionCallback 提供 JMS 會話,而 ProducerCallback 暴露一個 SessionMessageProducer 對。

JMS API 暴露了兩種傳送方法,一種接收投遞模式、優先順序和存活時間作為服務質量 (QOS) 引數,另一種不接收 QOS 引數並使用預設值。由於 JmsTemplate 有許多傳送方法,為了避免傳送方法數量上的重複,QOS 引數的設定被暴露為 bean 屬性。類似地,同步接收呼叫的超時值可以透過使用 setReceiveTimeout 屬性來設定。

一些 JMS 提供者允許透過配置 ConnectionFactory 來管理性地設定預設 QOS 值。這樣做會導致對 MessageProducer 例項的 send 方法 (send(Destination destination, Message message)) 的呼叫使用的預設 QOS 值與 JMS 規範中指定的值不同。為了提供一致的 QOS 值管理,JmsTemplate 因此必須透過將布林屬性 isExplicitQosEnabled 設定為 true 來專門啟用使用自己的 QOS 值。

為了方便,JmsTemplate 還暴露了一個基本的請求-回覆操作,允許傳送訊息並在操作過程中建立的臨時佇列上等待回覆。

一旦配置完成,JmsTemplate 類的例項是執行緒安全的。這很重要,因為它意味著你可以配置一個 JmsTemplate 的單個例項,然後安全地將這個共享引用注入到多個協作物件中。需要明確的是,JmsTemplate 是有狀態的,因為它維護了對 ConnectionFactory 的引用,但這種狀態不是會話狀態。

自 Spring Framework 4.1 起,JmsMessagingTemplate 構建在 JmsTemplate 之上,並提供了與訊息抽象(即 org.springframework.messaging.Message)的整合。這允許你以通用方式建立要傳送的訊息。

連線

JmsTemplate 需要引用一個 ConnectionFactoryConnectionFactory 是 JMS 規範的一部分,作為使用 JMS 的入口點。它被客戶端應用用作與 JMS 提供者建立連線的工廠,並封裝了各種配置引數,其中許多是供應商特定的,例如 SSL 配置選項。

在 EJB 中使用 JMS 時,供應商提供了 JMS 介面的實現,以便它們可以參與宣告式事務管理並執行連線和會話的池化。為了使用這種實現,Jakarta EE 容器通常要求你在 EJB 或 servlet 部署描述符內部將 JMS 連線工廠宣告為 resource-ref。為了確保在 EJB 內部使用 JmsTemplate 時能夠利用這些特性,客戶端應用應確保它引用的是 ConnectionFactory 的託管實現。

快取訊息資源

標準 API 涉及建立許多中間物件。要傳送訊息,需要執行以下 'API' 流程:

ConnectionFactory->Connection->Session->MessageProducer->send

ConnectionFactorySend 操作之間,會建立和銷燬三個中間物件。為了最佳化資源使用並提高效能,Spring 提供了兩種 ConnectionFactory 實現。

使用 SingleConnectionFactory

Spring 提供了一個 ConnectionFactory 介面的實現,SingleConnectionFactory,它在所有 createConnection() 呼叫中返回相同的 Connection 並忽略對 close() 的呼叫。這對於測試和獨立環境非常有用,這樣同一連線可以用於跨越任意數量事務的多個 JmsTemplate 呼叫。SingleConnectionFactory 接收一個標準 ConnectionFactory 的引用,該引用通常來自 JNDI。

使用 CachingConnectionFactory

CachingConnectionFactory 擴充套件了 SingleConnectionFactory 的功能,並增加了 SessionMessageProducerMessageConsumer 例項的快取。初始快取大小設定為 1。你可以使用 sessionCacheSize 屬性來增加快取會話的數量。請注意,實際快取會話的數量會多於該數字,因為會話根據它們的確認模式進行快取,所以當 sessionCacheSize 設定為一時,最多可以有四個快取的會話例項(每種確認模式一個)。MessageProducerMessageConsumer 例項在其所屬會話內快取,並在快取時也考慮生產者和消費者的獨特屬性。MessageProducer 根據其目的地進行快取。MessageConsumer 根據由目的地、選擇器、noLocal 投遞標誌和持久訂閱名稱(如果建立持久消費者)組成的鍵進行快取。

臨時佇列和主題(TemporaryQueue/TemporaryTopic)的 MessageProducer 和 MessageConsumer 將永遠不會被快取。遺憾的是,WebLogic JMS 恰好在其常規目的地實現上實現了臨時佇列/主題介面,錯誤地指示其所有目的地都無法快取。請在 WebLogic 上使用不同的連線池/快取,或為 WebLogic 定製 CachingConnectionFactory

目的地管理

目的地,作為 ConnectionFactory 例項,是 JMS 管理物件,你可以將其儲存和檢索在 JNDI 中。配置 Spring 應用上下文時,你可以使用 JNDI 的 JndiObjectFactoryBean 工廠類或 <jee:jndi-lookup> 對你的物件引用 JMS 目的地進行依賴注入。然而,如果應用中有大量目的地,或者存在 JMS 提供者特有的高階目的地管理功能,這種策略通常會很繁瑣。這類高階目的地管理的示例包括動態目的地的建立或對層級目的地名稱空間的支援。JmsTemplate 將目的地名稱的解析委託給實現 DestinationResolver 介面的 JMS 目的地物件。DynamicDestinationResolverJmsTemplate 使用的預設實現,用於解析動態目的地。還提供了 JndiDestinationResolver 作為 JNDI 中包含的目的地的服務定位器,並可選地回退到 DynamicDestinationResolver 中的行為。

通常,JMS 應用中使用的目的地僅在執行時可知,因此在部署應用時無法透過管理方式建立。這通常是因為互動系統元件之間存在共享應用邏輯,這些邏輯根據眾所周知的命名約定在執行時建立目的地。儘管動態目的地的建立不是 JMS 規範的一部分,但大多數供應商都提供了此功能。動態目的地使用使用者定義的名稱建立,這使其與臨時目的地不同,並且通常不註冊在 JNDI 中。建立動態目的地所使用的 API 因提供者而異,因為與目的地關聯的屬性是供應商特定的。然而,供應商有時會做出一個簡單的實現選擇,即忽略 JMS 規範中的警告,並使用 TopicSessioncreateTopic(String topicName) 方法或 QueueSessioncreateQueue(String queueName) 方法建立具有預設目的地屬性的新目的地。根據供應商實現的不同,DynamicDestinationResolver 隨後也可以建立一個物理目的地,而不僅僅是解析一個現有的。

布林屬性 pubSubDomain 用於配置 JmsTemplate,使其瞭解正在使用哪種 JMS 域。預設情況下,此屬性的值為 false,表示使用點對點域,即 Queues。此屬性(由 JmsTemplate 使用)透過 DestinationResolver 介面的實現來決定動態目的地解析的行為。

你還可以透過 defaultDestination 屬性為 JmsTemplate 配置一個預設目的地。對於不指定特定目的地的傳送和接收操作,將使用預設目的地。

訊息監聽器容器

在 EJB 世界中,JMS 訊息最常見的用途之一是驅動訊息驅動 Bean (MDB)。Spring 提供了一種建立訊息驅動 POJO (MDP) 的解決方案,這種方式不將使用者繫結到 EJB 容器。(有關 Spring MDP 支援的詳細介紹,請參閱非同步接收:訊息驅動 POJO。)端點方法可以用 @JmsListener 註解——有關詳細資訊,請參閱註解驅動的監聽器端點

訊息監聽器容器用於從 JMS 訊息佇列接收訊息並驅動注入其中的 MessageListener。監聽器容器負責所有訊息接收的執行緒化,並將訊息分派給監聽器進行處理。訊息監聽器容器是 MDP 和訊息提供者之間的中介,負責註冊接收訊息、參與事務、資源獲取和釋放、異常轉換等等。這使你能夠編寫與接收訊息(並可能對其進行響應)相關的(可能很複雜的)業務邏輯,並將樣板化的 JMS 基礎設施關注點委託給框架。

Spring 提供了兩個標準的 JMS 訊息監聽器容器,每個容器都有一套專門的功能集。

使用 SimpleMessageListenerContainer

這個訊息監聽器容器是兩個標準型別中較簡單的一個。它在啟動時建立固定數量的 JMS 會話和消費者,使用標準的 JMS MessageConsumer.setMessageListener() 方法註冊監聽器,並由 JMS 提供者負責執行監聽器回撥。這個變體不允許動態適應執行時需求,也不允許參與外部管理的事務。在相容性方面,它非常接近獨立 JMS 規範的精神,但通常與 Jakarta EE 的 JMS 限制不相容。

雖然 SimpleMessageListenerContainer 不允許參與外部管理的事務,但它確實支援原生的 JMS 事務。要啟用此功能,你可以將 sessionTransacted 標誌切換為 true,或者在 XML 名稱空間中將 acknowledge 屬性設定為 transacted。監聽器丟擲的異常將導致回滾,並且訊息會被重新投遞。或者,可以考慮使用 CLIENT_ACKNOWLEDGE 模式,該模式在發生異常時也提供重新投遞,但不使用事務性 Session 例項,因此不在事務協議中包含任何其他 Session 操作(例如傳送響應訊息)。
預設的 AUTO_ACKNOWLEDGE 模式無法提供足夠的可靠性保證。當監聽器執行失敗時(因為提供者在呼叫監聽器後會自動確認每條訊息,不會將異常傳播給提供者)或者當監聽器容器關閉時(你可以透過設定 acceptMessagesWhileStopping 標誌來配置),訊息可能會丟失。如果需要可靠性保證(例如,用於可靠的佇列處理和持久主題訂閱),請確保使用事務性會話。

使用 DefaultMessageListenerContainer

這個訊息監聽器容器在大多數情況下被使用。與 SimpleMessageListenerContainer 不同,這個容器變體允許動態適應執行時需求,並能夠參與外部管理的事務。當配置了 JtaTransactionManager 時,每條收到的訊息都會註冊到一個 XA 事務中。因此,處理過程可以利用 XA 事務語義。這個監聽器容器在對 JMS 提供者的低要求、高階功能(例如參與外部管理的事務)以及與 Jakarta EE 環境的相容性之間取得了很好的平衡。

你可以自定義容器的快取級別。請注意,當沒有啟用快取時,每次接收訊息都會建立一個新的連線和一個新的會話。在高負載下結合非持久訂閱使用,這可能導致訊息丟失。在這種情況下,請務必使用適當的快取級別。

當代理伺服器宕機時,這個容器也具備恢復能力。預設情況下,一個簡單的 BackOff 實現會每五秒重試一次。你可以指定一個自定義的 BackOff 實現,以獲得更細粒度的恢復選項。請參閱 ExponentialBackOff 作為示例。

與其同胞容器 (SimpleMessageListenerContainer) 類似,DefaultMessageListenerContainer 支援原生 JMS 事務並允許自定義確認模式。如果對你的場景可行,強烈建議優先使用此方式而非外部管理的事務——也就是說,如果你能夠接受在 JVM 死亡情況下偶爾出現的訊息重複。你的業務邏輯中的自定義重複訊息檢測步驟可以處理這種情況——例如,透過檢查業務實體是否存在或檢查協議表。任何此類安排都比替代方案效率顯著更高:透過 XA 事務包裝你的整個處理過程(透過配置你的 DefaultMessageListenerContainer 使用 JtaTransactionManager),以覆蓋 JMS 訊息的接收以及訊息監聽器中業務邏輯的執行(包括資料庫操作等)。
預設的 AUTO_ACKNOWLEDGE 模式無法提供足夠的可靠性保證。當監聽器執行失敗時(因為提供者在呼叫監聽器後會自動確認每條訊息,不會將異常傳播給提供者)或者當監聽器容器關閉時(你可以透過設定 acceptMessagesWhileStopping 標誌來配置),訊息可能會丟失。如果需要可靠性保證(例如,用於可靠的佇列處理和持久主題訂閱),請確保使用事務性會話。

事務管理

Spring 提供了一個 JmsTransactionManager,它管理單個 JMS ConnectionFactory 的事務。這使得 JMS 應用能夠利用 Spring 的託管事務特性,如資料訪問章節的事務管理部分所述。JmsTransactionManager 執行本地資源事務,將指定的 ConnectionFactory 的 JMS Connection/Session 對繫結到執行緒。JmsTemplate 會自動檢測到此類事務性資源並相應地進行操作。

在 Jakarta EE 環境中,ConnectionFactory 會連線和會話例項進行池化,因此這些資源可以在事務中高效地重用。在獨立環境中,使用 Spring 的 SingleConnectionFactory 會導致共享的 JMS Connection,每個事務都有其自己的獨立 Session。或者,可以考慮使用特定於提供者的池化介面卡,例如 ActiveMQ 的 PooledConnectionFactory 類。

你還可以將 JmsTemplateJtaTransactionManager 和支援 XA 的 JMS ConnectionFactory 一起使用,以執行分散式事務。請注意,這需要使用 JTA 事務管理器以及經過正確 XA 配置的 ConnectionFactory。(請查閱你的 Jakarta EE 伺服器或 JMS 提供者的文件。)

在託管和非託管事務環境中重用程式碼時,使用 JMS API 從 Connection 建立 Session 可能會令人困惑。這是因為 JMS API 只有一個建立 Session 的工廠方法,並且它需要事務和確認模式的值。在託管環境中,設定這些值是環境事務基礎設施的責任,因此這些值會被廠商提供的 JMS Connection 包裝器忽略。當你在非託管環境中使用 JmsTemplate 時,你可以透過使用屬性 sessionTransactedsessionAcknowledgeMode 來指定這些值。當你將 PlatformTransactionManagerJmsTemplate 一起使用時,模板總是會獲得一個事務性的 JMS Session