容器工廠
如@KafkaListener 註解中所述,ConcurrentKafkaListenerContainerFactory 用於為帶註解的方法建立容器。
從版本 2.2 開始,您可以使用相同的工廠來建立任何 ConcurrentMessageListenerContainer。如果您希望建立具有相似屬性的多個容器,或者希望使用某些外部配置的工廠(例如 Spring Boot 自動配置提供的工廠),這可能會很有用。容器建立後,您可以進一步修改其屬性,其中許多屬性是透過使用 container.getContainerProperties() 設定的。以下示例配置了一個 ConcurrentMessageListenerContainer
@Bean
public ConcurrentMessageListenerContainer<String, String>(
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
ConcurrentMessageListenerContainer<String, String> container =
factory.createContainer("topic1", "topic2");
container.setMessageListener(m -> { ... } );
return container;
}
以這種方式建立的容器不會新增到端點登錄檔中。它們應該作為 @Bean 定義建立,以便它們註冊到應用程式上下文中。 |
從版本 2.3.4 開始,您可以向工廠新增 ContainerCustomizer,以在容器建立和配置後進一步配置每個容器。
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setContainerCustomizer(container -> { /* customize the container */ });
return factory;
}
從版本 3.1 開始,還可以透過在 KafkaListener 註解上指定 'ContainerPostProcessor' 的 bean 名稱,在單個監聽器上應用相同型別的自定義。
@Bean
public ContainerPostProcessor<String, String, AbstractMessageListenerContainer<String, String>> customContainerPostProcessor() {
return container -> { /* customize the container */ };
}
...
@KafkaListener(..., containerPostProcessor="customContainerPostProcessor", ...)