容器工廠

@KafkaListener 註解中所述,ConcurrentKafkaListenerContainerFactory 用於為帶有註解的方法建立容器。

從 2.2 版本開始,您可以使用同一個工廠建立任何 ConcurrentMessageListenerContainer。如果您想建立具有類似屬性的多個容器,或者希望使用一些外部配置的工廠,例如 Spring Boot 自動配置提供的工廠,這可能會很有用。容器建立後,您可以進一步修改其屬性,其中許多屬性是透過使用 container.getContainerProperties() 設定的。以下示例配置了一個 ConcurrentMessageListenerContainer

@Bean
public ConcurrentMessageListenerContainer<String, String>(
        ConcurrentKafkaListenerContainerFactory<String, String> factory) {

    ConcurrentMessageListenerContainer<String, String> container =
        factory.createContainer("topic1", "topic2");
    container.setMessageListener(m -> { ... } );
    return container;
}
以這種方式建立的容器不會新增到端點註冊中心。它們應該作為 @Bean 定義建立,以便註冊到應用程式上下文中。

從 2.3.4 版本開始,您可以在工廠中新增一個 ContainerCustomizer,以便在容器建立和配置後進一步配置每個容器。

@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.setContainerCustomizer(container -> { /* customize the container */ });
    return factory;
}

從 3.1 版本開始,還可以透過在 KafkaListener 註解上指定 'ContainerPostProcessor' 的 bean 名稱來對單個監聽器應用相同型別的自定義配置。

@Bean
public ContainerPostProcessor<String, String, AbstractMessageListenerContainer<String, String>> customContainerPostProcessor() {
    return container -> { /* customize the container */ };
}

...

@KafkaListener(..., containerPostProcessor="customContainerPostProcessor", ...)