配置選項

本節包含 Kafka Streams binder 使用的配置選項。

有關 Binder 的通用配置選項和屬性,請參閱核心文件

Kafka Streams Binder 屬性

以下屬性可在 Binder 級別使用,並且必須以 spring.cloud.stream.kafka.streams.binder. 為字首。在 Kafka Streams binder 中重複使用的任何 Kafka binder 提供的屬性必須以 spring.cloud.stream.kafka.streams.binder 為字首,而不是 spring.cloud.stream.kafka.binder。此規則的唯一例外是定義 Kafka 引導伺服器屬性時,此時任何一個字首都可以使用。

configuration

包含與 Apache Kafka Streams API 相關屬性的鍵/值對 Map。此屬性必須以 spring.cloud.stream.kafka.streams.binder. 為字首。以下是一些使用此屬性的示例。

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000

有關可能包含在 Streams 配置中的所有屬性的更多資訊,請參閱 Apache Kafka Streams 文件中的 StreamsConfig JavaDocs。您可以從 StreamsConfig 設定的所有配置都可以透過此屬性進行設定。使用此屬性時,它適用於整個應用程式,因為它是一個 Binder 級別的屬性。如果應用程式中有多個處理器,所有這些處理器都將獲取這些屬性。對於像 application.id 這樣的屬性,這將成為問題,因此您必須仔細檢查如何使用此 Binder 級別的 configuration 屬性來對映來自 StreamsConfig 的屬性。

functions.<function-bean-name>.applicationId

僅適用於函式式處理器。此屬性可用於在應用程式中為每個函式設定 application ID。在多個函式的情況下,這是設定 application ID 的便捷方式。

functions.<function-bean-name>.configuration

僅適用於函式式處理器。包含與 Apache Kafka Streams API 相關屬性的鍵/值對 Map。這類似於上面描述的 Binder 級別的 configuration 屬性,但此級別的 configuration 屬性僅針對指定函式進行限制。當您有多個處理器並且希望根據特定函式限制對配置的訪問時,您可能希望使用此屬性。所有 StreamsConfig 屬性都可以在這裡使用。

brokers

Broker URL

預設值: localhost

zkNodes

Zookeeper URL

預設值: localhost

deserializationExceptionHandler

反序列化錯誤處理器型別。此處理器應用於 Binder 級別,因此應用於應用程式中的所有輸入繫結。可以在消費者繫結級別以更精細的方式對其進行控制。可能的值包括 - logAndContinue, logAndFail, skipAndContinuesendToDlq

預設值: logAndFail

applicationId

一種方便的方式,可在 Binder 級別全域性設定 Kafka Streams 應用的 application.id。如果應用程式包含多個函式,則 application id 應設定不同。詳細資訊請參見上文關於設定 application id 的討論。

預設值: 應用程式將生成一個靜態 application ID。有關更多詳細資訊,請參閱 application ID 部分。

stateStoreRetry.maxAttempts

嘗試連線狀態儲存的最大次數。

預設值: 1

stateStoreRetry.backoffPeriod

重試時嘗試連線狀態儲存的回退週期。

預設值: 1000 ms

consumerProperties

Binder 級別的任意消費者屬性。

producerProperties

Binder 級別的任意生產者屬性。

includeStoppedProcessorsForHealthCheck

當透過 actuator 停止處理器繫結時,預設情況下該處理器將不參與健康檢查。將此屬性設定為 true 以啟用所有處理器的健康檢查,包括當前透過 bindings actuator endpoint 停止的處理器。

預設值: false

Kafka Streams 生產者屬性

以下屬性*僅*適用於 Kafka Streams 生產者,且必須以 spring.cloud.stream.kafka.streams.bindings.<binding name>.producer. 為字首。為方便起見,如果存在多個輸出繫結且它們都需要一個公共值,則可以使用字首 spring.cloud.stream.kafka.streams.default.producer. 進行配置。

keySerde

要使用的 key serde

預設值: 請參閱上面關於訊息序列化/反序列化的討論

valueSerde

要使用的 value serde

預設值: 請參閱上面關於訊息序列化/反序列化的討論

useNativeEncoding

啟用/停用原生編碼的標誌

預設值: true

streamPartitionerBeanName

要在消費者端使用的自定義出站分割槽器 Bean 名稱。應用程式可以將自定義 StreamPartitioner 作為 Spring Bean 提供,並將此 Bean 的名稱提供給生產者使用,而不是使用預設分割槽器。

預設值: 請參閱上面關於出站分割槽支援的討論。

producedAs

處理器生成到的 Sink 元件的自定義名稱。

預設值: none (由 Kafka Streams 生成)

Kafka Streams 消費者屬性

以下屬性適用於 Kafka Streams 消費者,且必須以 spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer. 為字首。為方便起見,如果存在多個輸入繫結且它們都需要一個公共值,則可以使用字首 spring.cloud.stream.kafka.streams.default.consumer. 進行配置。

applicationId

為每個輸入繫結設定 application.id。

預設值: 請參閱上文。

keySerde

要使用的 key serde

預設值: 請參閱上面關於訊息序列化/反序列化的討論

valueSerde

要使用的 value serde

預設值: 請參閱上面關於訊息序列化/反序列化的討論

materializedAs

使用傳入 KTable 型別時要物化(materialize)的狀態儲存

預設值: none

useNativeDecoding

啟用/停用原生解碼的標誌

預設值: true

dlqName

DLQ 主題名稱。

預設值: 請參閱上文關於錯誤處理和 DLQ 的討論。

startOffset

如果沒有已提交的 offset 可供消費,則從此 offset 開始消費。這主要用於消費者首次消費主題時。Kafka Streams 使用 earliest 作為預設策略,Binder 也使用相同的預設值。可以使用此屬性將其覆蓋為 latest

預設值: earliest

注意: 在消費者上使用 resetOffsets 對 Kafka Streams binder 沒有影響。與基於訊息通道的 binder 不同,Kafka Streams binder 不會按需定址到開頭或結尾。

deserializationExceptionHandler

反序列化錯誤處理器型別。此處理器應用於每個消費者繫結,而不是應用於之前描述的 Binder 級別屬性。可能的值包括 - logAndContinue, logAndFail, skipAndContinuesendToDlq

預設值: logAndFail

timestampExtractorBeanName

要在消費者端使用的特定時間戳提取器 Bean 名稱。應用程式可以將 TimestampExtractor 作為 Spring Bean 提供,並將此 Bean 的名稱提供給消費者使用,而不是使用預設提取器。

預設值: 請參閱上文關於時間戳提取器的討論。

eventTypes

此繫結支援的事件型別列表,以逗號分隔。

預設值: none

eventTypeHeaderKey

透過此繫結的每個傳入記錄上的事件型別頭鍵。

預設值: event_type

consumedAs

處理器消費來源的 Source 元件的自定義名稱。

預設值: none (由 Kafka Streams 生成)

關於併發性的特別說明

在 Kafka Streams 中,您可以使用 num.stream.threads 屬性控制處理器可以建立的執行緒數。您可以使用上述在 Binder、函式、生產者或消費者級別描述的各種 configuration 選項來實現這一點。您也可以使用核心 Spring Cloud Stream 為此目的提供的 concurrency 屬性。使用此屬性時,需要在消費者上進行設定。當您有多個輸入繫結時,將其設定在第一個輸入繫結上。例如,設定 spring.cloud.stream.bindings.process-in-0.consumer.concurrency 時,它將被 Binder 轉換為 num.stream.threads。如果您有多個處理器,並且一個處理器定義了繫結級別的併發性,而其他處理器沒有,那麼那些沒有繫結級別併發性的處理器將回退到透過 spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads 指定的 Binder 範圍屬性。如果此 Binder 配置不可用,則應用程式將使用 Kafka Streams 設定的預設值。