應用事件

監聽器容器及其 consumer 會發布以下 Spring 應用事件

  • ConsumerStartingEvent: 當 consumer 執行緒首次啟動時釋出,在開始 polling 之前。

  • ConsumerStartedEvent: 當 consumer 即將開始 polling 時釋出。

  • ConsumerFailedToStartEvent: 如果在 consumerStartTimeout 容器屬性指定的時間內沒有釋出 ConsumerStartingEvent,則釋出此事件。此事件可能表明配置的任務執行器沒有足夠的執行緒來支援其使用的容器及其併發性。出現此情況時也會記錄錯誤訊息。

  • ListenerContainerIdleEvent: 如果在 idleEventInterval(如果配置)時間內沒有收到訊息時釋出。

  • ListenerContainerNoLongerIdleEvent: 在之前釋出 ListenerContainerIdleEvent 後,收到一條記錄時釋出。

  • ListenerContainerPartitionIdleEvent: 如果在 idlePartitionEventInterval(如果配置)時間內未從該分割槽收到訊息時釋出。

  • ListenerContainerPartitionNoLongerIdleEvent: 從之前釋出過 ListenerContainerPartitionIdleEvent 的分割槽收到一條記錄時釋出。

  • NonResponsiveConsumerEvent: 當 consumer 似乎阻塞在 poll 方法中時釋出。

  • ConsumerPartitionPausedEvent: 當分割槽暫停時,由每個 consumer 釋出。

  • ConsumerPartitionResumedEvent: 當分割槽恢復時,由每個 consumer 釋出。

  • ConsumerPausedEvent: 當容器暫停時,由每個 consumer 釋出。

  • ConsumerResumedEvent: 當容器恢復時,由每個 consumer 釋出。

  • ConsumerStoppingEvent: 在停止之前,由每個 consumer 釋出。

  • ConsumerStoppedEvent: consumer 關閉後釋出。詳見 執行緒安全

  • ConsumerRetryAuthEvent: 當 consumer 認證或授權失敗並正在重試時釋出。

  • ConsumerRetryAuthSuccessfulEvent: 當認證或授權重試成功時釋出。只有在之前發生過 ConsumerRetryAuthEvent 時才會發生。

  • ContainerStoppedEvent: 當所有 consumer 都已停止時釋出。

  • ConcurrentContainerStoppedEvent: 當 ConcurrentMessageListenerContainer 已停止時釋出。

預設情況下,應用上下文的事件多播器在呼叫執行緒上呼叫事件監聽器。如果將多播器更改為使用非同步執行器,則當事件包含對 consumer 的引用時,不得呼叫任何 Consumer 方法。

ListenerContainerIdleEvent 具有以下屬性

  • source: 釋出事件的監聽器容器例項。

  • container: 監聽器容器,如果源容器是子容器,則為父監聽器容器。

  • id: 監聽器 ID(或容器 Bean 名稱)。

  • idleTime: 釋出事件時容器已空閒的時間。

  • topicPartitions: 生成事件時分配給容器的 topic 和分割槽。

  • consumer: Kafka Consumer 物件的引用。例如,如果之前呼叫了 consumer 的 pause() 方法,則在收到事件時可以 resume()

  • paused: 容器當前是否已暫停。詳見 暫停和恢復監聽器容器

ListenerContainerNoLongerIdleEvent 具有相同的屬性,除了 idleTimepaused

ListenerContainerPartitionIdleEvent 具有以下屬性

  • source: 釋出事件的監聽器容器例項。

  • container: 監聽器容器,如果源容器是子容器,則為父監聽器容器。

  • id: 監聽器 ID(或容器 Bean 名稱)。

  • idleTime: 釋出事件時分割槽消費已空閒的時間。

  • topicPartition: 觸發事件的 topic 和分割槽。

  • consumer: Kafka Consumer 物件的引用。例如,如果之前呼叫了 consumer 的 pause() 方法,則在收到事件時可以 resume()

  • paused: 該 consumer 的該分割槽消費當前是否已暫停。詳見 暫停和恢復監聽器容器

ListenerContainerPartitionNoLongerIdleEvent 具有相同的屬性,除了 idleTimepaused

NonResponsiveConsumerEvent 具有以下屬性

  • source: 釋出事件的監聽器容器例項。

  • container: 監聽器容器,如果源容器是子容器,則為父監聽器容器。

  • id: 監聽器 ID(或容器 Bean 名稱)。

  • timeSinceLastPoll: 容器上次呼叫 poll() 之前的時間。

  • topicPartitions: 生成事件時分配給容器的 topic 和分割槽。

  • consumer: Kafka Consumer 物件的引用。例如,如果之前呼叫了 consumer 的 pause() 方法,則在收到事件時可以 resume()

  • paused: 容器當前是否已暫停。詳見 暫停和恢復監聽器容器

ConsumerPausedEventConsumerResumedEventConsumerStopping 事件具有以下屬性

  • source: 釋出事件的監聽器容器例項。

  • container: 監聽器容器,如果源容器是子容器,則為父監聽器容器。

  • partitions: 涉及的 TopicPartition 例項。

ConsumerPartitionPausedEventConsumerPartitionResumedEvent 事件具有以下屬性

  • source: 釋出事件的監聽器容器例項。

  • container: 監聽器容器,如果源容器是子容器,則為父監聽器容器。

  • partition: 涉及的 TopicPartition 例項。

ConsumerRetryAuthEvent 事件具有以下屬性

  • source: 釋出事件的監聽器容器例項。

  • container: 監聽器容器,如果源容器是子容器,則為父監聽器容器。

  • 原因:

    • AUTHENTICATION - 事件因認證異常而釋出。

    • AUTHORIZATION - 事件因授權異常而釋出。

ConsumerStartingEventConsumerStartedEventConsumerFailedToStartEventConsumerStoppedEventConsumerRetryAuthSuccessfulEventContainerStoppedEvent 事件具有以下屬性

  • source: 釋出事件的監聽器容器例項。

  • container: 監聽器容器,如果源容器是子容器,則為父監聽器容器。

所有容器(無論是子容器還是父容器)都發布 ContainerStoppedEvent。對於父容器,sourcecontainer 屬性是相同的。

此外,ConsumerStoppedEvent 具有以下附加屬性

  • 原因:

    • NORMAL - consumer 正常停止(容器已停止)。

    • ERROR - 丟擲了 java.lang.Error

    • FENCED - 事務性 producer 被隔離 (fenced),並且 stopContainerWhenFenced 容器屬性為 true

    • AUTH - 丟擲了 AuthenticationExceptionAuthorizationException,並且未配置 authExceptionRetryInterval

    • NO_OFFSET - 分割槽沒有 offset,並且 auto.offset.reset 策略是 none

您可以在發生此類情況後使用此事件重新啟動容器

if (event.getReason().equals(Reason.FENCED)) {
    event.getSource(MessageListenerContainer.class).start();
}

檢測空閒和無響應的 Consumer

非同步 consumer 雖然高效,但一個問題是檢測它們何時空閒。如果一段時間沒有訊息到達,您可能需要採取一些措施。

您可以配置監聽器容器,使其在一段時間沒有訊息投遞時釋出 ListenerContainerIdleEvent。容器空閒時,每隔 idleEventInterval 毫秒就會發佈一個事件。

要配置此功能,請在容器上設定 idleEventInterval。以下示例展示瞭如何操作

@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
    ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
    ...
    containerProps.setIdleEventInterval(60000L);
    ...
    KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(consumerFactory, containerProps);
    return container;
}

以下示例展示瞭如何為 @KafkaListener 設定 idleEventInterval

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.getContainerProperties().setIdleEventInterval(60000L);
    ...
    return factory;
}

在這些情況下,容器空閒時每分鐘釋出一次事件。

如果由於某種原因 consumer 的 poll() 方法沒有退出,則不會接收到訊息,也無法生成空閒事件(在早期版本的 kafka-clients 中,當 broker 不可達時存在此問題)。在這種情況下,如果 poll 在 pollTimeout 屬性的 3 倍時間內沒有返回,則容器會發布 NonResponsiveConsumerEvent。預設情況下,此檢查在每個容器中每 30 秒執行一次。您可以透過在配置監聽器容器時,在 ContainerProperties 中設定 monitorInterval(預設 30 秒)和 noPollThreshold(預設 3.0)屬性來修改此行為。noPollThreshold 應大於 1.0,以避免由於競態條件導致出現虛假事件。接收到此類事件後,您可以停止容器,從而喚醒 consumer,使其能夠停止。

從版本 2.6.2 開始,如果容器釋出了 ListenerContainerIdleEvent,則在隨後收到記錄時,它將釋出 ListenerContainerNoLongerIdleEvent

事件消費

您可以透過實現 ApplicationListener 來捕獲這些事件——可以是通用的監聽器,也可以是僅接收此特定事件的監聽器。您還可以使用 Spring Framework 4.2 中引入的 @EventListener

下一個示例將 @KafkaListener@EventListener 組合到一個類中。您應該瞭解,應用監聽器會接收所有容器的事件,因此如果您想根據哪個容器空閒而採取特定行動,您可能需要檢查監聽器 ID。您也可以為此目的使用 @EventListenercondition

詳見 應用事件 以獲取有關事件屬性的資訊。

事件通常釋出在 consumer 執行緒上,因此與 Consumer 物件互動是安全的。

以下示例同時使用了 @KafkaListener@EventListener

public class Listener {

    @KafkaListener(id = "qux", topics = "annotated")
    public void listen4(@Payload String foo, Acknowledgment ack) {
        ...
    }

    @EventListener(condition = "event.listenerId.startsWith('qux-')")
    public void eventHandler(ListenerContainerIdleEvent event) {
        ...
    }

}
事件監聽器會看到所有容器的事件。因此,在前面的示例中,我們根據監聽器 ID 來縮小接收到的事件範圍。由於為 @KafkaListener 建立的容器支援併發,實際的容器被命名為 id-n,其中 n 是支援併發的每個例項的唯一值。這就是我們在條件中使用 startsWith 的原因。
如果您希望使用空閒事件來停止監聽器容器,則不應在呼叫監聽器的執行緒上呼叫 container.stop()。這樣做會導致延遲和不必要的日誌訊息。相反,您應該將事件交給另一個執行緒,然後由該執行緒停止容器。此外,如果容器例項是子容器,則不應 stop() 它。您應該停止併發容器。

空閒時的當前位置

請注意,您可以透過在監聽器中實現 ConsumerSeekAware 來獲取檢測到空閒時的當前位置。詳見 seek 中的 onIdleContainer()