帶批次處理的 @RabbitListener

當接收 一批 訊息時,通常由容器執行分批處理,監聽器一次接收一條訊息。從版本 2.2 開始,您可以配置監聽器容器工廠和監聽器以在一個呼叫中接收整個批次,只需設定工廠的 batchListener 屬性,並將方法有效負載引數設定為 ListCollection

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setBatchListener(true);
    return factory;
}

@RabbitListener(queues = "batch.1")
public void listen1(List<Thing> in) {
    ...
}

// or

@RabbitListener(queues = "batch.2")
public void listen2(List<Message<Thing>> in) {
    ...
}

batchListener 屬性設定為 true 會自動關閉工廠建立的容器中的 deBatchingEnabled 容器屬性(除非 consumerBatchEnabledtrue - 參見下文)。實際上,分批處理從容器轉移到監聽器介面卡,介面卡建立傳遞給監聽器的列表。

批處理啟用的工廠不能與 多方法監聽器 一起使用。

同樣從版本 2.2 開始,當一次接收一批訊息時,最後一條訊息包含一個布林頭,設定為 true。可以透過將 @Header(AmqpHeaders.LAST_IN_BATCH) 布林值 last 引數新增到您的監聽器方法中來獲取此頭。該頭從 MessageProperties.isLastInBatch() 對映。此外,AmqpHeaders.BATCH_SIZE 在每個訊息片段中都填充了批次的大小。

此外,SimpleMessageListenerContainer 添加了一個新屬性 consumerBatchEnabled。當此屬性為 true 時,容器將建立一批訊息,最多達到 batchSize;如果在沒有新訊息到達的情況下 receiveTimeout 超時,則會交付部分批次。如果收到生產者建立的批次,則會將其分批並新增到消費者端批次中;因此,交付的實際訊息數量可能超過 batchSizebatchSize 表示從代理接收到的訊息數量。當 consumerBatchEnabled 為 true 時,deBatchingEnabled 必須為 true;容器工廠將強制執行此要求。

@Bean
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(rabbitConnectionFactory());
    factory.setConsumerTagStrategy(consumerTagStrategy());
    factory.setBatchListener(true); // configures a BatchMessageListenerAdapter
    factory.setBatchSize(2);
    factory.setConsumerBatchEnabled(true);
    return factory;
}

consumerBatchEnabled@RabbitListener 一起使用時

@RabbitListener(queues = "batch.1", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch1(List<Message> amqpMessages) {
    ...
}

@RabbitListener(queues = "batch.2", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch2(List<org.springframework.messaging.Message<Invoice>> messages) {
    ...
}

@RabbitListener(queues = "batch.3", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch3(List<Invoice> strings) {
    ...
}
  • 第一個方法使用接收到的原始、未轉換的 org.springframework.amqp.core.Message 呼叫。

  • 第二個方法使用帶有轉換後的有效負載和對映的頭/屬性的 org.springframework.messaging.Message<?> 呼叫。

  • 第三個方法使用轉換後的有效負載呼叫,無法訪問頭/屬性。

您還可以新增一個 Channel 引數,這在使用 MANUAL 確認模式時經常使用。這在第三個示例中不太有用,因為您無法訪問 delivery_tag 屬性。

Spring Boot 為 consumerBatchEnabledbatchSize 提供了配置屬性,但沒有為 batchListener 提供。從版本 3.0 開始,在容器工廠上將 consumerBatchEnabled 設定為 true 也會將 batchListener 設定為 true。當 consumerBatchEnabledtrue 時,監聽器必須是批處理監聽器。

從版本 3.0 開始,監聽器方法可以消費 Collection<?>List<?>

批次模式下的監聽器不支援回覆,因為批次中的訊息與產生的單個回覆之間可能沒有關聯。非同步返回型別 仍然受批處理監聽器支援。
© . This site is unofficial and not affiliated with VMware.