訊息監聽器容器
提供了兩種 MessageListenerContainer 實現
-
KafkaMessageListenerContainer -
ConcurrentMessageListenerContainer
KafkaMessageListenerContainer 在單個執行緒上接收所有主題或分割槽的所有訊息。ConcurrentMessageListenerContainer 委託給一個或多個 KafkaMessageListenerContainer 例項以提供多執行緒消費。
從版本 2.2.7 開始,您可以向監聽器容器新增 RecordInterceptor;它將在呼叫監聽器之前被呼叫,允許檢查或修改記錄。如果攔截器返回 null,則不會呼叫監聽器。從版本 2.7 開始,它增加了在監聽器退出(正常退出或丟擲異常)後呼叫的額外方法。此外,從版本 2.7 開始,新增了 BatchInterceptor,為批次監聽器提供類似的功能。另外,ConsumerAwareRecordInterceptor(和 BatchInterceptor)提供對 Consumer<?, ?> 的訪問。例如,這可以用於在攔截器中訪問消費者指標。
| 您不應該在這些攔截器中執行任何影響消費者位置和/或已提交偏移量的方法;容器需要管理這些資訊。 |
| 如果攔截器修改了記錄(透過建立一個新記錄),主題、分割槽和偏移量必須保持不變,以避免意外的副作用,例如記錄丟失。 |
CompositeRecordInterceptor 和 CompositeBatchInterceptor 可用於呼叫多個攔截器。
預設情況下,從版本 2.8 開始,使用事務時,攔截器在事務開始之前被呼叫。您可以將監聽器容器的 interceptBeforeTx 屬性設定為 false,以便在事務開始後呼叫攔截器。從版本 2.9 開始,這適用於任何事務管理器,而不僅僅是 KafkaAwareTransactionManager。例如,這使得攔截器能夠參與由容器啟動的 JDBC 事務。
從版本 2.3.8、2.4.6 開始,當併發度大於 1 時,ConcurrentMessageListenerContainer 現在支援靜態成員資格(Static Membership)。group.instance.id 的字尾為 -n,其中 n 從 1 開始。這與增加的 session.timeout.ms 一起使用,可以減少重平衡事件,例如在應用例項重新啟動時。
使用 KafkaMessageListenerContainer
提供了以下建構函式
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
它接收一個 ConsumerFactory 以及主題和分割槽的資訊,以及其他配置,這些都在 ContainerProperties 物件中。ContainerProperties 提供了以下建構函式
public ContainerProperties(TopicPartitionOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)
第一個建構函式接受一個 TopicPartitionOffset 引數陣列,以明確指示容器使用哪些分割槽(使用 consumer 的 assign() 方法)以及可選的初始偏移量。正值預設是絕對偏移量。負值預設是相對於分割槽內當前最後一個偏移量的相對值。提供了 TopicPartitionOffset 的一個建構函式,它接受一個額外的 boolean 引數。如果此引數為 true,則初始偏移量(正或負)是相對於此消費者當前位置的相對值。偏移量在容器啟動時應用。第二個接受一個主題陣列,Kafka 根據 group.id 屬性分配分割槽——在組內分佈分割槽。第三個使用正則表示式 Pattern 選擇主題。
要將 MessageListener 分配給容器,您可以在建立容器時使用 ContainerProps.setMessageListener 方法。以下示例展示瞭如何操作
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
請注意,建立 DefaultKafkaConsumerFactory 時,使用如上所示只接受屬性的建構函式意味著鍵和值的 Deserializer 類是從配置中獲取的。或者,可以將 Deserializer 例項傳遞給 DefaultKafkaConsumerFactory 的建構函式,用於鍵和/或值,在這種情況下,所有 Consumers 將共享相同的例項。另一種選擇是提供 Supplier<Deserializer> (從版本 2.3 開始),用於為每個 Consumer 獲取單獨的 Deserializer 例項
DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
有關 ContainerProperties 的各種屬性的更多資訊,請參閱其Javadoc。
從版本 2.1.1 開始,引入了一個名為 logContainerConfig 的新屬性。當此屬性為 true 且 INFO 級別日誌啟用時,每個監聽器容器會寫入一條日誌訊息,總結其配置屬性。
預設情況下,主題偏移量提交的日誌記錄級別是 DEBUG。從版本 2.1.2 開始,ContainerProperties 中新增了一個名為 commitLogLevel 的屬性,允許您指定這些訊息的日誌級別。例如,要將日誌級別更改為 INFO,您可以使用 containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);。
從版本 2.2 開始,新增了一個名為 missingTopicsFatal 的容器屬性(預設值:從 2.3.4 開始為 false)。如果配置的任何主題在 broker 上不存在,這將阻止容器啟動。如果容器配置為監聽主題模式(正則表示式),則此屬性不適用。之前,容器執行緒會在 consumer.poll() 方法內迴圈等待主題出現,同時記錄大量訊息。除了日誌之外,沒有任何跡象表明存在問題。
從版本 2.8 開始,引入了一個新的容器屬性 authExceptionRetryInterval。當從 KafkaConsumer 獲取到任何 AuthenticationException 或 AuthorizationException 時,這會導致容器重試獲取訊息。例如,當配置的使用者被拒絕讀取某個主題或憑據不正確時,可能會發生這種情況。定義 authExceptionRetryInterval 允許容器在授予適當許可權後恢復。
| 預設情況下,未配置間隔 - 認證和授權錯誤被視為致命錯誤,會導致容器停止。 |
從版本 2.8 開始,建立消費者工廠時,如果您以物件形式(在建構函式中或透過 setter)提供反序列化器,工廠將呼叫 configure() 方法,並使用配置屬性對其進行配置。
使用 ConcurrentMessageListenerContainer
其唯一的建構函式與 KafkaListenerContainer 的建構函式類似。以下清單顯示了建構函式的簽名
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
它還有一個 concurrency 屬性。例如,container.setConcurrency(3) 會建立三個 KafkaMessageListenerContainer 例項。
如果容器屬性配置為主題(或主題模式),Kafka 會使用其組管理功能在消費者之間分配分割槽。
|
監聽多個主題時,預設的分割槽分配可能與您的預期不同。例如,如果您有三個主題,每個主題有五個分割槽,並且您想使用 使用 Spring Boot 時,您可以按如下方式設定策略
|
當容器屬性配置了 TopicPartitionOffsets 時,ConcurrentMessageListenerContainer 會將 TopicPartitionOffset 例項分佈到委託的 KafkaMessageListenerContainer 例項上。
例如,如果提供了六個 TopicPartitionOffset 例項,且 concurrency 為 3;每個容器將獲得兩個分割槽。對於五個 TopicPartitionOffset 例項,兩個容器將獲得兩個分割槽,第三個獲得一個。如果 concurrency 大於 TopicPartitions 的數量,則 concurrency 會向下調整,使得每個容器獲得一個分割槽。
client.id 屬性(如果設定)會附加 -n,其中 n 對應於併發的消費者例項。當啟用 JMX 時,這對於為 MBeans 提供唯一的名稱是必需的。 |
從版本 1.3 開始,MessageListenerContainer 提供了訪問底層 KafkaConsumer 指標的功能。對於 ConcurrentMessageListenerContainer,metrics() 方法返回所有目標 KafkaMessageListenerContainer 例項的指標。這些指標按底層 KafkaConsumer 提供的 client-id 分組到 Map<MetricName, ? extends Metric> 中。
從版本 2.3 開始,ContainerProperties 提供了一個 idleBetweenPolls 選項,允許監聽器容器中的主迴圈在 KafkaConsumer.poll() 呼叫之間休眠。實際的休眠間隔是提供的選項以及 max.poll.interval.ms 消費者配置與當前記錄批處理時間差的最小值。
提交偏移量
提供了幾種提交偏移量的選項。如果 enable.auto.commit 消費者屬性為 true,Kafka 會根據其配置自動提交偏移量。如果為 false,容器支援多種 AckMode 設定(在以下列表中描述)。預設的 AckMode 是 BATCH。從版本 2.3 開始,除非在配置中明確設定,否則框架會將 enable.auto.commit 設定為 false。以前,如果未設定此屬性,則使用 Kafka 的預設值(true)。
消費者 poll() 方法返回一個或多個 ConsumerRecords。MessageListener 會為每條記錄呼叫一次。以下列表描述了容器在每種 AckMode 下采取的操作(不使用事務時)
-
RECORD:在監聽器處理完記錄返回後提交偏移量。 -
BATCH:在處理完poll()返回的所有記錄後提交偏移量。 -
TIME:在處理完poll()返回的所有記錄後提交偏移量,前提是自上次提交以來超過了ackTime。 -
COUNT:在處理完poll()返回的所有記錄後提交偏移量,前提是自上次提交以來收到了ackCount條記錄。 -
COUNT_TIME:類似於TIME和COUNT,但只要任一條件為true,就會執行提交。 -
MANUAL:訊息監聽器負責acknowledge()Acknowledgment。 -
MANUAL_IMMEDIATE:當監聽器呼叫Acknowledgment.acknowledge()方法時立即提交偏移量。
使用事務時,偏移量會發送到事務中,其語義等同於 RECORD 或 BATCH,具體取決於監聽器型別(記錄或批次)。
MANUAL 和 MANUAL_IMMEDIATE 要求監聽器是 AcknowledgingMessageListener 或 BatchAcknowledgingMessageListener。參見訊息監聽器。 |
根據 syncCommits 容器屬性,使用消費者上的 commitSync() 或 commitAsync() 方法。syncCommits 預設值為 true;另請參閱 setSyncCommitTimeout。參見 setCommitCallback 以獲取非同步提交的結果;預設的回撥是 LoggingCommitCallback,它記錄錯誤(並在 debug 級別記錄成功)。
由於監聽器容器有自己的提交偏移量機制,它傾向於將 Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 設定為 false。從版本 2.3 開始,除非在消費者工廠或容器的消費者屬性覆蓋中明確設定,否則它會無條件地將其設定為 false。
Acknowledgment 提供了以下方法
public interface Acknowledgment {
void acknowledge();
}
此方法允許監聽器控制何時提交偏移量。
從版本 2.3 開始,Acknowledgment 介面增加了兩個額外的方法 nack(long sleep) 和 nack(int index, long sleep)。第一個用於記錄監聽器,第二個用於批次監聽器。為您的監聽器型別呼叫錯誤的方法將丟擲 IllegalStateException。
如果您想提交部分批次,使用 nack(),在使用事務時,將 AckMode 設定為 MANUAL;呼叫 nack() 會將成功處理的記錄的偏移量傳送到事務。 |
nack() 只能在呼叫您的監聽器的消費者執行緒上呼叫。 |
使用亂序提交(Out of Order Commits)時不允許使用 nack()。 |
對於記錄監聽器,呼叫 nack() 時,任何待提交的偏移量都會被提交,上次 poll() 返回的剩餘記錄會被丟棄,並對其分割槽執行 seek 操作,以便在下一次 poll() 時重新投遞失敗和未處理的記錄。透過設定 sleep 引數,可以在重新投遞之前暫停消費者。此功能類似於當容器配置了 DefaultErrorHandler 時丟擲異常。
nack() 會暫停整個監聽器及其所有已分配的分割槽,暫停時長為指定的 sleep 持續時間。 |
使用批次監聽器時,您可以指定批次內發生失敗的記錄索引。呼叫 nack() 時,會提交索引之前記錄的偏移量,並對失敗和丟棄的記錄所在的分割槽執行 seek 操作,以便在下一次 poll() 時重新投遞它們。
有關更多資訊,請參閱容器錯誤處理器。
消費者在休眠期間會暫停,以便我們繼續輪詢 broker 以保持消費者活躍。實際的休眠時間及其精度取決於容器的 pollTimeout,預設為 5 秒。最小休眠時間等於 pollTimeout,並且所有休眠時間都將是它的倍數。對於較短的休眠時間或為了提高精度,可以考慮減少容器的 pollTimeout。 |
從版本 3.0.10 開始,批次監聽器可以使用 Acknowledgment 引數上的 acknowledge(index) 方法來提交批次中部分記錄的偏移量。呼叫此方法時,索引處記錄(以及之前所有記錄)的偏移量將被提交。在執行部分批次提交後呼叫 acknowledge() 將提交批次剩餘記錄的偏移量。以下限制適用
-
需要
AckMode.MANUAL_IMMEDIATE -
方法必須在監聽器執行緒上呼叫
-
監聽器必須消費
List而不是原始的ConsumerRecords -
索引必須在列表元素的範圍內
-
索引必須大於之前呼叫中使用的索引
這些限制將被強制執行,根據違規情況,方法將丟擲 IllegalArgumentException 或 IllegalStateException。