配置選項

本節包含 Apache Kafka Binder 使用的配置選項。

有關 Binder 的通用配置選項和屬性,請參閱核心文件中的繫結屬性

Kafka Binder 屬性

spring.cloud.stream.kafka.binder.brokers

Kafka Binder 連線的 Broker 列表。

預設值:localhost

spring.cloud.stream.kafka.binder.defaultBrokerPort

brokers 允許指定帶埠或不帶埠資訊的主機(例如,host1,host2:port2)。此屬性在 Broker 列表中未配置埠時設定預設埠。

預設值:9092

spring.cloud.stream.kafka.binder.configuration

Key/Value Map,包含傳遞給 Binder 建立的所有客戶端(生產者和消費者)的客戶端屬性。由於這些屬性同時用於生產者和消費者,因此其使用應限於通用屬性——例如,安全設定。透過此配置提供的未知 Kafka 生產者或消費者屬性會被過濾掉,不允許傳播。這裡的屬性會覆蓋在 Boot 中設定的任何屬性。

預設值:空 Map。

spring.cloud.stream.kafka.binder.consumerProperties

Key/Value Map,包含任意 Kafka 客戶端消費者屬性。除了支援已知的 Kafka 消費者屬性外,此處也允許使用未知消費者屬性。這裡的屬性會覆蓋在 Boot 中以及上述 configuration 屬性中設定的任何屬性。

預設值:空 Map。

spring.cloud.stream.kafka.binder.headers

Binder 傳輸的自定義 header 列表。僅在使用舊版本應用 (⇐ 1.3.x) 且 kafka-clients 版本低於 0.11.0.0 時需要。更新版本原生支援 header。

預設值:空。

spring.cloud.stream.kafka.binder.healthTimeout

獲取分割槽資訊等待的時間,單位為秒。如果此計時器過期,健康狀態將報告為 down。

預設值:60。

spring.cloud.stream.kafka.binder.requiredAcks

Broker 上所需的 acks 數量。參見 Kafka 文件中關於生產者 acks 屬性的說明。

預設值:1

spring.cloud.stream.kafka.binder.minPartitionCount

僅在設定了 autoCreateTopicsautoAddPartitions 時生效。Binder 在其生產或消費資料的 Topic 上配置的全域性最小分割槽數。它可能被生產者的 partitionCount 設定或生產者的 instanceCount * concurrency 設定(如果其中一個更大)所覆蓋。

預設值:1

spring.cloud.stream.kafka.binder.producerProperties

Key/Value Map,包含任意 Kafka 客戶端生產者屬性。除了支援已知的 Kafka 生產者屬性外,此處也允許使用未知生產者屬性。這裡的屬性會覆蓋在 Boot 中以及上述 configuration 屬性中設定的任何屬性。

預設值:空 Map。

spring.cloud.stream.kafka.binder.replicationFactor

如果 autoCreateTopics 啟用,則為自動建立的 Topic 的副本因子。可以在每個 Binding 上覆蓋。

如果您使用的是 2.4 之前的 Kafka Broker 版本,則此值應至少設定為 1。從 3.0.8 版本開始,Binder 使用 -1 作為預設值,這表示將使用 Broker 的 'default.replication.factor' 屬性來確定副本數量。請與您的 Kafka Broker 管理員確認是否存在需要最小副本因子的策略,如果是,則通常 default.replication.factor 將與該值匹配,應使用 -1,除非您需要大於最小值的副本因子。

預設值:-1

spring.cloud.stream.kafka.binder.autoCreateTopics

如果設定為 true,Binder 會自動建立新的 Topic。如果設定為 false,Binder 則依賴於已配置的 Topic。在後一種情況下,如果 Topic 不存在,Binder 將無法啟動。

此設定獨立於 Broker 的 auto.create.topics.enable 設定,並且不會影響它。如果伺服器設定為自動建立 Topic,它們可能會作為元資料檢索請求的一部分建立,並使用預設的 Broker 設定。

預設值:true

spring.cloud.stream.kafka.binder.autoAddPartitions

如果設定為 true,Binder 會在需要時建立新的分割槽。如果設定為 false,Binder 則依賴於已配置 Topic 的分割槽大小。如果目標 Topic 的分割槽數小於預期值,Binder 將無法啟動。

預設值:false

spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix

啟用 Binder 中的事務。參見 Kafka 文件中的 transaction.idspring-kafka 文件中的事務。啟用事務後,單個 producer 屬性將被忽略,所有生產者都使用 spring.cloud.stream.kafka.binder.transaction.producer.* 屬性。

預設值 null(無事務)

spring.cloud.stream.kafka.binder.transaction.producer.*

事務性 Binder 中生產者的全域性生產者屬性。參見 spring.cloud.stream.kafka.binder.transaction.transactionIdPrefixKafka 生產者屬性以及所有 Binder 支援的通用生產者屬性。

預設值:參見各個生產者屬性。

spring.cloud.stream.kafka.binder.headerMapperBeanName

用於將 spring-messaging header 與 Kafka header 之間進行對映的 KafkaHeaderMapper Bean 名稱。例如,如果您希望定製使用 JSON 反序列化 header 的 BinderHeaderMapper Bean 中的受信任包,可以使用此屬性。如果沒有透過此屬性將自定義的 BinderHeaderMapper Bean 提供給 Binder,那麼 Binder 將查詢名稱為 kafkaBinderHeaderMapper 且型別為 BinderHeaderMapper 的 header mapper Bean,然後再回退到 Binder 建立的預設 BinderHeaderMapper

預設值:無。

spring.cloud.stream.kafka.binder.considerDownWhenAnyPartitionHasNoLeader

標誌,用於在 Topic 上的任何分割槽(無論接收資料的消費者是哪個)被發現沒有 Leader 時,將 Binder 健康狀態設定為 down

預設值:true

spring.cloud.stream.kafka.binder.certificateStoreDirectory

當 truststore 或 keystore 證書位置以非本地檔案系統資源(org.springframework.core.io.Resource 支援的資源,例如 CLASSPATH, HTTP 等)形式給出時,Binder 會將資源從路徑(可轉換為 org.springframework.core.io.Resource)複製到檔案系統上的某個位置。這對於 Broker 級別的證書(ssl.truststore.locationssl.keystore.location)和用於 Schema Registry 的證書(schema.registry.ssl.truststore.locationschema.registry.ssl.keystore.location)都適用。請注意,truststore 和 keystore 的位置路徑必須在 spring.cloud.stream.kafka.binder.configuration…​ 下提供。例如,spring.cloud.stream.kafka.binder.configuration.ssl.truststore.locationspring.cloud.stream.kafka.binder.configuration.schema.registry.ssl.truststore.location 等。檔案將複製到為此屬性值指定的目錄,該目錄必須是檔案系統上已存在且執行應用程式的程序可寫入的目錄。如果未設定此值且證書檔案是非本地檔案系統資源,則它將被複制到透過 System.getProperty("java.io.tmpdir") 返回的系統臨時目錄。即使此值存在,但目錄無法在檔案系統上找到或不可寫入時,情況也是如此。

預設值:無。

spring.cloud.stream.kafka.binder.metrics.defaultOffsetLagMetricsEnabled

當設定為 true 時,每次訪問度量指標時都會計算每個消費者主題的偏移量滯後度量。當設定為 false 時,僅使用週期性計算的偏移量滯後。

預設值:true

spring.cloud.stream.kafka.binder.metrics.offsetLagMetricsInterval

計算每個消費者主題偏移量滯後的間隔。當 metrics.defaultOffsetLagMetricsEnabled 被停用或計算時間過長時,使用此值。

預設值:60 秒

spring.cloud.stream.kafka.binder.enableObservation

在此 Binder 中的所有繫結上啟用 Micrometer 觀察登錄檔。

預設值:false

spring.cloud.stream.kafka.binder.healthIndicatorConsumerGroup

KafkaHealthIndicator 元資料消費者 group.idHealthIndicator 使用此消費者查詢正在使用的 Topic 的元資料。

預設值:無。

Kafka 消費者屬性

以下屬性僅適用於 Kafka 消費者,並且必須以 spring.cloud.stream.kafka.bindings.<channelName>.consumer. 為字首。

為了避免重複,Spring Cloud Stream 支援為所有通道設定值,格式為 spring.cloud.stream.kafka.default.consumer.<property>=<value>
admin.configuration

自版本 2.1.1 起,此屬性已被棄用,推薦使用 topic.properties,未來版本將移除對它的支援。

admin.replicas-assignment

自版本 2.1.1 起,此屬性已被棄用,推薦使用 topic.replicas-assignment,未來版本將移除對它的支援。

admin.replication-factor

自版本 2.1.1 起,此屬性已被棄用,推薦使用 topic.replication-factor,未來版本將移除對它的支援。

autoRebalanceEnabled

當為 true 時,Topic 分割槽會在消費者組成員之間自動重新平衡。當為 false 時,每個消費者根據 spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex 分配一組固定的分割槽。這要求在每個啟動的例項上適當地設定 spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex 屬性。在這種情況下,spring.cloud.stream.instanceCount 屬性的值通常必須大於 1。

預設值:true

ackEachRecord

autoCommitOffsettrue 時,此設定決定是否在處理每個記錄後提交偏移量。預設情況下,在處理完 consumer.poll() 返回的批次中的所有記錄後提交偏移量。Poll 返回的記錄數量可以透過 max.poll.records Kafka 屬性控制,該屬性透過消費者 configuration 屬性設定。將其設定為 true 可能會導致效能下降,但這樣做可以減少發生故障時重新投遞記錄的可能性。此外,請參閱 Binder 的 requiredAcks 屬性,它也影響提交偏移量的效能。此屬性自 3.1 版本起已被棄用,推薦使用 ackMode。如果未設定 ackMode 且未啟用批次模式,將使用 RECORD 確認模式。

預設值:false

autoCommitOffset

從版本 3.1 開始,此屬性已被棄用。有關替代方案的詳細資訊,請參見 ackMode。訊息處理完成後是否自動提交偏移量。如果設定為 false,則入站訊息中將存在一個 key 為 kafka_acknowledgment、型別為 org.springframework.kafka.support.Acknowledgment 的 header。應用程式可以使用此 header 來確認訊息。有關詳細資訊,請參見示例部分。當此屬性設定為 false 時,Kafka Binder 將確認模式設定為 org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL,並且應用程式負責確認記錄。另請參閱 ackEachRecord

預設值:true

ackMode

指定容器確認模式。這基於 Spring Kafka 中定義的 AckMode 列舉。如果 ackEachRecord 屬性設定為 true 且消費者未處於批次模式,則將使用 RECORD 確認模式,否則,使用透過此屬性提供的確認模式。

autoCommitOnError

在 Poll 式消費者中,如果設定為 true,則在發生錯誤時始終自動提交。如果未設定(預設值)或為 false,則在 Poll 式消費者中不會自動提交。請注意,此屬性僅適用於 Poll 式消費者。

預設值:未設定。

resetOffsets

是否將消費者上的偏移量重置為 startOffset 提供的值。如果提供了 KafkaBindingRebalanceListener,則必須設定為 false;參見Rebalance 監聽器。有關此屬性的更多資訊,請參見重置偏移量

預設值:false

startOffset

新組的起始偏移量。允許的值:earliestlatest。如果透過 spring.cloud.stream.bindings.<channelName>.group 為消費者“繫結”顯式設定了消費者組,則 'startOffset' 設定為 earliest。否則,對於匿名消費者組,它設定為 latest。有關此屬性的更多資訊,請參見重置偏移量

預設值:null(等同於 earliest)。

enableDlq

當設定為 true 時,為消費者啟用 DLQ 行為。預設情況下,導致錯誤的訊息會被轉發到名為 error.<destination>.<group> 的主題。可以透過設定 dlqName 屬性或定義型別為 DlqDestinationResolver@Bean 來配置 DLQ 主題名稱。當錯誤數量相對較少且重放整個原始主題可能過於繁瑣時,這提供了一種替代 Kafka 更常見的重放場景的選項。有關更多資訊,請參見Kafka DLQ 處理。從 2.0 版本開始,傳送到 DLQ 主題的訊息會增加以下 header:x-original-topicx-exception-messagex-exception-stacktrace,型別為 byte[]。預設情況下,失敗的記錄會發送到 DLQ 主題中與原始記錄相同的分割槽號。有關如何更改此行為的資訊,請參見DLQ 分割槽選擇destinationIsPatterntrue 時不允許設定此屬性。

預設值:false

dlqPartitions

enableDlq 為 true 且未設定此屬性時,將建立一個與主主題擁有相同分割槽數的死信主題。通常,死信記錄會發送到死信主題中與原始記錄相同的分割槽。此行為可以更改;參見DLQ 分割槽選擇。如果此屬性設定為 1 且沒有 DqlPartitionFunction bean,所有死信記錄將寫入分割槽 0。如果此屬性大於 1,則必須提供一個 DlqPartitionFunction bean。請注意,實際分割槽數受 Binder 的 minPartitionCount 屬性影響。

預設值:無

configuration

Map,包含通用 Kafka 消費者屬性的 Key/Value 對。除了 Kafka 消費者屬性外,此處還可以傳遞其他配置屬性。例如,應用程式需要的一些屬性,如 spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar。此處不能設定 bootstrap.servers 屬性;如果需要連線到多個叢集,請使用多 Binder 支援。

預設值:空 Map。

dlqName

接收錯誤訊息的 DLQ 主題名稱。

預設值:null(如果未指定,導致錯誤的訊息將轉發到名為 error.<destination>.<group> 的主題)。

dlqProducerProperties

使用此屬性,可以設定特定於 DLQ 的生產者屬性。所有透過 Kafka 生產者屬性可用的屬性都可以透過此屬性設定。當消費者上啟用原生解碼(即 useNativeDecoding: true)時,應用程式必須為 DLQ 提供相應的 key/value 序列化器。這必須以 dlqProducerProperties.configuration.key.serializerdlqProducerProperties.configuration.value.serializer 的形式提供。

預設值:預設 Kafka 生產者屬性。

standardHeaders

指示入站通道介面卡填充哪些標準 header。允許的值:noneidtimestampboth。在使用原生反序列化且第一個接收訊息的元件需要一個 ID(例如配置為使用 JDBC 訊息儲存的聚合器)時很有用。

預設值:無

converterBeanName

實現 RecordMessageConverter 的 Bean 名稱。用於入站通道介面卡中,替換預設的 MessagingMessageConverter

預設值:null

idleEventInterval

事件之間的間隔,單位為毫秒,這些事件指示最近沒有收到訊息。使用 ApplicationListener<ListenerContainerIdleEvent> 來接收這些事件。有關使用示例,請參見暫停-恢復

預設值:30000

destinationIsPattern

當為 true 時,目標被視為由 Broker 用於匹配 Topic 名稱的正則表示式 Pattern。當為 true 時,Topic 不會被 Provision,並且不允許使用 enableDlq,因為 Binder 在 Provision 階段不知道 Topic 名稱。請注意,檢測匹配模式的新 Topic 所花費的時間由消費者屬性 metadata.max.age.ms 控制,該屬性(在撰寫本文時)預設為 300,000 毫秒(5 分鐘)。可以使用上述 configuration 屬性進行配置。

預設值:false

topic.properties

Map,用於 Provision 新 Topic 時使用的 Kafka Topic 屬性——例如,spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0

預設值:無。

topic.replicas-assignment

Map<Integer, List<Integer>>,用於副本分配,其中 key 是分割槽,value 是分配。用於 Provision 新 Topic 時。參見 kafka-clients jar 中 NewTopic 的 Javadoc。

預設值:無。

topic.replication-factor

Provision Topic 時使用的副本因子。覆蓋 Binder 範圍的設定。如果存在 replicas-assignments,則忽略。

預設值:無(使用 Binder 範圍的預設值 -1)。

pollTimeout

Poll 式消費者中用於 Polling 的超時時間。

預設值:5 秒。

transactionManager

KafkaAwareTransactionManager 的 Bean 名稱,用於覆蓋此繫結的 Binder 事務管理器。通常在需要使用 ChainedKafkaTransactionManaager 將另一個事務與 Kafka 事務同步時需要。為了實現記錄的精確一次消費和生產,消費者和生產者繫結都必須配置相同的事務管理器。

預設值:無。

txCommitRecovered

使用事務性 Binder 時,預設情況下,恢復的記錄(例如,重試耗盡並將記錄傳送到死信主題時)的偏移量將透過新事務提交。將此屬性設定為 false 可禁止提交恢復記錄的偏移量。

預設值:true。

commonErrorHandlerBeanName

每個消費者繫結使用的 CommonErrorHandler Bean 名稱。如果存在,此使用者提供的 CommonErrorHandler 將優先於 Binder 定義的任何其他錯誤處理器。如果應用程式不想使用 ListenerContainerCustomizer 並隨後檢查目標/組組合來設定錯誤處理器,這是一種方便的方式來表達錯誤處理器。

預設值:無。

Kafka 生產者屬性

以下屬性僅適用於 Kafka 生產者,並且必須以 spring.cloud.stream.kafka.bindings.<channelName>.producer. 為字首。

為了避免重複,Spring Cloud Stream 支援為所有通道設定值,格式為 spring.cloud.stream.kafka.default.producer.<property>=<value>
admin.configuration

自版本 2.1.1 起,此屬性已被棄用,推薦使用 topic.properties,未來版本將移除對它的支援。

admin.replicas-assignment

自版本 2.1.1 起,此屬性已被棄用,推薦使用 topic.replicas-assignment,未來版本將移除對它的支援。

admin.replication-factor

自版本 2.1.1 起,此屬性已被棄用,推薦使用 topic.replication-factor,未來版本將移除對它的支援。

bufferSize

Kafka 生產者在傳送前嘗試批次處理的資料量的上限,單位為位元組。

預設值:16384

sync

生產者是否是同步的。

預設值:false

sendTimeoutExpression

一個 SpEL 表示式,針對出站訊息進行評估,用於評估啟用同步釋出時等待 ack 的時間——例如,headers['mySendTimeout']。超時值以毫秒為單位。在 3.0 版本之前,除非使用原生編碼,否則無法使用 payload,因為評估此表示式時,payload 已經採用 byte[] 的形式。現在,表示式在轉換 payload 之前進行評估。

預設值:無。

batchTimeout

生產者在傳送訊息之前等待多長時間,以便在同一批次中累積更多訊息。(通常,生產者根本不等待,只是簡單地傳送在上次傳送進行時累積的所有訊息。)非零值可能會增加吞吐量,但會犧牲延遲。

預設值:0

messageKeyExpression

一個 SpEL 表示式,針對出站訊息進行評估,用於填充生產的 Kafka 訊息的 key——例如,headers['myKey']。在 3.0 版本之前,除非使用原生編碼,否則無法使用 payload,因為評估此表示式時,payload 已經採用 byte[] 的形式。現在,表示式在轉換 payload 之前進行評估。對於常規處理器(Function<String, String>Function<Message<?>, Message<?>),如果生產的 key 需要與來自 Topic 的入站 key 相同,可以如下設定此屬性:spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey'] 對於反應式函式,需要注意一個重要的警告。在這種情況下,由應用程式負責手動將 header 從入站訊息複製到出站訊息。您可以設定 header,例如 myKey 並像上面建議的那樣使用 headers['myKey'],或者為了方便,只需設定 KafkaHeaders.MESSAGE_KEY header,就完全不需要設定此屬性了。

預設值:無。

headerPatterns

逗號分隔的簡單模式列表,用於匹配 Spring messaging header 並將其對映到 ProducerRecord 中的 Kafka Header。模式可以以萬用字元(星號)開頭或結尾。模式可以透過字首 ! 來否定。匹配在第一個匹配(正向或負向)後停止。例如,!ask,as* 將透過 ash 但不會透過 askidtimestamp 永遠不會被對映。

預設值:*(所有頭資訊 - 除了 idtimestamp

configuration

包含通用 Kafka 生產者屬性的鍵/值對 Map。bootstrap.servers 屬性不能在此處設定;如果需要連線到多個叢集,請使用多繫結器支援。

預設值:空 Map。

topic.properties

一個 Map,包含在配置新主題時使用的 Kafka 主題屬性,例如:spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0

topic.replicas-assignment

Map<Integer, List<Integer>>,用於副本分配,其中 key 是分割槽,value 是分配。用於 Provision 新 Topic 時。參見 kafka-clients jar 中 NewTopic 的 Javadoc。

預設值:無。

topic.replication-factor

Provision Topic 時使用的副本因子。覆蓋 Binder 範圍的設定。如果存在 replicas-assignments,則忽略。

預設值:無(使用 Binder 範圍的預設值 -1)。

useTopicHeader

設定為 true 以使用出站訊息中 KafkaHeaders.TOPIC 訊息頭的值覆蓋預設繫結目標(主題名稱)。如果頭資訊不存在,則使用預設繫結目標。

預設值:false

recordMetadataChannel

一個 MessageChannel 的 bean 名稱,用於傳送成功的傳送結果;該 bean 必須存在於應用程式上下文中。傳送到該通道的訊息是已傳送的訊息(如果經過轉換),並附帶一個額外的頭資訊 KafkaHeaders.RECORD_METADATA。該頭資訊包含一個由 Kafka 客戶端提供的 RecordMetadata 物件;它包括記錄寫入主題的分割槽和偏移量。

ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)

失敗的傳送會進入生產者錯誤通道(如果已配置);請參閱 Kafka 錯誤通道

預設值:null。

Kafka 繫結器使用生產者的 partitionCount 設定作為提示,以建立具有給定分割槽數的主題(結合 minPartitionCount,兩者中的最大值將被使用)。在為繫結器配置 minPartitionCount 併為應用程式配置 partitionCount 時請謹慎,因為會使用較大的值。如果已存在一個分割槽數較少的主題且 autoAddPartitions 被停用(預設設定),則繫結器啟動失敗。如果已存在一個分割槽數較少的主題且 autoAddPartitions 被啟用,則會新增新分割槽。如果已存在一個分割槽數大於 (minPartitionCountpartitionCount) 中較大值的主題,則使用現有分割槽數。
compression

設定 compression.type 生產者屬性。支援的值包括 nonegzipsnappylz4zstd。如果您將 kafka-clients jar 版本覆蓋到 2.1.0(或更高版本),如 Spring for Apache Kafka 文件 中所述,並希望使用 zstd 壓縮,請使用 spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd

預設值:無。

transactionManager

KafkaAwareTransactionManager 的 Bean 名稱,用於覆蓋此繫結的 Binder 事務管理器。通常在需要使用 ChainedKafkaTransactionManaager 將另一個事務與 Kafka 事務同步時需要。為了實現記錄的精確一次消費和生產,消費者和生產者繫結都必須配置相同的事務管理器。

預設值:無。

closeTimeout

關閉生產者時等待的超時時間,單位為秒。

預設值:30

allowNonTransactional

通常,與事務性繫結器關聯的所有輸出繫結將在新事務中釋出(如果當前沒有進行中的事務)。此屬性允許您覆蓋該行為。如果設定為 true,則釋出到此輸出繫結的記錄將不會在事務中執行,除非當前已有進行中的事務。

預設值:false