出站分割槽支援
Kafka Streams 處理器通常將處理後的輸出傳送到出站 Kafka 主題。如果出站主題已分割槽,並且處理器需要將出站資料傳送到特定分割槽,則應用需要提供一個型別為 StreamPartitioner
的 bean。有關更多詳細資訊,請參閱 StreamPartitioner。讓我們看一些示例。
這是我們已經多次看到的同一個處理器,
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
...
}
這是輸出繫結目標
spring.cloud.stream.bindings.process-out-0.destination: outputTopic
如果主題 outputTopic
有 4 個分割槽,如果您不提供分割槽策略,Kafka Streams 將使用預設分割槽策略,這可能不是您根據特定用例想要的結果。假設您希望將任何與 spring
匹配的鍵傳送到分割槽 0,將 cloud
傳送到分割槽 1,將 stream
傳送到分割槽 2,以及將其他所有內容傳送到分割槽 3。您需要在應用中執行以下操作。
@Bean
public StreamPartitioner<String, WordCount> streamPartitioner() {
return (t, k, v, n) -> {
if (k.equals("spring")) {
return 0;
}
else if (k.equals("cloud")) {
return 1;
}
else if (k.equals("stream")) {
return 2;
}
else {
return 3;
}
};
}
這是一個基礎實現,但是,您可以訪問記錄的鍵/值、主題名稱和總分割槽數。因此,如有需要,您可以實現複雜的分割槽策略。
您還需要在應用配置中提供此 bean 名稱。
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.streamPartitionerBeanName: streamPartitioner
應用中的每個輸出主題都需要像這樣單獨配置。