使用 @SendTo 轉發監聽器結果

從版本 2.0 開始,如果您還將一個 @KafkaListener@SendTo 註解一起使用,並且方法呼叫返回一個結果,則該結果將轉發到 @SendTo 指定的主題。

@SendTo 的值可以有幾種形式

  • @SendTo("someTopic") 路由到字面主題。

  • @SendTo("#{someExpression}") 路由到透過在應用程式上下文初始化期間評估表示式一次確定的主題。

  • @SendTo("!{someExpression}") 路由到透過在執行時評估表示式確定的主題。評估的 #root 物件有三個屬性

    • request: 入站 ConsumerRecord(或批處理偵聽器的 ConsumerRecords 物件)。

    • source: 從 request 轉換而來的 org.springframework.messaging.Message<?>

    • result: 方法返回結果。

  • @SendTo(無屬性):這被視為 !{source.headers['kafka_replyTopic']}(從版本 2.1.3 開始)。

從版本 2.1.11 和 2.2.1 開始,屬性佔位符在 @SendTo 值中解析。

表示式評估的結果必須是一個表示主題名稱的 String。以下示例展示了使用 @SendTo 的各種方式

@KafkaListener(topics = "annotated21")
@SendTo("!{request.value()}") // runtime SpEL
public String replyingListener(String in) {
    ...
}

@KafkaListener(topics = "${some.property:annotated22}")
@SendTo("#{myBean.replyTopic}") // config time SpEL
public Collection<String> replyingBatchListener(List<String> in) {
    ...
}

@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
@SendTo("annotated23reply") // static reply topic definition
public String replyingListenerWithErrorHandler(String in) {
    ...
}
...
@KafkaListener(topics = "annotated25")
@SendTo("annotated25reply1")
public class MultiListenerSendTo {

    @KafkaHandler
    public String foo(String in) {
        ...
    }

    @KafkaHandler
    @SendTo("!{'annotated25reply2'}")
    public String bar(@Payload(required = false) KafkaNull nul,
            @Header(KafkaHeaders.RECEIVED_KEY) int key) {
        ...
    }

}
為了支援 @SendTo,必須向偵聽器容器工廠提供一個 KafkaTemplate(在其 replyTemplate 屬性中),用於傳送回覆。這應該是一個 KafkaTemplate,而不是用於客戶端請求/回覆處理的 ReplyingKafkaTemplate。當使用 Spring Boot 時,它會自動將模板配置到工廠中;當配置您自己的工廠時,必須按以下示例所示進行設定。

從版本 2.2 開始,您可以向偵聽器容器工廠新增一個 ReplyHeadersConfigurer。它用於確定您要在回覆訊息中設定哪些標頭。以下示例展示瞭如何新增 ReplyHeadersConfigurer

@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(cf());
    factory.setReplyTemplate(template());
    factory.setReplyHeadersConfigurer((k, v) -> k.equals("cat"));
    return factory;
}

如果您願意,還可以新增更多標頭。以下示例展示瞭如何實現

@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(cf());
    factory.setReplyTemplate(template());
    factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() {

      @Override
      public boolean shouldCopy(String headerName, Object headerValue) {
        return false;
      }

      @Override
      public Map<String, Object> additionalHeaders() {
        return Collections.singletonMap("qux", "fiz");
      }

    });
    return factory;
}

當您使用 @SendTo 時,您必須配置 ConcurrentKafkaListenerContainerFactory 並在其 replyTemplate 屬性中配置一個 KafkaTemplate 來執行傳送。Spring Boot 將自動連線其自動配置的模板(或任何單個例項存在的模板)。

除非您使用 請求/回覆語義,否則只使用簡單的 send(topic, value) 方法,因此您可能希望建立一個子類來生成分割槽或鍵。以下示例展示瞭如何實現
@Bean
public KafkaTemplate<String, String> myReplyingTemplate() {
    return new KafkaTemplate<String, String>(producerFactory()) {

        @Override
        public CompletableFuture<SendResult<String, String>> send(String topic, String data) {
            return super.send(topic, partitionForData(data), keyForData(data), data);
        }

        ...

    };
}

如果偵聽器方法返回 Message<?>Collection<Message<?>>,則偵聽器方法負責為回覆設定訊息標頭。例如,當處理來自 ReplyingKafkaTemplate 的請求時,您可能會執行以下操作

@KafkaListener(id = "messageReturned", topics = "someTopic")
public Message<?> listen(String in, @Header(KafkaHeaders.REPLY_TOPIC) byte[] replyTo,
        @Header(KafkaHeaders.CORRELATION_ID) byte[] correlation) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.TOPIC, replyTo)
            .setHeader(KafkaHeaders.KEY, 42)
            .setHeader(KafkaHeaders.CORRELATION_ID, correlation)
            .setHeader("someOtherHeader", "someValue")
            .build();
}

當使用請求/回覆語義時,目標分割槽可以由傳送方請求。

您可以為 @KafkaListener 方法新增 @SendTo 註解,即使沒有返回結果。這是為了允許配置一個 errorHandler,它可以將有關訊息傳遞失敗的資訊轉發到某個主題。以下示例展示瞭如何實現

@KafkaListener(id = "voidListenerWithReplyingErrorHandler", topics = "someTopic",
        errorHandler = "voidSendToErrorHandler")
@SendTo("failures")
public void voidListenerWithReplyingErrorHandler(String in) {
    throw new RuntimeException("fail");
}

@Bean
public KafkaListenerErrorHandler voidSendToErrorHandler() {
    return (m, e) -> {
        return ... // some information about the failure and input data
    };
}

有關更多資訊,請參閱 處理異常

如果偵聽器方法返回一個 Iterable,預設情況下,每個元素作為值傳送一個記錄。從版本 2.3.5 開始,將 @KafkaListener 上的 splitIterables 屬性設定為 false,整個結果將作為單個 ProducerRecord 的值傳送。這需要在回覆模板的生產者配置中有一個合適的序列化器。但是,如果回覆是 Iterable<Message<?>>,則該屬性將被忽略,並且每個訊息都將單獨傳送。
© . This site is unofficial and not affiliated with VMware.