選擇性地手動啟動 Kafka Streams 處理器

雖然上面描述的方法將透過 StreamsBuilderFactoryManager 無條件地將 auto start false 應用於應用程式中的所有 Kafka Streams 處理器,但通常我們希望只有單獨選擇的 Kafka Streams 處理器不自動啟動。例如,假設您的應用程式中有三個不同的函式(處理器),並且其中一個處理器,您不希望它在應用程式啟動時啟動。下面是這種情況的一個示例。

@Bean
public Function<KStream<?, ?>, KStream<?, ?>> process1() {

}

@Bean
public Consumer<KStream<?, ?>> process2() {

}

@Bean
public BiFunction<KStream<?, ?>, KTable<?, ?>, KStream<?, ?>> process3() {

}

在上述場景中,如果您將 spring.kafka.streams.auto-startup 設定為 false,那麼在應用程式啟動時,所有處理器都不會自動啟動。在這種情況下,您必須如上所述,透過呼叫底層 StreamsBuilderFactoryManagerstart() 方法來以程式設計方式啟動它們。然而,如果我們的用例是隻想選擇性地停用一個處理器,那麼您必須在該處理器的單個繫結上設定 auto-startup。假設我們不希望 process3 函式自動啟動。這是一個具有兩個輸入繫結的 BiFunction - process3-in-0process3-in-1。為了避免該處理器自動啟動,您可以選擇這些輸入繫結中的任何一個並在其上設定 auto-startup。您選擇哪個繫結無關緊要;如果您願意,可以在兩個繫結上都將 auto-startup 設定為 false,但一個就足夠了。因為它們共享同一個工廠 Bean,您不必在兩個繫結上都將 autoStartup 設定為 false,但為了清晰起見,這樣做可能更有意義。

以下是您可以用來停用此處理器自動啟動的 Spring Cloud Stream 屬性。

spring.cloud.stream.bindings.process3-in-0.consumer.auto-startup: false

spring.cloud.stream.bindings.process3-in-1.consumer.auto-startup: false

然後,您可以手動啟動處理器,可以使用 REST 端點或使用 BindingsEndpoint API,如下所示。為此,您需要確保 classpath 中包含 Spring Boot actuator 依賴。

curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST https://:8080/actuator/bindings/process3-in-0

@Autowired
BindingsEndpoint endpoint;

@Bean
public ApplicationRunner runner() {
    return args -> {
        endpoint.changeState("process3-in-0", State.STARTED);
    };
}

有關此機制的更多詳細資訊,請參見參考文件中的此部分

如本節所述,透過停用 auto-startup 來控制繫結時,請注意這僅適用於消費者繫結。換句話說,如果您使用生產者繫結 process3-out-0,它在停用處理器自動啟動方面沒有任何效果,儘管此生產者繫結與消費者繫結使用相同的 StreamsBuilderFactoryBean