多繫結器,結合基於 Kafka Streams 的繫結器和常規 Kafka 繫結器

您的應用中可以同時包含基於常規 Kafka 繫結器的函式/消費者/供應者,以及基於 Kafka Streams 的處理器。但是,您不能在單個函式或消費者中混合使用它們兩者。

以下是一個示例,展示瞭如何在同一個應用中同時使用這兩種基於繫結器的元件。

@Bean
public Function<String, String> process() {
    return s -> s;
}

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> kstreamProcess() {

    return input -> input;
}

這是配置中的相關部分

spring.cloud.function.definition=process;kstreamProcess
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar

如果您的應用與上述示例相同,但需要處理兩個不同的 Kafka 叢集,事情會變得稍微複雜一些。例如,常規的 process 處理邏輯作用於 Kafka 叢集 1 和叢集 2(從叢集 1 接收資料併發送到叢集 2),而 Kafka Streams 處理器作用於 Kafka 叢集 2。在這種情況下,您必須使用 Spring Cloud Stream 提供的多繫結器功能。

在這種場景下,您的配置可能會如下變化。

# multi binder configuration
spring.cloud.stream.binders.kafka1.type: kafka
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-1} #Replace kafkaCluster-1 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka2.type: kafka
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka3.type: kstream
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster

spring.cloud.function.definition=process;kstreamProcess

# From cluster 1 to cluster 2 with regular process function
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-in-0.binder=kafka1 # source from cluster 1
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.process-out-0.binder=kafka2 # send to cluster 2

# Kafka Streams processor on cluster 2
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.binder=kafka3
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
spring.cloud.stream.bindings.kstreamProcess-out-0.binder=kafka3

請注意上述配置。我們有兩種繫結器型別,但總共有 3 個繫結器:第一個是基於叢集 1 的常規 Kafka 繫結器(kafka1),然後是基於叢集 2 的另一個 Kafka 繫結器(kafka2),最後是 kstream 繫結器(kafka3)。應用中的第一個處理器從 kafka1 接收資料併發布到 kafka2,儘管這兩個繫結器都基於常規 Kafka 繫結器,但它們連線的是不同的叢集。第二個處理器是 Kafka Streams 處理器,它從 kafka3 消費資料,kafka3kafka2 是同一個叢集,但繫結器型別不同。

由於 Kafka Streams 繫結器家族中有三種不同的繫結器型別 - kstreamktableglobalktable - 如果您的應用有多個基於這些繫結器的繫結,則需要顯式地將其提供為繫結器型別。

例如,如果您有一個如下所示的處理器:

@Bean
public Function<KStream<Long, Order>,
        Function<KTable<Long, Customer>,
                Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {

    ...
}

那麼,在多繫結器場景下,必須將其配置如下。請注意,這僅在您擁有真正的多繫結器場景時才需要,即單個應用中存在多個處理器處理多個叢集的情況。在這種情況下,需要為繫結明確指定繫結器,以便與其他處理器的繫結器型別和叢集區分開來。

spring.cloud.stream.binders.kafka1.type: kstream
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka2.type: ktable
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka3.type: globalktable
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}

spring.cloud.stream.bindings.enrichOrder-in-0.binder=kafka1  #kstream
spring.cloud.stream.bindings.enrichOrder-in-1.binder=kafka2  #ktablr
spring.cloud.stream.bindings.enrichOrder-in-2.binder=kafka3  #globalktable
spring.cloud.stream.bindings.enrichOrder-out-0.binder=kafka1 #kstream

# rest of the configuration is omitted.