@KafkaListener 註解

@KafkaListener 註解用於將 bean 方法指定為監聽器容器的監聽器。該 bean 會被包裝在 MessagingMessageListenerAdapter 中,並配置各種特性,例如在必要時用於轉換資料以匹配方法引數的轉換器。

您可以使用 SpEL (#{…​}) 或屬性佔位符 (${…​}) 來配置註解上的大多數屬性。有關更多資訊,請參閱 Javadoc

Record 監聽器

@KafkaListener 註解為簡單的 POJO 監聽器提供了一種機制。以下示例展示瞭如何使用它

public class Listener {

    @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
    public void listen(String data) {
        ...
    }

}

該機制要求您的一個 @Configuration 類上使用 @EnableKafka 註解,並且需要一個監聽器容器工廠,該工廠用於配置底層 ConcurrentMessageListenerContainer。預設情況下,需要一個名為 kafkaListenerContainerFactory 的 bean。以下示例展示瞭如何使用 ConcurrentMessageListenerContainer

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        ...
        return props;
    }
}

請注意,要設定容器屬性,必須使用工廠上的 getContainerProperties() 方法。它被用作注入到容器中的實際屬性的模板。

從 2.1.1 版本開始,您現在可以為由該註解建立的消費者設定 client.id 屬性。clientIdPrefix 會被加上字尾 -n,其中 n 是一個整數,表示使用併發時的容器編號。

從 2.2 版本開始,您現在可以使用註解本身的屬性覆蓋容器工廠的 concurrencyautoStartup 屬性。這些屬性可以是簡單值、屬性佔位符或 SpEL 表示式。以下示例展示瞭如何實現

@KafkaListener(id = "myListener", topics = "myTopic",
        autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
    ...
}

顯式分割槽分配

您還可以為 POJO 監聽器配置顯式的主題和分割槽(以及可選的初始偏移量)。以下示例展示瞭如何實現

@KafkaListener(id = "thing2", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
          @TopicPartition(topic = "topic2", partitions = "0",
             partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

您可以在 partitionspartitionOffsets 屬性中指定每個分割槽,但不能同時使用。

與大多數註解屬性一樣,您可以使用 SpEL 表示式;有關如何生成大量分割槽的示例,請參閱 手動分配所有分割槽

從 2.5.5 版本開始,您可以將初始偏移量應用於所有分配的分割槽

@KafkaListener(id = "thing3", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" },
             partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

* 萬用字元表示 partitions 屬性中的所有分割槽。在每個 @TopicPartition 中只能有一個帶有萬用字元的 @PartitionOffset

此外,當監聽器實現 ConsumerSeekAware 時,即使使用手動分配,現在也會呼叫 onPartitionsAssigned。例如,這允許在該時間點執行任何任意的 seek 操作。

從 2.6.4 版本開始,您可以指定一個逗號分隔的分割槽列表,或者分割槽範圍

@KafkaListener(id = "pp", autoStartup = "false",
        topicPartitions = @TopicPartition(topic = "topic1",
                partitions = "0-5, 7, 10-15"))
public void process(String in) {
    ...
}

該範圍是包含性的;上面的示例將分配分割槽 0, 1, 2, 3, 4, 5, 7, 10, 11, 12, 13, 14, 15

在指定初始偏移量時可以使用相同的技術

@KafkaListener(id = "thing3", topicPartitions =
        { @TopicPartition(topic = "topic1",
             partitionOffsets = @PartitionOffset(partition = "0-5", initialOffset = "0"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

初始偏移量將應用於所有 6 個分割槽。

自 3.2 版本以來,@PartitionOffset 支援 SeekPosition.ENDSeekPosition.BEGINNINGSeekPosition.TIMESTAMPseekPosition 匹配 SeekPosition 列舉名稱

@KafkaListener(id = "seekPositionTime", topicPartitions = {
        @TopicPartition(topic = TOPIC_SEEK_POSITION, partitionOffsets = {
                @PartitionOffset(partition = "0", initialOffset = "723916800000", seekPosition = "TIMESTAMP"),
                @PartitionOffset(partition = "1", initialOffset = "0", seekPosition = "BEGINNING"),
                @PartitionOffset(partition = "2", initialOffset = "0", seekPosition = "END")
        })
})
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

如果 seekPosition 設定為 ENDBEGINNING,將忽略 initialOffsetrelativeToCurrent。如果 seekPosition 設定為 TIMESTAMPinitialOffset 表示時間戳。

手動確認

使用手動 AckMode 時,您還可以為監聽器提供 Acknowledgment。要啟用手動 AckMode,需要在 ContainerProperties 中將確認模式設定為適當的手動模式。以下示例還展示瞭如何使用不同的容器工廠。此自定義容器工廠必須透過呼叫 getContainerProperties() 然後呼叫其上的 setAckModeAckMode 設定為手動型別。否則,Acknowledgment 物件將為 null。

@KafkaListener(id = "cat", topics = "myTopic",
          containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
    ...
    ack.acknowledge();
}

消費者記錄元資料

最後,關於記錄的元資料可以從訊息頭中獲取。您可以使用以下頭名稱來檢索訊息的頭

  • KafkaHeaders.OFFSET

  • KafkaHeaders.RECEIVED_KEY

  • KafkaHeaders.RECEIVED_TOPIC

  • KafkaHeaders.RECEIVED_PARTITION

  • KafkaHeaders.RECEIVED_TIMESTAMP

  • KafkaHeaders.TIMESTAMP_TYPE

從 2.5 版本開始,如果入站記錄的 key 為 null,則 RECEIVED_KEY 將不會出現;之前該 header 會填充一個 null 值。此更改是為了使框架與 spring-messaging 的約定保持一致,即不存在值為 null 的 headers。

以下示例展示瞭如何使用這些 headers

@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
        @Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Integer key,
        @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
        ) {
    ...
}
引數註解(@Payload@Header)必須指定在監聽器方法的具體實現上;如果定義在介面上,則不會被檢測到。

從 2.5 版本開始,除了使用離散的 headers 外,您還可以在 ConsumerRecordMetadata 引數中接收記錄元資料。

@KafkaListener(...)
public void listen(String str, ConsumerRecordMetadata meta) {
    ...
}

這包含 ConsumerRecord 中除 key 和 value 外的所有資料。

批次監聽器

從 1.1 版本開始,您可以配置 @KafkaListener 方法來接收從消費者 poll 中獲取的整個批次的消費者記錄。

批次監聽器不支援非阻塞重試

要配置監聽器容器工廠以建立批次監聽器,您可以設定 batchListener 屬性。以下示例展示瞭如何實現

@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);
   return factory;
}
從 2.8 版本開始,您可以使用 @KafkaListener 註解上的 batch 屬性覆蓋工廠的 batchListener 屬性。結合 容器錯誤處理器 的變化,這允許同一個工廠用於 record 和 batch 監聽器。
從 2.9.6 版本開始,容器工廠為 recordMessageConverterbatchMessageConverter 屬性提供了單獨的 setter 方法。以前,只有一個 messageConverter 屬性,適用於 record 和 batch 監聽器。

以下示例展示瞭如何接收負載列表

@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
    ...
}

主題、分割槽、偏移量等資訊在與負載並行的 headers 中可用。以下示例展示瞭如何使用這些 headers

@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
        @Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
        @Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
        @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
}

或者,您可以接收 List 型別的 Message 物件,其中每個訊息包含其偏移量和其他詳細資訊,但這必須是方法上定義的唯一引數(除了可選的 Acknowledgment(在使用手動提交時)和/或 Consumer 引數)。以下示例展示瞭如何實現

@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen1(List<Message<?>> list) {
    ...
}

@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen2(List<Message<?>> list, Acknowledgment ack) {
    ...
}

@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen3(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
    ...
}

在這種情況下,不會對負載執行轉換。

如果 BatchMessagingMessageConverter 配置了 RecordMessageConverter,您還可以為 Message 引數新增泛型型別,並且負載會被轉換。有關更多資訊,請參閱 批次監聽器的負載轉換

您還可以接收 ConsumerRecord 物件的列表,但這必須是方法上定義的唯一引數(除了可選的 Acknowledgment(在使用手動提交時)和 Consumer 引數)。以下示例展示瞭如何實現

@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
    ...
}

@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
    ...
}

從 2.2 版本開始,監聽器可以接收由 poll() 方法返回的完整 ConsumerRecords 物件,使監聽器能夠訪問更多方法,例如 partitions()(返回列表中的 TopicPartition 例項)和 records(TopicPartition)(獲取選擇性記錄)。同樣,這必須是方法上的唯一引數(除了可選的 Acknowledgment(在使用手動提交時)或 Consumer 引數)。以下示例展示瞭如何實現

@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
public void pollResults(ConsumerRecords<?, ?> records) {
    ...
}
如果容器工廠配置了 RecordFilterStrategy,對於 ConsumerRecords 監聽器,它會被忽略,併發出 WARN 日誌訊息。只有在使用 List 形式的監聽器時,才能透過批次監聽器過濾記錄。預設情況下,記錄是逐個過濾的;從 2.8 版本開始,您可以覆蓋 filterBatch 以一次呼叫過濾整個批次。

註解屬性

從 2.0 版本開始,id 屬性(如果存在)將用作 Kafka 消費者的 group.id 屬性,覆蓋消費者工廠中配置的同名屬性(如果存在)。您也可以顯式設定 groupId 或將 idIsGroup 設定為 false,以恢復之前使用消費者工廠 group.id 的行為。

您可以在大多數註解屬性中使用屬性佔位符或 SpEL 表示式,如下例所示

@KafkaListener(topics = "${some.property}")

@KafkaListener(topics = "#{someBean.someProperty}",
    groupId = "#{someBean.someProperty}.group")

從 2.1.2 版本開始,SpEL 表示式支援一個特殊 token:__listener。這是一個偽 bean 名稱,表示此註解所在的當前 bean 例項。

考慮以下示例

@Bean
public Listener listener1() {
    return new Listener("topic1");
}

@Bean
public Listener listener2() {
    return new Listener("topic2");
}

給定前一個示例中的 bean,我們可以使用以下方式

public class Listener {

    private final String topic;

    public Listener(String topic) {
        this.topic = topic;
    }

    @KafkaListener(topics = "#{__listener.topic}",
        groupId = "#{__listener.topic}.group")
    public void listen(...) {
        ...
    }

    public String getTopic() {
        return this.topic;
    }

}

萬一(儘管不太可能)您有一個實際名為 __listener 的 bean,您可以使用 beanRef 屬性更改表示式 token。以下示例展示瞭如何實現

@KafkaListener(beanRef = "__x", topics = "#{__x.topic}", groupId = "#{__x.topic}.group")

從 2.2.4 版本開始,您可以直接在註解上指定 Kafka 消費者屬性,這些屬性將覆蓋消費者工廠中配置的同名屬性。您不能透過這種方式指定 group.idclient.id 屬性;它們將被忽略;請使用 groupIdclientIdPrefix 註解屬性來指定它們。

這些屬性指定為單獨的字串,採用標準的 Java Properties 檔案格式:foo:barfoo=barfoo bar,如下例所示

@KafkaListener(topics = "myTopic", groupId = "group", properties = {
    "max.poll.interval.ms:60000",
    ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})

以下是在 使用 RoutingKafkaTemplate 中的相應監聽器示例。

@KafkaListener(id = "one", topics = "one")
public void listen1(String in) {
    System.out.println("1: " + in);
}

@KafkaListener(id = "two", topics = "two",
        properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void listen2(byte[] in) {
    System.out.println("2: " + new String(in));
}