接收訊息
本節介紹如何在 Spring 中使用 JMS 接收訊息。
同步接收
雖然 JMS 通常與非同步處理相關聯,但你也可以同步消費訊息。過載的 receive(..)
方法提供了此功能。在同步接收期間,呼叫執行緒會阻塞,直到訊息可用。這可能是一個危險的操作,因為呼叫執行緒可能會無限期地阻塞。receiveTimeout
屬性指定了接收器在放棄等待訊息之前應該等待多長時間。
非同步接收:訊息驅動的 POJO (MDP)
Spring 還透過使用 `@JmsListener` 註解支援註解監聽器端點,並提供開放的基礎設施來透過程式設計方式註冊端點。這是迄今為止設定非同步接收器最方便的方法。更多詳情請參閱 啟用監聽器端點註解。 |
與 EJB 世界中的訊息驅動 Bean (MDB) 類似,訊息驅動 POJO (MDP) 充當 JMS 訊息的接收器。對 MDP 的一個限制(但請參閱 使用 MessageListenerAdapter
)是它必須實現 jakarta.jms.MessageListener
介面。請注意,如果你的 POJO 在多個執行緒上接收訊息,務必確保你的實現是執行緒安全的。
以下示例展示了一個簡單的 MDP 實現
-
Java
-
Kotlin
public class ExampleListener implements MessageListener {
public void onMessage(Message message) {
if (message instanceof TextMessage textMessage) {
try {
System.out.println(textMessage.getText());
}
catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
else {
throw new IllegalArgumentException("Message must be of type TextMessage");
}
}
}
class ExampleListener : MessageListener {
override fun onMessage(message: Message) {
if (message is TextMessage) {
try {
println(message.text)
} catch (ex: JMSException) {
throw RuntimeException(ex)
}
} else {
throw IllegalArgumentException("Message must be of type TextMessage")
}
}
}
一旦你實現了 MessageListener
,就可以建立一個訊息監聽器容器了。
以下示例展示瞭如何定義和配置 Spring 自帶的一種訊息監聽器容器(在本例中為 DefaultMessageListenerContainer
)
-
Java
-
Kotlin
-
Xml
@Bean
ExampleListener messageListener() {
return new ExampleListener();
}
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
return jmsContainer;
}
@Bean
fun messageListener() = ExampleListener()
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
}
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener"/>
<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
請參閱各種訊息監聽器容器(它們都實現了 MessageListenerContainer)的 Spring javadoc,以獲取每種實現支援功能的完整描述。
使用 SessionAwareMessageListener
介面
SessionAwareMessageListener
介面是一個 Spring 特有的介面,它提供了一個與 JMS MessageListener
介面類似的契約,但同時也允許訊息處理方法訪問接收到 Message
的 JMS Session
。以下列表顯示了 SessionAwareMessageListener
介面的定義
package org.springframework.jms.listener;
public interface SessionAwareMessageListener {
void onMessage(Message message, Session session) throws JMSException;
}
如果你希望你的 MDP 能夠響應任何接收到的訊息(透過使用 onMessage(Message, Session)
方法中提供的 Session
),你可以選擇讓你的 MDP 實現此介面(優先於標準的 JMS MessageListener
介面)。Spring 自帶的所有訊息監聽器容器實現都支援實現 MessageListener
或 SessionAwareMessageListener
介面的 MDP。實現了 SessionAwareMessageListener
的類有一個注意事項:它們透過介面與 Spring 耦合。是否使用它完全取決於你作為應用開發者或架構師的決定。
請注意,SessionAwareMessageListener
介面的 onMessage(..)
方法會丟擲 JMSException
。與標準的 JMS MessageListener
介面不同,當使用 SessionAwareMessageListener
介面時,客戶端程式碼有責任處理所有丟擲的異常。
使用 MessageListenerAdapter
MessageListenerAdapter
類是 Spring 非同步訊息支援中的最後一個元件。簡而言之,它允許你將幾乎任何類公開為 MDP(儘管存在一些限制)。
考慮以下介面定義
-
Java
-
Kotlin
public interface MessageDelegate {
void handleMessage(String message);
void handleMessage(Map message);
void handleMessage(byte[] message);
void handleMessage(Serializable message);
}
interface MessageDelegate {
fun handleMessage(message: String)
fun handleMessage(message: Map<*, *>)
fun handleMessage(message: ByteArray)
fun handleMessage(message: Serializable)
}
注意,儘管該介面既沒有擴充套件 MessageListener
介面也沒有擴充套件 SessionAwareMessageListener
介面,但你仍然可以使用 MessageListenerAdapter
類將其用作 MDP。另請注意,各種訊息處理方法是如何根據它們可以接收和處理的各種 Message
型別的內容進行強型別化的。
現在考慮 MessageDelegate
介面的以下實現
-
Java
-
Kotlin
public class DefaultMessageDelegate implements MessageDelegate {
@Override
public void handleMessage(String message) {
// ...
}
@Override
public void handleMessage(Map message) {
// ...
}
@Override
public void handleMessage(byte[] message) {
// ...
}
@Override
public void handleMessage(Serializable message) {
// ...
}
}
class DefaultMessageDelegate : MessageDelegate {
override fun handleMessage(message: String) {
// ...
}
override fun handleMessage(message: Map<*, *>) {
// ...
}
override fun handleMessage(message: ByteArray) {
// ...
}
override fun handleMessage(message: Serializable) {
// ...
}
}
特別請注意,上述 MessageDelegate
介面的實現(即 DefaultMessageDelegate
類)完全沒有 JMS 依賴。它確實是一個 POJO,我們可以透過以下配置將其變成 MDP。
-
Java
-
Kotlin
-
Xml
@Bean
MessageListenerAdapter messageListener(DefaultMessageDelegate messageDelegate) {
return new MessageListenerAdapter(messageDelegate);
}
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
return jmsContainer;
}
@Bean
fun messageListener(messageDelegate: DefaultMessageDelegate): MessageListenerAdapter {
return MessageListenerAdapter(messageDelegate)
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
}
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultMessageDelegate"/>
</constructor-arg>
</bean>
<!-- and this is the message listener container... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
下一個示例展示了另一個只能處理接收 JMS TextMessage
訊息的 MDP。注意訊息處理方法實際上是如何被稱為 receive
的(在 MessageListenerAdapter
中,訊息處理方法名稱預設為 handleMessage
),但它是可配置的(如本節後面所示)。另請注意,receive(..)
方法是強型別化的,只接收並響應 JMS TextMessage
訊息。以下列表顯示了 TextMessageDelegate
介面的定義
-
Java
-
Kotlin
public interface TextMessageDelegate {
void receive(TextMessage message);
}
interface TextMessageDelegate {
fun receive(message: TextMessage)
}
以下列表顯示了一個實現 TextMessageDelegate
介面的類
-
Java
-
Kotlin
public class DefaultTextMessageDelegate implements TextMessageDelegate {
@Override
public void receive(TextMessage message) {
// ...
}
}
class DefaultTextMessageDelegate : TextMessageDelegate {
override fun receive(message: TextMessage) {
// ...
}
}
相應的 MessageListenerAdapter
配置如下
-
Java
-
Kotlin
-
Xml
@Bean
MessageListenerAdapter messageListener(DefaultTextMessageDelegate messageDelegate) {
MessageListenerAdapter messageListener = new MessageListenerAdapter(messageDelegate);
messageListener.setDefaultListenerMethod("receive");
// We don't want automatic message context extraction
messageListener.setMessageConverter(null);
return messageListener;
}
@Bean
fun messageListener(messageDelegate: DefaultTextMessageDelegate) = MessageListenerAdapter(messageDelegate).apply {
setDefaultListenerMethod("receive")
// We don't want automatic message context extraction
setMessageConverter(null)
}
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultTextMessageDelegate"/>
</constructor-arg>
<property name="defaultListenerMethod" value="receive"/>
<!-- we don't want automatic message context extraction -->
<property name="messageConverter">
<null/>
</property>
</bean>
請注意,如果 messageListener
接收到的 JMS Message
型別不是 TextMessage
,將丟擲 IllegalStateException
(並隨後被吞沒)。MessageListenerAdapter
類的另一個功能是,如果處理方法返回非 void 值,則能夠自動傳送響應 Message
。考慮以下介面和類
-
Java
-
Kotlin
public interface ResponsiveTextMessageDelegate {
// Notice the return type...
String receive(TextMessage message);
}
interface ResponsiveTextMessageDelegate {
// Notice the return type...
fun receive(message: TextMessage): String
}
-
Java
-
Kotlin
public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
@Override
public String receive(TextMessage message) {
return "message";
}
}
class DefaultResponsiveTextMessageDelegate : ResponsiveTextMessageDelegate {
override fun receive(message: TextMessage): String {
return "message"
}
}
如果你將 DefaultResponsiveTextMessageDelegate
與 MessageListenerAdapter
結合使用,從 `'receive(..)'` 方法執行返回的任何非 null 值(在預設配置下)將被轉換為 TextMessage
。然後,生成的 TextMessage
將傳送到原始 Message
的 JMS Reply-To
屬性中定義的 Destination
(如果存在)或 MessageListenerAdapter
上設定的預設 Destination
(如果已配置)。如果未找到 Destination
,將丟擲 InvalidDestinationException
(注意,此異常不會被吞沒,而是會沿著呼叫棧向上層傳播)。
在事務中處理訊息
在事務中呼叫訊息監聽器只需要重新配置監聽器容器。
你可以透過監聽器容器定義上的 sessionTransacted
標誌啟用本地資源事務。這樣,每次訊息監聽器呼叫都會在活躍的 JMS 事務中執行,如果監聽器執行失敗,訊息接收將被回滾。傳送響應訊息(透過 SessionAwareMessageListener
)是同一本地事務的一部分,但任何其他資源操作(如資料庫訪問)則獨立執行。這通常需要在監聽器實現中進行重複訊息檢測,以應對資料庫處理已提交但訊息處理未能提交的情況。
考慮以下 Bean 定義
-
Java
-
Kotlin
-
Xml
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
jmsContainer.setSessionTransacted(true);
return jmsContainer;
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
isSessionTransacted = true
}
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="sessionTransacted" value="true"/>
</bean>
要參與外部管理的事務,你需要配置一個事務管理器,並使用支援外部管理事務的監聽器容器(通常是 DefaultMessageListenerContainer
)。
要為 XA 事務參與配置訊息監聽器容器,你需要配置一個 JtaTransactionManager
(它預設會委託給 Jakarta EE 伺服器的事務子系統)。請注意,底層的 JMS ConnectionFactory
需要具備 XA 能力,並已在你的 JTA 事務協調器中正確註冊。(請檢查你的 Jakarta EE 伺服器的 JNDI 資源配置。)這樣,訊息接收以及(例如)資料庫訪問就可以作為同一事務的一部分(具有統一的提交語義,但會增加 XA 事務日誌開銷)。
以下 Bean 定義建立了一個事務管理器
-
Java
-
Kotlin
-
Xml
@Bean
JtaTransactionManager transactionManager() {
return new JtaTransactionManager();
}
@Bean
fun transactionManager() = JtaTransactionManager()
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>
然後我們需要將其新增到之前的容器配置中。容器會處理其餘的事情。以下示例展示瞭如何操作
-
Java
-
Kotlin
-
Xml
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
jmsContainer.setSessionTransacted(true);
return jmsContainer;
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener,
transactionManager: JtaTransactionManager) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
setTransactionManager(transactionManager)
}
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="transactionManager" ref="transactionManager"/>
</bean>