監聽器容器屬性

表 1. ContainerProperties 屬性
屬性 預設值 描述

ackCount

1

ackModeCOUNTCOUNT_TIME 時,在提交待處理偏移量之前的記錄數。

adviceChain

null

包裹訊息監聽器的 Advice 物件鏈(例如 MethodInterceptor 環繞通知),按順序呼叫。

ackMode

BATCH

控制偏移量的提交頻率 - 參見 提交偏移量

ackTime

5000

ackModeTIMECOUNT_TIME 時,待處理偏移量在經過該毫秒數後提交。

assignmentCommitOption

LATEST_ONLY _NO_TX

是否在分配時提交初始位置;預設情況下,僅當 ConsumerConfig.AUTO_OFFSET_RESET_CONFIGlatest 且即使存在事務管理器也不會在事務中執行時,才會提交初始偏移量。有關可用選項的更多資訊,請參閱 ContainerProperties.AssignmentCommitOption 的 JavaDocs。

asyncAcks

false

啟用亂序提交(參見 手動提交偏移量);消費者被暫停,提交被延遲直到填補空白。

authExceptionRetryInterval

null

當不為 null 時,表示 Kafka 客戶端丟擲 AuthenticationExceptionAuthorizationException 時,兩次 Poll 之間休眠的 Duration。當為 null 時,此類異常被視為致命錯誤,容器將停止。

batchRecoverAfterRollback

false

設定為 true 以啟用批次恢復,參見 回滾後處理器

clientId

(空字串)

client.id 消費者屬性的字首。覆蓋消費者工廠的 client.id 屬性;在併發容器中,每個消費者例項會新增 -n 字尾。

checkDeserExWhenKeyNull

false

設定為 true 以在接收到 null 鍵時始終檢查 DeserializationException 頭。當消費者程式碼無法確定 ErrorHandlingDeserializer 已被配置時(例如在使用委託反序列化器時),此屬性非常有用。

checkDeserExWhenValueNull

false

設定為 true 以在接收到 null 值時始終檢查 DeserializationException 頭。當消費者程式碼無法確定 ErrorHandlingDeserializer 已被配置時(例如在使用委託反序列化器時),此屬性非常有用。

commitCallback

null

當存在且 syncCommitsfalse 時,提交完成後呼叫的回撥。

commitLogLevel

DEBUG

與提交偏移量相關的日誌的日誌級別。

consumerRebalanceListener

null

一個再平衡監聽器;參見 再平衡監聽器

commitRetries

3

使用 syncCommits 設定為 true 時,設定 RetriableCommitFailedException 的重試次數。預設 3 次(總共嘗試 4 次)。

consumerStartTimeout

30s

在記錄錯誤日誌之前等待消費者啟動的時間;例如,如果您使用執行緒數不足的任務執行器,可能會發生這種情況。

deliveryAttemptHeader

false

參見 投遞嘗試次數頭

eosMode

V2

精確一次語義模式;參見 精確一次語義

fixTxOffsets

false

當消費事務性生產者產生的記錄,且消費者位於分割槽末尾時,由於用於指示事務提交/回滾的偽記錄,以及可能存在的已回滾記錄,Lag 可能被錯誤地報告為大於零。這不影響消費者的功能,但一些使用者對“Lag”非零表示擔憂。將此屬性設定為 true,容器將糾正此類誤報的偏移量。檢查在下一次 Poll 之前執行,以避免給提交處理增加顯著複雜性。在撰寫本文時,僅當消費者配置了 isolation.level=read_committedmax.poll.records 大於 1 時,Lag 才會得到糾正。更多資訊請參閱 KAFKA-10683

groupId

null

覆蓋消費者 group.id 屬性;由 @KafkaListeneridgroupId 屬性自動設定。

idleBeforeDataMultiplier

5.0

在收到任何記錄之前應用的 idleEventInterval 乘數。收到記錄後,不再應用該乘數。自 2.8 版本起可用。

idleBetweenPolls

0

用於透過在兩次 Poll 之間休眠執行緒來減慢投遞速度。處理一批記錄的時間加上此值必須小於 max.poll.interval.ms 消費者屬性。

idleEventInterval

null

設定後,啟用 ListenerContainerIdleEvents 的釋出,參見 應用事件檢測空閒和無響應的消費者。另請參閱 idleBeforeDataMultiplier

idlePartitionEventInterval

null

設定後,啟用 ListenerContainerIdlePartitionEvents 的釋出,參見 應用事件檢測空閒和無響應的消費者

kafkaConsumerProperties

用於覆蓋消費者工廠上配置的任意消費者屬性。

kafkaAwareTransactionManager

null

參見 事務

listenerTaskExecutor

SimpleAsyncTaskExecutor

一個用於執行消費者執行緒的任務執行器。預設執行器建立的執行緒命名為 <name>-C-n;對於 KafkaMessageListenerContainer,名稱是 Bean 名稱;對於 ConcurrentMessageListenerContainer,名稱是 Bean 名稱加上 -m 字尾,其中 m 對每個子容器遞增。參見 容器執行緒命名

logContainerConfig

false

設定為 true 以在 INFO 級別記錄所有容器屬性。

messageListener

null

訊息監聽器。

micrometerEnabled

true

是否為消費者執行緒維護 Micrometer 計時器。

micrometerTags

要新增到 Micrometer 度量的靜態標籤對映。

micrometerTagsProvider

null

一個根據消費者記錄提供動態標籤的函式。

missingTopicsFatal

false

當為 true 時,如果配置的 topic(s) 不存在於 broker 上,則阻止容器啟動。

monitorInterval

30s

檢查消費者執行緒狀態以獲取 NonResponsiveConsumerEvents 的頻率。參見 noPollThresholdpollTimeout

noPollThreshold

3.0

乘以 pollTimeOut 以確定是否釋出 NonResponsiveConsumerEvent。參見 monitorInterval

observationConvention

null

設定後,根據消費者記錄中的資訊,為計時器和跟蹤新增動態標籤。

observationEnabled

false

設定為 true 以啟用透過 Micrometer 進行觀察。

offsetAndMetadataProvider

null

一個 OffsetAndMetadata 的提供者;預設情況下,提供者建立一個帶有空元資料的偏移量和元資料。提供者提供了一種自定義元資料的方式。

onlyLogRecordMetadata

false

設定為 false 以記錄完整的消費者記錄(在錯誤、除錯日誌等中),而不僅僅是 topic-partition@offset

pauseImmediate

false

當容器暫停時,在處理當前記錄後停止處理,而不是在處理完上次 Poll 的所有記錄後停止;剩餘的記錄會保留在記憶體中,並在容器恢復時傳遞給監聽器。

pollTimeout

5000

傳遞給 Consumer.poll() 的超時時間,單位為毫秒。

pollTimeoutWhilePaused

100

當容器處於暫停狀態時,傳遞給 Consumer.poll() 的超時時間(單位為毫秒)。

restartAfterAuthExceptions

false

如果容器因授權/認證異常而停止,則設定為 True 以重啟容器。

scheduler

ThreadPoolTaskScheduler

一個用於執行消費者監控任務的排程器。

shutdownTimeout

10000

阻塞 stop() 方法直到所有消費者停止並在釋出容器停止事件之前的最大時間(單位為毫秒)。

stopContainerWhenFenced

false

如果丟擲 ProducerFencedException,則停止監聽器容器。更多資訊請參見 回滾後處理器

stopImmediate

false

當容器停止時,在處理當前記錄後停止處理,而不是在處理完上次 Poll 的所有記錄後停止。

subBatchPerPartition

見描述。

使用批次監聽器時,如果此屬性為 true,則將 Poll 結果分割成子批次(每個分割槽一個)後呼叫監聽器。預設值為 false

syncCommitTimeout

null

syncCommitstrue 時使用的超時時間。未設定時,容器將嘗試確定 default.api.timeout.ms 消費者屬性並使用該值;否則將使用 60 秒。

syncCommits

true

是否對偏移量使用同步或非同步提交;參見 commitCallback

topics topicPattern topicPartitions

不適用

配置的 topic(s)、topic 模式或顯式分配的 topics/partitions。互斥;必須至少提供其中之一;由 ContainerProperties 建構函式強制執行。

transactionManager

null

自 3.2 版本起已棄用,參見 [kafkaAwareTransactionManager], 其他事務管理器

表 2. AbstractMessageListenerContainer 屬性
屬性 預設值 描述

afterRollbackProcessor

DefaultAfterRollbackProcessor

一個在事務回滾後呼叫的 AfterRollbackProcessor

applicationEventPublisher

應用上下文

事件釋出器。

batchErrorHandler

見描述。

已棄用 - 參見 commonErrorHandler

batchInterceptor

null

設定一個 BatchInterceptor 以在呼叫批次監聽器之前呼叫;不適用於記錄監聽器。另請參見 interceptBeforeTx

beanName

bean 名稱

容器的 Bean 名稱;子容器會新增 -n 字尾。

commonErrorHandler

見描述。

當提供了 transactionManager 且使用了 DefaultAfterRollbackProcessor 時,預設是 DefaultErrorHandlernull。參見 容器錯誤處理器

containerProperties

ContainerProperties

容器屬性例項。

groupId

見描述。

如果存在,則是 containerProperties.groupId,否則是消費者工廠的 group.id 屬性。

interceptBeforeTx

true

確定是在事務開始之前還是之後呼叫 recordInterceptor

listenerId

見描述。

使用者配置的容器的 Bean 名稱或 @KafkaListenerid 屬性。

listenerInfo

null

要填充到 KafkaHeaders.LISTENER_INFO 頭中的值。對於 @KafkaListener,此值來自 info 屬性。此頭可以在各種地方使用,例如在 RecordInterceptorRecordFilterStrategy 以及監聽器程式碼本身中。

pauseRequested

(只讀)

如果已請求暫停消費者,則為 True。

recordInterceptor

null

設定一個 RecordInterceptor 以在呼叫記錄監聽器之前呼叫;不適用於批次監聽器。另請參見 interceptBeforeTx

topicCheckTimeout

30s

missingTopicsFatal 容器屬性為 true 時,等待 describeTopics 操作完成的時間,單位為秒。

表 3. KafkaMessageListenerContainer 屬性
屬性 預設值 描述

assignedPartitions

(只讀)

當前分配給此容器的分割槽(顯式或非顯式)。

clientIdSuffix

null

由併發容器使用,為每個子容器的消費者提供唯一的 client.id

containerPaused

不適用

如果已請求暫停並且消費者已實際暫停,則為 True。

表 4. ConcurrentMessageListenerContainer 屬性
屬性 預設值 描述

alwaysClientIdSuffix

true

concurrency 僅為 1 時,設定為 false 以禁止向 client.id 消費者屬性新增字尾。

assignedPartitions

(只讀)

當前分配給此容器的子 KafkaMessageListenerContainers 的分割槽集合(顯式或非顯式)。

concurrency

1

要管理的子 KafkaMessageListenerContainers 的數量。

containerPaused

不適用

如果已請求暫停並且所有子容器的消費者已實際暫停,則為 True。

containers

不適用

對所有子 KafkaMessageListenerContainers 的引用。