配置選項

本節包含 Kafka Streams 繫結器使用的配置選項。

有關繫結器的通用配置選項和屬性,請參閱 核心文件

Kafka Streams 繫結器屬性

以下屬性在繫結器級別可用,並且必須以 spring.cloud.stream.kafka.streams.binder. 為字首。Kafka Streams 繫結器中重複使用的任何 Kafka 繫結器提供的屬性必須以 spring.cloud.stream.kafka.streams.binder 為字首,而不是 spring.cloud.stream.kafka.binder。此規則的唯一例外是當定義 Kafka 引導伺服器屬性時,此時任一字首都有效。

configuration

包含與 Apache Kafka Streams API 相關的鍵/值對的對映。此屬性必須以 spring.cloud.stream.kafka.streams.binder. 為字首。以下是使用此屬性的一些示例。

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000

有關可用於流配置的所有屬性的更多資訊,請參閱 Apache Kafka Streams 文件中的 StreamsConfig JavaDocs。您可以透過此屬性設定 StreamsConfig 中的所有配置。當使用此屬性時,它適用於整個應用程式,因為這是一個繫結器級別的屬性。如果應用程式中有多個處理器,所有處理器都將獲取這些屬性。在像 application.id 這樣的屬性情況下,這會變得有問題,因此您必須仔細檢查如何使用此繫結器級別的 configuration 屬性對映 StreamsConfig 中的屬性。

functions.<function-bean-name>.applicationId

僅適用於函式式處理器。這可以用於在應用程式中為每個函式設定應用程式 ID。在有多個函式的情況下,這是一種方便的設定應用程式 ID 的方法。

functions.<function-bean-name>.configuration

僅適用於函式式處理器。包含與 Apache Kafka Streams API 相關的鍵/值對的對映。這類似於上面描述的繫結器級別的 configuration 屬性,但此級別的 configuration 屬性僅限於命名函式。當您有多個處理器並且希望根據特定函式限制對配置的訪問時,您可能需要使用此屬性。此處可以使用所有 StreamsConfig 屬性。

brokers

Broker URL

預設值:localhost

zkNodes

Zookeeper URL

預設值:localhost

deserializationExceptionHandler

反序列化錯誤處理程式型別。此處理程式在繫結器級別應用,因此適用於應用程式中的所有輸入繫結。有一種方法可以在消費者繫結級別以更精細的方式控制它。可能的值為 - logAndContinuelogAndFailskipAndContinuesendToDlq

預設值:logAndFail

applicationId

方便地在繫結器級別為 Kafka Streams 應用程式全域性設定 application.id。如果應用程式包含多個函式,則應不同地設定應用程式 ID。有關設定應用程式 ID 的詳細討論,請參閱上面。

預設值:應用程式將生成一個靜態應用程式 ID。有關更多詳細資訊,請參閱應用程式 ID 部分。

stateStoreRetry.maxAttempts

嘗試連線狀態儲存的最大次數。

預設值:1

stateStoreRetry.backoffPeriod

重試連線狀態儲存時的退避週期。

預設值:1000 毫秒

consumerProperties

繫結器級別的任意消費者屬性。

producerProperties

繫結器級別的任意生產者屬性。

includeStoppedProcessorsForHealthCheck

當處理器繫結透過執行器停止時,此處理器預設不參與健康檢查。將此屬性設定為 true 以啟用所有處理器的健康檢查,包括那些當前透過繫結執行器端點停止的處理器。

預設值:false

Kafka Streams 生產者屬性

以下屬性適用於 Kafka Streams 生產者,並且必須以 spring.cloud.stream.kafka.streams.bindings.<binding name>.producer. 為字首。為方便起見,如果存在多個輸出繫結且它們都需要一個公共值,則可以使用 spring.cloud.stream.kafka.streams.default.producer. 字首進行配置。

keySerde

要使用的鍵序列化/反序列化器

預設值:請參閱上面關於訊息反序列化/序列化的討論

valueSerde

要使用的值序列化/反序列化器

預設值:請參閱上面關於訊息反序列化/序列化的討論

useNativeEncoding

啟用/停用原生編碼的標誌

預設值:true

streamPartitionerBeanName

要在消費者端使用的自定義出站分割槽器 bean 名稱。應用程式可以提供自定義的 StreamPartitioner 作為 Spring bean,並且此 bean 的名稱可以提供給生產者以替代預設分割槽器。

預設值:請參閱上面關於出站分割槽支援的討論。

producedAs

處理器生產到的 Sink 元件的自定義名稱。

預設值:none (由 Kafka Streams 生成)

Kafka Streams 消費者屬性

以下屬性適用於 Kafka Streams 消費者,並且必須以 spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer. 為字首。為方便起見,如果存在多個輸入繫結且它們都需要一個公共值,則可以使用 spring.cloud.stream.kafka.streams.default.consumer. 字首進行配置。

applicationId

為每個輸入繫結設定 application.id。

預設值:如上所述。

keySerde

要使用的鍵序列化/反序列化器

預設值:請參閱上面關於訊息反序列化/序列化的討論

valueSerde

要使用的值序列化/反序列化器

預設值:請參閱上面關於訊息反序列化/序列化的討論

materializedAs

當使用傳入 KTable 型別時要具體化的狀態儲存

預設值:none

cachingDisabled

停用具體化 KTable 的快取。設定為 true 時,呼叫 Materialized 物件的 withCachingDisabled()。設定為 false 時,呼叫 Materialized 物件的 withCachingEnabled()

預設值:false

loggingDisabled

停用具體化 KTable 的日誌記錄。設定為 true 時,呼叫 Materialized 物件的 withLoggingDisabled()

預設值:false

useNativeDecoding

啟用/停用原生解碼的標誌

預設值:true

dlqName

DLQ 主題名稱。

預設值:請參閱上面關於錯誤處理和 DLQ 的討論。

startOffset

如果沒有已提交的偏移量可供消費,則從該偏移量開始消費。這主要用於消費者首次從主題消費時。Kafka Streams 使用 earliest 作為預設策略,繫結器也使用相同的預設策略。可以使用此屬性將其覆蓋為 latest

預設值:earliest

注意:在消費者上使用 resetOffsets 對 Kafka Streams 繫結器沒有任何影響。與基於訊息通道的繫結器不同,Kafka Streams 繫結器不會按需跳轉到開頭或結尾。

deserializationExceptionHandler

反序列化錯誤處理程式型別。此處理程式按每個消費者繫結應用,而不是之前描述的繫結器級別屬性。可能的值為 - logAndContinuelogAndFailskipAndContinuesendToDlq

預設值:logAndFail

timestampExtractorBeanName

要在消費者端使用的特定時間戳提取器 bean 名稱。應用程式可以提供 TimestampExtractor 作為 Spring bean,並且此 bean 的名稱可以提供給消費者以替代預設時間戳提取器。

預設值:請參閱上面關於時間戳提取器的討論。

eventTypes

此繫結支援的事件型別,以逗號分隔。

預設值:none

eventTypeHeaderKey

透過此繫結傳入的每條記錄上的事件型別頭鍵。

預設值:event_type

consumedAs

處理器消費的源元件的自定義名稱。

預設值:none (由 Kafka Streams 生成)

關於併發的特別說明

在 Kafka Streams 中,您可以使用 num.stream.threads 屬性控制處理器可以建立的執行緒數。這可以透過上面描述的繫結器、函式、生產者或消費者級別的各種 configuration 選項來完成。您還可以為此目的使用核心 Spring Cloud Stream 提供的 concurrency 屬性。使用此屬性時,您需要在消費者上使用它。當您有多個輸入繫結時,請將其設定在第一個輸入繫結上。例如,當設定 spring.cloud.stream.bindings.process-in-0.consumer.concurrency 時,它將被繫結器轉換為 num.stream.threads。如果您有多個處理器,並且一個處理器定義了繫結級別併發,而其他處理器沒有,則沒有繫結級別併發的處理器將回退到透過 spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads 指定的繫結器範圍屬性。如果此繫結器配置不可用,則應用程式將使用 Kafka Streams 設定的預設值。

© . This site is unofficial and not affiliated with VMware.