接收訊息

本節介紹如何在 Spring 中使用 JMS 接收訊息。

同步接收

雖然 JMS 通常與非同步處理相關聯,但你也可以同步消費訊息。過載的 receive(..) 方法提供了此功能。在同步接收期間,呼叫執行緒會阻塞,直到訊息可用。這可能是一個危險的操作,因為呼叫執行緒可能會無限期地阻塞。receiveTimeout 屬性指定了接收器在放棄等待訊息之前應該等待多長時間。

非同步接收:訊息驅動的 POJO (MDP)

Spring 還透過使用 `@JmsListener` 註解支援註解監聽器端點,並提供開放的基礎設施來透過程式設計方式註冊端點。這是迄今為止設定非同步接收器最方便的方法。更多詳情請參閱 啟用監聽器端點註解

與 EJB 世界中的訊息驅動 Bean (MDB) 類似,訊息驅動 POJO (MDP) 充當 JMS 訊息的接收器。對 MDP 的一個限制(但請參閱 使用 MessageListenerAdapter)是它必須實現 jakarta.jms.MessageListener 介面。請注意,如果你的 POJO 在多個執行緒上接收訊息,務必確保你的實現是執行緒安全的。

以下示例展示了一個簡單的 MDP 實現

  • Java

  • Kotlin

public class ExampleListener implements MessageListener {

	public void onMessage(Message message) {
		if (message instanceof TextMessage textMessage) {
			try {
				System.out.println(textMessage.getText());
			}
			catch (JMSException ex) {
				throw new RuntimeException(ex);
			}
		}
		else {
			throw new IllegalArgumentException("Message must be of type TextMessage");
		}
	}
}
class ExampleListener : MessageListener {

	override fun onMessage(message: Message) {
		if (message is TextMessage) {
			try {
				println(message.text)
			} catch (ex: JMSException) {
				throw RuntimeException(ex)
			}
		} else {
			throw IllegalArgumentException("Message must be of type TextMessage")
		}
	}
}

一旦你實現了 MessageListener,就可以建立一個訊息監聽器容器了。

以下示例展示瞭如何定義和配置 Spring 自帶的一種訊息監聽器容器(在本例中為 DefaultMessageListenerContainer

  • Java

  • Kotlin

  • Xml

@Bean
ExampleListener messageListener() {
	return new ExampleListener();
}

@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
		ExampleListener messageListener) {

	DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
	jmsContainer.setConnectionFactory(connectionFactory);
	jmsContainer.setDestination(destination);
	jmsContainer.setMessageListener(messageListener);
	return jmsContainer;
}
@Bean
fun messageListener() = ExampleListener()

@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
	DefaultMessageListenerContainer().apply {
		setConnectionFactory(connectionFactory)
		setDestination(destination)
		setMessageListener(messageListener)
	}
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener"/>

<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
</bean>

請參閱各種訊息監聽器容器(它們都實現了 MessageListenerContainer)的 Spring javadoc,以獲取每種實現支援功能的完整描述。

使用 SessionAwareMessageListener 介面

SessionAwareMessageListener 介面是一個 Spring 特有的介面,它提供了一個與 JMS MessageListener 介面類似的契約,但同時也允許訊息處理方法訪問接收到 Message 的 JMS Session。以下列表顯示了 SessionAwareMessageListener 介面的定義

package org.springframework.jms.listener;

public interface SessionAwareMessageListener {

	void onMessage(Message message, Session session) throws JMSException;
}

如果你希望你的 MDP 能夠響應任何接收到的訊息(透過使用 onMessage(Message, Session) 方法中提供的 Session),你可以選擇讓你的 MDP 實現此介面(優先於標準的 JMS MessageListener 介面)。Spring 自帶的所有訊息監聽器容器實現都支援實現 MessageListenerSessionAwareMessageListener 介面的 MDP。實現了 SessionAwareMessageListener 的類有一個注意事項:它們透過介面與 Spring 耦合。是否使用它完全取決於你作為應用開發者或架構師的決定。

請注意,SessionAwareMessageListener 介面的 onMessage(..) 方法會丟擲 JMSException。與標準的 JMS MessageListener 介面不同,當使用 SessionAwareMessageListener 介面時,客戶端程式碼有責任處理所有丟擲的異常。

使用 MessageListenerAdapter

MessageListenerAdapter 類是 Spring 非同步訊息支援中的最後一個元件。簡而言之,它允許你將幾乎任何類公開為 MDP(儘管存在一些限制)。

考慮以下介面定義

  • Java

  • Kotlin

public interface MessageDelegate {

	void handleMessage(String message);

	void handleMessage(Map message);

	void handleMessage(byte[] message);

	void handleMessage(Serializable message);
}
interface MessageDelegate {
	fun handleMessage(message: String)
	fun handleMessage(message: Map<*, *>)
	fun handleMessage(message: ByteArray)
	fun handleMessage(message: Serializable)
}

注意,儘管該介面既沒有擴充套件 MessageListener 介面也沒有擴充套件 SessionAwareMessageListener 介面,但你仍然可以使用 MessageListenerAdapter 類將其用作 MDP。另請注意,各種訊息處理方法是如何根據它們可以接收和處理的各種 Message 型別的內容進行強型別化的。

現在考慮 MessageDelegate 介面的以下實現

  • Java

  • Kotlin

public class DefaultMessageDelegate implements MessageDelegate {

	@Override
	public void handleMessage(String message) {
		// ...
	}

	@Override
	public void handleMessage(Map message) {
		// ...
	}

	@Override
	public void handleMessage(byte[] message) {
		// ...
	}

	@Override
	public void handleMessage(Serializable message) {
		// ...
	}
}
class DefaultMessageDelegate : MessageDelegate {

	override fun handleMessage(message: String) {
		// ...
	}

	override fun handleMessage(message: Map<*, *>) {
		// ...
	}

	override fun handleMessage(message: ByteArray) {
		// ...
	}

	override fun handleMessage(message: Serializable) {
		// ...
	}
}

特別請注意,上述 MessageDelegate 介面的實現(即 DefaultMessageDelegate 類)完全沒有 JMS 依賴。它確實是一個 POJO,我們可以透過以下配置將其變成 MDP。

  • Java

  • Kotlin

  • Xml

@Bean
MessageListenerAdapter messageListener(DefaultMessageDelegate messageDelegate) {
	return new MessageListenerAdapter(messageDelegate);
}

@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
		ExampleListener messageListener) {

	DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
	jmsContainer.setConnectionFactory(connectionFactory);
	jmsContainer.setDestination(destination);
	jmsContainer.setMessageListener(messageListener);
	return jmsContainer;
}
@Bean
fun messageListener(messageDelegate: DefaultMessageDelegate): MessageListenerAdapter {
	return MessageListenerAdapter(messageDelegate)
}

@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
	DefaultMessageListenerContainer().apply {
		setConnectionFactory(connectionFactory)
		setDestination(destination)
		setMessageListener(messageListener)
	}
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
	<constructor-arg>
		<bean class="jmsexample.DefaultMessageDelegate"/>
	</constructor-arg>
</bean>

<!-- and this is the message listener container... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
</bean>

下一個示例展示了另一個只能處理接收 JMS TextMessage 訊息的 MDP。注意訊息處理方法實際上是如何被稱為 receive 的(在 MessageListenerAdapter 中,訊息處理方法名稱預設為 handleMessage),但它是可配置的(如本節後面所示)。另請注意,receive(..) 方法是強型別化的,只接收並響應 JMS TextMessage 訊息。以下列表顯示了 TextMessageDelegate 介面的定義

  • Java

  • Kotlin

public interface TextMessageDelegate {

	void receive(TextMessage message);
}
interface TextMessageDelegate {
	fun receive(message: TextMessage)
}

以下列表顯示了一個實現 TextMessageDelegate 介面的類

  • Java

  • Kotlin

public class DefaultTextMessageDelegate implements TextMessageDelegate {

	@Override
	public void receive(TextMessage message) {
		// ...
	}
}
class DefaultTextMessageDelegate : TextMessageDelegate {

	override fun receive(message: TextMessage) {
		// ...
	}
}

相應的 MessageListenerAdapter 配置如下

  • Java

  • Kotlin

  • Xml

@Bean
MessageListenerAdapter messageListener(DefaultTextMessageDelegate messageDelegate) {
	MessageListenerAdapter messageListener = new MessageListenerAdapter(messageDelegate);
	messageListener.setDefaultListenerMethod("receive");
	// We don't want automatic message context extraction
	messageListener.setMessageConverter(null);
	return messageListener;
}
@Bean
fun messageListener(messageDelegate: DefaultTextMessageDelegate) = MessageListenerAdapter(messageDelegate).apply {
	setDefaultListenerMethod("receive")
	// We don't want automatic message context extraction
	setMessageConverter(null)
}
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
	<constructor-arg>
		<bean class="jmsexample.DefaultTextMessageDelegate"/>
	</constructor-arg>
	<property name="defaultListenerMethod" value="receive"/>
	<!-- we don't want automatic message context extraction -->
	<property name="messageConverter">
		<null/>
	</property>
</bean>

請注意,如果 messageListener 接收到的 JMS Message 型別不是 TextMessage,將丟擲 IllegalStateException(並隨後被吞沒)。MessageListenerAdapter 類的另一個功能是,如果處理方法返回非 void 值,則能夠自動傳送響應 Message。考慮以下介面和類

  • Java

  • Kotlin

public interface ResponsiveTextMessageDelegate {

	// Notice the return type...
	String receive(TextMessage message);
}
interface ResponsiveTextMessageDelegate {

	// Notice the return type...
	fun receive(message: TextMessage): String
}
  • Java

  • Kotlin

public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {

	@Override
	public String receive(TextMessage message) {
		return "message";
	}
}
class DefaultResponsiveTextMessageDelegate : ResponsiveTextMessageDelegate {

	override fun receive(message: TextMessage): String {
		return "message"
	}
}

如果你將 DefaultResponsiveTextMessageDelegateMessageListenerAdapter 結合使用,從 `'receive(..)'` 方法執行返回的任何非 null 值(在預設配置下)將被轉換為 TextMessage。然後,生成的 TextMessage 將傳送到原始 Message 的 JMS Reply-To 屬性中定義的 Destination(如果存在)或 MessageListenerAdapter 上設定的預設 Destination(如果已配置)。如果未找到 Destination,將丟擲 InvalidDestinationException(注意,此異常不會被吞沒,而是會沿著呼叫棧向上層傳播)。

在事務中處理訊息

在事務中呼叫訊息監聽器只需要重新配置監聽器容器。

你可以透過監聽器容器定義上的 sessionTransacted 標誌啟用本地資源事務。這樣,每次訊息監聽器呼叫都會在活躍的 JMS 事務中執行,如果監聽器執行失敗,訊息接收將被回滾。傳送響應訊息(透過 SessionAwareMessageListener)是同一本地事務的一部分,但任何其他資源操作(如資料庫訪問)則獨立執行。這通常需要在監聽器實現中進行重複訊息檢測,以應對資料庫處理已提交但訊息處理未能提交的情況。

考慮以下 Bean 定義

  • Java

  • Kotlin

  • Xml

@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
		ExampleListener messageListener) {

	DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
	jmsContainer.setConnectionFactory(connectionFactory);
	jmsContainer.setDestination(destination);
	jmsContainer.setMessageListener(messageListener);
	jmsContainer.setSessionTransacted(true);
	return jmsContainer;
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
	DefaultMessageListenerContainer().apply {
		setConnectionFactory(connectionFactory)
		setDestination(destination)
		setMessageListener(messageListener)
		isSessionTransacted = true
	}
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
	<property name="sessionTransacted" value="true"/>
</bean>

要參與外部管理的事務,你需要配置一個事務管理器,並使用支援外部管理事務的監聽器容器(通常是 DefaultMessageListenerContainer)。

要為 XA 事務參與配置訊息監聽器容器,你需要配置一個 JtaTransactionManager(它預設會委託給 Jakarta EE 伺服器的事務子系統)。請注意,底層的 JMS ConnectionFactory 需要具備 XA 能力,並已在你的 JTA 事務協調器中正確註冊。(請檢查你的 Jakarta EE 伺服器的 JNDI 資源配置。)這樣,訊息接收以及(例如)資料庫訪問就可以作為同一事務的一部分(具有統一的提交語義,但會增加 XA 事務日誌開銷)。

以下 Bean 定義建立了一個事務管理器

  • Java

  • Kotlin

  • Xml

@Bean
JtaTransactionManager transactionManager()  {
	return new JtaTransactionManager();
}
@Bean
fun transactionManager() = JtaTransactionManager()
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>

然後我們需要將其新增到之前的容器配置中。容器會處理其餘的事情。以下示例展示瞭如何操作

  • Java

  • Kotlin

  • Xml

@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
		ExampleListener messageListener) {

	DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
	jmsContainer.setConnectionFactory(connectionFactory);
	jmsContainer.setDestination(destination);
	jmsContainer.setMessageListener(messageListener);
	jmsContainer.setSessionTransacted(true);
	return jmsContainer;
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener,
				 transactionManager: JtaTransactionManager) =
	DefaultMessageListenerContainer().apply {
		setConnectionFactory(connectionFactory)
		setDestination(destination)
		setMessageListener(messageListener)
		setTransactionManager(transactionManager)
	}
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
	<property name="transactionManager" ref="transactionManager"/>
</bean>