配置選項
本節包含 Kafka Streams binder 使用的配置選項。
有關 Binder 的通用配置選項和屬性,請參閱核心文件。
Kafka Streams Binder 屬性
以下屬性可在 Binder 級別使用,並且必須以 spring.cloud.stream.kafka.streams.binder.
為字首。在 Kafka Streams binder 中重複使用的任何 Kafka binder 提供的屬性必須以 spring.cloud.stream.kafka.streams.binder
為字首,而不是 spring.cloud.stream.kafka.binder
。此規則的唯一例外是定義 Kafka 引導伺服器屬性時,此時任何一個字首都可以使用。
- configuration
-
包含與 Apache Kafka Streams API 相關屬性的鍵/值對 Map。此屬性必須以
spring.cloud.stream.kafka.streams.binder.
為字首。以下是一些使用此屬性的示例。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
有關可能包含在 Streams 配置中的所有屬性的更多資訊,請參閱 Apache Kafka Streams 文件中的 StreamsConfig
JavaDocs。您可以從 StreamsConfig
設定的所有配置都可以透過此屬性進行設定。使用此屬性時,它適用於整個應用程式,因為它是一個 Binder 級別的屬性。如果應用程式中有多個處理器,所有這些處理器都將獲取這些屬性。對於像 application.id
這樣的屬性,這將成為問題,因此您必須仔細檢查如何使用此 Binder 級別的 configuration
屬性來對映來自 StreamsConfig
的屬性。
- functions.<function-bean-name>.applicationId
-
僅適用於函式式處理器。此屬性可用於在應用程式中為每個函式設定 application ID。在多個函式的情況下,這是設定 application ID 的便捷方式。
- functions.<function-bean-name>.configuration
-
僅適用於函式式處理器。包含與 Apache Kafka Streams API 相關屬性的鍵/值對 Map。這類似於上面描述的 Binder 級別的
configuration
屬性,但此級別的configuration
屬性僅針對指定函式進行限制。當您有多個處理器並且希望根據特定函式限制對配置的訪問時,您可能希望使用此屬性。所有StreamsConfig
屬性都可以在這裡使用。 - brokers
-
Broker URL
預設值:
localhost
- zkNodes
-
Zookeeper URL
預設值:
localhost
- deserializationExceptionHandler
-
反序列化錯誤處理器型別。此處理器應用於 Binder 級別,因此應用於應用程式中的所有輸入繫結。可以在消費者繫結級別以更精細的方式對其進行控制。可能的值包括 -
logAndContinue
,logAndFail
,skipAndContinue
或sendToDlq
預設值:
logAndFail
- applicationId
-
一種方便的方式,可在 Binder 級別全域性設定 Kafka Streams 應用的 application.id。如果應用程式包含多個函式,則 application id 應設定不同。詳細資訊請參見上文關於設定 application id 的討論。
預設值: 應用程式將生成一個靜態 application ID。有關更多詳細資訊,請參閱 application ID 部分。
- stateStoreRetry.maxAttempts
-
嘗試連線狀態儲存的最大次數。
預設值: 1
- stateStoreRetry.backoffPeriod
-
重試時嘗試連線狀態儲存的回退週期。
預設值: 1000 ms
- consumerProperties
-
Binder 級別的任意消費者屬性。
- producerProperties
-
Binder 級別的任意生產者屬性。
- includeStoppedProcessorsForHealthCheck
-
當透過 actuator 停止處理器繫結時,預設情況下該處理器將不參與健康檢查。將此屬性設定為
true
以啟用所有處理器的健康檢查,包括當前透過 bindings actuator endpoint 停止的處理器。預設值: false
Kafka Streams 生產者屬性
以下屬性*僅*適用於 Kafka Streams 生產者,且必須以 spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.
為字首。為方便起見,如果存在多個輸出繫結且它們都需要一個公共值,則可以使用字首 spring.cloud.stream.kafka.streams.default.producer.
進行配置。
- keySerde
-
要使用的 key serde
預設值: 請參閱上面關於訊息序列化/反序列化的討論
- valueSerde
-
要使用的 value serde
預設值: 請參閱上面關於訊息序列化/反序列化的討論
- useNativeEncoding
-
啟用/停用原生編碼的標誌
預設值:
true
。 - streamPartitionerBeanName
-
要在消費者端使用的自定義出站分割槽器 Bean 名稱。應用程式可以將自定義
StreamPartitioner
作為 Spring Bean 提供,並將此 Bean 的名稱提供給生產者使用,而不是使用預設分割槽器。預設值: 請參閱上面關於出站分割槽支援的討論。
- producedAs
-
處理器生成到的 Sink 元件的自定義名稱。
預設值:
none
(由 Kafka Streams 生成)
Kafka Streams 消費者屬性
以下屬性適用於 Kafka Streams 消費者,且必須以 spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.
為字首。為方便起見,如果存在多個輸入繫結且它們都需要一個公共值,則可以使用字首 spring.cloud.stream.kafka.streams.default.consumer.
進行配置。
- applicationId
-
為每個輸入繫結設定 application.id。
預設值: 請參閱上文。
- keySerde
-
要使用的 key serde
預設值: 請參閱上面關於訊息序列化/反序列化的討論
- valueSerde
-
要使用的 value serde
預設值: 請參閱上面關於訊息序列化/反序列化的討論
- materializedAs
-
使用傳入 KTable 型別時要物化(materialize)的狀態儲存
預設值:
none
。 - useNativeDecoding
-
啟用/停用原生解碼的標誌
預設值:
true
。 - dlqName
-
DLQ 主題名稱。
預設值: 請參閱上文關於錯誤處理和 DLQ 的討論。
- startOffset
-
如果沒有已提交的 offset 可供消費,則從此 offset 開始消費。這主要用於消費者首次消費主題時。Kafka Streams 使用
earliest
作為預設策略,Binder 也使用相同的預設值。可以使用此屬性將其覆蓋為latest
。預設值:
earliest
。
注意: 在消費者上使用 resetOffsets
對 Kafka Streams binder 沒有影響。與基於訊息通道的 binder 不同,Kafka Streams binder 不會按需定址到開頭或結尾。
- deserializationExceptionHandler
-
反序列化錯誤處理器型別。此處理器應用於每個消費者繫結,而不是應用於之前描述的 Binder 級別屬性。可能的值包括 -
logAndContinue
,logAndFail
,skipAndContinue
或sendToDlq
預設值:
logAndFail
- timestampExtractorBeanName
-
要在消費者端使用的特定時間戳提取器 Bean 名稱。應用程式可以將
TimestampExtractor
作為 Spring Bean 提供,並將此 Bean 的名稱提供給消費者使用,而不是使用預設提取器。預設值: 請參閱上文關於時間戳提取器的討論。
- eventTypes
-
此繫結支援的事件型別列表,以逗號分隔。
預設值:
none
- eventTypeHeaderKey
-
透過此繫結的每個傳入記錄上的事件型別頭鍵。
預設值:
event_type
- consumedAs
-
處理器消費來源的 Source 元件的自定義名稱。
預設值:
none
(由 Kafka Streams 生成)
關於併發性的特別說明
在 Kafka Streams 中,您可以使用 num.stream.threads
屬性控制處理器可以建立的執行緒數。您可以使用上述在 Binder、函式、生產者或消費者級別描述的各種 configuration
選項來實現這一點。您也可以使用核心 Spring Cloud Stream 為此目的提供的 concurrency
屬性。使用此屬性時,需要在消費者上進行設定。當您有多個輸入繫結時,將其設定在第一個輸入繫結上。例如,設定 spring.cloud.stream.bindings.process-in-0.consumer.concurrency
時,它將被 Binder 轉換為 num.stream.threads
。如果您有多個處理器,並且一個處理器定義了繫結級別的併發性,而其他處理器沒有,那麼那些沒有繫結級別併發性的處理器將回退到透過 spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads
指定的 Binder 範圍屬性。如果此 Binder 配置不可用,則應用程式將使用 Kafka Streams 設定的預設值。