接收訊息
本文件描述瞭如何在 Spring 中使用 JMS 接收訊息。
同步接收
雖然 JMS 通常與非同步處理相關聯,但你也可以同步消費訊息。JmsTemplate 和 JmsClient 上的 receive(..) 方法提供了此功能。在同步接收期間,呼叫執行緒會阻塞,直到有訊息可用。這可能是一個危險的操作,因為呼叫執行緒可能會無限期地阻塞。receiveTimeout 屬性指定接收器在放棄等待訊息之前應該等待多長時間。
非同步接收:訊息驅動 POJO
Spring 還透過使用 @JmsListener 註解支援註解監聽器端點,並提供了開放的基礎設施以程式設計方式註冊端點。這是目前為止設定非同步接收器最方便的方式。有關更多詳細資訊,請參閱啟用監聽器端點註解。 |
與 EJB 世界中的訊息驅動 Bean (MDB) 類似,訊息驅動 POJO (MDP) 充當 JMS 訊息的接收器。對 MDP 的一個限制(但請參閱使用 MessageListenerAdapter)是它必須實現 jakarta.jms.MessageListener 介面。請注意,如果你的 POJO 在多個執行緒上接收訊息,那麼確保你的實現是執行緒安全的非常重要。
以下示例展示了一個簡單的 MDP 實現:
-
Java
-
Kotlin
public class ExampleListener implements MessageListener {
public void onMessage(Message message) {
if (message instanceof TextMessage textMessage) {
try {
System.out.println(textMessage.getText());
}
catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
else {
throw new IllegalArgumentException("Message must be of type TextMessage");
}
}
}
class ExampleListener : MessageListener {
override fun onMessage(message: Message) {
if (message is TextMessage) {
try {
println(message.text)
} catch (ex: JMSException) {
throw RuntimeException(ex)
}
} else {
throw IllegalArgumentException("Message must be of type TextMessage")
}
}
}
一旦你實現了你的 MessageListener,就該建立訊息監聽器容器了。
以下示例展示瞭如何定義和配置 Spring 附帶的一個訊息監聽器容器(在本例中為 DefaultMessageListenerContainer):
-
Java
-
Kotlin
-
Xml
@Bean
ExampleListener messageListener() {
return new ExampleListener();
}
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
return jmsContainer;
}
@Bean
fun messageListener() = ExampleListener()
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
}
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener"/>
<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
請參閱各種訊息監聽器容器的 Spring javadoc(所有這些容器都實現了 MessageListenerContainer),以獲取每個實現支援的功能的完整描述。
使用 SessionAwareMessageListener 介面
SessionAwareMessageListener 介面是 Spring 特有的介面,它提供了與 JMS MessageListener 介面類似的約定,但同時允許訊息處理方法訪問接收 Message 的 JMS Session。以下清單顯示了 SessionAwareMessageListener 介面的定義:
package org.springframework.jms.listener;
public interface SessionAwareMessageListener {
void onMessage(Message message, Session session) throws JMSException;
}
如果你希望你的 MDP 能夠響應任何收到的訊息(透過使用 onMessage(Message, Session) 方法中提供的 Session),你可以選擇讓你的 MDP 實現此介面(優先於標準的 JMS MessageListener 介面)。Spring 附帶的所有訊息監聽器容器實現都支援實現 MessageListener 或 SessionAwareMessageListener 介面的 MDP。實現 SessionAwareMessageListener 的類有一個警告,即它們透過介面與 Spring 繫結。是否使用它的選擇完全取決於你作為應用程式開發人員或架構師。
請注意,SessionAwareMessageListener 介面的 onMessage(..) 方法丟擲 JMSException。與標準 JMS MessageListener 介面不同,在使用 SessionAwareMessageListener 介面時,客戶端程式碼有責任處理任何丟擲的異常。
使用 MessageListenerAdapter
MessageListenerAdapter 類是 Spring 非同步訊息支援的最後一個元件。簡而言之,它允許你將幾乎任何類公開為 MDP(儘管有一些限制)。
考慮以下介面定義:
-
Java
-
Kotlin
public interface MessageDelegate {
void handleMessage(String message);
void handleMessage(Map message);
void handleMessage(byte[] message);
void handleMessage(Serializable message);
}
interface MessageDelegate {
fun handleMessage(message: String)
fun handleMessage(message: Map<*, *>)
fun handleMessage(message: ByteArray)
fun handleMessage(message: Serializable)
}
請注意,儘管該介面既不擴充套件 MessageListener 介面也不擴充套件 SessionAwareMessageListener 介面,但你仍然可以使用 MessageListenerAdapter 類將其用作 MDP。另請注意,各種訊息處理方法如何根據它們可以接收和處理的各種 Message 型別的內容進行強型別化。
現在考慮 MessageDelegate 介面的以下實現:
-
Java
-
Kotlin
public class DefaultMessageDelegate implements MessageDelegate {
@Override
public void handleMessage(String message) {
// ...
}
@Override
public void handleMessage(Map message) {
// ...
}
@Override
public void handleMessage(byte[] message) {
// ...
}
@Override
public void handleMessage(Serializable message) {
// ...
}
}
class DefaultMessageDelegate : MessageDelegate {
override fun handleMessage(message: String) {
// ...
}
override fun handleMessage(message: Map<*, *>) {
// ...
}
override fun handleMessage(message: ByteArray) {
// ...
}
override fun handleMessage(message: Serializable) {
// ...
}
}
特別注意,MessageDelegate 介面的上述實現 (DefaultMessageDelegate 類) 根本沒有任何 JMS 依賴。它確實是一個 POJO,我們可以透過以下配置將其轉換為 MDP:
-
Java
-
Kotlin
-
Xml
@Bean
MessageListenerAdapter messageListener(DefaultMessageDelegate messageDelegate) {
return new MessageListenerAdapter(messageDelegate);
}
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
return jmsContainer;
}
@Bean
fun messageListener(messageDelegate: DefaultMessageDelegate): MessageListenerAdapter {
return MessageListenerAdapter(messageDelegate)
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
}
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultMessageDelegate"/>
</constructor-arg>
</bean>
<!-- and this is the message listener container... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
下一個示例展示了另一個 MDP,它只能處理接收 JMS TextMessage 訊息。請注意,訊息處理方法實際上叫做 receive(MessageListenerAdapter 中訊息處理方法的預設名稱是 handleMessage),但它是可配置的(你可以在本節後面看到)。另請注意,receive(..) 方法是強型別化的,只能接收和響應 JMS TextMessage 訊息。以下清單顯示了 TextMessageDelegate 介面的定義:
-
Java
-
Kotlin
public interface TextMessageDelegate {
void receive(TextMessage message);
}
interface TextMessageDelegate {
fun receive(message: TextMessage)
}
以下清單顯示了一個實現 TextMessageDelegate 介面的類:
-
Java
-
Kotlin
public class DefaultTextMessageDelegate implements TextMessageDelegate {
@Override
public void receive(TextMessage message) {
// ...
}
}
class DefaultTextMessageDelegate : TextMessageDelegate {
override fun receive(message: TextMessage) {
// ...
}
}
伴隨的 MessageListenerAdapter 的配置如下:
-
Java
-
Kotlin
-
Xml
@Bean
MessageListenerAdapter messageListener(DefaultTextMessageDelegate messageDelegate) {
MessageListenerAdapter messageListener = new MessageListenerAdapter(messageDelegate);
messageListener.setDefaultListenerMethod("receive");
// We don't want automatic message context extraction
messageListener.setMessageConverter(null);
return messageListener;
}
@Bean
fun messageListener(messageDelegate: DefaultTextMessageDelegate) = MessageListenerAdapter(messageDelegate).apply {
setDefaultListenerMethod("receive")
// We don't want automatic message context extraction
setMessageConverter(null)
}
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultTextMessageDelegate"/>
</constructor-arg>
<property name="defaultListenerMethod" value="receive"/>
<!-- we don't want automatic message context extraction -->
<property name="messageConverter">
<null/>
</property>
</bean>
請注意,如果 messageListener 接收到的 JMS Message 型別不是 TextMessage,則會丟擲(並隨後吞噬)IllegalStateException。MessageListenerAdapter 類的另一個功能是,如果處理程式方法返回非空值,則能夠自動傳送響應 Message。考慮以下介面和類:
-
Java
-
Kotlin
public interface ResponsiveTextMessageDelegate {
// Notice the return type...
String receive(TextMessage message);
}
interface ResponsiveTextMessageDelegate {
// Notice the return type...
fun receive(message: TextMessage): String
}
-
Java
-
Kotlin
public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
@Override
public String receive(TextMessage message) {
return "message";
}
}
class DefaultResponsiveTextMessageDelegate : ResponsiveTextMessageDelegate {
override fun receive(message: TextMessage): String {
return "message"
}
}
如果您將 DefaultResponsiveTextMessageDelegate 與 MessageListenerAdapter 結合使用,從 'receive(..)' 方法執行返回的任何非空值(在預設配置中)都會轉換為 TextMessage。生成的 TextMessage 將被髮送到原始 Message 的 JMS Reply-To 屬性中定義的 Destination(如果存在),或者傳送到 MessageListenerAdapter 上設定的預設 Destination(如果已配置)。如果未找到 Destination,則會丟擲 InvalidDestinationException(請注意,此異常不會被吞噬並會沿著呼叫堆疊傳播)。
在事務中處理訊息
在事務中呼叫訊息監聽器只需要重新配置監聽器容器即可。
您可以透過監聽器容器定義上的 sessionTransacted 標誌啟用本地資源事務。然後,每個訊息監聽器呼叫都在一個活動的 JMS 事務中操作,如果監聽器執行失敗,訊息接收將被回滾。傳送響應訊息(透過 SessionAwareMessageListener)是同一本地事務的一部分,但任何其他資源操作(例如資料庫訪問)都獨立執行。這通常需要在監聽器實現中進行重複訊息檢測,以涵蓋資料庫處理已提交但訊息處理未能提交的情況。
考慮以下 bean 定義:
-
Java
-
Kotlin
-
Xml
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
jmsContainer.setSessionTransacted(true);
return jmsContainer;
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
isSessionTransacted = true
}
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="sessionTransacted" value="true"/>
</bean>
要參與外部管理的事務,你需要配置一個事務管理器,並使用支援外部管理事務的監聽器容器(通常是 DefaultMessageListenerContainer)。
要配置訊息監聽器容器以參與 XA 事務,你需要配置一個 JtaTransactionManager(它預設委託給 Jakarta EE 伺服器的事務子系統)。請注意,底層的 JMS ConnectionFactory 需要支援 XA 並正確註冊到你的 JTA 事務協調器。(檢查你的 Jakarta EE 伺服器的 JNDI 資源配置。)這使得訊息接收以及(例如)資料庫訪問可以成為同一事務的一部分(具有統一的提交語義,但以 XA 事務日誌開銷為代價)。
以下 bean 定義建立了一個事務管理器:
-
Java
-
Kotlin
-
Xml
@Bean
JtaTransactionManager transactionManager() {
return new JtaTransactionManager();
}
@Bean
fun transactionManager() = JtaTransactionManager()
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>
然後,我們需要將其新增到我們之前的容器配置中。容器會處理其餘的事情。以下示例展示瞭如何操作:
-
Java
-
Kotlin
-
Xml
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
jmsContainer.setSessionTransacted(true);
return jmsContainer;
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener,
transactionManager: JtaTransactionManager) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
setTransactionManager(transactionManager)
}
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="transactionManager" ref="transactionManager"/>
</bean>