多繫結器,結合基於 Kafka Streams 的繫結器和常規 Kafka 繫結器
您的應用中可以同時包含基於常規 Kafka 繫結器的函式/消費者/供應者,以及基於 Kafka Streams 的處理器。但是,您不能在單個函式或消費者中混合使用它們兩者。
以下是一個示例,展示瞭如何在同一個應用中同時使用這兩種基於繫結器的元件。
@Bean
public Function<String, String> process() {
return s -> s;
}
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> kstreamProcess() {
return input -> input;
}
這是配置中的相關部分
spring.cloud.function.definition=process;kstreamProcess
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
如果您的應用與上述示例相同,但需要處理兩個不同的 Kafka 叢集,事情會變得稍微複雜一些。例如,常規的 process
處理邏輯作用於 Kafka 叢集 1 和叢集 2(從叢集 1 接收資料併發送到叢集 2),而 Kafka Streams 處理器作用於 Kafka 叢集 2。在這種情況下,您必須使用 Spring Cloud Stream 提供的多繫結器功能。
在這種場景下,您的配置可能會如下變化。
# multi binder configuration
spring.cloud.stream.binders.kafka1.type: kafka
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-1} #Replace kafkaCluster-1 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka2.type: kafka
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka3.type: kstream
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.function.definition=process;kstreamProcess
# From cluster 1 to cluster 2 with regular process function
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-in-0.binder=kafka1 # source from cluster 1
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.process-out-0.binder=kafka2 # send to cluster 2
# Kafka Streams processor on cluster 2
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.binder=kafka3
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
spring.cloud.stream.bindings.kstreamProcess-out-0.binder=kafka3
請注意上述配置。我們有兩種繫結器型別,但總共有 3 個繫結器:第一個是基於叢集 1 的常規 Kafka 繫結器(kafka1
),然後是基於叢集 2 的另一個 Kafka 繫結器(kafka2
),最後是 kstream
繫結器(kafka3
)。應用中的第一個處理器從 kafka1
接收資料併發布到 kafka2
,儘管這兩個繫結器都基於常規 Kafka 繫結器,但它們連線的是不同的叢集。第二個處理器是 Kafka Streams 處理器,它從 kafka3
消費資料,kafka3
與 kafka2
是同一個叢集,但繫結器型別不同。
由於 Kafka Streams 繫結器家族中有三種不同的繫結器型別 - kstream
、ktable
和 globalktable
- 如果您的應用有多個基於這些繫結器的繫結,則需要顯式地將其提供為繫結器型別。
例如,如果您有一個如下所示的處理器:
@Bean
public Function<KStream<Long, Order>,
Function<KTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {
...
}
那麼,在多繫結器場景下,必須將其配置如下。請注意,這僅在您擁有真正的多繫結器場景時才需要,即單個應用中存在多個處理器處理多個叢集的情況。在這種情況下,需要為繫結明確指定繫結器,以便與其他處理器的繫結器型別和叢集區分開來。
spring.cloud.stream.binders.kafka1.type: kstream
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka2.type: ktable
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka3.type: globalktable
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.bindings.enrichOrder-in-0.binder=kafka1 #kstream
spring.cloud.stream.bindings.enrichOrder-in-1.binder=kafka2 #ktablr
spring.cloud.stream.bindings.enrichOrder-in-2.binder=kafka3 #globalktable
spring.cloud.stream.bindings.enrichOrder-out-0.binder=kafka1 #kstream
# rest of the configuration is omitted.