消費記錄
在上述 uppercase 函式中,我們將記錄作為 Flux<String> 消費,然後將其作為 Flux<String> 生成。有時您可能需要以原始接收格式(即 ReceiverRecord)接收記錄。下面就是一個這樣的函式。
@Bean
public Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<String>> lowercase() {
return s -> s.map(rec -> new String(rec.value()).toLowerCase());
}
在此函式中,請注意,我們將記錄作為 Flux<ReceiverRecord<byte[], byte[]>> 消費,然後將其作為 Flux<String> 生成。ReceiverRecord 是基本的接收記錄,它是 Reactor Kafka 中專用的 Kafka ConsumerRecord。當使用響應式 Kafka 繫結器時,上述函式將使您能夠訪問每個傳入記錄的 ReceiverRecord 型別。但是,在這種情況下,您需要為 RecordMessageConverter 提供自定義實現。預設情況下,響應式 Kafka 繫結器使用 MessagingMessageConverter,它從 ConsumerRecord 轉換有效負載和頭部。因此,當您的處理器方法接收到它時,有效負載已經從接收到的記錄中提取並傳遞給方法,就像我們上面看到的第一個函式一樣。透過在應用程式中提供自定義 RecordMessageConverter 實現,您可以覆蓋預設行為。例如,如果您想將記錄作為原始 Flux<ReceiverRecord<byte[], byte[]>> 消費,那麼您可以在應用程式中提供以下 bean 定義。
@Bean
RecordMessageConverter fullRawReceivedRecord() {
return new RecordMessageConverter() {
private final RecordMessageConverter converter = new MessagingMessageConverter();
@Override
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
Consumer<?, ?> consumer, Type payloadType) {
return MessageBuilder.withPayload(record).build();
}
@Override
public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
return this.converter.fromMessage(message, defaultTopic);
}
};
}
然後,您需要指示框架為所需的繫結使用此轉換器。以下是基於我們的 lowercase 函式的示例。
spring.cloud.stream.kafka.bindings.lowercase-in-0.consumer.converterBeanName=fullRawReceivedRecord"
lowercase-in-0 是我們 lowercase 函式的輸入繫結名稱。對於出站(lowercase-out-0),我們仍然使用常規的 MessagingMessageConverter。
在上面的 toMessage 實現中,我們接收原始的 ConsumerRecord(由於我們在響應式繫結器上下文中,因此是 ReceiverRecord),然後將其包裝在 Message 中。然後,作為 ReceiverRecord 的該訊息有效負載將提供給使用者方法。
如果 reactiveAutoCommit 為 false(預設),則呼叫 rec.receiverOffset().acknowledge()(或 commit())以使偏移量被提交;如果 reactiveAutoCommit 為 true,則 flux 提供 ConsumerRecord。有關更多資訊,請參閱 reactor-kafka 文件和 javadoc。