訊息監聽器容器

提供了兩種 MessageListenerContainer 實現

  • KafkaMessageListenerContainer

  • ConcurrentMessageListenerContainer

KafkaMessageListenerContainer 在單個執行緒上接收所有主題或分割槽的所有訊息。ConcurrentMessageListenerContainer 委託給一個或多個 KafkaMessageListenerContainer 例項以提供多執行緒消費。

從版本 2.2.7 開始,您可以向監聽器容器新增 RecordInterceptor;它將在呼叫監聽器之前被呼叫,允許檢查或修改記錄。如果攔截器返回 null,則不會呼叫監聽器。從版本 2.7 開始,它增加了在監聽器退出(正常退出或丟擲異常)後呼叫的額外方法。此外,從版本 2.7 開始,新增了 BatchInterceptor,為批次監聽器提供類似的功能。另外,ConsumerAwareRecordInterceptor(和 BatchInterceptor)提供對 Consumer<?, ?> 的訪問。例如,這可以用於在攔截器中訪問消費者指標。

您不應該在這些攔截器中執行任何影響消費者位置和/或已提交偏移量的方法;容器需要管理這些資訊。
如果攔截器修改了記錄(透過建立一個新記錄),主題、分割槽和偏移量必須保持不變,以避免意外的副作用,例如記錄丟失。

CompositeRecordInterceptorCompositeBatchInterceptor 可用於呼叫多個攔截器。

預設情況下,從版本 2.8 開始,使用事務時,攔截器在事務開始之前被呼叫。您可以將監聽器容器的 interceptBeforeTx 屬性設定為 false,以便在事務開始後呼叫攔截器。從版本 2.9 開始,這適用於任何事務管理器,而不僅僅是 KafkaAwareTransactionManager。例如,這使得攔截器能夠參與由容器啟動的 JDBC 事務。

從版本 2.3.8、2.4.6 開始,當併發度大於 1 時,ConcurrentMessageListenerContainer 現在支援靜態成員資格(Static Membership)group.instance.id 的字尾為 -n,其中 n1 開始。這與增加的 session.timeout.ms 一起使用,可以減少重平衡事件,例如在應用例項重新啟動時。

使用 KafkaMessageListenerContainer

提供了以下建構函式

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties)

它接收一個 ConsumerFactory 以及主題和分割槽的資訊,以及其他配置,這些都在 ContainerProperties 物件中。ContainerProperties 提供了以下建構函式

public ContainerProperties(TopicPartitionOffset... topicPartitions)

public ContainerProperties(String... topics)

public ContainerProperties(Pattern topicPattern)

第一個建構函式接受一個 TopicPartitionOffset 引數陣列,以明確指示容器使用哪些分割槽(使用 consumer 的 assign() 方法)以及可選的初始偏移量。正值預設是絕對偏移量。負值預設是相對於分割槽內當前最後一個偏移量的相對值。提供了 TopicPartitionOffset 的一個建構函式,它接受一個額外的 boolean 引數。如果此引數為 true,則初始偏移量(正或負)是相對於此消費者當前位置的相對值。偏移量在容器啟動時應用。第二個接受一個主題陣列,Kafka 根據 group.id 屬性分配分割槽——在組內分佈分割槽。第三個使用正則表示式 Pattern 選擇主題。

要將 MessageListener 分配給容器,您可以在建立容器時使用 ContainerProps.setMessageListener 方法。以下示例展示瞭如何操作

ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
    ...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

請注意,建立 DefaultKafkaConsumerFactory 時,使用如上所示只接受屬性的建構函式意味著鍵和值的 Deserializer 類是從配置中獲取的。或者,可以將 Deserializer 例項傳遞給 DefaultKafkaConsumerFactory 的建構函式,用於鍵和/或值,在這種情況下,所有 Consumers 將共享相同的例項。另一種選擇是提供 Supplier<Deserializer> (從版本 2.3 開始),用於為每個 Consumer 獲取單獨的 Deserializer 例項

DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

有關 ContainerProperties 的各種屬性的更多資訊,請參閱其Javadoc

從版本 2.1.1 開始,引入了一個名為 logContainerConfig 的新屬性。當此屬性為 trueINFO 級別日誌啟用時,每個監聽器容器會寫入一條日誌訊息,總結其配置屬性。

預設情況下,主題偏移量提交的日誌記錄級別是 DEBUG。從版本 2.1.2 開始,ContainerProperties 中新增了一個名為 commitLogLevel 的屬性,允許您指定這些訊息的日誌級別。例如,要將日誌級別更改為 INFO,您可以使用 containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);

從版本 2.2 開始,新增了一個名為 missingTopicsFatal 的容器屬性(預設值:從 2.3.4 開始為 false)。如果配置的任何主題在 broker 上不存在,這將阻止容器啟動。如果容器配置為監聽主題模式(正則表示式),則此屬性不適用。之前,容器執行緒會在 consumer.poll() 方法內迴圈等待主題出現,同時記錄大量訊息。除了日誌之外,沒有任何跡象表明存在問題。

從版本 2.8 開始,引入了一個新的容器屬性 authExceptionRetryInterval。當從 KafkaConsumer 獲取到任何 AuthenticationExceptionAuthorizationException 時,這會導致容器重試獲取訊息。例如,當配置的使用者被拒絕讀取某個主題或憑據不正確時,可能會發生這種情況。定義 authExceptionRetryInterval 允許容器在授予適當許可權後恢復。

預設情況下,未配置間隔 - 認證和授權錯誤被視為致命錯誤,會導致容器停止。

從版本 2.8 開始,建立消費者工廠時,如果您以物件形式(在建構函式中或透過 setter)提供反序列化器,工廠將呼叫 configure() 方法,並使用配置屬性對其進行配置。

使用 ConcurrentMessageListenerContainer

其唯一的建構函式與 KafkaListenerContainer 的建構函式類似。以下清單顯示了建構函式的簽名

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                            ContainerProperties containerProperties)

它還有一個 concurrency 屬性。例如,container.setConcurrency(3) 會建立三個 KafkaMessageListenerContainer 例項。

如果容器屬性配置為主題(或主題模式),Kafka 會使用其組管理功能在消費者之間分配分割槽。

監聽多個主題時,預設的分割槽分配可能與您的預期不同。例如,如果您有三個主題,每個主題有五個分割槽,並且您想使用 concurrency=15,您會看到只有五個活動消費者,每個消費者被分配到每個主題的一個分割槽,而其他 10 個消費者處於空閒狀態。這是因為預設的 Kafka ConsumerPartitionAssignorRangeAssignor(參見其 Javadoc)。對於這種情況,您可能需要考慮使用 RoundRobinAssignor,它會將分割槽分佈到所有消費者上。然後,每個消費者被分配到一個主題或分割槽。要更改 ConsumerPartitionAssignor,您可以在提供給 DefaultKafkaConsumerFactory 的屬性中設定 partition.assignment.strategy 消費者屬性(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)。

使用 Spring Boot 時,您可以按如下方式設定策略

spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor

當容器屬性配置了 TopicPartitionOffsets 時,ConcurrentMessageListenerContainer 會將 TopicPartitionOffset 例項分佈到委託的 KafkaMessageListenerContainer 例項上。

例如,如果提供了六個 TopicPartitionOffset 例項,且 concurrency3;每個容器將獲得兩個分割槽。對於五個 TopicPartitionOffset 例項,兩個容器將獲得兩個分割槽,第三個獲得一個。如果 concurrency 大於 TopicPartitions 的數量,則 concurrency 會向下調整,使得每個容器獲得一個分割槽。

client.id 屬性(如果設定)會附加 -n,其中 n 對應於併發的消費者例項。當啟用 JMX 時,這對於為 MBeans 提供唯一的名稱是必需的。

從版本 1.3 開始,MessageListenerContainer 提供了訪問底層 KafkaConsumer 指標的功能。對於 ConcurrentMessageListenerContainermetrics() 方法返回所有目標 KafkaMessageListenerContainer 例項的指標。這些指標按底層 KafkaConsumer 提供的 client-id 分組到 Map<MetricName, ? extends Metric> 中。

從版本 2.3 開始,ContainerProperties 提供了一個 idleBetweenPolls 選項,允許監聽器容器中的主迴圈在 KafkaConsumer.poll() 呼叫之間休眠。實際的休眠間隔是提供的選項以及 max.poll.interval.ms 消費者配置與當前記錄批處理時間差的最小值。

提交偏移量

提供了幾種提交偏移量的選項。如果 enable.auto.commit 消費者屬性為 true,Kafka 會根據其配置自動提交偏移量。如果為 false,容器支援多種 AckMode 設定(在以下列表中描述)。預設的 AckModeBATCH。從版本 2.3 開始,除非在配置中明確設定,否則框架會將 enable.auto.commit 設定為 false。以前,如果未設定此屬性,則使用 Kafka 的預設值(true)。

消費者 poll() 方法返回一個或多個 ConsumerRecordsMessageListener 會為每條記錄呼叫一次。以下列表描述了容器在每種 AckMode 下采取的操作(不使用事務時)

  • RECORD:在監聽器處理完記錄返回後提交偏移量。

  • BATCH:在處理完 poll() 返回的所有記錄後提交偏移量。

  • TIME:在處理完 poll() 返回的所有記錄後提交偏移量,前提是自上次提交以來超過了 ackTime

  • COUNT:在處理完 poll() 返回的所有記錄後提交偏移量,前提是自上次提交以來收到了 ackCount 條記錄。

  • COUNT_TIME:類似於 TIMECOUNT,但只要任一條件為 true,就會執行提交。

  • MANUAL:訊息監聽器負責 acknowledge() Acknowledgment

  • MANUAL_IMMEDIATE:當監聽器呼叫 Acknowledgment.acknowledge() 方法時立即提交偏移量。

使用事務時,偏移量會發送到事務中,其語義等同於 RECORDBATCH,具體取決於監聽器型別(記錄或批次)。

MANUALMANUAL_IMMEDIATE 要求監聽器是 AcknowledgingMessageListenerBatchAcknowledgingMessageListener。參見訊息監聽器

根據 syncCommits 容器屬性,使用消費者上的 commitSync()commitAsync() 方法。syncCommits 預設值為 true;另請參閱 setSyncCommitTimeout。參見 setCommitCallback 以獲取非同步提交的結果;預設的回撥是 LoggingCommitCallback,它記錄錯誤(並在 debug 級別記錄成功)。

由於監聽器容器有自己的提交偏移量機制,它傾向於將 Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 設定為 false。從版本 2.3 開始,除非在消費者工廠或容器的消費者屬性覆蓋中明確設定,否則它會無條件地將其設定為 false。

Acknowledgment 提供了以下方法

public interface Acknowledgment {

    void acknowledge();

}

此方法允許監聽器控制何時提交偏移量。

從版本 2.3 開始,Acknowledgment 介面增加了兩個額外的方法 nack(long sleep)nack(int index, long sleep)。第一個用於記錄監聽器,第二個用於批次監聽器。為您的監聽器型別呼叫錯誤的方法將丟擲 IllegalStateException

如果您想提交部分批次,使用 nack(),在使用事務時,將 AckMode 設定為 MANUAL;呼叫 nack() 會將成功處理的記錄的偏移量傳送到事務。
nack() 只能在呼叫您的監聽器的消費者執行緒上呼叫。
使用亂序提交(Out of Order Commits)時不允許使用 nack()

對於記錄監聽器,呼叫 nack() 時,任何待提交的偏移量都會被提交,上次 poll() 返回的剩餘記錄會被丟棄,並對其分割槽執行 seek 操作,以便在下一次 poll() 時重新投遞失敗和未處理的記錄。透過設定 sleep 引數,可以在重新投遞之前暫停消費者。此功能類似於當容器配置了 DefaultErrorHandler 時丟擲異常。

nack() 會暫停整個監聽器及其所有已分配的分割槽,暫停時長為指定的 sleep 持續時間。

使用批次監聽器時,您可以指定批次內發生失敗的記錄索引。呼叫 nack() 時,會提交索引之前記錄的偏移量,並對失敗和丟棄的記錄所在的分割槽執行 seek 操作,以便在下一次 poll() 時重新投遞它們。

有關更多資訊,請參閱容器錯誤處理器

消費者在休眠期間會暫停,以便我們繼續輪詢 broker 以保持消費者活躍。實際的休眠時間及其精度取決於容器的 pollTimeout,預設為 5 秒。最小休眠時間等於 pollTimeout,並且所有休眠時間都將是它的倍數。對於較短的休眠時間或為了提高精度,可以考慮減少容器的 pollTimeout

從版本 3.0.10 開始,批次監聽器可以使用 Acknowledgment 引數上的 acknowledge(index) 方法來提交批次中部分記錄的偏移量。呼叫此方法時,索引處記錄(以及之前所有記錄)的偏移量將被提交。在執行部分批次提交後呼叫 acknowledge() 將提交批次剩餘記錄的偏移量。以下限制適用

  • 需要 AckMode.MANUAL_IMMEDIATE

  • 方法必須在監聽器執行緒上呼叫

  • 監聽器必須消費 List 而不是原始的 ConsumerRecords

  • 索引必須在列表元素的範圍內

  • 索引必須大於之前呼叫中使用的索引

這些限制將被強制執行,根據違規情況,方法將丟擲 IllegalArgumentExceptionIllegalStateException

監聽器容器自動啟動

監聽器容器實現了 SmartLifecycle,並且 autoStartup 預設值為 true。容器在較晚的階段啟動(Integer.MAX-VALUE - 100)。其他實現了 SmartLifecycle 並處理來自監聽器資料的元件應在更早的階段啟動。- 100 為後續階段留出空間,以便在容器啟動後自動啟動元件。