帶有基於 Kafka Streams 繫結器和常規 Kafka 繫結器的多繫結器

你可以有一個應用程式,其中包含基於常規 Kafka 繫結器的函式/消費者/生產者,以及一個基於 Kafka Streams 的處理器。但是,你不能在單個函式或消費者中混合使用它們。

以下是一個示例,其中你在同一個應用程式中擁有兩個基於繫結器的元件。

@Bean
public Function<String, String> process() {
    return s -> s;
}

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> kstreamProcess() {

    return input -> input;
}

這是配置中的相關部分

spring.cloud.function.definition=process;kstreamProcess
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar

如果你有上述相同的應用程式,但它處理兩個不同的 Kafka 叢集,例如,常規的 `process` 在 Kafka 叢集 1 和叢集 2 上都進行操作(從叢集 1 接收資料併發送到叢集 2),而 Kafka Streams 處理器在 Kafka 叢集 2 上進行操作。那麼你必須使用 Spring Cloud Stream 提供的多繫結器功能。

在這種情況下,你的配置可能會如下改變。

# multi binder configuration
spring.cloud.stream.binders.kafka1.type: kafka
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-1} #Replace kafkaCluster-1 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka2.type: kafka
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka3.type: kstream
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster

spring.cloud.function.definition=process;kstreamProcess

# From cluster 1 to cluster 2 with regular process function
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-in-0.binder=kafka1 # source from cluster 1
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.process-out-0.binder=kafka2 # send to cluster 2

# Kafka Streams processor on cluster 2
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.binder=kafka3
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
spring.cloud.stream.bindings.kstreamProcess-out-0.binder=kafka3

請注意以上配置。我們有兩種繫結器,但總共有 3 個繫結器,第一個是基於叢集 1 的常規 Kafka 繫結器 (`kafka1`),然後是基於叢集 2 的另一個 Kafka 繫結器 (`kafka2`),最後是 `kstream` 繫結器 (`kafka3`)。應用程式中的第一個處理器從 `kafka1` 接收資料併發布到 `kafka2`,其中兩個繫結器都基於常規 Kafka 繫結器但屬於不同叢集。第二個處理器是 Kafka Streams 處理器,它從 `kafka3` 消費資料,`kafka3` 與 `kafka2` 是同一個叢集,但繫結器型別不同。

由於 Kafka Streams 繫結器系列中有三種不同的繫結器型別——`kstream`、`ktable` 和 `globalktable`——如果你的應用程式有多個基於這些繫結器的繫結,則需要明確將其作為繫結器型別提供。

例如,如果你的處理器如下所示,

@Bean
public Function<KStream<Long, Order>,
        Function<KTable<Long, Customer>,
                Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {

    ...
}

那麼,這必須在多繫結器場景中配置如下。請注意,這僅在你有真正的多繫結器場景時才需要,即單個應用程式中有多個處理器處理多個叢集。在這種情況下,繫結器需要明確提供繫結,以區分其他處理器的繫結器型別和叢集。

spring.cloud.stream.binders.kafka1.type: kstream
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka2.type: ktable
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka3.type: globalktable
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}

spring.cloud.stream.bindings.enrichOrder-in-0.binder=kafka1  #kstream
spring.cloud.stream.bindings.enrichOrder-in-1.binder=kafka2  #ktablr
spring.cloud.stream.bindings.enrichOrder-in-2.binder=kafka3  #globalktable
spring.cloud.stream.bindings.enrichOrder-out-0.binder=kafka1 #kstream

# rest of the configuration is omitted.
© . This site is unofficial and not affiliated with VMware.