消費記錄

在上述 uppercase 函式中,我們將記錄作為 Flux<String> 消費,然後將其作為 Flux<String> 生成。有時您可能需要以原始接收格式(即 ReceiverRecord)接收記錄。下面就是一個這樣的函式。

@Bean
public Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<String>> lowercase() {
    return s -> s.map(rec -> new String(rec.value()).toLowerCase());
}

在此函式中,請注意,我們將記錄作為 Flux<ReceiverRecord<byte[], byte[]>> 消費,然後將其作為 Flux<String> 生成。ReceiverRecord 是基本的接收記錄,它是 Reactor Kafka 中專用的 Kafka ConsumerRecord。當使用響應式 Kafka 繫結器時,上述函式將使您能夠訪問每個傳入記錄的 ReceiverRecord 型別。但是,在這種情況下,您需要為 RecordMessageConverter 提供自定義實現。預設情況下,響應式 Kafka 繫結器使用 MessagingMessageConverter,它從 ConsumerRecord 轉換有效負載和頭部。因此,當您的處理器方法接收到它時,有效負載已經從接收到的記錄中提取並傳遞給方法,就像我們上面看到的第一個函式一樣。透過在應用程式中提供自定義 RecordMessageConverter 實現,您可以覆蓋預設行為。例如,如果您想將記錄作為原始 Flux<ReceiverRecord<byte[], byte[]>> 消費,那麼您可以在應用程式中提供以下 bean 定義。

@Bean
RecordMessageConverter fullRawReceivedRecord() {
    return new RecordMessageConverter() {

        private final RecordMessageConverter converter = new MessagingMessageConverter();

        @Override
        public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
                Consumer<?, ?> consumer, Type payloadType) {
            return MessageBuilder.withPayload(record).build();
        }

        @Override
        public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
            return this.converter.fromMessage(message, defaultTopic);
        }

    };
}

然後,您需要指示框架為所需的繫結使用此轉換器。以下是基於我們的 lowercase 函式的示例。

spring.cloud.stream.kafka.bindings.lowercase-in-0.consumer.converterBeanName=fullRawReceivedRecord"

lowercase-in-0 是我們 lowercase 函式的輸入繫結名稱。對於出站(lowercase-out-0),我們仍然使用常規的 MessagingMessageConverter

在上面的 toMessage 實現中,我們接收原始的 ConsumerRecord(由於我們在響應式繫結器上下文中,因此是 ReceiverRecord),然後將其包裝在 Message 中。然後,作為 ReceiverRecord 的該訊息有效負載將提供給使用者方法。

如果 reactiveAutoCommitfalse(預設),則呼叫 rec.receiverOffset().acknowledge()(或 commit())以使偏移量被提交;如果 reactiveAutoCommittrue,則 flux 提供 ConsumerRecord。有關更多資訊,請參閱 reactor-kafka 文件和 javadoc。

© . This site is unofficial and not affiliated with VMware.