帶批次處理的 @RabbitListener
當接收 一批 訊息時,通常由容器執行分批處理,監聽器一次接收一條訊息。從版本 2.2 開始,您可以配置監聽器容器工廠和監聽器以在一個呼叫中接收整個批次,只需設定工廠的 batchListener 屬性,並將方法有效負載引數設定為 List 或 Collection。
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setBatchListener(true);
return factory;
}
@RabbitListener(queues = "batch.1")
public void listen1(List<Thing> in) {
...
}
// or
@RabbitListener(queues = "batch.2")
public void listen2(List<Message<Thing>> in) {
...
}
將 batchListener 屬性設定為 true 會自動關閉工廠建立的容器中的 deBatchingEnabled 容器屬性(除非 consumerBatchEnabled 為 true - 參見下文)。實際上,分批處理從容器轉移到監聽器介面卡,介面卡建立傳遞給監聽器的列表。
批處理啟用的工廠不能與 多方法監聽器 一起使用。
同樣從版本 2.2 開始,當一次接收一批訊息時,最後一條訊息包含一個布林頭,設定為 true。可以透過將 @Header(AmqpHeaders.LAST_IN_BATCH) 布林值 last 引數新增到您的監聽器方法中來獲取此頭。該頭從 MessageProperties.isLastInBatch() 對映。此外,AmqpHeaders.BATCH_SIZE 在每個訊息片段中都填充了批次的大小。
此外,SimpleMessageListenerContainer 添加了一個新屬性 consumerBatchEnabled。當此屬性為 true 時,容器將建立一批訊息,最多達到 batchSize;如果在沒有新訊息到達的情況下 receiveTimeout 超時,則會交付部分批次。如果收到生產者建立的批次,則會將其分批並新增到消費者端批次中;因此,交付的實際訊息數量可能超過 batchSize,batchSize 表示從代理接收到的訊息數量。當 consumerBatchEnabled 為 true 時,deBatchingEnabled 必須為 true;容器工廠將強制執行此要求。
@Bean
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setConsumerTagStrategy(consumerTagStrategy());
factory.setBatchListener(true); // configures a BatchMessageListenerAdapter
factory.setBatchSize(2);
factory.setConsumerBatchEnabled(true);
return factory;
}
當 consumerBatchEnabled 與 @RabbitListener 一起使用時
@RabbitListener(queues = "batch.1", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch1(List<Message> amqpMessages) {
...
}
@RabbitListener(queues = "batch.2", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch2(List<org.springframework.messaging.Message<Invoice>> messages) {
...
}
@RabbitListener(queues = "batch.3", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch3(List<Invoice> strings) {
...
}
-
第一個方法使用接收到的原始、未轉換的
org.springframework.amqp.core.Message呼叫。 -
第二個方法使用帶有轉換後的有效負載和對映的頭/屬性的
org.springframework.messaging.Message<?>呼叫。 -
第三個方法使用轉換後的有效負載呼叫,無法訪問頭/屬性。
您還可以新增一個 Channel 引數,這在使用 MANUAL 確認模式時經常使用。這在第三個示例中不太有用,因為您無法訪問 delivery_tag 屬性。
Spring Boot 為 consumerBatchEnabled 和 batchSize 提供了配置屬性,但沒有為 batchListener 提供。從版本 3.0 開始,在容器工廠上將 consumerBatchEnabled 設定為 true 也會將 batchListener 設定為 true。當 consumerBatchEnabled 為 true 時,監聽器必須是批處理監聽器。
從版本 3.0 開始,監聽器方法可以消費 Collection<?> 或 List<?>。
| 批次模式下的監聽器不支援回覆,因為批次中的訊息與產生的單個回覆之間可能沒有關聯。非同步返回型別 仍然受批處理監聽器支援。 |