啟用監聽器端點註解
要啟用 @RabbitListener 註解支援,您可以將 @EnableRabbit 新增到您的一個 @Configuration 類中。以下示例展示瞭如何實現:
@Configuration
@EnableRabbit
public class AppConfig {
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
factory.setContainerCustomizer(container -> /* customize the container */);
return factory;
}
}
自 2.0 版本起,還提供了 DirectMessageListenerContainerFactory。它建立 DirectMessageListenerContainer 例項。
有關如何選擇 SimpleRabbitListenerContainerFactory 和 DirectRabbitListenerContainerFactory 的資訊,請參閱 選擇容器。 |
從 2.2.2 版本開始,您可以提供一個 ContainerCustomizer 實現(如上所示)。這可用於在容器建立和配置後進一步配置容器;例如,您可以使用它來設定容器工廠未公開的屬性。
2.4.8 版本提供了 CompositeContainerCustomizer,用於您希望應用多個定製器的情況。
預設情況下,基礎設施會查詢名為 rabbitListenerContainerFactory 的 bean 作為工廠的來源,該工廠用於建立訊息監聽器容器。在這種情況下,忽略 RabbitMQ 基礎設施設定,可以使用三個核心執行緒池大小和十個最大執行緒池大小的執行緒呼叫 processOrder 方法。
您可以定製監聽器容器工廠,以便將其用於每個註解,或者透過實現 RabbitListenerConfigurer 介面來配置一個顯式預設值。僅當至少一個端點未註冊特定容器工廠時才需要預設值。有關完整詳細資訊和示例,請參閱 Javadoc。
容器工廠提供新增 MessagePostProcessor 例項的方法,這些例項在接收訊息後(呼叫監聽器之前)和傳送回覆之前應用。
有關回復的資訊,請參閱 回覆管理。
從 2.0.6 版本開始,您可以將 RetryTemplate 和 RecoveryCallback 新增到監聽器容器工廠。它在傳送回覆時使用。當重試耗盡時,會呼叫 RecoveryCallback。
如果您更喜歡 XML 配置,可以使用 <rabbit:annotation-driven> 元素。任何用 @RabbitListener 註解的 bean 都會被檢測到。
對於 SimpleRabbitListenerContainer 例項,您可以使用類似於以下內容的 XML:
<rabbit:annotation-driven/>
<bean id="rabbitListenerContainerFactory"
class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="concurrentConsumers" value="3"/>
<property name="maxConcurrentConsumers" value="10"/>
</bean>
對於 DirectMessageListenerContainer 例項,您可以使用類似於以下內容的 XML:
<rabbit:annotation-driven/>
<bean id="rabbitListenerContainerFactory"
class="org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="consumersPerQueue" value="3"/>
</bean>
從 2.0 版本開始,@RabbitListener 註解具有 concurrency 屬性。它支援 SpEL 表示式(#{…})和屬性佔位符(${…})。其含義和允許的值取決於容器型別,如下所示:
-
對於
DirectMessageListenerContainer,該值必須是單個整數值,它設定容器上的consumersPerQueue屬性。 -
對於
SimpleRabbitListenerContainer,該值可以是單個整數值,它設定容器上的concurrentConsumers屬性,或者它可以具有m-n的形式,其中m是concurrentConsumers屬性,n是maxConcurrentConsumers屬性。
在任何一種情況下,此設定都會覆蓋工廠上的設定。以前,如果您的監聽器需要不同的併發性,則必須定義不同的容器工廠。
該註解還允許透過 autoStartup 和 executor(自 2.2 起)註解屬性覆蓋工廠的 autoStartup 和 taskExecutor 屬性。為每個監聽器使用不同的執行器可能有助於在日誌和執行緒轉儲中識別與每個監聽器關聯的執行緒。
2.2 版本還添加了 ackMode 屬性,它允許您覆蓋容器工廠的 acknowledgeMode 屬性。
@RabbitListener(id = "manual.acks.1", queues = "manual.acks.1", ackMode = "MANUAL")
public void manual1(String in, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
...
channel.basicAck(tag, false);
}