接收訊息

本文件描述瞭如何在 Spring 中使用 JMS 接收訊息。

同步接收

雖然 JMS 通常與非同步處理相關聯,但你也可以同步消費訊息。JmsTemplateJmsClient 上的 receive(..) 方法提供了此功能。在同步接收期間,呼叫執行緒會阻塞,直到有訊息可用。這可能是一個危險的操作,因為呼叫執行緒可能會無限期地阻塞。receiveTimeout 屬性指定接收器在放棄等待訊息之前應該等待多長時間。

非同步接收:訊息驅動 POJO

Spring 還透過使用 @JmsListener 註解支援註解監聽器端點,並提供了開放的基礎設施以程式設計方式註冊端點。這是目前為止設定非同步接收器最方便的方式。有關更多詳細資訊,請參閱啟用監聽器端點註解

與 EJB 世界中的訊息驅動 Bean (MDB) 類似,訊息驅動 POJO (MDP) 充當 JMS 訊息的接收器。對 MDP 的一個限制(但請參閱使用 MessageListenerAdapter)是它必須實現 jakarta.jms.MessageListener 介面。請注意,如果你的 POJO 在多個執行緒上接收訊息,那麼確保你的實現是執行緒安全的非常重要。

以下示例展示了一個簡單的 MDP 實現:

  • Java

  • Kotlin

public class ExampleListener implements MessageListener {

	public void onMessage(Message message) {
		if (message instanceof TextMessage textMessage) {
			try {
				System.out.println(textMessage.getText());
			}
			catch (JMSException ex) {
				throw new RuntimeException(ex);
			}
		}
		else {
			throw new IllegalArgumentException("Message must be of type TextMessage");
		}
	}
}
class ExampleListener : MessageListener {

	override fun onMessage(message: Message) {
		if (message is TextMessage) {
			try {
				println(message.text)
			} catch (ex: JMSException) {
				throw RuntimeException(ex)
			}
		} else {
			throw IllegalArgumentException("Message must be of type TextMessage")
		}
	}
}

一旦你實現了你的 MessageListener,就該建立訊息監聽器容器了。

以下示例展示瞭如何定義和配置 Spring 附帶的一個訊息監聽器容器(在本例中為 DefaultMessageListenerContainer):

  • Java

  • Kotlin

  • Xml

@Bean
ExampleListener messageListener() {
	return new ExampleListener();
}

@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
		ExampleListener messageListener) {

	DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
	jmsContainer.setConnectionFactory(connectionFactory);
	jmsContainer.setDestination(destination);
	jmsContainer.setMessageListener(messageListener);
	return jmsContainer;
}
@Bean
fun messageListener() = ExampleListener()

@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
	DefaultMessageListenerContainer().apply {
		setConnectionFactory(connectionFactory)
		setDestination(destination)
		setMessageListener(messageListener)
	}
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener"/>

<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
</bean>

請參閱各種訊息監聽器容器的 Spring javadoc(所有這些容器都實現了 MessageListenerContainer),以獲取每個實現支援的功能的完整描述。

使用 SessionAwareMessageListener 介面

SessionAwareMessageListener 介面是 Spring 特有的介面,它提供了與 JMS MessageListener 介面類似的約定,但同時允許訊息處理方法訪問接收 Message 的 JMS Session。以下清單顯示了 SessionAwareMessageListener 介面的定義:

package org.springframework.jms.listener;

public interface SessionAwareMessageListener {

	void onMessage(Message message, Session session) throws JMSException;
}

如果你希望你的 MDP 能夠響應任何收到的訊息(透過使用 onMessage(Message, Session) 方法中提供的 Session),你可以選擇讓你的 MDP 實現此介面(優先於標準的 JMS MessageListener 介面)。Spring 附帶的所有訊息監聽器容器實現都支援實現 MessageListenerSessionAwareMessageListener 介面的 MDP。實現 SessionAwareMessageListener 的類有一個警告,即它們透過介面與 Spring 繫結。是否使用它的選擇完全取決於你作為應用程式開發人員或架構師。

請注意,SessionAwareMessageListener 介面的 onMessage(..) 方法丟擲 JMSException。與標準 JMS MessageListener 介面不同,在使用 SessionAwareMessageListener 介面時,客戶端程式碼有責任處理任何丟擲的異常。

使用 MessageListenerAdapter

MessageListenerAdapter 類是 Spring 非同步訊息支援的最後一個元件。簡而言之,它允許你將幾乎任何類公開為 MDP(儘管有一些限制)。

考慮以下介面定義:

  • Java

  • Kotlin

public interface MessageDelegate {

	void handleMessage(String message);

	void handleMessage(Map message);

	void handleMessage(byte[] message);

	void handleMessage(Serializable message);
}
interface MessageDelegate {
	fun handleMessage(message: String)
	fun handleMessage(message: Map<*, *>)
	fun handleMessage(message: ByteArray)
	fun handleMessage(message: Serializable)
}

請注意,儘管該介面既不擴充套件 MessageListener 介面也不擴充套件 SessionAwareMessageListener 介面,但你仍然可以使用 MessageListenerAdapter 類將其用作 MDP。另請注意,各種訊息處理方法如何根據它們可以接收和處理的各種 Message 型別的內容進行強型別化。

現在考慮 MessageDelegate 介面的以下實現:

  • Java

  • Kotlin

public class DefaultMessageDelegate implements MessageDelegate {

	@Override
	public void handleMessage(String message) {
		// ...
	}

	@Override
	public void handleMessage(Map message) {
		// ...
	}

	@Override
	public void handleMessage(byte[] message) {
		// ...
	}

	@Override
	public void handleMessage(Serializable message) {
		// ...
	}
}
class DefaultMessageDelegate : MessageDelegate {

	override fun handleMessage(message: String) {
		// ...
	}

	override fun handleMessage(message: Map<*, *>) {
		// ...
	}

	override fun handleMessage(message: ByteArray) {
		// ...
	}

	override fun handleMessage(message: Serializable) {
		// ...
	}
}

特別注意,MessageDelegate 介面的上述實現 (DefaultMessageDelegate 類) 根本沒有任何 JMS 依賴。它確實是一個 POJO,我們可以透過以下配置將其轉換為 MDP:

  • Java

  • Kotlin

  • Xml

@Bean
MessageListenerAdapter messageListener(DefaultMessageDelegate messageDelegate) {
	return new MessageListenerAdapter(messageDelegate);
}

@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
		ExampleListener messageListener) {

	DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
	jmsContainer.setConnectionFactory(connectionFactory);
	jmsContainer.setDestination(destination);
	jmsContainer.setMessageListener(messageListener);
	return jmsContainer;
}
@Bean
fun messageListener(messageDelegate: DefaultMessageDelegate): MessageListenerAdapter {
	return MessageListenerAdapter(messageDelegate)
}

@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
	DefaultMessageListenerContainer().apply {
		setConnectionFactory(connectionFactory)
		setDestination(destination)
		setMessageListener(messageListener)
	}
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
	<constructor-arg>
		<bean class="jmsexample.DefaultMessageDelegate"/>
	</constructor-arg>
</bean>

<!-- and this is the message listener container... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
</bean>

下一個示例展示了另一個 MDP,它只能處理接收 JMS TextMessage 訊息。請注意,訊息處理方法實際上叫做 receiveMessageListenerAdapter 中訊息處理方法的預設名稱是 handleMessage),但它是可配置的(你可以在本節後面看到)。另請注意,receive(..) 方法是強型別化的,只能接收和響應 JMS TextMessage 訊息。以下清單顯示了 TextMessageDelegate 介面的定義:

  • Java

  • Kotlin

public interface TextMessageDelegate {

	void receive(TextMessage message);
}
interface TextMessageDelegate {
	fun receive(message: TextMessage)
}

以下清單顯示了一個實現 TextMessageDelegate 介面的類:

  • Java

  • Kotlin

public class DefaultTextMessageDelegate implements TextMessageDelegate {

	@Override
	public void receive(TextMessage message) {
		// ...
	}
}
class DefaultTextMessageDelegate : TextMessageDelegate {

	override fun receive(message: TextMessage) {
		// ...
	}
}

伴隨的 MessageListenerAdapter 的配置如下:

  • Java

  • Kotlin

  • Xml

@Bean
MessageListenerAdapter messageListener(DefaultTextMessageDelegate messageDelegate) {
	MessageListenerAdapter messageListener = new MessageListenerAdapter(messageDelegate);
	messageListener.setDefaultListenerMethod("receive");
	// We don't want automatic message context extraction
	messageListener.setMessageConverter(null);
	return messageListener;
}
@Bean
fun messageListener(messageDelegate: DefaultTextMessageDelegate) = MessageListenerAdapter(messageDelegate).apply {
	setDefaultListenerMethod("receive")
	// We don't want automatic message context extraction
	setMessageConverter(null)
}
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
	<constructor-arg>
		<bean class="jmsexample.DefaultTextMessageDelegate"/>
	</constructor-arg>
	<property name="defaultListenerMethod" value="receive"/>
	<!-- we don't want automatic message context extraction -->
	<property name="messageConverter">
		<null/>
	</property>
</bean>

請注意,如果 messageListener 接收到的 JMS Message 型別不是 TextMessage,則會丟擲(並隨後吞噬)IllegalStateExceptionMessageListenerAdapter 類的另一個功能是,如果處理程式方法返回非空值,則能夠自動傳送響應 Message。考慮以下介面和類:

  • Java

  • Kotlin

public interface ResponsiveTextMessageDelegate {

	// Notice the return type...
	String receive(TextMessage message);
}
interface ResponsiveTextMessageDelegate {

	// Notice the return type...
	fun receive(message: TextMessage): String
}
  • Java

  • Kotlin

public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {

	@Override
	public String receive(TextMessage message) {
		return "message";
	}
}
class DefaultResponsiveTextMessageDelegate : ResponsiveTextMessageDelegate {

	override fun receive(message: TextMessage): String {
		return "message"
	}
}

如果您將 DefaultResponsiveTextMessageDelegateMessageListenerAdapter 結合使用,從 'receive(..)' 方法執行返回的任何非空值(在預設配置中)都會轉換為 TextMessage。生成的 TextMessage 將被髮送到原始 Message 的 JMS Reply-To 屬性中定義的 Destination(如果存在),或者傳送到 MessageListenerAdapter 上設定的預設 Destination(如果已配置)。如果未找到 Destination,則會丟擲 InvalidDestinationException(請注意,此異常不會被吞噬並會沿著呼叫堆疊傳播)。

在事務中處理訊息

在事務中呼叫訊息監聽器只需要重新配置監聽器容器即可。

您可以透過監聽器容器定義上的 sessionTransacted 標誌啟用本地資源事務。然後,每個訊息監聽器呼叫都在一個活動的 JMS 事務中操作,如果監聽器執行失敗,訊息接收將被回滾。傳送響應訊息(透過 SessionAwareMessageListener)是同一本地事務的一部分,但任何其他資源操作(例如資料庫訪問)都獨立執行。這通常需要在監聽器實現中進行重複訊息檢測,以涵蓋資料庫處理已提交但訊息處理未能提交的情況。

考慮以下 bean 定義:

  • Java

  • Kotlin

  • Xml

@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
		ExampleListener messageListener) {

	DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
	jmsContainer.setConnectionFactory(connectionFactory);
	jmsContainer.setDestination(destination);
	jmsContainer.setMessageListener(messageListener);
	jmsContainer.setSessionTransacted(true);
	return jmsContainer;
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
	DefaultMessageListenerContainer().apply {
		setConnectionFactory(connectionFactory)
		setDestination(destination)
		setMessageListener(messageListener)
		isSessionTransacted = true
	}
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
	<property name="sessionTransacted" value="true"/>
</bean>

要參與外部管理的事務,你需要配置一個事務管理器,並使用支援外部管理事務的監聽器容器(通常是 DefaultMessageListenerContainer)。

要配置訊息監聽器容器以參與 XA 事務,你需要配置一個 JtaTransactionManager(它預設委託給 Jakarta EE 伺服器的事務子系統)。請注意,底層的 JMS ConnectionFactory 需要支援 XA 並正確註冊到你的 JTA 事務協調器。(檢查你的 Jakarta EE 伺服器的 JNDI 資源配置。)這使得訊息接收以及(例如)資料庫訪問可以成為同一事務的一部分(具有統一的提交語義,但以 XA 事務日誌開銷為代價)。

以下 bean 定義建立了一個事務管理器:

  • Java

  • Kotlin

  • Xml

@Bean
JtaTransactionManager transactionManager()  {
	return new JtaTransactionManager();
}
@Bean
fun transactionManager() = JtaTransactionManager()
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>

然後,我們需要將其新增到我們之前的容器配置中。容器會處理其餘的事情。以下示例展示瞭如何操作:

  • Java

  • Kotlin

  • Xml

@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
		ExampleListener messageListener) {

	DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
	jmsContainer.setConnectionFactory(connectionFactory);
	jmsContainer.setDestination(destination);
	jmsContainer.setMessageListener(messageListener);
	jmsContainer.setSessionTransacted(true);
	return jmsContainer;
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener,
				 transactionManager: JtaTransactionManager) =
	DefaultMessageListenerContainer().apply {
		setConnectionFactory(connectionFactory)
		setDestination(destination)
		setMessageListener(messageListener)
		setTransactionManager(transactionManager)
	}
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
	<property name="transactionManager" ref="transactionManager"/>
</bean>
© . This site is unofficial and not affiliated with VMware.