有選擇地手動啟動 Kafka Streams 處理器
儘管上述方法將透過 StreamsBuilderFactoryManager 無條件地將 auto-start 設定為 false 應用於應用程式中的所有 Kafka Streams 處理器,但通常需要只對單獨選定的 Kafka Streams 處理器不進行自動啟動。例如,假設您的應用程式中有三個不同的函式(處理器),其中一個處理器您不希望在應用程式啟動時啟動。下面是這種情況的一個示例。
@Bean
public Function<KStream<?, ?>, KStream<?, ?>> process1() {
}
@Bean
public Consumer<KStream<?, ?>> process2() {
}
@Bean
public BiFunction<KStream<?, ?>, KTable<?, ?>, KStream<?, ?>> process3() {
}
在上述場景中,如果將 spring.kafka.streams.auto-startup 設定為 false,那麼在應用程式啟動期間,所有處理器都不會自動啟動。在這種情況下,您必須如上所述透過呼叫底層 StreamsBuilderFactoryManager 上的 start() 來以程式設計方式啟動它們。但是,如果我們的用例是選擇性地停用一個處理器,那麼您必須在該處理器的單個繫結上設定 auto-startup。假設我們不希望 process3 函式自動啟動。這是一個具有兩個輸入繫結(process3-in-0 和 process3-in-1)的 BiFunction。為了避免此處理器自動啟動,您可以選擇這些輸入繫結中的任何一個,並在其上設定 auto-startup。選擇哪個繫結無關緊要;如果您願意,可以將它們兩者都設定為 auto-startup 為 false,但一個就足夠了。因為它們共享相同的工廠 bean,所以您不必在兩個繫結上都將 autoStartup 設定為 false,但為了清晰起見,這樣做可能更有意義。
這是您可以用來停用此處理器自動啟動的 Spring Cloud Stream 屬性。
spring.cloud.stream.bindings.process3-in-0.consumer.auto-startup: false
或
spring.cloud.stream.bindings.process3-in-1.consumer.auto-startup: false
然後,您可以使用 REST 端點或使用 BindingsEndpoint API 手動啟動處理器,如下所示。為此,您需要確保類路徑上存在 Spring Boot 執行器依賴項。
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST https://:8080/actuator/bindings/process3-in-0
或
@Autowired
BindingsEndpoint endpoint;
@Bean
public ApplicationRunner runner() {
return args -> {
endpoint.changeState("process3-in-0", State.STARTED);
};
}
有關此機制的更多詳細資訊,請參閱參考文件中的此部分。
當透過停用 auto-startup 來控制繫結時,如本節所述,請注意這僅適用於消費者繫結。換句話說,如果您使用生產者繫結 process3-out-0,它在停用處理器自動啟動方面沒有任何效果,儘管此生產者繫結使用與消費者繫結相同的 StreamsBuilderFactoryBean。 |