消費記錄
在上面的 upppercase
函式中,我們以 Flux<String>
格式消費記錄,然後以 Flux<String>
格式生產記錄。有時您可能需要以原始接收格式(即 ReceiverRecord
)接收記錄。這裡就是一個這樣的函式。
@Bean
public Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<String>> lowercase() {
return s -> s.map(rec -> new String(rec.value()).toLowerCase());
}
在此函式中,請注意,我們以 Flux<ReceiverRecord<byte[], byte[]>>
格式消費記錄,然後以 Flux<String>
格式生產記錄。ReceiverRecord
是基本接收記錄,它是 Reactor Kafka 中專門的 Kafka ConsumerRecord
。使用 reactive Kafka binder 時,上述函式將使您能夠訪問每個入站記錄的 ReceiverRecord
型別。但是,在這種情況下,您需要為 RecordMessageConverter 提供自定義實現。預設情況下,reactive Kafka binder 使用 MessagingMessageConverter,它從 ConsumerRecord
轉換載荷和頭部。因此,當您的處理方法接收到它時,載荷已經從接收到的記錄中提取出來,並像我們上面看到的第一個函式那樣傳遞給方法。透過在應用程式中提供自定義的 RecordMessageConverter
實現,您可以覆蓋預設行為。例如,如果您想以原始的 Flux<ReceiverRecord<byte[], byte[]>>
格式消費記錄,那麼您可以在應用程式中提供以下 bean 定義。
@Bean
RecordMessageConverter fullRawReceivedRecord() {
return new RecordMessageConverter() {
private final RecordMessageConverter converter = new MessagingMessageConverter();
@Override
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
Consumer<?, ?> consumer, Type payloadType) {
return MessageBuilder.withPayload(record).build();
}
@Override
public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
return this.converter.fromMessage(message, defaultTopic);
}
};
}
然後,您需要指示框架為所需的繫結使用此轉換器。這裡是一個基於我們 lowercase
函式的示例。
spring.cloud.stream.kafka.bindings.lowercase-in-0.consumer.converterBeanName=fullRawReceivedRecord"
lowercase-in-0
是我們 lowercase
函式的輸入繫結名稱。對於出站(lowercase-out-0
),我們仍然使用常規的 MessagingMessageConverter
。
在上面的 toMessage
實現中,我們接收原始的 ConsumerRecord
(由於處於 reactive binder 上下文,所以是 ReceiverRecord
),然後將其包裝在 Message
中。然後將作為 ReceiverRecord
的訊息載荷提供給使用者方法。
如果 reactiveAutoCommit
為 false
(預設值),則呼叫 rec.receiverOffset().acknowledge()
(或 commit()
)以提交偏移量;如果 reactiveAutoCommit
為 true
,則 flux 將直接提供 ConsumerRecord
。有關更多資訊,請參閱 reactor-kafka
文件和 javadocs。