訊息監聽器容器

提供了兩個 MessageListenerContainer 實現

  • KafkaMessageListenerContainer

  • ConcurrentMessageListenerContainer

KafkaMessageListenerContainer 在單個執行緒上接收來自所有主題或分割槽的所有訊息。ConcurrentMessageListenerContainer 委託給一個或多個 KafkaMessageListenerContainer 例項以提供多執行緒消費。

從版本 2.2.7 開始,你可以向監聽器容器新增一個 RecordInterceptor;它將在呼叫監聽器之前被呼叫,允許檢查或修改記錄。如果攔截器返回 null,則不呼叫監聽器。從版本 2.7 開始,它具有在監聽器退出(正常退出或丟擲異常)後呼叫的附加方法。此外,從版本 2.7 開始,現在有一個 BatchInterceptor,為 批次監聽器 提供類似的功能。此外,ConsumerAwareRecordInterceptor(和 BatchInterceptor)提供了對 Consumer<?, ?> 的訪問。例如,這可以用於在攔截器中訪問消費者指標。

你不應在這些攔截器中執行任何影響消費者位置和/或已提交偏移量的方法;容器需要管理此類資訊。
如果攔截器修改了記錄(透過建立新記錄),則 topicpartitionoffset 必須保持不變,以避免意外的副作用,例如記錄丟失。

CompositeRecordInterceptorCompositeBatchInterceptor 可用於呼叫多個攔截器。

從版本 4.0 開始,AbstractKafkaListenerContainerFactoryAbstractMessageListenerContainergetRecordInterceptor()getBatchInterceptor() 公開為公共方法。如果返回的攔截器是 CompositeRecordInterceptorCompositeBatchInterceptor 的例項,則即使在建立了擴充套件 AbstractMessageListenerContainer 的容器例項並已配置 RecordInterceptorBatchInterceptor 之後,也可以向其新增額外的 RecordInterceptorBatchInterceptor 例項。以下示例顯示瞭如何實現:

public void configureRecordInterceptor(AbstractKafkaListenerContainerFactory<Integer, String> containerFactory) {
    CompositeRecordInterceptor compositeInterceptor;

    RecordInterceptor<Integer, String> previousInterceptor = containerFactory.getRecordInterceptor();
    if (previousInterceptor instanceof CompositeRecordInterceptor interceptor) {
        compositeInterceptor = interceptor;
    } else {
        compositeInterceptor = new CompositeRecordInterceptor<>();
        containerFactory.setRecordInterceptor(compositeInterceptor);
        if (previousInterceptor != null) {
            compositeInterceptor.addRecordInterceptor(previousInterceptor);
        }
    }

    RecordInterceptor<Integer, String> recordInterceptor1 = new RecordInterceptor() {...};
    RecordInterceptor<Integer, String> recordInterceptor2 = new RecordInterceptor() {...};

    compositeInterceptor.addRecordInterceptor(recordInterceptor1);
    compositeInterceptor.addRecordInterceptor(recordInterceptor2);
}

預設情況下,從版本 2.8 開始,當使用事務時,攔截器在事務開始之前被呼叫。你可以將監聽器容器的 interceptBeforeTx 屬性設定為 false,以便在事務開始後呼叫攔截器。從版本 2.9 開始,這將適用於任何事務管理器,而不僅僅是 KafkaAwareTransactionManager。這允許,例如,攔截器參與由容器啟動的 JDBC 事務。

從版本 2.3.8、2.4.6 開始,當併發度大於 1 時,ConcurrentMessageListenerContainer 現在支援 靜態成員資格group.instance.id 會以 -n 作為字尾,其中 n1 開始。這與增加的 session.timeout.ms 一起,可用於減少重新平衡事件,例如,當應用程式例項重新啟動時。

使用 KafkaMessageListenerContainer

提供以下建構函式

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties)

它在一個 ContainerProperties 物件中接收 ConsumerFactory 以及關於主題和分割槽的資訊,以及其他配置。ContainerProperties 具有以下建構函式

public ContainerProperties(TopicPartitionOffset... topicPartitions)

public ContainerProperties(String... topics)

public ContainerProperties(Pattern topicPattern)

第一個建構函式接受一個 TopicPartitionOffset 引數陣列,以明確指示容器使用哪些分割槽(使用消費者 assign() 方法)以及可選的初始偏移量。正值預設是絕對偏移量。負值預設是相對於分割槽內當前最後一個偏移量。提供了一個接受附加 boolean 引數的 TopicPartitionOffset 建構函式。如果此引數為 true,則初始偏移量(正或負)相對於此消費者的當前位置。偏移量在容器啟動時應用。第二個建構函式接受一個主題陣列,Kafka 根據 group.id 屬性分配分割槽 — 在組中分發分割槽。第三個建構函式使用正則表示式 Pattern 選擇主題。

要將 MessageListener 分配給容器,可以在建立容器時使用 ContainerProps.setMessageListener 方法。以下示例展示瞭如何實現:

ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
    ...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

請注意,當建立 DefaultKafkaConsumerFactory 時,使用僅接受屬性的建構函式(如上所示)意味著鍵和值 Deserializer 類是從配置中獲取的。或者,可以將 Deserializer 例項傳遞給 DefaultKafkaConsumerFactory 建構函式用於鍵和/或值,在這種情況下,所有消費者共享相同的例項。另一個選項是提供 Supplier<Deserializer>(從版本 2.3 開始),它將用於為每個 Consumer 獲取單獨的 Deserializer 例項。

DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

有關您可以設定的各種屬性的更多資訊,請參閱 ContainerPropertiesJavadoc

從版本 2.1.1 開始,提供了一個名為 logContainerConfig 的新屬性。當 true 且啟用 INFO 日誌時,每個監聽器容器都會寫入一條日誌訊息,總結其配置屬性。

預設情況下,主題偏移量提交的日誌記錄在 DEBUG 日誌級別執行。從版本 2.1.2 開始,ContainerProperties 中的一個名為 commitLogLevel 的屬性允許你指定這些訊息的日誌級別。例如,要將日誌級別更改為 INFO,你可以使用 containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);

從版本 2.2 開始,添加了一個名為 missingTopicsFatal 的新容器屬性(預設值:從 2.3.4 開始為 false)。如果任何配置的主題不存在於代理上,此屬性將阻止容器啟動。如果容器配置為偵聽主題模式(正則表示式),則此屬性不適用。以前,容器執行緒在 consumer.poll() 方法內迴圈,等待主題出現,同時記錄大量訊息。除了日誌之外,沒有跡象表明存在問題。

從版本 2.8 開始,引入了一個新的容器屬性 authExceptionRetryInterval。這使得容器在從 KafkaConsumer 獲取任何 AuthenticationExceptionAuthorizationException 後重試獲取訊息。這可能發生在,例如,配置的使用者被拒絕訪問讀取某個主題或憑據不正確時。定義 authExceptionRetryInterval 允許容器在授予正確許可權後恢復。

預設情況下,未配置間隔 - 身份驗證和授權錯誤被視為致命錯誤,導致容器停止。

從版本 2.8 開始,當建立消費者工廠時,如果你提供反序列化器作為物件(在建構函式中或透過設定器),工廠將呼叫 configure() 方法來用配置屬性配置它們。

使用 ConcurrentMessageListenerContainer

唯一的建構函式與 KafkaListenerContainer 建構函式類似。以下列表顯示了建構函式的簽名

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                            ContainerProperties containerProperties)

它還有一個 concurrency 屬性。例如,container.setConcurrency(3) 會建立三個 KafkaMessageListenerContainer 例項。

如果容器屬性配置為主題(或主題模式),Kafka 會使用其組管理功能將分割槽分配給消費者。

當監聽多個主題時,預設分割槽分配可能與你預期不符。例如,如果你有三個主題,每個主題有五個分割槽,並且你想使用 concurrency=15,你只會看到五個活躍的消費者,每個消費者分配每個主題的一個分割槽,而其他 10 個消費者處於空閒狀態。這是因為預設的 Kafka ConsumerPartitionAssignorRangeAssignor(請參閱其 Javadoc)。對於這種情況,你可能需要考慮使用 RoundRobinAssignor,它將分割槽分配給所有消費者。然後,每個消費者被分配一個主題或分割槽。要更改 ConsumerPartitionAssignor,你可以在提供給 DefaultKafkaConsumerFactory 的屬性中設定 partition.assignment.strategy 消費者屬性(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)。

使用 Spring Boot 時,您可以按如下方式設定策略

spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor

當容器屬性配置了 TopicPartitionOffset 時,ConcurrentMessageListenerContainer 會將 TopicPartitionOffset 例項分發給委託的 KafkaMessageListenerContainer 例項。

例如,如果提供了六個 TopicPartitionOffset 例項,並且 concurrency3;每個容器將獲得兩個分割槽。對於五個 TopicPartitionOffset 例項,兩個容器獲得兩個分割槽,第三個獲得一個。如果 concurrency 大於 TopicPartitions 的數量,則 concurrency 將向下調整,使得每個容器獲得一個分割槽。

client.id 屬性(如果設定)會附加 -n,其中 n 是與併發性對應的消費者例項。當啟用 JMX 時,這是為 MBean 提供唯一名稱所必需的。

從版本 1.3 開始,MessageListenerContainer 提供了對底層 KafkaConsumer 指標的訪問。對於 ConcurrentMessageListenerContainermetrics() 方法返回所有目標 KafkaMessageListenerContainer 例項的指標。這些指標按底層 KafkaConsumer 提供的 client-id 分組到 Map<MetricName, ? extends Metric> 中。

從版本 2.3 開始,ContainerProperties 提供了 idleBetweenPolls 選項,允許監聽器容器中的主迴圈在 KafkaConsumer.poll() 呼叫之間休眠。實際的休眠間隔是根據提供的選項以及 max.poll.interval.ms 消費者配置與當前記錄批處理時間之間的差異的最小值來選擇的。

提交偏移量

提供了幾個提交偏移量的選項。如果 enable.auto.commit 消費者屬性為 true,Kafka 會根據其配置自動提交偏移量。如果為 false,容器支援幾種 AckMode 設定(在下一列表中描述)。預設的 AckModeBATCH。從版本 2.3 開始,除非在配置中明確設定,否則框架會將 enable.auto.commit 設定為 false。此前,如果未設定此屬性,則使用 Kafka 預設值(true)。

消費者 poll() 方法返回一個或多個 ConsumerRecordsMessageListener 為每個記錄呼叫。以下列表描述了容器對每個 AckMode(不使用事務時)採取的操作

  • RECORD:處理記錄後,當監聽器返回時提交偏移量。

  • BATCH:當 poll() 返回的所有記錄都已處理完畢時,提交偏移量。

  • TIME:當 poll() 返回的所有記錄都已處理完畢,並且自上次提交以來已超過 ackTime 時,提交偏移量。

  • COUNT:當 poll() 返回的所有記錄都已處理完畢,並且自上次提交以來已接收到 ackCount 條記錄時,提交偏移量。

  • COUNT_TIME:類似於 TIMECOUNT,但如果任一條件為 true,則執行提交。

  • MANUAL:訊息監聽器負責 acknowledge() Acknowledgment。之後,應用與 BATCH 相同的語義。

  • MANUAL_IMMEDIATE:當監聽器呼叫 Acknowledgment.acknowledge() 方法時立即提交偏移量。

使用事務時,偏移量會發送到事務,其語義等同於 RECORDBATCH,具體取決於監聽器型別(記錄或批次)。

MANUALMANUAL_IMMEDIATE 要求監聽器是 AcknowledgingMessageListenerBatchAcknowledgingMessageListener。請參閱 訊息監聽器

根據 syncCommits 容器屬性,將使用消費者上的 commitSync()commitAsync() 方法。syncCommits 預設為 true;另請參閱 setSyncCommitTimeout。參閱 setCommitCallback 以獲取非同步提交的結果;預設的回撥是 LoggingCommitCallback,它記錄錯誤(並在除錯級別記錄成功)。

由於監聽器容器有自己的提交偏移量機制,因此它更傾向於將 Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 設定為 false。從版本 2.3 開始,除非在消費者工廠或容器的消費者屬性覆蓋中明確設定,否則它會無條件地將其設定為 false。

Acknowledgment 具有以下方法

public interface Acknowledgment {

    void acknowledge();

}

此方法允許監聽器控制何時提交偏移量。

從版本 2.3 開始,Acknowledgment 介面新增了兩個方法 nack(long sleep)nack(int index, long sleep)。第一個用於記錄監聽器,第二個用於批次監聽器。為你的監聽器型別呼叫錯誤的方法將丟擲 IllegalStateException

如果要提交部分批次,請使用 nack()。當使用事務時,將 AckMode 設定為 MANUAL;呼叫 nack() 會將成功處理的記錄的偏移量傳送到事務。
nack() 只能在呼叫你的監聽器的消費者執行緒上呼叫。
使用 亂序提交 時不允許使用 nack()

對於記錄監聽器,當呼叫 nack() 時,任何待處理的偏移量都會被提交,上次輪詢中剩餘的記錄會被丟棄,並且會對它們的分割槽執行查詢操作,以便失敗的記錄和未處理的記錄在下一次 poll() 時重新投遞。可以透過設定 sleep 引數來在重新投遞之前暫停消費者。這與當容器配置了 DefaultErrorHandler 時丟擲異常的功能類似。

nack() 會暫停整個監聽器,包括所有分配的分割槽,持續指定的休眠時間。

當使用批次監聽器時,您可以指定批處理中發生故障的索引。當呼叫 nack() 時,索引之前的記錄的偏移量將被提交,並對失敗和已丟棄記錄的分割槽執行查詢,以便它們將在下一次 poll() 時重新投遞。

有關更多資訊,請參閱容器錯誤處理器

消費者在休眠期間會暫停,以便我們繼續輪詢代理以保持消費者活躍。實際的休眠時間及其解析度取決於容器的 pollTimeout,預設為 5 秒。最小休眠時間等於 pollTimeout,所有休眠時間都將是它的倍數。對於較小的休眠時間或為了提高其準確性,請考慮減少容器的 pollTimeout

從版本 3.0.10 開始,批處理監聽器可以使用 Acknowledgment 引數上的 acknowledge(index) 提交批處理的部分偏移量。當呼叫此方法時,索引處的記錄(以及所有先前的記錄)的偏移量將被提交。在執行部分批處理提交後呼叫 acknowledge() 將提交批處理剩餘部分的偏移量。以下限制適用

  • 需要 AckMode.MANUAL_IMMEDIATE

  • 該方法必須在監聽器執行緒上呼叫

  • 監聽器必須消費一個 List 而不是原始的 ConsumerRecords

  • 索引必須在列表元素的範圍內

  • 索引必須大於之前呼叫中使用的索引

這些限制是強制執行的,該方法將根據違規情況丟擲 IllegalArgumentExceptionIllegalStateException

監聽器容器自動啟動

監聽器容器實現 SmartLifecycle,預設情況下 autoStartuptrue。容器在較晚的階段(Integer.MAX-VALUE - 100)啟動。其他實現 SmartLifecycle 的元件,為了處理來自監聽器的資料,應該在較早的階段啟動。- 100 為後續階段留出了空間,以便在容器之後啟用元件自動啟動。

© . This site is unofficial and not affiliated with VMware.