手動啟動 Kafka Streams 處理器

Spring Cloud Stream Kafka Streams Binder 在 Spring for Apache Kafka 的 StreamsBuilderFactoryBean 之上提供了一個名為 StreamsBuilderFactoryManager 的抽象。這個管理器 API 用於控制基於 Binder 的應用中每個處理器對應的多個 StreamsBuilderFactoryBean。因此,在使用 Binder 時,如果您想手動控制應用中各個 StreamsBuilderFactoryBean 物件的自動啟動,您需要使用 StreamsBuilderFactoryManager。您可以使用屬性 spring.kafka.streams.auto-startup 並將其設定為 false 來關閉處理器的自動啟動。然後,在應用中,您可以使用如下所示的方式,透過 StreamsBuilderFactoryManager 來啟動處理器。

@Bean
public ApplicationRunner runner(StreamsBuilderFactoryManager sbfm) {
    return args -> {
        sbfm.start();
    };
}

當您希望應用在主執行緒中啟動,並讓 Kafka Streams 處理器獨立啟動時,此功能非常有用。例如,當您有一個大型狀態儲存需要恢復時,如果處理器像預設情況一樣正常啟動,這可能會阻塞您的應用啟動。如果您正在使用某種存活探針機制(例如在 Kubernetes 上),它可能會認為應用已宕機並嘗試重啟。為了糾正這種情況,您可以將 spring.kafka.streams.auto-startup 設定為 false 並遵循上述方法。

請記住,當使用 Spring Cloud Stream Binder 時,您不是直接處理來自 Spring for Apache Kafka 的 StreamsBuilderFactoryBean,而是 StreamsBuilderFactoryManager,因為 StreamsBuilderFactoryBean 物件是由 Binder 內部管理的。