啟用監聽器端點註解
要啟用對 `@RabbitListener` 註解的支援,可以將 `@EnableRabbit` 新增到您的一個 `@Configuration` 類中。以下示例展示瞭如何做到這一點
@Configuration
@EnableRabbit
public class AppConfig {
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
factory.setContainerCustomizer(container -> /* customize the container */);
return factory;
}
}
自 2.0 版本起,DirectMessageListenerContainerFactory
也可用了。它建立 DirectMessageListenerContainer
例項。
有關幫助您在 SimpleRabbitListenerContainerFactory 和 DirectRabbitListenerContainerFactory 之間進行選擇的資訊,請參閱 選擇容器。 |
從 2.2.2 版本開始,您可以提供一個 ContainerCustomizer
實現(如上所示)。這可用於在容器建立和配置後對其進行進一步配置;例如,您可以使用此功能設定容器工廠未公開的屬性。
2.4.8 版本提供了 CompositeContainerCustomizer
,適用於希望應用多個定製器的情況。
預設情況下,基礎設施會查詢一個名為 rabbitListenerContainerFactory
的 bean,作為用於建立訊息監聽器容器的工廠來源。在這種情況下,忽略 RabbitMQ 基礎設施設定,processOrder
方法可以使用三個核心輪詢執行緒和最多十個執行緒的執行緒池大小進行呼叫。
您可以為每個註解定製要使用的監聽器容器工廠,或者透過實現 RabbitListenerConfigurer
介面配置一個顯式預設值。僅當至少有一個端點註冊時未指定容器工廠時,才需要預設值。有關完整詳細資訊和示例,請參閱 Javadoc。
容器工廠提供了新增 MessagePostProcessor
例項的方法,這些例項在接收訊息後(呼叫監聽器之前)和傳送回覆之前應用。
有關回復的資訊,請參閱 回覆管理。
從 2.0.6 版本開始,您可以向監聽器容器工廠新增一個 RetryTemplate
和 RecoveryCallback
。它用於傳送回覆時。當重試耗盡時,會呼叫 RecoveryCallback
。您可以使用 SendRetryContextAccessor
從上下文中獲取資訊。以下示例展示瞭如何做到這一點
factory.setRetryTemplate(retryTemplate);
factory.setReplyRecoveryCallback(ctx -> {
Message failed = SendRetryContextAccessor.getMessage(ctx);
Address replyTo = SendRetryContextAccessor.getAddress(ctx);
Throwable t = ctx.getLastThrowable();
...
return null;
});
如果您偏好 XML 配置,可以使用 <rabbit:annotation-driven>
元素。任何使用 `@RabbitListener` 註解的 bean 都會被檢測到。
對於 SimpleRabbitListenerContainer
例項,您可以使用類似於以下的 XML 配置
<rabbit:annotation-driven/>
<bean id="rabbitListenerContainerFactory"
class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="concurrentConsumers" value="3"/>
<property name="maxConcurrentConsumers" value="10"/>
</bean>
對於 DirectMessageListenerContainer
例項,您可以使用類似於以下的 XML 配置
<rabbit:annotation-driven/>
<bean id="rabbitListenerContainerFactory"
class="org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="consumersPerQueue" value="3"/>
</bean>
從 2.0 版本開始,`@RabbitListener` 註解有一個 concurrency
屬性。它支援 SpEL 表示式(#{…}
)和屬性佔位符(${…}
)。它的含義和允許的值取決於容器型別,具體如下
-
對於
DirectMessageListenerContainer
,該值必須是單個整數值,它設定容器上的consumersPerQueue
屬性。 -
對於
SimpleRabbitListenerContainer
,該值可以是單個整數值,用於設定容器上的concurrentConsumers
屬性;或者它可以採用m-n
的形式,其中m
是concurrentConsumers
屬性,n
是maxConcurrentConsumers
屬性。
在任一情況下,此設定都會覆蓋工廠上的設定。以前,如果您的監聽器需要不同的併發性,則必須定義不同的容器工廠。
該註解還允許透過 autoStartup
和 executor
(自 2.2 版本起)註解屬性覆蓋工廠的 autoStartup
和 taskExecutor
屬性。為每個監聽器使用不同的 executor 可能有助於在日誌和執行緒轉儲中識別與每個監聽器關聯的執行緒。
2.2 版本還添加了 ackMode
屬性,允許您覆蓋容器工廠的 acknowledgeMode
屬性。
@RabbitListener(id = "manual.acks.1", queues = "manual.acks.1", ackMode = "MANUAL")
public void manual1(String in, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
...
channel.basicAck(tag, false);
}