配置選項
本節包含 Apache Kafka Binder 使用的配置選項。
有關 Binder 的通用配置選項和屬性,請參閱核心文件中的繫結屬性。
Kafka Binder 屬性
- spring.cloud.stream.kafka.binder.brokers
-
Kafka Binder 連線的 Broker 列表。
預設值:
localhost
。 - spring.cloud.stream.kafka.binder.defaultBrokerPort
-
brokers
允許指定帶埠或不帶埠資訊的主機(例如,host1,host2:port2
)。此屬性在 Broker 列表中未配置埠時設定預設埠。預設值:
9092
。 - spring.cloud.stream.kafka.binder.configuration
-
Key/Value Map,包含傳遞給 Binder 建立的所有客戶端(生產者和消費者)的客戶端屬性。由於這些屬性同時用於生產者和消費者,因此其使用應限於通用屬性——例如,安全設定。透過此配置提供的未知 Kafka 生產者或消費者屬性會被過濾掉,不允許傳播。這裡的屬性會覆蓋在 Boot 中設定的任何屬性。
預設值:空 Map。
- spring.cloud.stream.kafka.binder.consumerProperties
-
Key/Value Map,包含任意 Kafka 客戶端消費者屬性。除了支援已知的 Kafka 消費者屬性外,此處也允許使用未知消費者屬性。這裡的屬性會覆蓋在 Boot 中以及上述
configuration
屬性中設定的任何屬性。預設值:空 Map。
- spring.cloud.stream.kafka.binder.headers
-
Binder 傳輸的自定義 header 列表。僅在使用舊版本應用 (⇐ 1.3.x) 且
kafka-clients
版本低於 0.11.0.0 時需要。更新版本原生支援 header。預設值:空。
- spring.cloud.stream.kafka.binder.healthTimeout
-
獲取分割槽資訊等待的時間,單位為秒。如果此計時器過期,健康狀態將報告為 down。
預設值:60。
- spring.cloud.stream.kafka.binder.requiredAcks
-
Broker 上所需的 acks 數量。參見 Kafka 文件中關於生產者
acks
屬性的說明。預設值:
1
。 - spring.cloud.stream.kafka.binder.minPartitionCount
-
僅在設定了
autoCreateTopics
或autoAddPartitions
時生效。Binder 在其生產或消費資料的 Topic 上配置的全域性最小分割槽數。它可能被生產者的partitionCount
設定或生產者的instanceCount * concurrency
設定(如果其中一個更大)所覆蓋。預設值:
1
。 - spring.cloud.stream.kafka.binder.producerProperties
-
Key/Value Map,包含任意 Kafka 客戶端生產者屬性。除了支援已知的 Kafka 生產者屬性外,此處也允許使用未知生產者屬性。這裡的屬性會覆蓋在 Boot 中以及上述
configuration
屬性中設定的任何屬性。預設值:空 Map。
- spring.cloud.stream.kafka.binder.replicationFactor
-
如果
autoCreateTopics
啟用,則為自動建立的 Topic 的副本因子。可以在每個 Binding 上覆蓋。如果您使用的是 2.4 之前的 Kafka Broker 版本,則此值應至少設定為 1
。從 3.0.8 版本開始,Binder 使用-1
作為預設值,這表示將使用 Broker 的 'default.replication.factor' 屬性來確定副本數量。請與您的 Kafka Broker 管理員確認是否存在需要最小副本因子的策略,如果是,則通常default.replication.factor
將與該值匹配,應使用-1
,除非您需要大於最小值的副本因子。預設值:
-1
。 - spring.cloud.stream.kafka.binder.autoCreateTopics
-
如果設定為
true
,Binder 會自動建立新的 Topic。如果設定為false
,Binder 則依賴於已配置的 Topic。在後一種情況下,如果 Topic 不存在,Binder 將無法啟動。此設定獨立於 Broker 的 auto.create.topics.enable
設定,並且不會影響它。如果伺服器設定為自動建立 Topic,它們可能會作為元資料檢索請求的一部分建立,並使用預設的 Broker 設定。預設值:
true
。 - spring.cloud.stream.kafka.binder.autoAddPartitions
-
如果設定為
true
,Binder 會在需要時建立新的分割槽。如果設定為false
,Binder 則依賴於已配置 Topic 的分割槽大小。如果目標 Topic 的分割槽數小於預期值,Binder 將無法啟動。預設值:
false
。 - spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
-
啟用 Binder 中的事務。參見 Kafka 文件中的
transaction.id
和spring-kafka
文件中的事務。啟用事務後,單個producer
屬性將被忽略,所有生產者都使用spring.cloud.stream.kafka.binder.transaction.producer.*
屬性。預設值
null
(無事務) - spring.cloud.stream.kafka.binder.transaction.producer.*
-
事務性 Binder 中生產者的全域性生產者屬性。參見
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
和Kafka 生產者屬性以及所有 Binder 支援的通用生產者屬性。預設值:參見各個生產者屬性。
- spring.cloud.stream.kafka.binder.headerMapperBeanName
-
用於將
spring-messaging
header 與 Kafka header 之間進行對映的KafkaHeaderMapper
Bean 名稱。例如,如果您希望定製使用 JSON 反序列化 header 的BinderHeaderMapper
Bean 中的受信任包,可以使用此屬性。如果沒有透過此屬性將自定義的BinderHeaderMapper
Bean 提供給 Binder,那麼 Binder 將查詢名稱為kafkaBinderHeaderMapper
且型別為BinderHeaderMapper
的 header mapper Bean,然後再回退到 Binder 建立的預設BinderHeaderMapper
。預設值:無。
- spring.cloud.stream.kafka.binder.considerDownWhenAnyPartitionHasNoLeader
-
標誌,用於在 Topic 上的任何分割槽(無論接收資料的消費者是哪個)被發現沒有 Leader 時,將 Binder 健康狀態設定為
down
。預設值:
true
。 - spring.cloud.stream.kafka.binder.certificateStoreDirectory
-
當 truststore 或 keystore 證書位置以非本地檔案系統資源(
org.springframework.core.io.Resource
支援的資源,例如 CLASSPATH, HTTP 等)形式給出時,Binder 會將資源從路徑(可轉換為org.springframework.core.io.Resource
)複製到檔案系統上的某個位置。這對於 Broker 級別的證書(ssl.truststore.location
和ssl.keystore.location
)和用於 Schema Registry 的證書(schema.registry.ssl.truststore.location
和schema.registry.ssl.keystore.location
)都適用。請注意,truststore 和 keystore 的位置路徑必須在spring.cloud.stream.kafka.binder.configuration…
下提供。例如,spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location
、spring.cloud.stream.kafka.binder.configuration.schema.registry.ssl.truststore.location
等。檔案將複製到為此屬性值指定的目錄,該目錄必須是檔案系統上已存在且執行應用程式的程序可寫入的目錄。如果未設定此值且證書檔案是非本地檔案系統資源,則它將被複制到透過System.getProperty("java.io.tmpdir")
返回的系統臨時目錄。即使此值存在,但目錄無法在檔案系統上找到或不可寫入時,情況也是如此。預設值:無。
- spring.cloud.stream.kafka.binder.metrics.defaultOffsetLagMetricsEnabled
-
當設定為 true 時,每次訪問度量指標時都會計算每個消費者主題的偏移量滯後度量。當設定為 false 時,僅使用週期性計算的偏移量滯後。
預設值:true
- spring.cloud.stream.kafka.binder.metrics.offsetLagMetricsInterval
-
計算每個消費者主題偏移量滯後的間隔。當
metrics.defaultOffsetLagMetricsEnabled
被停用或計算時間過長時,使用此值。預設值:60 秒
- spring.cloud.stream.kafka.binder.enableObservation
-
在此 Binder 中的所有繫結上啟用 Micrometer 觀察登錄檔。
預設值:false
- spring.cloud.stream.kafka.binder.healthIndicatorConsumerGroup
-
KafkaHealthIndicator
元資料消費者group.id
。HealthIndicator
使用此消費者查詢正在使用的 Topic 的元資料。預設值:無。
Kafka 消費者屬性
以下屬性僅適用於 Kafka 消費者,並且必須以 spring.cloud.stream.kafka.bindings.<channelName>.consumer.
為字首。
為了避免重複,Spring Cloud Stream 支援為所有通道設定值,格式為 spring.cloud.stream.kafka.default.consumer.<property>=<value> 。 |
- admin.configuration
-
自版本 2.1.1 起,此屬性已被棄用,推薦使用
topic.properties
,未來版本將移除對它的支援。 - admin.replicas-assignment
-
自版本 2.1.1 起,此屬性已被棄用,推薦使用
topic.replicas-assignment
,未來版本將移除對它的支援。 - admin.replication-factor
-
自版本 2.1.1 起,此屬性已被棄用,推薦使用
topic.replication-factor
,未來版本將移除對它的支援。 - autoRebalanceEnabled
-
當為
true
時,Topic 分割槽會在消費者組成員之間自動重新平衡。當為false
時,每個消費者根據spring.cloud.stream.instanceCount
和spring.cloud.stream.instanceIndex
分配一組固定的分割槽。這要求在每個啟動的例項上適當地設定spring.cloud.stream.instanceCount
和spring.cloud.stream.instanceIndex
屬性。在這種情況下,spring.cloud.stream.instanceCount
屬性的值通常必須大於 1。預設值:
true
。 - ackEachRecord
-
當
autoCommitOffset
為true
時,此設定決定是否在處理每個記錄後提交偏移量。預設情況下,在處理完consumer.poll()
返回的批次中的所有記錄後提交偏移量。Poll 返回的記錄數量可以透過max.poll.records
Kafka 屬性控制,該屬性透過消費者configuration
屬性設定。將其設定為true
可能會導致效能下降,但這樣做可以減少發生故障時重新投遞記錄的可能性。此外,請參閱 Binder 的requiredAcks
屬性,它也影響提交偏移量的效能。此屬性自 3.1 版本起已被棄用,推薦使用ackMode
。如果未設定ackMode
且未啟用批次模式,將使用RECORD
確認模式。預設值:
false
。 - autoCommitOffset
-
從版本 3.1 開始,此屬性已被棄用。有關替代方案的詳細資訊,請參見
ackMode
。訊息處理完成後是否自動提交偏移量。如果設定為false
,則入站訊息中將存在一個 key 為kafka_acknowledgment
、型別為org.springframework.kafka.support.Acknowledgment
的 header。應用程式可以使用此 header 來確認訊息。有關詳細資訊,請參見示例部分。當此屬性設定為false
時,Kafka Binder 將確認模式設定為org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL
,並且應用程式負責確認記錄。另請參閱ackEachRecord
。預設值:
true
。 - ackMode
-
指定容器確認模式。這基於 Spring Kafka 中定義的
AckMode
列舉。如果ackEachRecord
屬性設定為true
且消費者未處於批次模式,則將使用RECORD
確認模式,否則,使用透過此屬性提供的確認模式。 - autoCommitOnError
-
在 Poll 式消費者中,如果設定為
true
,則在發生錯誤時始終自動提交。如果未設定(預設值)或為 false,則在 Poll 式消費者中不會自動提交。請注意,此屬性僅適用於 Poll 式消費者。預設值:未設定。
- resetOffsets
-
是否將消費者上的偏移量重置為
startOffset
提供的值。如果提供了KafkaBindingRebalanceListener
,則必須設定為 false;參見Rebalance 監聽器。有關此屬性的更多資訊,請參見重置偏移量。預設值:
false
。 - startOffset
-
新組的起始偏移量。允許的值:
earliest
和latest
。如果透過spring.cloud.stream.bindings.<channelName>.group
為消費者“繫結”顯式設定了消費者組,則 'startOffset' 設定為earliest
。否則,對於匿名消費者組,它設定為latest
。有關此屬性的更多資訊,請參見重置偏移量。預設值:
null
(等同於earliest
)。 - enableDlq
-
當設定為 true 時,為消費者啟用 DLQ 行為。預設情況下,導致錯誤的訊息會被轉發到名為
error.<destination>.<group>
的主題。可以透過設定dlqName
屬性或定義型別為DlqDestinationResolver
的@Bean
來配置 DLQ 主題名稱。當錯誤數量相對較少且重放整個原始主題可能過於繁瑣時,這提供了一種替代 Kafka 更常見的重放場景的選項。有關更多資訊,請參見Kafka DLQ 處理。從 2.0 版本開始,傳送到 DLQ 主題的訊息會增加以下 header:x-original-topic
、x-exception-message
和x-exception-stacktrace
,型別為byte[]
。預設情況下,失敗的記錄會發送到 DLQ 主題中與原始記錄相同的分割槽號。有關如何更改此行為的資訊,請參見DLQ 分割槽選擇。當destinationIsPattern
為true
時不允許設定此屬性。預設值:
false
。 - dlqPartitions
-
當
enableDlq
為 true 且未設定此屬性時,將建立一個與主主題擁有相同分割槽數的死信主題。通常,死信記錄會發送到死信主題中與原始記錄相同的分割槽。此行為可以更改;參見DLQ 分割槽選擇。如果此屬性設定為1
且沒有DqlPartitionFunction
bean,所有死信記錄將寫入分割槽0
。如果此屬性大於1
,則必須提供一個DlqPartitionFunction
bean。請注意,實際分割槽數受 Binder 的minPartitionCount
屬性影響。預設值:無
- configuration
-
Map,包含通用 Kafka 消費者屬性的 Key/Value 對。除了 Kafka 消費者屬性外,此處還可以傳遞其他配置屬性。例如,應用程式需要的一些屬性,如
spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar
。此處不能設定bootstrap.servers
屬性;如果需要連線到多個叢集,請使用多 Binder 支援。預設值:空 Map。
- dlqName
-
接收錯誤訊息的 DLQ 主題名稱。
預設值:
null
(如果未指定,導致錯誤的訊息將轉發到名為error.<destination>.<group>
的主題)。 - dlqProducerProperties
-
使用此屬性,可以設定特定於 DLQ 的生產者屬性。所有透過 Kafka 生產者屬性可用的屬性都可以透過此屬性設定。當消費者上啟用原生解碼(即
useNativeDecoding: true
)時,應用程式必須為 DLQ 提供相應的 key/value 序列化器。這必須以dlqProducerProperties.configuration.key.serializer
和dlqProducerProperties.configuration.value.serializer
的形式提供。預設值:預設 Kafka 生產者屬性。
- standardHeaders
-
指示入站通道介面卡填充哪些標準 header。允許的值:
none
、id
、timestamp
或both
。在使用原生反序列化且第一個接收訊息的元件需要一個 ID(例如配置為使用 JDBC 訊息儲存的聚合器)時很有用。預設值:無
- converterBeanName
-
實現
RecordMessageConverter
的 Bean 名稱。用於入站通道介面卡中,替換預設的MessagingMessageConverter
。預設值:
null
- idleEventInterval
-
事件之間的間隔,單位為毫秒,這些事件指示最近沒有收到訊息。使用
ApplicationListener<ListenerContainerIdleEvent>
來接收這些事件。有關使用示例,請參見暫停-恢復。預設值:
30000
- destinationIsPattern
-
當為 true 時,目標被視為由 Broker 用於匹配 Topic 名稱的正則表示式
Pattern
。當為 true 時,Topic 不會被 Provision,並且不允許使用enableDlq
,因為 Binder 在 Provision 階段不知道 Topic 名稱。請注意,檢測匹配模式的新 Topic 所花費的時間由消費者屬性metadata.max.age.ms
控制,該屬性(在撰寫本文時)預設為 300,000 毫秒(5 分鐘)。可以使用上述configuration
屬性進行配置。預設值:
false
- topic.properties
-
Map,用於 Provision 新 Topic 時使用的 Kafka Topic 屬性——例如,
spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0
預設值:無。
- topic.replicas-assignment
-
Map<Integer, List<Integer>>,用於副本分配,其中 key 是分割槽,value 是分配。用於 Provision 新 Topic 時。參見
kafka-clients
jar 中NewTopic
的 Javadoc。預設值:無。
- topic.replication-factor
-
Provision Topic 時使用的副本因子。覆蓋 Binder 範圍的設定。如果存在
replicas-assignments
,則忽略。預設值:無(使用 Binder 範圍的預設值 -1)。
- pollTimeout
-
Poll 式消費者中用於 Polling 的超時時間。
預設值:5 秒。
- transactionManager
-
KafkaAwareTransactionManager
的 Bean 名稱,用於覆蓋此繫結的 Binder 事務管理器。通常在需要使用ChainedKafkaTransactionManaager
將另一個事務與 Kafka 事務同步時需要。為了實現記錄的精確一次消費和生產,消費者和生產者繫結都必須配置相同的事務管理器。預設值:無。
- txCommitRecovered
-
使用事務性 Binder 時,預設情況下,恢復的記錄(例如,重試耗盡並將記錄傳送到死信主題時)的偏移量將透過新事務提交。將此屬性設定為
false
可禁止提交恢復記錄的偏移量。預設值:true。
- commonErrorHandlerBeanName
-
每個消費者繫結使用的
CommonErrorHandler
Bean 名稱。如果存在,此使用者提供的CommonErrorHandler
將優先於 Binder 定義的任何其他錯誤處理器。如果應用程式不想使用ListenerContainerCustomizer
並隨後檢查目標/組組合來設定錯誤處理器,這是一種方便的方式來表達錯誤處理器。預設值:無。
Kafka 生產者屬性
以下屬性僅適用於 Kafka 生產者,並且必須以 spring.cloud.stream.kafka.bindings.<channelName>.producer.
為字首。
為了避免重複,Spring Cloud Stream 支援為所有通道設定值,格式為 spring.cloud.stream.kafka.default.producer.<property>=<value> 。 |
- admin.configuration
-
自版本 2.1.1 起,此屬性已被棄用,推薦使用
topic.properties
,未來版本將移除對它的支援。 - admin.replicas-assignment
-
自版本 2.1.1 起,此屬性已被棄用,推薦使用
topic.replicas-assignment
,未來版本將移除對它的支援。 - admin.replication-factor
-
自版本 2.1.1 起,此屬性已被棄用,推薦使用
topic.replication-factor
,未來版本將移除對它的支援。 - bufferSize
-
Kafka 生產者在傳送前嘗試批次處理的資料量的上限,單位為位元組。
預設值:
16384
。 - sync
-
生產者是否是同步的。
預設值:
false
。 - sendTimeoutExpression
-
一個 SpEL 表示式,針對出站訊息進行評估,用於評估啟用同步釋出時等待 ack 的時間——例如,
headers['mySendTimeout']
。超時值以毫秒為單位。在 3.0 版本之前,除非使用原生編碼,否則無法使用 payload,因為評估此表示式時,payload 已經採用byte[]
的形式。現在,表示式在轉換 payload 之前進行評估。預設值:無。
- batchTimeout
-
生產者在傳送訊息之前等待多長時間,以便在同一批次中累積更多訊息。(通常,生產者根本不等待,只是簡單地傳送在上次傳送進行時累積的所有訊息。)非零值可能會增加吞吐量,但會犧牲延遲。
預設值:
0
。 - messageKeyExpression
-
一個 SpEL 表示式,針對出站訊息進行評估,用於填充生產的 Kafka 訊息的 key——例如,
headers['myKey']
。在 3.0 版本之前,除非使用原生編碼,否則無法使用 payload,因為評估此表示式時,payload 已經採用byte[]
的形式。現在,表示式在轉換 payload 之前進行評估。對於常規處理器(Function<String, String>
或Function<Message<?>, Message<?>
),如果生產的 key 需要與來自 Topic 的入站 key 相同,可以如下設定此屬性:spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey']
對於反應式函式,需要注意一個重要的警告。在這種情況下,由應用程式負責手動將 header 從入站訊息複製到出站訊息。您可以設定 header,例如myKey
並像上面建議的那樣使用headers['myKey']
,或者為了方便,只需設定KafkaHeaders.MESSAGE_KEY
header,就完全不需要設定此屬性了。預設值:無。
- headerPatterns
-
逗號分隔的簡單模式列表,用於匹配 Spring messaging header 並將其對映到
ProducerRecord
中的 Kafka Header。模式可以以萬用字元(星號)開頭或結尾。模式可以透過字首!
來否定。匹配在第一個匹配(正向或負向)後停止。例如,!ask,as*
將透過ash
但不會透過ask
。id
和timestamp
永遠不會被對映。預設值:
*
(所有頭資訊 - 除了id
和timestamp
) - configuration
-
包含通用 Kafka 生產者屬性的鍵/值對 Map。
bootstrap.servers
屬性不能在此處設定;如果需要連線到多個叢集,請使用多繫結器支援。預設值:空 Map。
- topic.properties
-
一個
Map
,包含在配置新主題時使用的 Kafka 主題屬性,例如:spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0
- topic.replicas-assignment
-
Map<Integer, List<Integer>>,用於副本分配,其中 key 是分割槽,value 是分配。用於 Provision 新 Topic 時。參見
kafka-clients
jar 中NewTopic
的 Javadoc。預設值:無。
- topic.replication-factor
-
Provision Topic 時使用的副本因子。覆蓋 Binder 範圍的設定。如果存在
replicas-assignments
,則忽略。預設值:無(使用 Binder 範圍的預設值 -1)。
- useTopicHeader
-
設定為
true
以使用出站訊息中KafkaHeaders.TOPIC
訊息頭的值覆蓋預設繫結目標(主題名稱)。如果頭資訊不存在,則使用預設繫結目標。預設值:
false
。 - recordMetadataChannel
-
一個
MessageChannel
的 bean 名稱,用於傳送成功的傳送結果;該 bean 必須存在於應用程式上下文中。傳送到該通道的訊息是已傳送的訊息(如果經過轉換),並附帶一個額外的頭資訊KafkaHeaders.RECORD_METADATA
。該頭資訊包含一個由 Kafka 客戶端提供的RecordMetadata
物件;它包括記錄寫入主題的分割槽和偏移量。ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)
失敗的傳送會進入生產者錯誤通道(如果已配置);請參閱 Kafka 錯誤通道。
預設值:null。
Kafka 繫結器使用生產者的 partitionCount 設定作為提示,以建立具有給定分割槽數的主題(結合 minPartitionCount ,兩者中的最大值將被使用)。在為繫結器配置 minPartitionCount 併為應用程式配置 partitionCount 時請謹慎,因為會使用較大的值。如果已存在一個分割槽數較少的主題且 autoAddPartitions 被停用(預設設定),則繫結器啟動失敗。如果已存在一個分割槽數較少的主題且 autoAddPartitions 被啟用,則會新增新分割槽。如果已存在一個分割槽數大於 (minPartitionCount 或 partitionCount ) 中較大值的主題,則使用現有分割槽數。 |
- compression
-
設定
compression.type
生產者屬性。支援的值包括none
、gzip
、snappy
、lz4
和zstd
。如果您將kafka-clients
jar 版本覆蓋到 2.1.0(或更高版本),如 Spring for Apache Kafka 文件 中所述,並希望使用zstd
壓縮,請使用spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd
。預設值:無。
- transactionManager
-
KafkaAwareTransactionManager
的 Bean 名稱,用於覆蓋此繫結的 Binder 事務管理器。通常在需要使用ChainedKafkaTransactionManaager
將另一個事務與 Kafka 事務同步時需要。為了實現記錄的精確一次消費和生產,消費者和生產者繫結都必須配置相同的事務管理器。預設值:無。
- closeTimeout
-
關閉生產者時等待的超時時間,單位為秒。
預設值:
30
- allowNonTransactional
-
通常,與事務性繫結器關聯的所有輸出繫結將在新事務中釋出(如果當前沒有進行中的事務)。此屬性允許您覆蓋該行為。如果設定為 true,則釋出到此輸出繫結的記錄將不會在事務中執行,除非當前已有進行中的事務。
預設值:
false