監聽器容器屬性

表 1. ContainerProperties 屬性
財產 預設值 描述

ackCount

1

ackModeCOUNTCOUNT_TIME 時,在提交待定偏移量之前的記錄數量。

adviceChain

null

一個 Advice 物件鏈(例如圍繞訊息偵聽器的 MethodInterceptor 建議),按順序呼叫。

ackMode

BATCH

控制偏移量提交的頻率 - 請參閱提交偏移量

ackTime

5000

ackModeTIMECOUNT_TIME 時,待定偏移量提交之前的時間(毫秒)。

assignmentCommitOption

LATEST_ONLY _NO_TX

是否在分配時提交初始位置;預設情況下,只有當 ConsumerConfig.AUTO_OFFSET_RESET_CONFIGlatest 時才會提交初始偏移量,並且即使存在事務管理器,也不會在事務中執行。有關可用選項的更多資訊,請參閱 ContainerProperties.AssignmentCommitOption 的 JavaDocs。

asyncAcks

啟用亂序提交(請參閱手動提交偏移量);消費者暫停,直到填充間隙後才提交。

authExceptionRetryInterval

null

當不為 null 時,Kafka 客戶端丟擲 AuthenticationExceptionAuthorizationException 異常時,在兩次輪詢之間休眠的 Duration。當為 null 時,此類異常被認為是致命的,容器將停止。

batchRecoverAfterRollback

設定為 true 以啟用批處理恢復,請參閱回滾後處理器

clientId

(空字串)

client.id 消費者屬性的字首。覆蓋消費者工廠的 client.id 屬性;在併發容器中,-n 作為字尾新增到每個消費者例項。

checkDeserExWhenKeyNull

當收到 null key 時,設定為 true 以始終檢查 DeserializationException 標頭。當消費者程式碼無法確定已配置 ErrorHandlingDeserializer 時(例如使用委託反序列化器時)很有用。

checkDeserExWhenValueNull

當收到 null value 時,設定為 true 以始終檢查 DeserializationException 標頭。當消費者程式碼無法確定已配置 ErrorHandlingDeserializer 時(例如使用委託反序列化器時)很有用。

commitCallback

null

當存在且 syncCommitsfalse 時,提交完成後呼叫的回撥。

commitLogLevel

DEBUG

與提交偏移量相關的日誌的日誌級別。

consumerRebalanceListener

null

一個再平衡偵聽器;請參閱再平衡偵聽器

commitRetries

3

當使用 syncCommits 設定為 true 時,設定 RetriableCommitFailedException 的重試次數。預設 3 次(總共 4 次嘗試)。

consumerStartTimeout

30s

在記錄錯誤之前等待消費者啟動的時間;這可能會發生,例如,如果使用執行緒不足的任務執行器。

deliveryAttemptHeader

請參閱投遞嘗試標頭

eosMode

V2

精確一次語義模式;請參閱精確一次語義

fixTxOffsets

當消費由事務性生產者生成且消費者位於分割槽的末尾的記錄時,由於用於指示事務提交/回滾的偽記錄以及可能存在的已回滾記錄,滯後可能錯誤地報告為大於零。這不會在功能上影響消費者,但有些使用者擔心“滯後”不是零。將此屬性設定為 true,容器將糾正此類錯誤報告的偏移量。在下次輪詢之前執行檢查,以避免增加提交處理的顯著複雜性。在撰寫本文時,僅當消費者配置了 isolation.level=read_committedmax.poll.records 大於 1 時,滯後才會得到糾正。有關更多資訊,請參閱KAFKA-10683

groupId

null

覆蓋消費者 group.id 屬性;由 @KafkaListeneridgroupId 屬性自動設定。

idleBeforeDataMultiplier

5.0

應用於在收到任何記錄之前應用的 idleEventInterval 的乘數。收到記錄後,不再應用乘數。自版本 2.8 起可用。

idleBetweenPolls

0

用於透過在輪詢之間休眠執行緒來減慢交付速度。處理一批記錄的時間加上此值必須小於 max.poll.interval.ms 消費者屬性。

idleEventInterval

null

設定後,啟用 ListenerContainerIdleEvent 的釋出,請參閱應用程式事件檢測空閒和無響應的消費者。另請參閱 idleBeforeDataMultiplier

idlePartitionEventInterval

null

設定後,啟用 ListenerContainerIdlePartitionEvent 的釋出,請參閱應用程式事件檢測空閒和無響應的消費者

kafkaConsumerProperties

用於覆蓋在消費者工廠上配置的任何任意消費者屬性。

kafkaAwareTransactionManager

null

請參閱事務

listenerTaskExecutor

SimpleAsyncTaskExecutor

一個任務執行器,用於執行消費者執行緒。預設執行器建立名為 <name>-C-n 的執行緒;對於 KafkaMessageListenerContainer,名稱是 bean 名稱;對於 ConcurrentMessageListenerContainer,名稱是 bean 名稱後加上 -m,其中 m 為每個子容器遞增。請參閱容器執行緒命名

logContainerConfig

設定為 true 以在 INFO 級別記錄所有容器屬性。

messageListener

null

訊息偵聽器。

micrometerEnabled

true

是否為消費者執行緒維護 Micrometer 計時器。

micrometerTags

要新增到 Micrometer 指標的靜態標籤對映。

micrometerTagsProvider

null

一個根據消費者記錄提供動態標籤的函式。

missingTopicsFatal

當為 true 時,如果配置的主題在 broker 上不存在,則阻止容器啟動。

monitorInterval

30s

檢查消費者執行緒狀態以獲取 NonResponsiveConsumerEvent 的頻率。請參閱 noPollThresholdpollTimeout

noPollThreshold

3.0

乘以 pollTimeOut 以確定是否釋出 NonResponsiveConsumerEvent。請參閱 monitorInterval

observationConvention

null

設定後,根據消費者記錄中的資訊向計時器和跟蹤新增動態標籤。

observationEnabled

設定為 true 以透過 Micrometer 啟用觀察。

offsetAndMetadataProvider

null

一個 OffsetAndMetadata 提供者;預設情況下,提供者建立帶有空元資料的偏移量和元資料。提供者提供了一種自定義元資料的方法。

onlyLogRecordMetadata

設定為 false 以記錄完整的消費者記錄(在錯誤、除錯日誌等中),而不僅僅是 topic-partition@offset

pauseImmediate

當容器暫停時,在當前記錄處理後停止處理,而不是在處理完上次輪詢的所有記錄後停止;剩餘的記錄保留在記憶體中,當容器恢復時將傳遞給偵聽器。

pollTimeout

5000

傳遞給 Consumer.poll() 的超時時間(毫秒)。

pollTimeoutWhilePaused

100

當容器處於暫停狀態時,傳遞給 Consumer.poll() 的超時時間(毫秒)。

restartAfterAuthExceptions

如果容器因授權/認證異常而停止,則為 true 以重新啟動容器。

scheduler

ThreadPoolTaskScheduler

一個排程器,用於執行消費者監視任務。

shutdownTimeout

10000

阻塞 stop() 方法的最長時間(毫秒),直到所有消費者停止併發布容器停止事件。

stopContainerWhenFenced

如果丟擲 ProducerFencedException,則停止偵聽器容器。有關更多資訊,請參閱回滾後處理器

stopImmediate

當容器停止時,在當前記錄處理後停止處理,而不是在處理完上次輪詢的所有記錄後停止。

subBatchPerPartition

請參閱描述。

使用批處理偵聽器時,如果此值為 true,則偵聽器將使用輪詢結果(按分割槽拆分為子批處理)進行呼叫。預設值為 false

syncCommitTimeout

null

syncCommitstrue 時使用的超時時間。未設定時,容器將嘗試確定 default.api.timeout.ms 消費者屬性並使用它;否則將使用 60 秒。

syncCommits

true

是否對偏移量使用同步或非同步提交;請參閱 commitCallback

topics topicPattern topicPartitions

不適用

配置的主題、主題模式或顯式分配的主題/分割槽。互斥;必須至少提供一個;由 ContainerProperties 建構函式強制執行。

transactionManager

null

自 3.2 起已棄用,請參閱[kafkaAwareTransactionManager]其他事務管理器

表 2. AbstractMessageListenerContainer 屬性
財產 預設值 描述

afterRollbackProcessor

DefaultAfterRollbackProcessor

在事務回滾後呼叫的 AfterRollbackProcessor

applicationEventPublisher

應用程式上下文

事件釋出者。

batchErrorHandler

請參閱描述。

已棄用 - 請參閱 commonErrorHandler

batchInterceptor

null

設定一個 BatchInterceptor,在呼叫批處理偵聽器之前呼叫;不適用於記錄偵聽器。另請參閱 interceptBeforeTx

beanName

bean 名稱

容器的 bean 名稱;子容器的字尾為 -n

commonErrorHandler

請參閱描述。

DefaultErrorHandlernull,當提供 transactionManager 時,使用 DefaultAfterRollbackProcessor。請參閱容器錯誤處理器

containerProperties

ContainerProperties

容器屬性例項。

groupId

請參閱描述。

如果存在,則為 containerProperties.groupId,否則為消費者工廠的 group.id 屬性。

interceptBeforeTx

true

確定 recordInterceptor 是在事務開始之前還是之後呼叫。

listenerId

請參閱描述。

使用者配置容器的 bean 名稱或 @KafkaListenerid 屬性。

listenerInfo

null

一個用於填充 KafkaHeaders.LISTENER_INFO 標頭的值。對於 @KafkaListener,此值從 info 屬性獲取。此標頭可用於各種地方,例如 RecordInterceptorRecordFilterStrategy 和偵聽器程式碼本身。

pauseRequested

(只讀)

如果已請求消費者暫停,則為 True。

recordInterceptor

null

設定一個 RecordInterceptor,在呼叫記錄偵聽器之前呼叫;不適用於批處理偵聽器。另請參閱 interceptBeforeTx

topicCheckTimeout

30s

missingTopicsFatal 容器屬性為 true 時,等待 describeTopics 操作完成的時間(秒)。

表 3. KafkaMessageListenerContainer 屬性
財產 預設值 描述

assignedPartitions

(只讀)

當前分配給此容器的分割槽(顯式或不顯式)。

clientIdSuffix

null

由併發容器用於為每個子容器的消費者提供唯一的 client.id

containerPaused

不適用

如果已請求暫停且消費者已實際暫停,則為 True。

表 4. ConcurrentMessageListenerContainer 屬性
財產 預設值 描述

alwaysClientIdSuffix

true

設定為 false 可阻止在 concurrency 僅為 1 時向 client.id 消費者屬性新增字尾。

assignedPartitions

(只讀)

此容器的子 KafkaMessageListenerContainer 當前分配的分割槽的總和(顯式或不顯式)。

concurrency

1

要管理的子 KafkaMessageListenerContainer 的數量。

containerPaused

不適用

如果已請求暫停且所有子容器的消費者已實際暫停,則為 True。

containers

不適用

對所有子 KafkaMessageListenerContainer 的引用。

© . This site is unofficial and not affiliated with VMware.