訊息監聽器容器
提供了兩個 MessageListenerContainer 實現
-
KafkaMessageListenerContainer -
ConcurrentMessageListenerContainer
KafkaMessageListenerContainer 在單個執行緒上接收來自所有主題或分割槽的所有訊息。ConcurrentMessageListenerContainer 委託給一個或多個 KafkaMessageListenerContainer 例項以提供多執行緒消費。
從版本 2.2.7 開始,你可以向監聽器容器新增一個 RecordInterceptor;它將在呼叫監聽器之前被呼叫,允許檢查或修改記錄。如果攔截器返回 null,則不呼叫監聽器。從版本 2.7 開始,它具有在監聽器退出(正常退出或丟擲異常)後呼叫的附加方法。此外,從版本 2.7 開始,現在有一個 BatchInterceptor,為 批次監聽器 提供類似的功能。此外,ConsumerAwareRecordInterceptor(和 BatchInterceptor)提供了對 Consumer<?, ?> 的訪問。例如,這可以用於在攔截器中訪問消費者指標。
| 你不應在這些攔截器中執行任何影響消費者位置和/或已提交偏移量的方法;容器需要管理此類資訊。 |
如果攔截器修改了記錄(透過建立新記錄),則 topic、partition 和 offset 必須保持不變,以避免意外的副作用,例如記錄丟失。 |
CompositeRecordInterceptor 和 CompositeBatchInterceptor 可用於呼叫多個攔截器。
從版本 4.0 開始,AbstractKafkaListenerContainerFactory 和 AbstractMessageListenerContainer 將 getRecordInterceptor() 和 getBatchInterceptor() 公開為公共方法。如果返回的攔截器是 CompositeRecordInterceptor 或 CompositeBatchInterceptor 的例項,則即使在建立了擴充套件 AbstractMessageListenerContainer 的容器例項並已配置 RecordInterceptor 或 BatchInterceptor 之後,也可以向其新增額外的 RecordInterceptor 或 BatchInterceptor 例項。以下示例顯示瞭如何實現:
public void configureRecordInterceptor(AbstractKafkaListenerContainerFactory<Integer, String> containerFactory) {
CompositeRecordInterceptor compositeInterceptor;
RecordInterceptor<Integer, String> previousInterceptor = containerFactory.getRecordInterceptor();
if (previousInterceptor instanceof CompositeRecordInterceptor interceptor) {
compositeInterceptor = interceptor;
} else {
compositeInterceptor = new CompositeRecordInterceptor<>();
containerFactory.setRecordInterceptor(compositeInterceptor);
if (previousInterceptor != null) {
compositeInterceptor.addRecordInterceptor(previousInterceptor);
}
}
RecordInterceptor<Integer, String> recordInterceptor1 = new RecordInterceptor() {...};
RecordInterceptor<Integer, String> recordInterceptor2 = new RecordInterceptor() {...};
compositeInterceptor.addRecordInterceptor(recordInterceptor1);
compositeInterceptor.addRecordInterceptor(recordInterceptor2);
}
預設情況下,從版本 2.8 開始,當使用事務時,攔截器在事務開始之前被呼叫。你可以將監聽器容器的 interceptBeforeTx 屬性設定為 false,以便在事務開始後呼叫攔截器。從版本 2.9 開始,這將適用於任何事務管理器,而不僅僅是 KafkaAwareTransactionManager。這允許,例如,攔截器參與由容器啟動的 JDBC 事務。
從版本 2.3.8、2.4.6 開始,當併發度大於 1 時,ConcurrentMessageListenerContainer 現在支援 靜態成員資格。group.instance.id 會以 -n 作為字尾,其中 n 從 1 開始。這與增加的 session.timeout.ms 一起,可用於減少重新平衡事件,例如,當應用程式例項重新啟動時。
使用 KafkaMessageListenerContainer
提供以下建構函式
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
它在一個 ContainerProperties 物件中接收 ConsumerFactory 以及關於主題和分割槽的資訊,以及其他配置。ContainerProperties 具有以下建構函式
public ContainerProperties(TopicPartitionOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)
第一個建構函式接受一個 TopicPartitionOffset 引數陣列,以明確指示容器使用哪些分割槽(使用消費者 assign() 方法)以及可選的初始偏移量。正值預設是絕對偏移量。負值預設是相對於分割槽內當前最後一個偏移量。提供了一個接受附加 boolean 引數的 TopicPartitionOffset 建構函式。如果此引數為 true,則初始偏移量(正或負)相對於此消費者的當前位置。偏移量在容器啟動時應用。第二個建構函式接受一個主題陣列,Kafka 根據 group.id 屬性分配分割槽 — 在組中分發分割槽。第三個建構函式使用正則表示式 Pattern 選擇主題。
要將 MessageListener 分配給容器,可以在建立容器時使用 ContainerProps.setMessageListener 方法。以下示例展示瞭如何實現:
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
請注意,當建立 DefaultKafkaConsumerFactory 時,使用僅接受屬性的建構函式(如上所示)意味著鍵和值 Deserializer 類是從配置中獲取的。或者,可以將 Deserializer 例項傳遞給 DefaultKafkaConsumerFactory 建構函式用於鍵和/或值,在這種情況下,所有消費者共享相同的例項。另一個選項是提供 Supplier<Deserializer>(從版本 2.3 開始),它將用於為每個 Consumer 獲取單獨的 Deserializer 例項。
DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
有關您可以設定的各種屬性的更多資訊,請參閱 ContainerProperties 的 Javadoc。
從版本 2.1.1 開始,提供了一個名為 logContainerConfig 的新屬性。當 true 且啟用 INFO 日誌時,每個監聽器容器都會寫入一條日誌訊息,總結其配置屬性。
預設情況下,主題偏移量提交的日誌記錄在 DEBUG 日誌級別執行。從版本 2.1.2 開始,ContainerProperties 中的一個名為 commitLogLevel 的屬性允許你指定這些訊息的日誌級別。例如,要將日誌級別更改為 INFO,你可以使用 containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);。
從版本 2.2 開始,添加了一個名為 missingTopicsFatal 的新容器屬性(預設值:從 2.3.4 開始為 false)。如果任何配置的主題不存在於代理上,此屬性將阻止容器啟動。如果容器配置為偵聽主題模式(正則表示式),則此屬性不適用。以前,容器執行緒在 consumer.poll() 方法內迴圈,等待主題出現,同時記錄大量訊息。除了日誌之外,沒有跡象表明存在問題。
從版本 2.8 開始,引入了一個新的容器屬性 authExceptionRetryInterval。這使得容器在從 KafkaConsumer 獲取任何 AuthenticationException 或 AuthorizationException 後重試獲取訊息。這可能發生在,例如,配置的使用者被拒絕訪問讀取某個主題或憑據不正確時。定義 authExceptionRetryInterval 允許容器在授予正確許可權後恢復。
| 預設情況下,未配置間隔 - 身份驗證和授權錯誤被視為致命錯誤,導致容器停止。 |
從版本 2.8 開始,當建立消費者工廠時,如果你提供反序列化器作為物件(在建構函式中或透過設定器),工廠將呼叫 configure() 方法來用配置屬性配置它們。
使用 ConcurrentMessageListenerContainer
唯一的建構函式與 KafkaListenerContainer 建構函式類似。以下列表顯示了建構函式的簽名
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
它還有一個 concurrency 屬性。例如,container.setConcurrency(3) 會建立三個 KafkaMessageListenerContainer 例項。
如果容器屬性配置為主題(或主題模式),Kafka 會使用其組管理功能將分割槽分配給消費者。
|
當監聽多個主題時,預設分割槽分配可能與你預期不符。例如,如果你有三個主題,每個主題有五個分割槽,並且你想使用 使用 Spring Boot 時,您可以按如下方式設定策略
|
當容器屬性配置了 TopicPartitionOffset 時,ConcurrentMessageListenerContainer 會將 TopicPartitionOffset 例項分發給委託的 KafkaMessageListenerContainer 例項。
例如,如果提供了六個 TopicPartitionOffset 例項,並且 concurrency 為 3;每個容器將獲得兩個分割槽。對於五個 TopicPartitionOffset 例項,兩個容器獲得兩個分割槽,第三個獲得一個。如果 concurrency 大於 TopicPartitions 的數量,則 concurrency 將向下調整,使得每個容器獲得一個分割槽。
client.id 屬性(如果設定)會附加 -n,其中 n 是與併發性對應的消費者例項。當啟用 JMX 時,這是為 MBean 提供唯一名稱所必需的。 |
從版本 1.3 開始,MessageListenerContainer 提供了對底層 KafkaConsumer 指標的訪問。對於 ConcurrentMessageListenerContainer,metrics() 方法返回所有目標 KafkaMessageListenerContainer 例項的指標。這些指標按底層 KafkaConsumer 提供的 client-id 分組到 Map<MetricName, ? extends Metric> 中。
從版本 2.3 開始,ContainerProperties 提供了 idleBetweenPolls 選項,允許監聽器容器中的主迴圈在 KafkaConsumer.poll() 呼叫之間休眠。實際的休眠間隔是根據提供的選項以及 max.poll.interval.ms 消費者配置與當前記錄批處理時間之間的差異的最小值來選擇的。
提交偏移量
提供了幾個提交偏移量的選項。如果 enable.auto.commit 消費者屬性為 true,Kafka 會根據其配置自動提交偏移量。如果為 false,容器支援幾種 AckMode 設定(在下一列表中描述)。預設的 AckMode 是 BATCH。從版本 2.3 開始,除非在配置中明確設定,否則框架會將 enable.auto.commit 設定為 false。此前,如果未設定此屬性,則使用 Kafka 預設值(true)。
消費者 poll() 方法返回一個或多個 ConsumerRecords。MessageListener 為每個記錄呼叫。以下列表描述了容器對每個 AckMode(不使用事務時)採取的操作
-
RECORD:處理記錄後,當監聽器返回時提交偏移量。 -
BATCH:當poll()返回的所有記錄都已處理完畢時,提交偏移量。 -
TIME:當poll()返回的所有記錄都已處理完畢,並且自上次提交以來已超過ackTime時,提交偏移量。 -
COUNT:當poll()返回的所有記錄都已處理完畢,並且自上次提交以來已接收到ackCount條記錄時,提交偏移量。 -
COUNT_TIME:類似於TIME和COUNT,但如果任一條件為true,則執行提交。 -
MANUAL:訊息監聽器負責acknowledge()Acknowledgment。之後,應用與BATCH相同的語義。 -
MANUAL_IMMEDIATE:當監聽器呼叫Acknowledgment.acknowledge()方法時立即提交偏移量。
使用事務時,偏移量會發送到事務,其語義等同於 RECORD 或 BATCH,具體取決於監聽器型別(記錄或批次)。
MANUAL 和 MANUAL_IMMEDIATE 要求監聽器是 AcknowledgingMessageListener 或 BatchAcknowledgingMessageListener。請參閱 訊息監聽器。 |
根據 syncCommits 容器屬性,將使用消費者上的 commitSync() 或 commitAsync() 方法。syncCommits 預設為 true;另請參閱 setSyncCommitTimeout。參閱 setCommitCallback 以獲取非同步提交的結果;預設的回撥是 LoggingCommitCallback,它記錄錯誤(並在除錯級別記錄成功)。
由於監聽器容器有自己的提交偏移量機制,因此它更傾向於將 Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 設定為 false。從版本 2.3 開始,除非在消費者工廠或容器的消費者屬性覆蓋中明確設定,否則它會無條件地將其設定為 false。
Acknowledgment 具有以下方法
public interface Acknowledgment {
void acknowledge();
}
此方法允許監聽器控制何時提交偏移量。
從版本 2.3 開始,Acknowledgment 介面新增了兩個方法 nack(long sleep) 和 nack(int index, long sleep)。第一個用於記錄監聽器,第二個用於批次監聽器。為你的監聽器型別呼叫錯誤的方法將丟擲 IllegalStateException。
如果要提交部分批次,請使用 nack()。當使用事務時,將 AckMode 設定為 MANUAL;呼叫 nack() 會將成功處理的記錄的偏移量傳送到事務。 |
nack() 只能在呼叫你的監聽器的消費者執行緒上呼叫。 |
使用 亂序提交 時不允許使用 nack()。 |
對於記錄監聽器,當呼叫 nack() 時,任何待處理的偏移量都會被提交,上次輪詢中剩餘的記錄會被丟棄,並且會對它們的分割槽執行查詢操作,以便失敗的記錄和未處理的記錄在下一次 poll() 時重新投遞。可以透過設定 sleep 引數來在重新投遞之前暫停消費者。這與當容器配置了 DefaultErrorHandler 時丟擲異常的功能類似。
nack() 會暫停整個監聽器,包括所有分配的分割槽,持續指定的休眠時間。 |
當使用批次監聽器時,您可以指定批處理中發生故障的索引。當呼叫 nack() 時,索引之前的記錄的偏移量將被提交,並對失敗和已丟棄記錄的分割槽執行查詢,以便它們將在下一次 poll() 時重新投遞。
有關更多資訊,請參閱容器錯誤處理器。
消費者在休眠期間會暫停,以便我們繼續輪詢代理以保持消費者活躍。實際的休眠時間及其解析度取決於容器的 pollTimeout,預設為 5 秒。最小休眠時間等於 pollTimeout,所有休眠時間都將是它的倍數。對於較小的休眠時間或為了提高其準確性,請考慮減少容器的 pollTimeout。 |
從版本 3.0.10 開始,批處理監聽器可以使用 Acknowledgment 引數上的 acknowledge(index) 提交批處理的部分偏移量。當呼叫此方法時,索引處的記錄(以及所有先前的記錄)的偏移量將被提交。在執行部分批處理提交後呼叫 acknowledge() 將提交批處理剩餘部分的偏移量。以下限制適用
-
需要
AckMode.MANUAL_IMMEDIATE -
該方法必須在監聽器執行緒上呼叫
-
監聽器必須消費一個
List而不是原始的ConsumerRecords -
索引必須在列表元素的範圍內
-
索引必須大於之前呼叫中使用的索引
這些限制是強制執行的,該方法將根據違規情況丟擲 IllegalArgumentException 或 IllegalStateException。