程式設計模型的輔助功能

單個應用中的多個 Kafka Streams 處理器

繫結器允許在單個 Spring Cloud Stream 應用中包含多個 Kafka Streams 處理器。你可以擁有一個如下所示的應用。

@Bean
public java.util.function.Function<KStream<Object, String>, KStream<Object, String>> process() {
   ...
}

@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
  ...
}

@Bean
public java.util.function.BiFunction<KStream<Object, String>, KTable<Integer, String>, KStream<Object, String>> yetAnotherProcess() {
   ...
}

在這種情況下,繫結器將建立 3 個具有不同應用 ID 的獨立 Kafka Streams 物件(稍後詳細介紹)。但是,如果你的應用中有多個處理器,則必須告知 Spring Cloud Stream 需要啟用哪些函式。以下是如何啟用這些函式的方法。

spring.cloud.function.definition: process;anotherProcess;yetAnotherProcess

如果你希望某些函式不會立即啟用,可以將其從此列表中移除。

當你在同一個應用中擁有一個 Kafka Streams 處理器以及透過不同繫結器處理的其他型別的 Function bean 時(例如,基於常規 Kafka Message Channel 繫結器的函式 bean),情況也是如此。

Kafka Streams 應用 ID

應用 ID 是 Kafka Streams 應用必須提供的屬性。Spring Cloud Stream Kafka Streams 繫結器允許你透過多種方式配置此應用 ID。

如果應用中只有一個處理器,則可以使用以下屬性在繫結器級別進行設定。

spring.cloud.stream.kafka.streams.binder.applicationId.

為方便起見,如果只有一個處理器,你也可以使用 spring.application.name 作為屬性來委派應用 ID。

如果應用中有多個 Kafka Streams 處理器,則需要為每個處理器設定應用 ID。在使用函式式模型的情況下,可以將其作為屬性附加到每個函式上。

例如,假設你有以下函式。

@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
   ...
}

@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
  ...
}

然後,可以使用以下繫結器級別屬性為每個函式設定應用 ID。

spring.cloud.stream.kafka.streams.binder.functions.process.applicationId

spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.applicationId

對於基於函式的模型,在繫結級別設定應用 ID 的方法也有效。但是,如果你使用函式式模型,如上所示在繫結器級別為每個函式進行設定會容易得多。

對於生產環境部署,強烈建議透過配置顯式指定應用 ID。如果你正在對應用進行自動擴縮容,這一點尤其重要,在這種情況下,你需要確保部署的每個例項都具有相同的應用 ID。

如果應用未提供應用 ID,則繫結器將為你自動生成一個靜態應用 ID。這在開發場景中非常方便,因為它避免了顯式提供應用 ID 的需要。以這種方式生成的應用 ID 在應用重啟後將保持靜態。在使用函式式模型的情況下,生成的應用 ID 將是函式 bean 名稱後跟字面值 applicationID,例如,如果函式 bean 名稱是 process,則生成的應用 ID 將是 process-applicationID

設定應用 ID 摘要

  • 預設情況下,繫結器將為每個函式方法自動生成應用 ID。

  • 如果只有一個處理器,則可以使用 spring.kafka.streams.applicationIdspring.application.namespring.cloud.stream.kafka.streams.binder.applicationId

  • 如果擁有多個處理器,則可以使用屬性 spring.cloud.stream.kafka.streams.binder.functions.<function-name>.applicationId 為每個函式設定應用 ID。

使用函式式風格覆蓋繫結器生成的預設繫結名稱

預設情況下,在使用函式式風格時,繫結器使用上述策略生成繫結名稱,即 <function-bean-name>-<in>|<out>-[0..n],例如 process-in-0, process-out-0 等。如果你想覆蓋這些繫結名稱,可以透過指定以下屬性來實現。

spring.cloud.stream.function.bindings.<default binding name>。預設繫結名稱是繫結器生成的原始繫結名稱。

例如,假設你有這個函式。

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}

繫結器將生成名稱為 process-in-0, process-in-1process-out-0 的繫結。現在,如果你想將它們完全更改為其他名稱,例如更具領域特定性的繫結名稱,則可以如下進行。

spring.cloud.stream.function.bindings.process-in-0=users

spring.cloud.stream.function.bindings.process-in-0=regions

spring.cloud.stream.function.bindings.process-out-0=clicks

之後,你必須在這些新的繫結名稱上設定所有繫結級別的屬性。

請記住,使用上述函數語言程式設計模型時,在大多數情況下遵循預設繫結名稱是合理的。你可能仍然希望進行此覆蓋的唯一原因是,當你有大量配置屬性並且希望將繫結對映到更具領域友好性的名稱時。

設定 bootstrap server 配置

執行 Kafka Streams 應用時,必須提供 Kafka broker 伺服器資訊。如果你不提供此資訊,繫結器會假定你在預設地址 localhost:9092 上執行 broker。如果情況不是這樣,則需要覆蓋它。有幾種方法可以實現。

  • 使用 boot 屬性 - spring.kafka.bootstrapServers

  • 繫結器級別屬性 - spring.cloud.stream.kafka.streams.binder.brokers

關於繫結器級別屬性,無論你是否使用透過常規 Kafka 繫結器提供的 broker 屬性 - spring.cloud.stream.kafka.binder.brokers,都沒有關係。Kafka Streams 繫結器會首先檢查是否設定了 Kafka Streams 繫結器特定的 broker 屬性 (spring.cloud.stream.kafka.streams.binder.brokers),如果未找到,則會查詢 spring.cloud.stream.kafka.binder.brokers