應用程式事件
以下 Spring 應用事件由監聽器容器及其消費者釋出
-
ConsumerStartingEvent:當消費者執行緒首次啟動,在它開始輪詢之前釋出。 -
ConsumerStartedEvent:當消費者即將開始輪詢時釋出。 -
ConsumerFailedToStartEvent:如果在consumerStartTimeout容器屬性所定義的時間內未釋出ConsumerStartingEvent,則釋出此事件。此事件可能表明配置的任務執行器沒有足夠的執行緒來支援它所使用的容器及其併發性。當出現此情況時,也會記錄一條錯誤訊息。 -
ListenerContainerIdleEvent:在idleEventInterval(如果已配置)內未收到任何訊息時釋出。 -
ListenerContainerNoLongerIdleEvent:在之前釋出ListenerContainerIdleEvent後,當收到一條記錄時釋出。 -
ListenerContainerPartitionIdleEvent:在idlePartitionEventInterval(如果已配置)內未從該分割槽收到任何訊息時釋出。 -
ListenerContainerPartitionNoLongerIdleEvent:在之前釋出ListenerContainerPartitionIdleEvent後,當從該分割槽收到一條記錄時釋出。 -
NonResponsiveConsumerEvent:當消費者似乎在poll方法中被阻塞時釋出。 -
ConsumerPartitionPausedEvent:當分割槽暫停時,由每個消費者釋出。 -
ConsumerPartitionResumedEvent:當分割槽恢復時,由每個消費者釋出。 -
ConsumerPausedEvent:當容器暫停時,由每個消費者釋出。 -
ConsumerResumedEvent:當容器恢復時,由每個消費者釋出。 -
ConsumerStoppingEvent:在停止之前,由每個消費者釋出。 -
ConsumerStoppedEvent:在消費者關閉後釋出。參見執行緒安全。 -
ConsumerRetryAuthEvent:當消費者的身份驗證或授權失敗並正在重試時釋出。 -
ConsumerRetryAuthSuccessfulEvent:當身份驗證或授權重試成功時釋出。只有在之前發生過ConsumerRetryAuthEvent時才會出現。 -
ContainerStoppedEvent:當所有消費者都停止時釋出。 -
ConcurrentContainerStoppedEvent:當ConcurrentMessageListenerContainer停止時釋出。
預設情況下,應用程式上下文的事件多播器在呼叫執行緒上呼叫事件監聽器。如果將多播器更改為使用非同步執行器,則當事件包含對消費者的引用時,不得呼叫任何 Consumer 方法。 |
ListenerContainerIdleEvent 具有以下屬性
-
source:釋出事件的監聽器容器例項。 -
container:監聽器容器或父監聽器容器,如果源容器是子容器。 -
id:監聽器 ID(或容器 Bean 名稱)。 -
idleTime:事件釋出時容器已空閒的時間。 -
topicPartitions:事件生成時容器分配到的主題和分割槽。 -
consumer:對 KafkaConsumer物件的引用。例如,如果之前呼叫了消費者的pause()方法,則在收到事件時可以resume()。 -
paused:容器當前是否已暫停。有關更多資訊,請參閱暫停和恢復監聽器容器。
ListenerContainerNoLongerIdleEvent 具有相同的屬性,除了 idleTime 和 paused。
ListenerContainerPartitionIdleEvent 具有以下屬性
-
source:釋出事件的監聽器容器例項。 -
container:監聽器容器或父監聽器容器,如果源容器是子容器。 -
id:監聽器 ID(或容器 Bean 名稱)。 -
idleTime:事件釋出時分割槽消費已空閒的時間。 -
topicPartition:觸發事件的主題和分割槽。 -
consumer:對 KafkaConsumer物件的引用。例如,如果之前呼叫了消費者的pause()方法,則在收到事件時可以resume()。 -
paused:該分割槽消費對於該消費者是否當前已暫停。有關更多資訊,請參閱暫停和恢復監聽器容器。
ListenerContainerPartitionNoLongerIdleEvent 具有相同的屬性,除了 idleTime 和 paused。
NonResponsiveConsumerEvent 具有以下屬性
-
source:釋出事件的監聽器容器例項。 -
container:監聽器容器或父監聽器容器,如果源容器是子容器。 -
id:監聽器 ID(或容器 Bean 名稱)。 -
timeSinceLastPoll:容器最後一次呼叫poll()之前的時間。 -
topicPartitions:事件生成時容器分配到的主題和分割槽。 -
consumer:對 KafkaConsumer物件的引用。例如,如果之前呼叫了消費者的pause()方法,則在收到事件時可以resume()。 -
paused:容器當前是否已暫停。有關更多資訊,請參閱暫停和恢復監聽器容器。
ConsumerPausedEvent、ConsumerResumedEvent 和 ConsumerStopping 事件具有以下屬性
-
source:釋出事件的監聽器容器例項。 -
container:監聽器容器或父監聽器容器,如果源容器是子容器。 -
partitions:涉及的TopicPartition例項。
ConsumerPartitionPausedEvent、ConsumerPartitionResumedEvent 事件具有以下屬性
-
source:釋出事件的監聽器容器例項。 -
container:監聽器容器或父監聽器容器,如果源容器是子容器。 -
partition:涉及的TopicPartition例項。
ConsumerRetryAuthEvent 事件具有以下屬性
-
source:釋出事件的監聽器容器例項。 -
container:監聽器容器或父監聽器容器,如果源容器是子容器。 -
reason:-
AUTHENTICATION- 事件因身份驗證異常而釋出。 -
AUTHORIZATION- 事件因授權異常而釋出。
-
ConsumerStartingEvent、ConsumerStartedEvent、ConsumerFailedToStartEvent、ConsumerStoppedEvent、ConsumerRetryAuthSuccessfulEvent 和 ContainerStoppedEvent 事件具有以下屬性
-
source:釋出事件的監聽器容器例項。 -
container:監聽器容器或父監聽器容器,如果源容器是子容器。
所有容器(無論是子容器還是父容器)都發布 ContainerStoppedEvent。對於父容器,源和容器屬性相同。
此外,ConsumerStoppedEvent 具有以下附加屬性
-
reason:-
NORMAL- 消費者正常停止(容器已停止)。 -
ABNORMAL- 消費者異常停止(容器異常停止)。 -
ERROR- 丟擲了java.lang.Error。 -
FENCED- 事務性生產者被隔離,並且stopContainerWhenFenced容器屬性為true。 -
AUTH- 丟擲了AuthenticationException或AuthorizationException,並且未配置authExceptionRetryInterval。 -
NO_OFFSET- 分割槽沒有偏移量,並且auto.offset.reset策略為none。
-
您可以使用此事件在此類情況後重新啟動容器
if (event.getReason().equals(Reason.FENCED)) {
event.getSource(MessageListenerContainer.class).start();
}
檢測空閒和無響應的消費者
非同步消費者雖然高效,但一個問題是檢測它們何時空閒。您可能希望在一段時間內沒有訊息到達時採取一些措施。
您可以配置監聽器容器,在一段時間沒有訊息傳遞時釋出 ListenerContainerIdleEvent。當容器空閒時,每 idleEventInterval 毫秒釋出一次事件。
要配置此功能,請在容器上設定 idleEventInterval。以下示例展示瞭如何實現
@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
...
containerProps.setIdleEventInterval(60000L);
...
KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(consumerFactory, containerProps);
return container;
}
以下示例展示瞭如何為 @KafkaListener 設定 idleEventInterval
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.getContainerProperties().setIdleEventInterval(60000L);
...
return factory;
}
在這些情況下,當容器空閒時,每分鐘釋出一次事件。
如果由於某種原因,消費者 poll() 方法沒有退出,則不會收到任何訊息,也無法生成空閒事件(這在早期版本的 kafka-clients 中是一個問題,當 broker 無法訪問時)。在這種情況下,如果 poll 在 3x pollTimeout 屬性內沒有返回,則容器會發布 NonResponsiveConsumerEvent。預設情況下,此檢查在每個容器中每 30 秒執行一次。您可以透過在配置監聽器容器時設定 ContainerProperties 中的 monitorInterval(預設 30 秒)和 noPollThreshold(預設 3.0)屬性來修改此行為。noPollThreshold 應該大於 1.0,以避免由於競態條件而收到虛假事件。接收此類事件可以讓您停止容器,從而喚醒消費者以便它停止。
從 2.6.2 版本開始,如果容器已釋出 ListenerContainerIdleEvent,則在隨後收到記錄時,它將釋出 ListenerContainerNoLongerIdleEvent。
事件消費
您可以透過實現 ApplicationListener 來捕獲這些事件——可以是通用監聽器,也可以是僅接收此特定事件的窄範圍監聽器。您還可以使用 Spring Framework 4.2 中引入的 @EventListener。
下一個示例將 @KafkaListener 和 @EventListener 組合到一個類中。您應該明白,應用程式監聽器會收到所有容器的事件,因此如果您想根據哪個容器空閒來採取特定操作,您可能需要檢查監聽器 ID。您也可以為此目的使用 @EventListener 的 condition。
有關事件屬性的資訊,請參閱應用事件。
事件通常在消費者執行緒上釋出,因此可以安全地與 Consumer 物件互動。
以下示例同時使用 @KafkaListener 和 @EventListener
public class Listener {
@KafkaListener(id = "qux", topics = "annotated")
public void listen4(@Payload String foo, Acknowledgment ack) {
...
}
@EventListener(condition = "event.listenerId.startsWith('qux-')")
public void eventHandler(ListenerContainerIdleEvent event) {
...
}
}
事件監聽器會看到所有容器的事件。因此,在前面的示例中,我們根據監聽器 ID 縮小了收到的事件範圍。由於為 @KafkaListener 建立的容器支援併發性,因此實際容器被命名為 id-n,其中 n 是每個例項的唯一值以支援併發性。這就是我們在條件中使用 startsWith 的原因。 |
如果您希望使用空閒事件來停止監聽器容器,則不應在呼叫監聽器的執行緒上呼叫 container.stop()。這樣做會導致延遲和不必要的日誌訊息。相反,您應該將事件交給另一個執行緒,然後該執行緒可以停止容器。此外,如果它是子容器,則不應 stop() 容器例項。您應該停止併發容器。 |
空閒時的當前位置
請注意,您可以透過在監聽器中實現 ConsumerSeekAware 來獲取檢測到空閒時的當前位置。請參閱查詢中的 onIdleContainer()。