註解驅動的監聽器端點

以非同步方式接收訊息最簡單的方法是使用註解驅動的監聽器端點基礎設施。簡而言之,它允許您將託管 Bean 的方法公開為 JMS 監聽器端點。以下示例展示瞭如何使用它

@Component
public class MyService {

	@JmsListener(destination = "myDestination")
	public void processOrder(String data) { ... }
}

前面示例的思想是,每當 jakarta.jms.Destination myDestination 上的訊息可用時,processOrder 就會相應地呼叫 方法(在此示例中,使用 JMS 訊息的內容,類似於 MessageListenerAdapter 提供的功能)。

註解驅動的端點基礎設施會為每個註解方法在幕後建立一個訊息監聽器容器,透過使用 JmsListenerContainerFactory。此類容器未在 application context 註冊,但可以透過使用 JmsListenerEndpointRegistry bean 輕鬆定位以進行管理。

@JmsListener 是 Java 8 中可重複使用的註解,因此您可以透過向同一方法新增額外的 @JmsListener 宣告來將其與多個 JMS 目的地關聯。

啟用監聽器端點註解

要啟用對 @JmsListener 註解的支援,您可以將 @EnableJms 新增到您的 @Configuration 類之一中,如下例所示:

  • Java

  • Kotlin

  • Xml

@Configuration
@EnableJms
public class JmsConfiguration {

	@Bean
	public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory,
			DestinationResolver destinationResolver) {

		DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
		factory.setConnectionFactory(connectionFactory);
		factory.setDestinationResolver(destinationResolver);
		factory.setSessionTransacted(true);
		factory.setConcurrency("3-10");
		return factory;
	}
}
@Configuration
@EnableJms
class JmsConfiguration {

	@Bean
	fun jmsListenerContainerFactory(connectionFactory: ConnectionFactory, destinationResolver: DestinationResolver) =
		DefaultJmsListenerContainerFactory().apply {
			setConnectionFactory(connectionFactory)
			setDestinationResolver(destinationResolver)
			setSessionTransacted(true)
			setConcurrency("3-10")
		}
}
<jms:annotation-driven/>

<bean id="jmsListenerContainerFactory" class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destinationResolver" ref="destinationResolver"/>
	<property name="sessionTransacted" value="true"/>
	<property name="concurrency" value="3-10"/>
</bean>

預設情況下,基礎設施會查詢名為 jmsListenerContainerFactory 的 bean 作為建立訊息監聽器容器的工廠來源。在這種情況下(並忽略 JMS 基礎設施設定),您可以使用三個核心執行緒和十個最大執行緒的執行緒池來呼叫 processOrder 方法。

您可以為每個註解定製要使用的監聽器容器工廠,或者透過實現 JmsListenerConfigurer 介面來配置顯式預設設定。僅當至少有一個端點未註冊特定容器工廠時才需要預設設定。有關詳細資訊和示例,請參閱實現 JmsListenerConfigurer 的類的 javadoc。

程式設計式端點註冊

JmsListenerEndpoint 提供了 JMS 端點的模型,並負責為該模型配置容器。除了透過 JmsListener 註解檢測到的端點外,該基礎設施還允許您程式設計式地配置端點。以下示例展示瞭如何進行:

@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {

	@Override
	public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
		SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
		endpoint.setId("myJmsEndpoint");
		endpoint.setDestination("anotherQueue");
		endpoint.setMessageListener(message -> {
			// processing
		});
		registrar.registerEndpoint(endpoint);
	}
}

在前面的示例中,我們使用了 SimpleJmsListenerEndpoint,它提供了要呼叫的實際 MessageListener。但是,您也可以構建自己的端點變體來描述自定義呼叫機制。

請注意,您可以完全跳過使用 @JmsListener,而僅透過 JmsListenerConfigurer 程式設計式地註冊您的端點。

註解端點方法簽名

到目前為止,我們一直在端點中注入一個簡單的 String,但它實際上可以具有非常靈活的方法簽名。在以下示例中,我們將其重寫為注入帶有自定義 header 的 Order

@Component
public class MyService {

	@JmsListener(destination = "myDestination")
	public void processOrder(Order order, @Header("order_type") String orderType) {
		...
	}
}

您可以在 JMS 監聽器端點中注入的主要元素如下:

  • 原始 jakarta.jms.Message 或其任何子類(前提是它與傳入的訊息型別匹配)。

  • 用於可選訪問原生 JMS API 的 jakarta.jms.Session (例如,用於傳送自定義回覆)。

  • 代表傳入 JMS 訊息的 org.springframework.messaging.Message。請注意,此訊息包含自定義 header 和標準 header(由 JmsHeaders 定義)。

  • 使用 @Header 註解的方法引數,用於提取特定的 header 值,包括標準 JMS header。

  • 使用 @Headers 註解的引數,該引數還必須可賦值給 java.util.Map,以便訪問所有 header。

  • 未使用註解的元素,如果不是受支援的型別(MessageSession),則被視為 payload。您可以透過使用 @Payload 註解引數來明確指定這一點。您還可以透過新增額外的 @Valid 來開啟驗證。

注入 Spring 的 Message 抽象的能力特別有用,可以利用儲存在傳輸特定訊息中的所有資訊,而無需依賴傳輸特定的 API。以下示例展示瞭如何進行:

@JmsListener(destination = "myDestination")
public void processOrder(Message<Order> order) { ... }

方法引數的處理由 DefaultMessageHandlerMethodFactory 提供,您可以進一步自定義它以支援其他方法引數。您也可以在此處自定義轉換和驗證支援。

例如,如果我們想確保我們的 Order 在處理之前是有效的,我們可以使用 @Valid 註解 payload 並配置必要的驗證器,如下例所示:

@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {

	@Override
	public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
		registrar.setMessageHandlerMethodFactory(myJmsHandlerMethodFactory());
	}

	@Bean
	public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
		DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
		factory.setValidator(myValidator());
		return factory;
	}
}

響應管理

MessageListenerAdapter 中已有的支援允許您方法具有非 void 返回型別。在這種情況下,方法呼叫的結果會被封裝在 jakarta.jms.Message 中,傳送到原始訊息的 JMSReplyTo header 中指定的目的地,或傳送到監聽器上配置的預設目的地。現在,您可以使用訊息抽象的 @SendTo 註解來設定該預設目的地。

假設我們的 processOrder 方法現在應該返回一個 OrderStatus,我們可以編寫它以自動傳送響應,如下例所示:

@JmsListener(destination = "myDestination")
@SendTo("status")
public OrderStatus processOrder(Order order) {
	// order processing
	return status;
}
如果您有多個使用 @JmsListener 註解的方法,您也可以將 @SendTo 註解放在類級別上以共享一個預設回覆目的地。

如果您需要以傳輸無關的方式設定額外的 header,您可以返回一個 Message 而不是,方法類似於以下示例:

@JmsListener(destination = "myDestination")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
	// order processing
	return MessageBuilder
			.withPayload(status)
			.setHeader("code", 1234)
			.build();
}

如果您需要在執行時計算響應目的地,您可以將響應封裝在 JmsResponse 例項中,該例項也提供執行時使用的目的地。我們可以將前面的示例重寫如下:

@JmsListener(destination = "myDestination")
public JmsResponse<Message<OrderStatus>> processOrder(Order order) {
	// order processing
	Message<OrderStatus> response = MessageBuilder
			.withPayload(status)
			.setHeader("code", 1234)
			.build();
	return JmsResponse.forQueue(response, "status");
}

最後,如果您需要為響應指定一些 QoS values,例如 priority 或 time to live,您可以相應地配置 JmsListenerContainerFactory,如下例所示:

@Configuration
@EnableJms
public class AppConfig {

	@Bean
	public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
		DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
		factory.setConnectionFactory(connectionFactory());
		QosSettings replyQosSettings = new QosSettings();
		replyQosSettings.setPriority(2);
		replyQosSettings.setTimeToLive(10000);
		factory.setReplyQosSettings(replyQosSettings);
		return factory;
	}
}