程式設計模型輔助功能

單個應用程式中的多個 Kafka Streams 處理器

Binder 允許在單個 Spring Cloud Stream 應用程式中擁有多個 Kafka Streams 處理器。你可以擁有一個如下所示的應用程式。

@Bean
public java.util.function.Function<KStream<Object, String>, KStream<Object, String>> process() {
   ...
}

@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
  ...
}

@Bean
public java.util.function.BiFunction<KStream<Object, String>, KTable<Integer, String>, KStream<Object, String>> yetAnotherProcess() {
   ...
}

在這種情況下,binder 將建立 3 個獨立的 Kafka Streams 物件,它們具有不同的應用程式 ID(下面將詳細介紹)。但是,如果應用程式中有多個處理器,則必須告訴 Spring Cloud Stream 哪些函式需要被啟用。以下是啟用函式的方法。

spring.cloud.function.definition: process;anotherProcess;yetAnotherProcess

如果你希望某些函式不要立即啟用,可以將其從列表中移除。

當你在同一個應用程式中擁有一個 Kafka Streams 處理器和透過不同 binder 處理的其他型別的 Function bean(例如,基於常規 Kafka 訊息通道 binder 的函式 bean)時,這也是成立的。

Kafka Streams 應用程式 ID

應用程式 ID 是 Kafka Streams 應用程式必須提供的屬性。Spring Cloud Stream Kafka Streams binder 允許你透過多種方式配置此應用程式 ID。

如果應用程式中只有一個處理器,則可以使用以下屬性在 binder 級別設定它

spring.cloud.stream.kafka.streams.binder.applicationId.

為了方便,如果只有一個處理器,你還可以使用 spring.application.name 作為屬性來委託應用程式 ID。

如果應用程式中有多個 Kafka Streams 處理器,則需要為每個處理器設定應用程式 ID。在函式式模型中,你可以將其作為屬性附加到每個函式。

例如,假設你有以下函式。

@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
   ...
}

@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
  ...
}

然後你可以使用以下 binder 級別屬性為每個函式設定應用程式 ID。

spring.cloud.stream.kafka.streams.binder.functions.process.applicationId

spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.applicationId

對於基於函式的模型,在繫結級別設定應用程式 ID 的方法也適用。但是,如果你使用函式式模型,如我們上面所見,在 binder 級別為每個函式進行設定要容易得多。

對於生產部署,強烈建議透過配置明確指定應用程式 ID。如果你正在自動擴充套件應用程式,這一點尤其關鍵,在這種情況下,你需要確保每個例項都以相同的應用程式 ID 進行部署。

如果應用程式未提供應用程式 ID,則 binder 將為你自動生成一個靜態應用程式 ID。這在開發場景中很方便,因為它避免了顯式提供應用程式 ID 的需要。以這種方式生成的應用程式 ID 在應用程式重啟時將是靜態的。在函式式模型中,生成的應用程式 ID 將是函式 bean 名稱後跟字面量 applicationID,例如,如果 process 是函式 bean 名稱,則為 process-applicationID

設定應用程式 ID 摘要

  • 預設情況下,binder 將為每個函式方法自動生成應用程式 ID。

  • 如果只有一個處理器,則可以使用 spring.kafka.streams.applicationIdspring.application.namespring.cloud.stream.kafka.streams.binder.applicationId

  • 如果具有多個處理器,則可以透過屬性 spring.cloud.stream.kafka.streams.binder.functions.<function-name>.applicationId 為每個函式設定應用程式 ID。

使用函式式風格覆蓋 binder 生成的預設繫結名稱

預設情況下,當使用函式式風格時,binder 使用上述策略生成繫結名稱,即 <function-bean-name>-<in>|<out>-[0..n],例如 process-in-0、process-out-0 等。如果你想覆蓋這些繫結名稱,可以透過指定以下屬性來實現。

spring.cloud.stream.function.bindings.<default binding name>。預設繫結名稱是 binder 生成的原始繫結名稱。

例如,假設你有以下函式。

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}

Binder 將生成名稱為 process-in-0process-in-1process-out-0 的繫結。現在,如果你想將它們完全更改為其他名稱,也許是更具領域特定性的繫結名稱,那麼可以按如下方式進行。

spring.cloud.stream.function.bindings.process-in-0=users

spring.cloud.stream.function.bindings.process-in-0=regions

spring.cloud.stream.function.bindings.process-out-0=clicks

之後,你必須在新繫結名稱上設定所有繫結級別的屬性。

請記住,對於上面描述的函數語言程式設計模型,遵守預設繫結名稱在大多數情況下都是有意義的。你可能仍然想要進行這種覆蓋的唯一原因是當你擁有大量配置屬性並且希望將繫結對映到更友好的領域名稱時。

設定引導伺服器配置

執行 Kafka Streams 應用程式時,必須提供 Kafka 代理伺服器資訊。如果你不提供此資訊,則 binder 期望你正在預設的 localhost:9092 上執行代理。如果不是這種情況,則需要覆蓋它。有幾種方法可以做到這一點。

  • 使用啟動屬性 - spring.kafka.bootstrapServers

  • Binder 級別屬性 - spring.cloud.stream.kafka.streams.binder.brokers

對於 binder 級別屬性,無論你是否使用透過常規 Kafka binder 提供的代理屬性 - spring.cloud.stream.kafka.binder.brokers,都沒有關係。Kafka Streams binder 將首先檢查是否設定了 Kafka Streams binder 特定的代理屬性 (spring.cloud.stream.kafka.streams.binder.brokers),如果未找到,則會查詢 spring.cloud.stream.kafka.binder.brokers

© . This site is unofficial and not affiliated with VMware.