StreamsBuilderFactoryBean 配置器
通常需要定製建立 KafkaStreams
物件的 StreamsBuilderFactoryBean
。基於 Spring Kafka 提供的底層支援,該 binder 允許您定製 StreamsBuilderFactoryBean
。您可以使用 StreamsBuilderFactoryBeanConfigurer
來自定義 StreamsBuilderFactoryBean
本身。然後,一旦透過此配置器訪問到 StreamsBuilderFactoryBean
,您就可以使用 KafkaStreamsCustomzier
定製相應的 KafkaStreams
。這兩個定製器都是 Spring for Apache Kafka 專案的一部分。
以下是使用 StreamsBuilderFactoryBeanConfigurer
的示例。
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return sfb -> sfb.setStateListener((newState, oldState) -> {
//Do some action here!
});
}
上述示例說明了您可以對 StreamsBuilderFactoryBean
進行的定製。您基本上可以呼叫 StreamsBuilderFactoryBean
的任何可用變動操作來進行定製。該定製器將在工廠 Bean 啟動之前由 binder 呼叫。
一旦您訪問到 StreamsBuilderFactoryBean
,您還可以定製底層的 KafkaStreams
物件。以下是實現這一點的藍圖。
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return factoryBean -> {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
};
}
KafkaStreamsCustomizer
將在底層的 KafkaStreams
啟動之前由 StreamsBuilderFactoryBeabn
呼叫。
整個應用中只能有一個 StreamsBuilderFactoryBeanConfigurer
。那麼如何考慮多個 Kafka Streams 處理器呢?因為它們每一個都由獨立的 StreamsBuilderFactoryBean
物件支援。在這種情況下,如果這些處理器需要不同的定製,那麼應用需要基於應用 ID 應用一些過濾器。
例如,
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return factoryBean -> {
if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
.equals("processor1-application-id")) {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
}
};
使用 StreamsBuilderFactoryBeanConfigurer 註冊全域性狀態儲存
如上所述,binder 沒有直接提供一種註冊全域性狀態儲存的功能。為此,您需要透過 StreamsBuilderFactoryBeanConfigurer
使用定製器。以下是如何實現。
@Bean
public StreamsBuilderFactoryBeanConfigurer customizer() {
return streamsBuilderFactoryBean -> {
try {
streamsBuilderFactoryBean.setInfrastructureCustomizer(new KafkaStreamsInfrastructureCustomizer() {
@Override
public void configureBuilder(StreamsBuilder builder) {
builder.addGlobalStore(
...
);
}
});
}
catch (Exception e) {
}
};
}
對 StreamsBuilder
的任何定製都必須透過 KafkaStreamsInfrastructureCustomizer
完成,如上所示。如果呼叫 StreamsBuilderFactoryBean#getObject()
來獲取 StreamsBuilder
物件,這可能無法工作,因為該 Bean 可能處於初始化階段,從而導致一些迴圈依賴問題。
如果您有多個處理器,您需要透過使用應用 ID 過濾掉其他 StreamsBuilderFactoryBean
物件,將全域性狀態儲存附加到正確的 StreamsBuilder
,如上所述。
使用 StreamsBuilderFactoryBeanConfigurer 註冊生產異常處理器
在錯誤處理部分,我們指出 binder 沒有直接提供處理生產異常的方式。儘管如此,您仍然可以使用 StreamsBuilderFactoryBean
定製器來註冊生產異常處理器。請參閱下文。
@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
return fb -> {
fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
CustomProductionExceptionHandler.class);
};
}
同樣,如果您有多個處理器,您可能希望針對正確的 StreamsBuilderFactoryBean
適當設定它。您也可以使用配置屬性新增此類生產異常處理器(有關更多資訊,請參閱下文),但如果您選擇採用程式設計方法,這是一個選項。