@RabbitListener 批次處理
當接收 一批訊息時,通常由容器執行反批次處理,並逐條呼叫監聽器。從 2.2 版本開始,您可以配置監聽器容器工廠和監聽器,以在一次呼叫中接收整個批次。只需設定工廠的 batchListener
屬性為 true,並將方法引數(payload)設定為 List
或 Collection
即可。
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setBatchListener(true);
return factory;
}
@RabbitListener(queues = "batch.1")
public void listen1(List<Thing> in) {
...
}
// or
@RabbitListener(queues = "batch.2")
public void listen2(List<Message<Thing>> in) {
...
}
將 batchListener
屬性設定為 true 會自動關閉工廠建立的容器中的 deBatchingEnabled
容器屬性(除非 consumerBatchEnabled
為 true - 參見下文)。實際上,反批次處理從容器轉移到了監聽器介面卡,介面卡建立了傳遞給監聽器的列表。
啟用了批次處理的工廠不能與 多方法監聽器一起使用。
同樣從 2.2 版本開始,當逐條接收批次訊息時,最後一條訊息包含一個設定為 true
的布林頭部。透過向您的監聽器方法新增 @Header(AmqpHeaders.LAST_IN_BATCH) boolean last
引數可以獲取此頭部。該頭部對映自 MessageProperties.isLastInBatch()
。此外,AmqpHeaders.BATCH_SIZE
在每個訊息片段中都會填充批次的大小。
此外,SimpleMessageListenerContainer
中添加了一個新屬性 consumerBatchEnabled
。當此屬性為 true 時,容器將建立一個最多包含 batchSize
條訊息的批次;如果在 receiveTimeout
超時時間內沒有新訊息到達,則會交付部分批次。如果接收到生產者建立的批次,它將被反批次處理並新增到消費者端的批次中;因此實際交付的訊息數量可能會超過 batchSize
,batchSize
表示從 Broker 接收到的訊息數量。當 consumerBatchEnabled
為 true 時,deBatchingEnabled
也必須為 true;容器工廠將強制執行此要求。
@Bean
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setConsumerTagStrategy(consumerTagStrategy());
factory.setBatchListener(true); // configures a BatchMessageListenerAdapter
factory.setBatchSize(2);
factory.setConsumerBatchEnabled(true);
return factory;
}
當使用 consumerBatchEnabled
與 @RabbitListener
時
@RabbitListener(queues = "batch.1", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch1(List<Message> amqpMessages) {
...
}
@RabbitListener(queues = "batch.2", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch2(List<org.springframework.messaging.Message<Invoice>> messages) {
...
}
@RabbitListener(queues = "batch.3", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch3(List<Invoice> strings) {
...
}
-
第一個例子使用接收到的原始、未轉換的
org.springframework.amqp.core.Message
s 進行呼叫。 -
第二個例子使用經過轉換 payload 並映射了 headers/properties 的
org.springframework.messaging.Message<?>
s 進行呼叫。 -
第三個例子使用轉換後的 payload 進行呼叫,無法訪問 headers/properties。
您還可以新增一個 Channel
引數,這在使用 MANUAL
確認模式時經常用到。這對於第三個例子不太有用,因為您無法訪問 delivery_tag
屬性。
Spring Boot 為 consumerBatchEnabled
和 batchSize
提供了配置屬性,但沒有為 batchListener
提供。從 3.0 版本開始,在容器工廠上將 consumerBatchEnabled
設定為 true
也會將 batchListener
設定為 true
。當 consumerBatchEnabled
為 true
時,監聽器必須是批次監聽器。
從 3.0 版本開始,監聽器方法可以消費 Collection<?>
或 List<?>
。
批次模式下的監聽器不支援回覆,因為批次中的訊息與產生的單個回覆之間可能沒有關聯。非同步返回型別仍然支援批次監聽器。 |