消費記錄

在上面的 upppercase 函式中,我們以 Flux<String> 格式消費記錄,然後以 Flux<String> 格式生產記錄。有時您可能需要以原始接收格式(即 ReceiverRecord)接收記錄。這裡就是一個這樣的函式。

@Bean
public Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<String>> lowercase() {
    return s -> s.map(rec -> new String(rec.value()).toLowerCase());
}

在此函式中,請注意,我們以 Flux<ReceiverRecord<byte[], byte[]>> 格式消費記錄,然後以 Flux<String> 格式生產記錄。ReceiverRecord 是基本接收記錄,它是 Reactor Kafka 中專門的 Kafka ConsumerRecord。使用 reactive Kafka binder 時,上述函式將使您能夠訪問每個入站記錄的 ReceiverRecord 型別。但是,在這種情況下,您需要為 RecordMessageConverter 提供自定義實現。預設情況下,reactive Kafka binder 使用 MessagingMessageConverter,它從 ConsumerRecord 轉換載荷和頭部。因此,當您的處理方法接收到它時,載荷已經從接收到的記錄中提取出來,並像我們上面看到的第一個函式那樣傳遞給方法。透過在應用程式中提供自定義的 RecordMessageConverter 實現,您可以覆蓋預設行為。例如,如果您想以原始的 Flux<ReceiverRecord<byte[], byte[]>> 格式消費記錄,那麼您可以在應用程式中提供以下 bean 定義。

@Bean
RecordMessageConverter fullRawReceivedRecord() {
    return new RecordMessageConverter() {

        private final RecordMessageConverter converter = new MessagingMessageConverter();

        @Override
        public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
                Consumer<?, ?> consumer, Type payloadType) {
            return MessageBuilder.withPayload(record).build();
        }

        @Override
        public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
            return this.converter.fromMessage(message, defaultTopic);
        }

    };
}

然後,您需要指示框架為所需的繫結使用此轉換器。這裡是一個基於我們 lowercase 函式的示例。

spring.cloud.stream.kafka.bindings.lowercase-in-0.consumer.converterBeanName=fullRawReceivedRecord"

lowercase-in-0 是我們 lowercase 函式的輸入繫結名稱。對於出站(lowercase-out-0),我們仍然使用常規的 MessagingMessageConverter

在上面的 toMessage 實現中,我們接收原始的 ConsumerRecord(由於處於 reactive binder 上下文,所以是 ReceiverRecord),然後將其包裝在 Message 中。然後將作為 ReceiverRecord 的訊息載荷提供給使用者方法。

如果 reactiveAutoCommitfalse(預設值),則呼叫 rec.receiverOffset().acknowledge()(或 commit())以提交偏移量;如果 reactiveAutoCommittrue,則 flux 將直接提供 ConsumerRecord。有關更多資訊,請參閱 reactor-kafka 文件和 javadocs。