出站分割槽支援

Kafka Streams 處理器通常將處理後的輸出傳送到出站 Kafka 主題。如果出站主題已分割槽,並且處理器需要將出站資料傳送到特定分割槽,則應用需要提供一個型別為 StreamPartitioner 的 bean。有關更多詳細資訊,請參閱 StreamPartitioner。讓我們看一些示例。

這是我們已經多次看到的同一個處理器,

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {

    ...
}

這是輸出繫結目標

spring.cloud.stream.bindings.process-out-0.destination: outputTopic

如果主題 outputTopic 有 4 個分割槽,如果您不提供分割槽策略,Kafka Streams 將使用預設分割槽策略,這可能不是您根據特定用例想要的結果。假設您希望將任何與 spring 匹配的鍵傳送到分割槽 0,將 cloud 傳送到分割槽 1,將 stream 傳送到分割槽 2,以及將其他所有內容傳送到分割槽 3。您需要在應用中執行以下操作。

@Bean
public StreamPartitioner<String, WordCount> streamPartitioner() {
    return (t, k, v, n) -> {
        if (k.equals("spring")) {
            return 0;
        }
        else if (k.equals("cloud")) {
            return 1;
        }
        else if (k.equals("stream")) {
            return 2;
        }
        else {
            return 3;
        }
    };
}

這是一個基礎實現,但是,您可以訪問記錄的鍵/值、主題名稱和總分割槽數。因此,如有需要,您可以實現複雜的分割槽策略。

您還需要在應用配置中提供此 bean 名稱。

spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.streamPartitionerBeanName: streamPartitioner

應用中的每個輸出主題都需要像這樣單獨配置。