使用 @SendTo 轉發監聽器結果
從版本 2.0 開始,如果您還將一個 @KafkaListener 與 @SendTo 註解一起使用,並且方法呼叫返回一個結果,則該結果將轉發到 @SendTo 指定的主題。
@SendTo 的值可以有幾種形式
-
@SendTo("someTopic")路由到字面主題。 -
@SendTo("#{someExpression}")路由到透過在應用程式上下文初始化期間評估表示式一次確定的主題。 -
@SendTo("!{someExpression}")路由到透過在執行時評估表示式確定的主題。評估的#root物件有三個屬性-
request: 入站ConsumerRecord(或批處理偵聽器的ConsumerRecords物件)。 -
source: 從request轉換而來的org.springframework.messaging.Message<?>。 -
result: 方法返回結果。
-
-
@SendTo(無屬性):這被視為!{source.headers['kafka_replyTopic']}(從版本 2.1.3 開始)。
從版本 2.1.11 和 2.2.1 開始,屬性佔位符在 @SendTo 值中解析。
表示式評估的結果必須是一個表示主題名稱的 String。以下示例展示了使用 @SendTo 的各種方式
@KafkaListener(topics = "annotated21")
@SendTo("!{request.value()}") // runtime SpEL
public String replyingListener(String in) {
...
}
@KafkaListener(topics = "${some.property:annotated22}")
@SendTo("#{myBean.replyTopic}") // config time SpEL
public Collection<String> replyingBatchListener(List<String> in) {
...
}
@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
@SendTo("annotated23reply") // static reply topic definition
public String replyingListenerWithErrorHandler(String in) {
...
}
...
@KafkaListener(topics = "annotated25")
@SendTo("annotated25reply1")
public class MultiListenerSendTo {
@KafkaHandler
public String foo(String in) {
...
}
@KafkaHandler
@SendTo("!{'annotated25reply2'}")
public String bar(@Payload(required = false) KafkaNull nul,
@Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}
}
為了支援 @SendTo,必須向偵聽器容器工廠提供一個 KafkaTemplate(在其 replyTemplate 屬性中),用於傳送回覆。這應該是一個 KafkaTemplate,而不是用於客戶端請求/回覆處理的 ReplyingKafkaTemplate。當使用 Spring Boot 時,它會自動將模板配置到工廠中;當配置您自己的工廠時,必須按以下示例所示進行設定。 |
從版本 2.2 開始,您可以向偵聽器容器工廠新增一個 ReplyHeadersConfigurer。它用於確定您要在回覆訊息中設定哪些標頭。以下示例展示瞭如何新增 ReplyHeadersConfigurer
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf());
factory.setReplyTemplate(template());
factory.setReplyHeadersConfigurer((k, v) -> k.equals("cat"));
return factory;
}
如果您願意,還可以新增更多標頭。以下示例展示瞭如何實現
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf());
factory.setReplyTemplate(template());
factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() {
@Override
public boolean shouldCopy(String headerName, Object headerValue) {
return false;
}
@Override
public Map<String, Object> additionalHeaders() {
return Collections.singletonMap("qux", "fiz");
}
});
return factory;
}
當您使用 @SendTo 時,您必須配置 ConcurrentKafkaListenerContainerFactory 並在其 replyTemplate 屬性中配置一個 KafkaTemplate 來執行傳送。Spring Boot 將自動連線其自動配置的模板(或任何單個例項存在的模板)。
除非您使用 請求/回覆語義,否則只使用簡單的 send(topic, value) 方法,因此您可能希望建立一個子類來生成分割槽或鍵。以下示例展示瞭如何實現 |
@Bean
public KafkaTemplate<String, String> myReplyingTemplate() {
return new KafkaTemplate<String, String>(producerFactory()) {
@Override
public CompletableFuture<SendResult<String, String>> send(String topic, String data) {
return super.send(topic, partitionForData(data), keyForData(data), data);
}
...
};
}
|
如果偵聽器方法返回
|
當使用請求/回覆語義時,目標分割槽可以由傳送方請求。
|
您可以為
有關更多資訊,請參閱 處理異常。 |
如果偵聽器方法返回一個 Iterable,預設情況下,每個元素作為值傳送一個記錄。從版本 2.3.5 開始,將 @KafkaListener 上的 splitIterables 屬性設定為 false,整個結果將作為單個 ProducerRecord 的值傳送。這需要在回覆模板的生產者配置中有一個合適的序列化器。但是,如果回覆是 Iterable<Message<?>>,則該屬性將被忽略,並且每個訊息都將單獨傳送。 |