選擇性地手動啟動 Kafka Streams 處理器
雖然上面描述的方法將透過 StreamsBuilderFactoryManager
無條件地將 auto start false
應用於應用程式中的所有 Kafka Streams 處理器,但通常我們希望只有單獨選擇的 Kafka Streams 處理器不自動啟動。例如,假設您的應用程式中有三個不同的函式(處理器),並且其中一個處理器,您不希望它在應用程式啟動時啟動。下面是這種情況的一個示例。
@Bean
public Function<KStream<?, ?>, KStream<?, ?>> process1() {
}
@Bean
public Consumer<KStream<?, ?>> process2() {
}
@Bean
public BiFunction<KStream<?, ?>, KTable<?, ?>, KStream<?, ?>> process3() {
}
在上述場景中,如果您將 spring.kafka.streams.auto-startup
設定為 false
,那麼在應用程式啟動時,所有處理器都不會自動啟動。在這種情況下,您必須如上所述,透過呼叫底層 StreamsBuilderFactoryManager
的 start()
方法來以程式設計方式啟動它們。然而,如果我們的用例是隻想選擇性地停用一個處理器,那麼您必須在該處理器的單個繫結上設定 auto-startup
。假設我們不希望 process3
函式自動啟動。這是一個具有兩個輸入繫結的 BiFunction
- process3-in-0
和 process3-in-1
。為了避免該處理器自動啟動,您可以選擇這些輸入繫結中的任何一個並在其上設定 auto-startup
。您選擇哪個繫結無關緊要;如果您願意,可以在兩個繫結上都將 auto-startup
設定為 false
,但一個就足夠了。因為它們共享同一個工廠 Bean,您不必在兩個繫結上都將 autoStartup
設定為 false
,但為了清晰起見,這樣做可能更有意義。
以下是您可以用來停用此處理器自動啟動的 Spring Cloud Stream 屬性。
spring.cloud.stream.bindings.process3-in-0.consumer.auto-startup: false
或
spring.cloud.stream.bindings.process3-in-1.consumer.auto-startup: false
然後,您可以手動啟動處理器,可以使用 REST 端點或使用 BindingsEndpoint
API,如下所示。為此,您需要確保 classpath 中包含 Spring Boot actuator 依賴。
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST https://:8080/actuator/bindings/process3-in-0
或
@Autowired
BindingsEndpoint endpoint;
@Bean
public ApplicationRunner runner() {
return args -> {
endpoint.changeState("process3-in-0", State.STARTED);
};
}
有關此機制的更多詳細資訊,請參見參考文件中的此部分。
如本節所述,透過停用 auto-startup 來控制繫結時,請注意這僅適用於消費者繫結。換句話說,如果您使用生產者繫結 process3-out-0 ,它在停用處理器自動啟動方面沒有任何效果,儘管此生產者繫結與消費者繫結使用相同的 StreamsBuilderFactoryBean 。 |