StreamsBuilderFactoryBean 配置器
通常需要自定義建立 KafkaStreams 物件的 StreamsBuilderFactoryBean。基於 Spring Kafka 提供的底層支援,繫結器允許您自定義 StreamsBuilderFactoryBean。您可以使用 StreamsBuilderFactoryBeanConfigurer 來自定義 StreamsBuilderFactoryBean 本身。然後,一旦您透過此配置器訪問到 StreamsBuilderFactoryBean,您就可以使用 KafkaStreamsCustomzier 來自定義相應的 KafkaStreams。這兩個自定義器都屬於 Spring for Apache Kafka 專案。
以下是使用 StreamsBuilderFactoryBeanConfigurer 的示例。
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return sfb -> sfb.setStateListener((newState, oldState) -> {
//Do some action here!
});
}
上面所示是您可以自定義 StreamsBuilderFactoryBean 的一個示例。您基本上可以呼叫 StreamsBuilderFactoryBean 中任何可用的修改操作來對其進行自定義。繫結器將在工廠 bean 啟動之前呼叫此自定義器。
一旦您訪問到 StreamsBuilderFactoryBean,您還可以自定義底層的 KafkaStreams 物件。以下是執行此操作的藍圖。
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return factoryBean -> {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
};
}
StreamsBuilderFactoryBeabn 將在底層 KafkaStreams 啟動之前呼叫 KafkaStreamsCustomizer。
整個應用程式中只能有一個 StreamsBuilderFactoryBeanConfigurer。那麼我們如何處理多個 Kafka Streams 處理器呢,因為它們每個都由獨立的 StreamsBuilderFactoryBean 物件支援?在這種情況下,如果這些處理器需要不同的自定義,那麼應用程式需要根據應用程式 ID 應用一些過濾器。
例如,
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return factoryBean -> {
if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
.equals("processor1-application-id")) {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
}
};
使用 StreamsBuilderFactoryBeanConfigurer 註冊全域性狀態儲存
如上所述,繫結器不提供一流的方式來將全域性狀態儲存註冊為功能。為此,您需要透過 StreamsBuilderFactoryBeanConfigurer 使用自定義器。以下是具體操作方法。
@Bean
public StreamsBuilderFactoryBeanConfigurer customizer() {
return streamsBuilderFactoryBean -> {
try {
streamsBuilderFactoryBean.setInfrastructureCustomizer(new KafkaStreamsInfrastructureCustomizer() {
@Override
public void configureBuilder(StreamsBuilder builder) {
builder.addGlobalStore(
...
);
}
});
}
catch (Exception e) {
}
};
}
對 StreamsBuilder 的任何自定義都必須透過 KafkaStreamsInfrastructureCustomizer 完成,如上所示。如果呼叫 StreamsBuilderFactoryBean#getObject() 來獲取 StreamsBuilder 物件,則可能無法正常工作,因為 bean 可能正在初始化,從而導致一些迴圈依賴問題。
如果您有多個處理器,您需要透過使用應用程式 ID 過濾掉其他 StreamsBuilderFactoryBean 物件,將全域性狀態儲存附加到正確的 StreamsBuilder,如上所述。
使用 StreamsBuilderFactoryBeanConfigurer 註冊生產異常處理器
在錯誤處理部分,我們指出繫結器不提供一流的方式來處理生產異常。儘管如此,您仍然可以使用 StreamsBuilderFactoryBean 自定義器來註冊生產異常處理器。請參見下文。
@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
return fb -> {
fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
CustomProductionExceptionHandler.class);
};
}
再次,如果您有多個處理器,您可能希望將其適當設定為正確的 StreamsBuilderFactoryBean。您也可以使用配置屬性新增此類生產異常處理器(有關更多資訊,請參見下文),但如果您選擇採用程式設計方法,這是一個選項。