出站分割槽支援
一個 Kafka Streams 處理器通常將處理後的輸出傳送到出站 Kafka 主題。如果出站主題已分割槽,並且處理器需要將出站資料傳送到特定分割槽,則應用程式需要提供一個型別為 StreamPartitioner 的 bean。有關更多詳細資訊,請參閱 StreamPartitioner。讓我們看一些示例。
這是我們已經多次看到的同一個處理器,
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
...
}
這是輸出繫結目的地
spring.cloud.stream.bindings.process-out-0.destination: outputTopic
如果主題 outputTopic 有 4 個分割槽,如果您不提供分割槽策略,Kafka Streams 將使用預設分割槽策略,這可能不是您希望的結果,具體取決於特定用例。假設您想將所有與 spring 匹配的鍵傳送到分割槽 0,將 cloud 傳送到分割槽 1,將 stream 傳送到分割槽 2,將所有其他鍵傳送到分割槽 3。您需要在應用程式中執行以下操作。
@Bean
public StreamPartitioner<String, WordCount> streamPartitioner() {
return (t, k, v, n) -> {
if (k.equals("spring")) {
return 0;
}
else if (k.equals("cloud")) {
return 1;
}
else if (k.equals("stream")) {
return 2;
}
else {
return 3;
}
};
}
這是一個基本實現,但是,您可以訪問記錄的鍵/值、主題名稱和總分割槽數。因此,如果需要,您可以實現複雜的分割槽策略。
您還需要將此 bean 名稱與應用程式配置一起提供。
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.streamPartitionerBeanName: streamPartitioner
應用程式中的每個輸出主題都需要像這樣單獨配置。