註解驅動的監聽器端點
非同步接收訊息最簡單的方法是使用註解驅動的監聽器端點基礎設施。簡而言之,它允許你將一個託管 bean 的方法暴露為 JMS 監聽器端點。以下示例展示瞭如何使用它:
@Component
public class MyService {
@JmsListener(destination = "myDestination")
public void processOrder(String data) { ... }
}
前述示例的目的是,每當 jakarta.jms.Destination myDestination 上有訊息可用時,processOrder 方法就會相應地被呼叫(在這種情況下,使用 JMS 訊息的內容,類似於 MessageListenerAdapter 所提供的功能)。
註解端點基礎設施在幕後為每個註解方法建立了一個訊息監聽器容器,它透過 JmsListenerContainerFactory 實現。這樣的容器不會註冊到應用程式上下文,但可以透過 JmsListenerEndpointRegistry bean 輕鬆定位,以便進行管理。
@JmsListener 在 Java 8 中是可重複的註解,因此你可以透過新增額外的 @JmsListener 宣告將多個 JMS 目標與同一個方法關聯起來。 |
啟用監聽器端點註解
要啟用 @JmsListener 註解支援,你可以在其中一個 @Configuration 類中新增 @EnableJms,如以下示例所示:
-
Java
-
Kotlin
-
Xml
@Configuration
@EnableJms
public class JmsConfiguration {
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory,
DestinationResolver destinationResolver) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setDestinationResolver(destinationResolver);
factory.setSessionTransacted(true);
factory.setConcurrency("3-10");
return factory;
}
}
@Configuration
@EnableJms
class JmsConfiguration {
@Bean
fun jmsListenerContainerFactory(connectionFactory: ConnectionFactory, destinationResolver: DestinationResolver) =
DefaultJmsListenerContainerFactory().apply {
setConnectionFactory(connectionFactory)
setDestinationResolver(destinationResolver)
setSessionTransacted(true)
setConcurrency("3-10")
}
}
<jms:annotation-driven/>
<bean id="jmsListenerContainerFactory" class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destinationResolver" ref="destinationResolver"/>
<property name="sessionTransacted" value="true"/>
<property name="concurrency" value="3-10"/>
</bean>
預設情況下,基礎設施會查詢名為 jmsListenerContainerFactory 的 bean 作為建立訊息監聽器容器的工廠來源。在這種情況下(並且忽略 JMS 基礎設施設定),你可以使用三個執行緒的核心池大小和十個執行緒的最大池大小來呼叫 processOrder 方法。
你可以自定義每個註解使用的監聽器容器工廠,或者透過實現 JmsListenerConfigurer 介面來配置一個顯式預設值。僅當至少一個端點未指定容器工廠就註冊時,才需要預設值。有關詳細資訊和示例,請參閱實現 JmsListenerConfigurer 的類的 javadoc。
程式設計方式註冊端點
JmsListenerEndpoint 提供了 JMS 端點的模型,並負責為該模型配置容器。除了透過 JmsListener 註解檢測到的端點之外,該基礎設施還允許你以程式設計方式配置端點。以下示例展示瞭如何實現:
@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {
@Override
public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
endpoint.setId("myJmsEndpoint");
endpoint.setDestination("anotherQueue");
endpoint.setMessageListener(message -> {
// processing
});
registrar.registerEndpoint(endpoint);
}
}
在前面的示例中,我們使用了 SimpleJmsListenerEndpoint,它提供了要呼叫的實際 MessageListener。然而,你也可以構建自己的端點變體來描述自定義呼叫機制。
請注意,你可以完全跳過使用 @JmsListener,只通過 JmsListenerConfigurer 以程式設計方式註冊你的端點。
註解端點方法簽名
到目前為止,我們一直在端點中注入一個簡單的 String,但它實際上可以有一個非常靈活的方法簽名。在以下示例中,我們將其重寫為注入帶有自定義頭資訊的 Order:
@Component
public class MyService {
@JmsListener(destination = "myDestination")
public void processOrder(Order order, @Header("order_type") String orderType) {
...
}
}
你可以在 JMS 監聽器端點中注入的主要元素如下:
-
原始的
jakarta.jms.Message或其任何子類(前提是它與傳入訊息型別匹配)。 -
jakarta.jms.Session,用於可選地訪問原生 JMS API(例如,用於傳送自定義回覆)。 -
表示傳入 JMS 訊息的
org.springframework.messaging.Message。請注意,此訊息包含自定義和標準頭資訊(由JmsHeaders定義)。 -
使用
@Header註解的方法引數,用於提取特定的頭值,包括標準 JMS 頭。 -
一個使用
@Headers註解的引數,它還必須可賦值給java.util.Map,以便訪問所有頭。 -
未註解且不是受支援型別(
Message或Session)的元素被視為有效負載。你可以透過使用@Payload註解引數來明確這一點。你還可以透過新增額外的@Valid來啟用驗證。
注入 Spring 的 Message 抽象的能力特別有用,可以利用儲存在傳輸特定訊息中的所有資訊,而無需依賴傳輸特定 API。以下示例展示瞭如何實現:
@JmsListener(destination = "myDestination")
public void processOrder(Message<Order> order) { ... }
方法引數的處理由 DefaultMessageHandlerMethodFactory 提供,你可以進一步自定義它以支援額外的方法引數。你也可以在那裡自定義轉換和驗證支援。
例如,如果我們要確保 Order 在處理之前有效,我們可以使用 @Valid 註解有效負載並配置必要的驗證器,如以下示例所示:
@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {
@Override
public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(myJmsHandlerMethodFactory());
}
@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setValidator(myValidator());
return factory;
}
}
響應管理
MessageListenerAdapter 中現有的支援已允許你的方法具有非 void 返回型別。在這種情況下,呼叫的結果被封裝在一個 jakarta.jms.Message 中,傳送到原始訊息的 JMSReplyTo 頭中指定的目標,或者傳送到監聽器上配置的預設目標。你現在可以使用訊息抽象的 @SendTo 註解設定該預設目標。
假設我們的 processOrder 方法現在應該返回一個 OrderStatus,我們可以將其編寫為自動傳送響應,如以下示例所示:
@JmsListener(destination = "myDestination")
@SendTo("status")
public OrderStatus processOrder(Order order) {
// order processing
return status;
}
如果你有幾個帶有 @JmsListener 註解的方法,你也可以在類級別放置 @SendTo 註解,以共享一個預設的回覆目標。 |
如果你需要以與傳輸無關的方式設定額外的頭資訊,你可以返回一個 Message,方法類似於以下內容:
@JmsListener(destination = "myDestination")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
// order processing
return MessageBuilder
.withPayload(status)
.setHeader("code", 1234)
.build();
}
如果你需要在執行時計算響應目標,你可以將響應封裝在一個 JmsResponse 例項中,該例項還提供執行時要使用的目標。我們可以將前面的示例重寫如下:
@JmsListener(destination = "myDestination")
public JmsResponse<Message<OrderStatus>> processOrder(Order order) {
// order processing
Message<OrderStatus> response = MessageBuilder
.withPayload(status)
.setHeader("code", 1234)
.build();
return JmsResponse.forQueue(response, "status");
}
最後,如果你需要為響應指定一些 QoS 值,例如優先順序或存活時間,你可以相應地配置 JmsListenerContainerFactory,如以下示例所示:
@Configuration
@EnableJms
public class AppConfig {
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
QosSettings replyQosSettings = new QosSettings();
replyQosSettings.setPriority(2);
replyQosSettings.setTimeToLive(10000);
factory.setReplyQosSettings(replyQosSettings);
return factory;
}
}