配置選項
本節包含 Kafka Streams 繫結器使用的配置選項。
有關繫結器的通用配置選項和屬性,請參閱 核心文件。
Kafka Streams 繫結器屬性
以下屬性在繫結器級別可用,並且必須以 spring.cloud.stream.kafka.streams.binder. 為字首。Kafka Streams 繫結器中重複使用的任何 Kafka 繫結器提供的屬性必須以 spring.cloud.stream.kafka.streams.binder 為字首,而不是 spring.cloud.stream.kafka.binder。此規則的唯一例外是當定義 Kafka 引導伺服器屬性時,此時任一字首都有效。
- configuration
-
包含與 Apache Kafka Streams API 相關的鍵/值對的對映。此屬性必須以
spring.cloud.stream.kafka.streams.binder.為字首。以下是使用此屬性的一些示例。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
有關可用於流配置的所有屬性的更多資訊,請參閱 Apache Kafka Streams 文件中的 StreamsConfig JavaDocs。您可以透過此屬性設定 StreamsConfig 中的所有配置。當使用此屬性時,它適用於整個應用程式,因為這是一個繫結器級別的屬性。如果應用程式中有多個處理器,所有處理器都將獲取這些屬性。在像 application.id 這樣的屬性情況下,這會變得有問題,因此您必須仔細檢查如何使用此繫結器級別的 configuration 屬性對映 StreamsConfig 中的屬性。
- functions.<function-bean-name>.applicationId
-
僅適用於函式式處理器。這可以用於在應用程式中為每個函式設定應用程式 ID。在有多個函式的情況下,這是一種方便的設定應用程式 ID 的方法。
- functions.<function-bean-name>.configuration
-
僅適用於函式式處理器。包含與 Apache Kafka Streams API 相關的鍵/值對的對映。這類似於上面描述的繫結器級別的
configuration屬性,但此級別的configuration屬性僅限於命名函式。當您有多個處理器並且希望根據特定函式限制對配置的訪問時,您可能需要使用此屬性。此處可以使用所有StreamsConfig屬性。 - brokers
-
Broker URL
預設值:
localhost - zkNodes
-
Zookeeper URL
預設值:
localhost - deserializationExceptionHandler
-
反序列化錯誤處理程式型別。此處理程式在繫結器級別應用,因此適用於應用程式中的所有輸入繫結。有一種方法可以在消費者繫結級別以更精細的方式控制它。可能的值為 -
logAndContinue、logAndFail、skipAndContinue或sendToDlq預設值:
logAndFail - applicationId
-
方便地在繫結器級別為 Kafka Streams 應用程式全域性設定 application.id。如果應用程式包含多個函式,則應不同地設定應用程式 ID。有關設定應用程式 ID 的詳細討論,請參閱上面。
預設值:應用程式將生成一個靜態應用程式 ID。有關更多詳細資訊,請參閱應用程式 ID 部分。
- stateStoreRetry.maxAttempts
-
嘗試連線狀態儲存的最大次數。
預設值:1
- stateStoreRetry.backoffPeriod
-
重試連線狀態儲存時的退避週期。
預設值:1000 毫秒
- consumerProperties
-
繫結器級別的任意消費者屬性。
- producerProperties
-
繫結器級別的任意生產者屬性。
- includeStoppedProcessorsForHealthCheck
-
當處理器繫結透過執行器停止時,此處理器預設不參與健康檢查。將此屬性設定為
true以啟用所有處理器的健康檢查,包括那些當前透過繫結執行器端點停止的處理器。預設值:false
Kafka Streams 生產者屬性
以下屬性僅適用於 Kafka Streams 生產者,並且必須以 spring.cloud.stream.kafka.streams.bindings.<binding name>.producer. 為字首。為方便起見,如果存在多個輸出繫結且它們都需要一個公共值,則可以使用 spring.cloud.stream.kafka.streams.default.producer. 字首進行配置。
- keySerde
-
要使用的鍵序列化/反序列化器
預設值:請參閱上面關於訊息反序列化/序列化的討論
- valueSerde
-
要使用的值序列化/反序列化器
預設值:請參閱上面關於訊息反序列化/序列化的討論
- useNativeEncoding
-
啟用/停用原生編碼的標誌
預設值:
true。 - streamPartitionerBeanName
-
要在消費者端使用的自定義出站分割槽器 bean 名稱。應用程式可以提供自定義的
StreamPartitioner作為 Spring bean,並且此 bean 的名稱可以提供給生產者以替代預設分割槽器。預設值:請參閱上面關於出站分割槽支援的討論。
- producedAs
-
處理器生產到的 Sink 元件的自定義名稱。
預設值:
none(由 Kafka Streams 生成)
Kafka Streams 消費者屬性
以下屬性適用於 Kafka Streams 消費者,並且必須以 spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer. 為字首。為方便起見,如果存在多個輸入繫結且它們都需要一個公共值,則可以使用 spring.cloud.stream.kafka.streams.default.consumer. 字首進行配置。
- applicationId
-
為每個輸入繫結設定 application.id。
預設值:如上所述。
- keySerde
-
要使用的鍵序列化/反序列化器
預設值:請參閱上面關於訊息反序列化/序列化的討論
- valueSerde
-
要使用的值序列化/反序列化器
預設值:請參閱上面關於訊息反序列化/序列化的討論
- materializedAs
-
當使用傳入 KTable 型別時要具體化的狀態儲存
預設值:
none。 - cachingDisabled
-
停用具體化 KTable 的快取。設定為
true時,呼叫Materialized物件的withCachingDisabled()。設定為false時,呼叫Materialized物件的withCachingEnabled()。預設值:
false。 - loggingDisabled
-
停用具體化 KTable 的日誌記錄。設定為
true時,呼叫Materialized物件的withLoggingDisabled()。預設值:
false。 - useNativeDecoding
-
啟用/停用原生解碼的標誌
預設值:
true。 - dlqName
-
DLQ 主題名稱。
預設值:請參閱上面關於錯誤處理和 DLQ 的討論。
- startOffset
-
如果沒有已提交的偏移量可供消費,則從該偏移量開始消費。這主要用於消費者首次從主題消費時。Kafka Streams 使用
earliest作為預設策略,繫結器也使用相同的預設策略。可以使用此屬性將其覆蓋為latest。預設值:
earliest。
注意:在消費者上使用 resetOffsets 對 Kafka Streams 繫結器沒有任何影響。與基於訊息通道的繫結器不同,Kafka Streams 繫結器不會按需跳轉到開頭或結尾。
- deserializationExceptionHandler
-
反序列化錯誤處理程式型別。此處理程式按每個消費者繫結應用,而不是之前描述的繫結器級別屬性。可能的值為 -
logAndContinue、logAndFail、skipAndContinue或sendToDlq預設值:
logAndFail - timestampExtractorBeanName
-
要在消費者端使用的特定時間戳提取器 bean 名稱。應用程式可以提供
TimestampExtractor作為 Spring bean,並且此 bean 的名稱可以提供給消費者以替代預設時間戳提取器。預設值:請參閱上面關於時間戳提取器的討論。
- eventTypes
-
此繫結支援的事件型別,以逗號分隔。
預設值:
none - eventTypeHeaderKey
-
透過此繫結傳入的每條記錄上的事件型別頭鍵。
預設值:
event_type - consumedAs
-
處理器消費的源元件的自定義名稱。
預設值:
none(由 Kafka Streams 生成)
關於併發的特別說明
在 Kafka Streams 中,您可以使用 num.stream.threads 屬性控制處理器可以建立的執行緒數。這可以透過上面描述的繫結器、函式、生產者或消費者級別的各種 configuration 選項來完成。您還可以為此目的使用核心 Spring Cloud Stream 提供的 concurrency 屬性。使用此屬性時,您需要在消費者上使用它。當您有多個輸入繫結時,請將其設定在第一個輸入繫結上。例如,當設定 spring.cloud.stream.bindings.process-in-0.consumer.concurrency 時,它將被繫結器轉換為 num.stream.threads。如果您有多個處理器,並且一個處理器定義了繫結級別併發,而其他處理器沒有,則沒有繫結級別併發的處理器將回退到透過 spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads 指定的繫結器範圍屬性。如果此繫結器配置不可用,則應用程式將使用 Kafka Streams 設定的預設值。