使用 `@SendTo` 轉發監聽器結果

從 2.0 版本開始,如果您同時使用 `@SendTo` 註解標記 `@KafkaListener`,並且方法呼叫返回結果,則該結果將轉發到 `@SendTo` 指定的主題。

`@SendTo` 值可以有幾種形式

  • `@SendTo("someTopic")` 路由到字面值主題。

  • `@SendTo("#{someExpression}")` 路由到在應用上下文初始化期間評估表示式確定的主題。

  • `@SendTo("!{someExpression}")` 路由到在執行時評估表示式確定的主題。評估的 `#root` 物件有三個屬性

    • `request`:入站的 `ConsumerRecord`(對於批次監聽器,則是 `ConsumerRecords` 物件)。

    • `source`:從 `request` 轉換而來的 `org.springframework.messaging.Message`。

    • `result`:方法返回的結果。

  • `@SendTo`(無屬性):這被視為 `!{source.headers['kafka_replyTopic']}`(從 2.1.3 版本開始)。

從 2.1.11 和 2.2.1 版本開始,屬性佔位符會在 `@SendTo` 值中被解析。

表示式評估的結果必須是表示主題名稱的 `String`。以下示例展示了使用 `@SendTo` 的各種方式

@KafkaListener(topics = "annotated21")
@SendTo("!{request.value()}") // runtime SpEL
public String replyingListener(String in) {
    ...
}

@KafkaListener(topics = "${some.property:annotated22}")
@SendTo("#{myBean.replyTopic}") // config time SpEL
public Collection<String> replyingBatchListener(List<String> in) {
    ...
}

@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
@SendTo("annotated23reply") // static reply topic definition
public String replyingListenerWithErrorHandler(String in) {
    ...
}
...
@KafkaListener(topics = "annotated25")
@SendTo("annotated25reply1")
public class MultiListenerSendTo {

    @KafkaHandler
    public String foo(String in) {
        ...
    }

    @KafkaHandler
    @SendTo("!{'annotated25reply2'}")
    public String bar(@Payload(required = false) KafkaNull nul,
            @Header(KafkaHeaders.RECEIVED_KEY) int key) {
        ...
    }

}
為了支援 `@SendTo`,必須為監聽器容器工廠提供一個 `KafkaTemplate`(在其 `replyTemplate` 屬性中),用於傳送回覆。這應該是一個 `KafkaTemplate`,而不是用於客戶端請求/回覆處理的 `ReplyingKafkaTemplate`。使用 Spring Boot 時,它會自動將模板配置到工廠中;配置自己的工廠時,必須按照以下示例所示進行設定。

從 2.2 版本開始,您可以向監聽器容器工廠新增一個 `ReplyHeadersConfigurer`。會諮詢它以確定要在回覆訊息中設定哪些頭。以下示例展示瞭如何新增 `ReplyHeadersConfigurer`

@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(cf());
    factory.setReplyTemplate(template());
    factory.setReplyHeadersConfigurer((k, v) -> k.equals("cat"));
    return factory;
}

如果需要,您還可以新增更多頭。以下示例展示瞭如何操作

@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(cf());
    factory.setReplyTemplate(template());
    factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() {

      @Override
      public boolean shouldCopy(String headerName, Object headerValue) {
        return false;
      }

      @Override
      public Map<String, Object> additionalHeaders() {
        return Collections.singletonMap("qux", "fiz");
      }

    });
    return factory;
}

使用 `@SendTo` 時,您必須在 `ConcurrentKafkaListenerContainerFactory` 的 `replyTemplate` 屬性中配置一個 `KafkaTemplate` 來執行傳送。Spring Boot 會自動注入其自動配置的模板(或者如果存在單個例項,則注入該例項)。

除非您使用 請求/回覆語義,否則只使用簡單的 `send(topic, value)` 方法,因此您可能希望建立一個子類來生成分割槽或鍵。以下示例展示瞭如何操作
@Bean
public KafkaTemplate<String, String> myReplyingTemplate() {
    return new KafkaTemplate<String, String>(producerFactory()) {

        @Override
        public CompletableFuture<SendResult<String, String>> send(String topic, String data) {
            return super.send(topic, partitionForData(data), keyForData(data), data);
        }

        ...

    };
}

如果監聽器方法返回 `Message` 或 `Collection>`,則監聽器方法負責設定回覆的訊息頭。例如,處理來自 `ReplyingKafkaTemplate` 的請求時,您可以執行以下操作

@KafkaListener(id = "messageReturned", topics = "someTopic")
public Message<?> listen(String in, @Header(KafkaHeaders.REPLY_TOPIC) byte[] replyTo,
        @Header(KafkaHeaders.CORRELATION_ID) byte[] correlation) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.TOPIC, replyTo)
            .setHeader(KafkaHeaders.KEY, 42)
            .setHeader(KafkaHeaders.CORRELATION_ID, correlation)
            .setHeader("someOtherHeader", "someValue")
            .build();
}

使用 請求/回覆 語義時,目標分割槽可以由傳送者請求。

即使沒有返回結果,您也可以使用 `@SendTo` 註解標記 `@KafkaListener` 方法。這允許配置一個 `errorHandler`,它可以將關於失敗訊息投遞的資訊轉發到某個主題。以下示例展示瞭如何操作

@KafkaListener(id = "voidListenerWithReplyingErrorHandler", topics = "someTopic",
        errorHandler = "voidSendToErrorHandler")
@SendTo("failures")
public void voidListenerWithReplyingErrorHandler(String in) {
    throw new RuntimeException("fail");
}

@Bean
public KafkaListenerErrorHandler voidSendToErrorHandler() {
    return (m, e) -> {
        return ... // some information about the failure and input data
    };
}

有關更多資訊,請參閱 異常處理

如果監聽器方法返回 `Iterable`,預設情況下會為每個元素髮送一條記錄作為值。從 2.3.5 版本開始,將 `@KafkaListener` 的 `splitIterables` 屬性設定為 `false`,整個結果將作為單個 `ProducerRecord` 的值傳送。這需要在回覆模板的生產者配置中配置合適的序列化器。但是,如果回覆是 `Iterable>`,則該屬性將被忽略,每條訊息將單獨傳送。