容器工廠
如@KafkaListener
註解中所述,ConcurrentKafkaListenerContainerFactory
用於為帶有註解的方法建立容器。
從 2.2 版本開始,您可以使用同一個工廠建立任何 ConcurrentMessageListenerContainer
。如果您想建立具有類似屬性的多個容器,或者希望使用一些外部配置的工廠,例如 Spring Boot 自動配置提供的工廠,這可能會很有用。容器建立後,您可以進一步修改其屬性,其中許多屬性是透過使用 container.getContainerProperties()
設定的。以下示例配置了一個 ConcurrentMessageListenerContainer
@Bean
public ConcurrentMessageListenerContainer<String, String>(
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
ConcurrentMessageListenerContainer<String, String> container =
factory.createContainer("topic1", "topic2");
container.setMessageListener(m -> { ... } );
return container;
}
以這種方式建立的容器不會新增到端點註冊中心。它們應該作為 @Bean 定義建立,以便註冊到應用程式上下文中。 |
從 2.3.4 版本開始,您可以在工廠中新增一個 ContainerCustomizer
,以便在容器建立和配置後進一步配置每個容器。
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setContainerCustomizer(container -> { /* customize the container */ });
return factory;
}
從 3.1 版本開始,還可以透過在 KafkaListener 註解上指定 'ContainerPostProcessor' 的 bean 名稱來對單個監聽器應用相同型別的自定義配置。
@Bean
public ContainerPostProcessor<String, String, AbstractMessageListenerContainer<String, String>> customContainerPostProcessor() {
return container -> { /* customize the container */ };
}
...
@KafkaListener(..., containerPostProcessor="customContainerPostProcessor", ...)