配置選項
本節包含 Apache Kafka 繫結器使用的配置選項。
有關繫結器的通用配置選項和屬性,請參閱核心文件中的繫結屬性。
Kafka 繫結器屬性
- spring.cloud.stream.kafka.binder.brokers
-
Kafka 繫結器連線的 broker 列表。
預設值:
localhost。 - spring.cloud.stream.kafka.binder.defaultBrokerPort
-
brokers允許指定帶埠資訊或不帶埠資訊的主機(例如,host1,host2:port2)。當 broker 列表中未配置埠時,此屬性設定預設埠。預設值:
9092。 - spring.cloud.stream.kafka.binder.configuration
-
傳遞給繫結器建立的所有客戶端的客戶端屬性(包括生產者和消費者)的鍵/值對映。由於這些屬性同時被生產者和消費者使用,因此應將其用途限制為通用屬性,例如安全設定。透過此配置提供的未知 Kafka 生產者或消費者屬性會被過濾掉,不允許傳播。此處的屬性會覆蓋 boot 中設定的任何屬性。
預設值:空對映。
- spring.cloud.stream.kafka.binder.consumerProperties
-
任意 Kafka 客戶端消費者屬性的鍵/值對映。除了支援已知的 Kafka 消費者屬性外,此處也允許未知的消費者屬性。此處的屬性會覆蓋 boot 中和上面
configuration屬性中設定的任何屬性。預設值:空對映。
- spring.cloud.stream.kafka.binder.headers
-
由繫結器傳輸的自定義頭列表。僅在與使用
kafka-clients版本 < 0.11.0.0 的舊版應用程式(⇐ 1.3.x)通訊時才需要。更新的版本原生支援頭。預設值:空。
- spring.cloud.stream.kafka.binder.healthTimeout
-
獲取分割槽資訊等待時間,單位為秒。如果此計時器到期,則健康狀態報告為 down。
預設值:60。
- spring.cloud.stream.kafka.binder.requiredAcks
-
broker 上所需的 acks 數量。請參閱 Kafka 文件中生產者
acks屬性的說明。預設值:
1。 - spring.cloud.stream.kafka.binder.minPartitionCount
-
僅當
autoCreateTopics或autoAddPartitions設定為 true 時才生效。繫結器在其生產或消費資料的 topics 上配置的全域性最小分割槽數。它可以被生產者的partitionCount設定或生產者的instanceCount * concurrency設定的值(如果兩者之一更大)覆蓋。預設值:
1。 - spring.cloud.stream.kafka.binder.producerProperties
-
任意 Kafka 客戶端生產者屬性的鍵/值對映。除了支援已知的 Kafka 生產者屬性外,此處也允許未知的生產者屬性。此處的屬性會覆蓋 boot 中和上面
configuration屬性中設定的任何屬性。預設值:空對映。
- spring.cloud.stream.kafka.binder.replicationFactor
-
如果
autoCreateTopics處於活動狀態,則自動建立 topic 的複製因子。可以在每個繫結上覆蓋。如果您使用的 Kafka broker 版本早於 2.4,則此值應設定為至少 1。從 3.0.8 版本開始,繫結器使用-1作為預設值,表示將使用 broker 的 'default.replication.factor' 屬性來確定副本數量。請與您的 Kafka broker 管理員核實,檢視是否存在要求最小複製因子的策略,如果是這樣,通常default.replication.factor將匹配該值,並且應使用-1,除非您需要大於最小複製因子的複製因子。預設值:
-1。 - spring.cloud.stream.kafka.binder.autoCreateTopics
-
如果設定為
true,繫結器會自動建立新 topic。如果設定為false,繫結器依賴於已配置的 topic。在後一種情況下,如果 topic 不存在,繫結器將無法啟動。此設定與 broker 的 auto.create.topics.enable設定無關,也不影響它。如果伺服器設定為自動建立 topic,它們可能會作為元資料檢索請求的一部分建立,並使用預設的 broker 設定。預設值:
true。 - spring.cloud.stream.kafka.binder.autoAddPartitions
-
如果設定為
true,繫結器會在需要時建立新分割槽。如果設定為false,繫結器依賴於已配置的 topic 分割槽大小。如果目標 topic 的分割槽數小於預期值,繫結器將無法啟動。預設值:
false。 - spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
-
在繫結器中啟用事務。請參閱 Kafka 文件中的
transaction.id和spring-kafka文件中的事務。啟用事務時,單個producer屬性將被忽略,所有生產者都使用spring.cloud.stream.kafka.binder.transaction.producer.*屬性。預設值
null(無事務) - spring.cloud.stream.kafka.binder.transaction.producer.*
-
事務繫結器中生產者的全域性生產者屬性。請參閱
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix和Kafka 生產者屬性以及所有繫結器支援的通用生產者屬性。預設值:請參閱單獨的生產者屬性。
- spring.cloud.stream.kafka.binder.headerMapperBeanName
-
用於將
spring-messaging頭對映到 Kafka 頭以及從 Kafka 頭對映的KafkaHeaderMapper的 bean 名稱。例如,如果您希望自定義使用 JSON 反序列化頭的BinderHeaderMapperbean 中的受信任包,請使用此屬性。如果此自定義BinderHeaderMapperbean 未透過此屬性提供給繫結器,則繫結器將查詢名為kafkaBinderHeaderMapper且型別為BinderHeaderMapper的頭對映器 bean,然後回退到繫結器建立的預設BinderHeaderMapper。預設值:無。
- spring.cloud.stream.kafka.binder.considerDownWhenAnyPartitionHasNoLeader
-
當 topic 上的任何分割槽(無論哪個消費者正在從中接收資料)被發現沒有 leader 時,將繫結器健康狀態設定為
down的標誌。預設值:
true。 - spring.cloud.stream.kafka.binder.certificateStoreDirectory
-
當信任庫或金鑰庫證書位置作為非本地檔案系統資源(org.springframework.core.io.Resource 支援的資源,例如 CLASSPATH、HTTP 等)提供時,繫結器會將資源從路徑(可轉換為 org.springframework.core.io.Resource)複製到檔案系統上的某個位置。這對於 broker 級別的證書(
ssl.truststore.location和ssl.keystore.location)和用於 schema 登錄檔的證書(schema.registry.ssl.truststore.location和schema.registry.ssl.keystore.location)都適用。請記住,信任庫和金鑰庫位置路徑必須在spring.cloud.stream.kafka.binder.configuration…下提供。例如,spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location、spring.cloud.stream.kafka.binder.configuration.schema.registry.ssl.truststore.location等。檔案將複製到此屬性值指定的位置,該位置必須是檔案系統上應用程式執行程序可寫的現有目錄。如果此值未設定且證書檔案是非本地檔案系統資源,則它將複製到System.getProperty("java.io.tmpdir")返回的系統臨時目錄。如果此值存在但找不到目錄或不可寫,則也適用。預設值:無。
- spring.cloud.stream.kafka.binder.metrics.defaultOffsetLagMetricsEnabled
-
當設定為 true 時,每個消費者 topic 的 offset 滯後指標會在每次訪問指標時計算。當設定為 false 時,僅使用定期計算的 offset 滯後。
預設值:true
- spring.cloud.stream.kafka.binder.metrics.offsetLagMetricsInterval
-
計算每個消費者 topic 的 offset 滯後的間隔。當
metrics.defaultOffsetLagMetricsEnabled被停用或其計算時間過長時,將使用此值。預設值:60 秒
- spring.cloud.stream.kafka.binder.enableObservation
-
在此繫結器中的所有繫結上啟用 Micrometer 觀察登錄檔。
預設值:false
- spring.cloud.stream.kafka.binder.healthIndicatorConsumerGroup
-
KafkaHealthIndicator元資料消費者group.id。此消費者由HealthIndicator用於查詢所用 topic 的元資料。預設值:無。
Kafka 消費者屬性
以下屬性僅適用於 Kafka 消費者,並且必須以 spring.cloud.stream.kafka.bindings.<channelName>.consumer. 為字首。
為避免重複,Spring Cloud Stream 支援以 spring.cloud.stream.kafka.default.consumer.<property>=<value> 的格式設定所有通道的值。 |
- admin.configuration
-
自 2.1.1 版本起,此屬性已棄用,取而代之的是
topic.properties,並將在未來版本中移除對其的支援。 - admin.replicas-assignment
-
自 2.1.1 版本起,此屬性已棄用,取而代之的是
topic.replicas-assignment,並將在未來版本中移除對其的支援。 - admin.replication-factor
-
自 2.1.1 版本起,此屬性已棄用,取而代之的是
topic.replication-factor,並將在未來版本中移除對其的支援。 - autoRebalanceEnabled
-
當
true時,topic 分割槽會在消費者組的成員之間自動重新平衡。當false時,每個消費者根據spring.cloud.stream.instanceCount和spring.cloud.stream.instanceIndex分配一組固定的分割槽。這要求在每個啟動的例項上適當地設定spring.cloud.stream.instanceCount和spring.cloud.stream.instanceIndex屬性。在這種情況下,spring.cloud.stream.instanceCount屬性的值通常必須大於 1。預設值:
true。 - ackEachRecord
-
當
autoCommitOffset為true時,此設定決定是否在處理每個記錄後提交 offset。預設情況下,在處理完consumer.poll()返回的批次中的所有記錄後提交 offset。透過消費者configuration屬性設定的max.poll.recordsKafka 屬性可以控制 poll 返回的記錄數。將其設定為true可能會導致效能下降,但這樣做可以減少發生故障時重新傳遞記錄的可能性。另請參閱繫結器requiredAcks屬性,它也影響提交 offset 的效能。此屬性自 3.1 版本起已棄用,取而代之的是使用ackMode。如果未設定ackMode且未啟用批處理模式,則將使用RECORD確認模式。預設值:
false。 - autoCommitOffset
-
從 3.1 版本開始,此屬性已棄用。有關替代方案的更多詳細資訊,請參閱
ackMode。訊息處理後是否自動提交 offset。如果設定為false,則入站訊息中會存在一個鍵為kafka_acknowledgment型別為org.springframework.kafka.support.Acknowledgment的頭。應用程式可以使用此頭來確認訊息。有關詳細資訊,請參閱示例部分。當此屬性設定為false時,Kafka 繫結器會將 ack 模式設定為org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL,並且應用程式負責確認記錄。另請參閱ackEachRecord。預設值:
true。 - ackMode
-
指定容器確認模式。這基於 Spring Kafka 中定義的 AckMode 列舉。如果
ackEachRecord屬性設定為true且消費者不處於批處理模式,則這將使用RECORD的確認模式,否則,使用透過此屬性提供的確認模式。 - autoCommitOnError
-
在可輪詢消費者中,如果設定為
true,則始終在出錯時自動提交。如果未設定(預設)或為 false,則在可輪詢消費者中不會自動提交。請注意,此屬性僅適用於可輪詢消費者。預設值:未設定。
- resetOffsets
-
是否將消費者上的 offset 重置為由 startOffset 提供的值。如果提供了
KafkaBindingRebalanceListener,則必須為 false;請參閱重新平衡監聽器。有關此屬性的更多資訊,請參閱重置 offset。預設值:
false。 - startOffset
-
新組的起始 offset。允許的值:
earliest和latest。如果為消費者 'binding' 顯式設定了消費者組(透過spring.cloud.stream.bindings.<channelName>.group),則 'startOffset' 設定為earliest。否則,對於anonymous消費者組,它設定為latest。有關此屬性的更多資訊,請參閱重置 offset。預設值:null(相當於
earliest)。 - enableDlq
-
當設定為 true 時,它為消費者啟用 DLQ 行為。預設情況下,導致錯誤的訊息會轉發到名為
error.<destination>.<group>的 topic。DLQ topic 名稱可以透過設定dlqName屬性或定義型別為DlqDestinationResolver的@Bean來配置。這為更常見的 Kafka 重放場景提供了一個替代選項,適用於錯誤數量相對較少且重放整個原始 topic 可能過於繁瑣的情況。有關更多資訊,請參閱kafka dlq 處理。從 2.0 版本開始,傳送到 DLQ topic 的訊息會增強以下頭:x-original-topic、x-exception-message和x-exception-stacktrace,型別為byte[]。預設情況下,失敗的記錄會發送到 DLQ topic 中與原始記錄相同的分割槽號。有關如何更改此行為的資訊,請參閱dlq 分割槽選擇。當destinationIsPattern為true時不允許。預設值:
false。 - dlqPartitions
-
當
enableDlq為 true 且未設定此屬性時,將建立一個死信 topic,其分割槽數與主 topic 相同。通常,死信記錄會發送到死信 topic 中與原始記錄相同的分割槽。此行為可以更改;請參閱dlq 分割槽選擇。如果此屬性設定為1並且沒有DqlPartitionFunctionbean,則所有死信記錄都將寫入分割槽0。如果此屬性大於1,則必須提供DlqPartitionFunctionbean。請注意,實際分割槽計數受繫結器minPartitionCount屬性的影響。預設值:
none - configuration
-
包含通用 Kafka 消費者屬性的鍵/值對對映。除了 Kafka 消費者屬性外,此處還可以傳遞其他配置屬性。例如,應用程式需要的一些屬性,如
spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar。此處不能設定bootstrap.servers屬性;如果需要連線到多個叢集,請使用多繫結器支援。預設值:空對映。
- dlqName
-
接收錯誤訊息的 DLQ topic 的名稱。
預設值:null(如果未指定,則導致錯誤的訊息會轉發到名為
error.<destination>.<group>的 topic)。 - dlqProducerProperties
-
使用此屬性可以設定 DLQ 特定的生產者屬性。所有透過 Kafka 生產者屬性可用的屬性都可以透過此屬性設定。當消費者上啟用原生解碼(即 useNativeDecoding: true)時,應用程式必須為 DLQ 提供相應的鍵/值序列化器。這必須以
dlqProducerProperties.configuration.key.serializer和dlqProducerProperties.configuration.value.serializer的形式提供。預設值:預設 Kafka 生產者屬性。
- standardHeaders
-
指示入站通道介面卡填充哪些標準頭。允許的值:
none、id、timestamp或both。如果使用原生反序列化且接收訊息的第一個元件需要id(例如配置為使用 JDBC 訊息儲存的聚合器),則此屬性非常有用。預設值:
none - converterBeanName
-
實現
RecordMessageConverter的 bean 名稱。在入站通道介面卡中使用,以替換預設的MessagingMessageConverter。預設值:
null - idleEventInterval
-
表示最近沒有收到訊息的事件之間的間隔,單位為毫秒。使用
ApplicationListener<ListenerContainerIdleEvent>接收這些事件。有關使用示例,請參閱暫停-恢復。預設值:
30000 - destinationIsPattern
-
當 true 時,目標被視為正則表示式
Pattern,用於由 broker 匹配 topic 名稱。當 true 時,不提供 topic,並且不允許enableDlq,因為繫結器在 provision 階段不知道 topic 名稱。請注意,檢測匹配模式的新 topic 所花費的時間由消費者屬性metadata.max.age.ms控制,該屬性(在撰寫本文時)預設為 300,000ms(5 分鐘)。這可以使用上面的configuration屬性進行配置。預設值:
false - topic.properties
-
一個 Kafka topic 屬性的
Map,用於 provision 新 topic,例如spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0預設值:無。
- topic.replicas-assignment
-
一個 Map<Integer, List<Integer>> 的副本分配,其中鍵是分割槽,值是分配。用於 provision 新 topic。請參閱
kafka-clientsjar 中的NewTopicJavadoc。預設值:無。
- topic.replication-factor
-
provision topic 時使用的複製因子。覆蓋繫結器範圍的設定。如果存在
replicas-assignments,則忽略。預設值:無(使用繫結器範圍的預設值 -1)。
- pollTimeout
-
可輪詢消費者中用於輪詢的超時時間。
預設值:5 秒。
- 事務管理器
-
KafkaAwareTransactionManager的 Bean 名稱,用於覆蓋此繫結的繫結器事務管理器。如果您想使用ChainedKafkaTransactionManaager將另一個事務與 Kafka 事務同步,通常需要此屬性。為了實現記錄的精確一次消費和生產,消費者和生產者繫結都必須配置相同的事務管理器。預設值:無。
- txCommitRecovered
-
當使用事務繫結器時,恢復的記錄的 offset(例如,當重試耗盡且記錄被髮送到死信 topic 時)將透過新事務提交,這是預設行為。將此屬性設定為
false會阻止提交恢復記錄的 offset。預設值:true。
- commonErrorHandlerBeanName
-
每個消費者繫結使用的
CommonErrorHandlerbean 名稱。當存在時,此使用者提供的CommonErrorHandler優先於繫結器定義的任何其他錯誤處理器。如果應用程式不想使用ListenerContainerCustomizer然後檢查目標/組組合來設定錯誤處理器,這是一種方便表達錯誤處理器的方式。預設值:無。
Kafka 生產者屬性
以下屬性僅適用於 Kafka 生產者,並且必須以 spring.cloud.stream.kafka.bindings.<channelName>.producer. 為字首。
為避免重複,Spring Cloud Stream 支援以 spring.cloud.stream.kafka.default.producer.<property>=<value> 的格式設定所有通道的值。 |
- admin.configuration
-
自 2.1.1 版本起,此屬性已棄用,取而代之的是
topic.properties,並將在未來版本中移除對其的支援。 - admin.replicas-assignment
-
自 2.1.1 版本起,此屬性已棄用,取而代之的是
topic.replicas-assignment,並將在未來版本中移除對其的支援。 - admin.replication-factor
-
自 2.1.1 版本起,此屬性已棄用,取而代之的是
topic.replication-factor,並將在未來版本中移除對其的支援。 - bufferSize
-
Kafka 生產者在傳送前嘗試批次處理的資料的上限,單位為位元組。
預設值:
16384。 - sync
-
生產者是否是同步的。
預設值:
false。 - sendTimeoutExpression
-
一個針對出站訊息進行評估的 SpEL 表示式,用於在啟用同步釋出時評估等待 ack 的時間,例如
headers['mySendTimeout']。超時時間的值以毫秒為單位。在 3.0 版本之前,除非使用原生編碼,否則無法使用 payload,因為在評估此表示式時,payload 已經採用byte[]的形式。現在,在轉換 payload 之前評估表示式。預設值:
none。 - batchTimeout
-
生產者等待允許更多訊息在同一批次中累積然後傳送訊息的時間。(通常,生產者根本不等待,只需傳送在前一次傳送正在進行時累積的所有訊息。)非零值可能會以延遲為代價提高吞吐量。
預設值:
0。 - messageKeyExpression
-
一個針對出站訊息進行評估的 SpEL 表示式,用於填充生成的 Kafka 訊息的鍵,例如
headers['myKey']。在 3.0 版本之前,除非使用原生編碼,否則無法使用 payload,因為在評估此表示式時,payload 已經採用byte[]的形式。現在,在轉換 payload 之前評估表示式。對於常規處理器(Function<String, String>或Function<Message<?>, Message<?>),如果生成的鍵需要與來自 topic 的入站鍵相同,則可以按如下方式設定此屬性:spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey']對於反應式函式,需要記住一個重要的注意事項。在這種情況下,應用程式需要手動將頭從入站訊息複製到出站訊息。您可以設定頭,例如myKey並使用上面建議的headers['myKey'],或者為了方便起見,只需設定KafkaHeaders.MESSAGE_KEY頭,您根本不需要設定此屬性。預設值:
none。 - headerPatterns
-
一個逗號分隔的簡單模式列表,用於匹配 Spring messaging 頭,以對映到
ProducerRecord中的 KafkaHeaders。模式可以以萬用字元(星號)開頭或結尾。模式可以透過加字首!來否定。匹配在第一個匹配(正或負)之後停止。例如!ask,as*將透過ash但不會透過ask。id和timestamp永遠不會被對映。預設值:
*(所有頭 - 除了id和timestamp) - configuration
-
包含通用 Kafka 生產者屬性的鍵/值對對映。此處不能設定
bootstrap.servers屬性;如果需要連線到多個叢集,請使用多繫結器支援。預設值:空對映。
- topic.properties
-
一個 Kafka topic 屬性的
Map,用於 provision 新 topic,例如spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0 - topic.replicas-assignment
-
一個 Map<Integer, List<Integer>> 的副本分配,其中鍵是分割槽,值是分配。用於 provision 新 topic。請參閱
kafka-clientsjar 中的NewTopicJavadoc。預設值:無。
- topic.replication-factor
-
provision topic 時使用的複製因子。覆蓋繫結器範圍的設定。如果存在
replicas-assignments,則忽略。預設值:無(使用繫結器範圍的預設值 -1)。
- useTopicHeader
-
設定為
true以使用出站訊息中KafkaHeaders.TOPIC訊息頭的值覆蓋預設的繫結目標(topic 名稱)。如果頭不存在,則使用預設的繫結目標。預設值:
false。 - recordMetadataChannel
-
一個
MessageChannel的 bean 名稱,用於傳送成功的傳送結果;該 bean 必須存在於應用程式上下文中。傳送到通道的訊息是已傳送的訊息(如果進行了任何轉換,則為轉換後的訊息),並帶有一個附加頭KafkaHeaders.RECORD_METADATA。該頭包含 Kafka 客戶端提供的RecordMetadata物件;它包括記錄寫入 topic 的分割槽和 offset。ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)失敗的傳送會轉到生產者錯誤通道(如果已配置);請參閱Kafka 錯誤通道。
預設值:null。
Kafka 繫結器使用生產者的 partitionCount 設定作為提示,以建立具有給定分割槽數的 topic(結合 minPartitionCount,兩者中的最大值將作為所使用的值)。在為繫結器配置 minPartitionCount 和為應用程式配置 partitionCount 時要謹慎,因為將使用較大的值。如果已存在分割槽數較小的 topic 且 autoAddPartitions 被停用(預設),則繫結器將無法啟動。如果已存在分割槽數較小的 topic 且 autoAddPartitions 被啟用,則會新增新分割槽。如果已存在分割槽數大於 (minPartitionCount 或 partitionCount) 最大值的 topic,則使用現有分割槽數。 |
- compression
-
設定
compression.type生產者屬性。支援的值為none、gzip、snappy、lz4和zstd。如果您將kafka-clientsjar 覆蓋到 2.1.0(或更高版本),如 Spring for Apache Kafka 文件中所述,並希望使用zstd壓縮,請使用spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd。預設值:
none。 - 事務管理器
-
KafkaAwareTransactionManager的 Bean 名稱,用於覆蓋此繫結的繫結器事務管理器。如果您想使用ChainedKafkaTransactionManaager將另一個事務與 Kafka 事務同步,通常需要此屬性。為了實現記錄的精確一次消費和生產,消費者和生產者繫結都必須配置相同的事務管理器。預設值:無。
- closeTimeout
-
關閉生產者時等待的超時時間,單位為秒。
預設值:
30 - allowNonTransactional
-
通常,與事務繫結器關聯的所有輸出繫結都將在新事務中釋出,如果尚未進行事務。此屬性允許您覆蓋該行為。如果設定為 true,則釋出到此輸出繫結的記錄將不會在事務中執行,除非事務已經在進行中。
預設值:
false