傳送訊息

本節介紹如何傳送訊息。

使用 KafkaTemplate

本節介紹如何使用 KafkaTemplate 傳送訊息。

概述

KafkaTemplate 封裝了一個生產者,並提供了便捷方法將資料傳送到 Kafka 主題。以下列表顯示了 KafkaTemplate 中的相關方法

CompletableFuture<SendResult<K, V>> sendDefault(V data);

CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, V data);

CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

CompletableFuture<SendResult<K, V>> send(Message<?> message);

Map<MetricName, ? extends Metric> metrics();

List<PartitionInfo> partitionsFor(String topic);

<T> T execute(ProducerCallback<K, V, T> callback);

<T> T executeInTransaction(OperationsCallback<K, V, T> callback);

// Flush the producer.
void flush();

interface ProducerCallback<K, V, T> {

    T doInKafka(Producer<K, V> producer);

}

interface OperationsCallback<K, V, T> {

    T doInOperations(KafkaOperations<K, V> operations);

}

有關更多詳細資訊,請參閱 Javadoc

sendDefault API 要求已向模板提供了預設主題。

該 API 將 timestamp 作為引數,並將此時間戳儲存在記錄中。使用者提供的時間戳如何儲存取決於 Kafka 主題上配置的時間戳型別。如果主題配置為使用 CREATE_TIME,則記錄(如果未指定則生成)使用者指定的時間戳。如果主題配置為使用 LOG_APPEND_TIME,則忽略使用者指定的時間戳,代理將新增本地代理時間。

metricspartitionsFor 方法委託給底層 Producer 上的相同方法。execute 方法提供了對底層 Producer 的直接訪問。

從 Spring 元件傳送訊息時,如果依賴於透過 NewTopic bean 自動建立主題,請避免使用 @PostConstruct 方法。@PostConstruct 在應用程式上下文完全就緒之前執行,這可能會導致乾淨的代理上出現 UnknownTopicOrPartitionException

相反,請考慮以下替代方案

  • 使用 ApplicationListener<ContextRefreshedEvent> 以確保上下文在傳送之前完全重新整理。

  • 實現 SmartLifecycle 以在 KafkaAdmin bean 完成其初始化後啟動。

  • 外部預建立主題。

要使用模板,您可以配置一個生產者工廠,並在模板的建構函式中提供它。以下示例展示瞭如何實現

@Bean
public ProducerFactory producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map producerConfigs() {
    Map props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // See https://kafka.apache.org/41/documentation/#producerconfigs for more properties
    return props;
}

@Bean
public KafkaTemplate kafkaTemplate() {
    return new KafkaTemplate(producerFactory());
}

從 2.5 版本開始,您現在可以覆蓋工廠的 ProducerConfig 屬性,以使用與同一工廠不同的生產者配置建立模板。

@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
    return new KafkaTemplate<>(pf);
}

@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
    return new KafkaTemplate<>(pf,
            Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}

請注意,型別為 ProducerFactory<?, ?> 的 bean(例如 Spring Boot 自動配置的 bean)可以引用為具有不同窄化泛型型別。

您還可以使用標準 <bean/> 定義配置模板。

然後,要使用模板,您可以呼叫其方法之一。

當您使用帶有 Message<?> 引數的方法時,主題、分割槽、鍵和時間戳資訊在訊息頭中提供,其中包含以下項

  • KafkaHeaders.TOPIC

  • KafkaHeaders.PARTITION

  • KafkaHeaders.KEY

  • KafkaHeaders.TIMESTAMP

訊息有效負載是資料。

或者,您可以為 KafkaTemplate 配置一個 ProducerListener,以獲取傳送結果(成功或失敗)的非同步回撥,而不是等待 Future 完成。以下列表顯示了 ProducerListener 介面的定義

public interface ProducerListener<K, V> {

    default void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
	}

    default void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata, Exception exception) {
	}

}

預設情況下,模板配置了一個 LoggingProducerListener,它記錄錯誤並在傳送成功時不做任何操作。

為了方便起見,如果您只想實現其中一個方法,則提供了預設方法實現。

請注意,傳送方法返回一個 CompletableFuture<SendResult>。您可以向偵聽器註冊回撥以非同步接收發送結果。以下示例展示瞭如何實現

CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.whenComplete((result, ex) -> {
    ...
});

SendResult 有兩個屬性:ProducerRecordRecordMetadata。有關這些物件的資訊,請參閱 Kafka API 文件。

Throwable 可以強制轉換為 KafkaProducerException;其 producerRecord 屬性包含失敗的記錄。

如果您希望阻塞傳送執行緒以等待結果,可以呼叫 future 的 get() 方法;建議使用帶有超時的版本。如果您已設定 linger.ms,您可能希望在等待之前呼叫 flush(),或者為了方便起見,模板有一個帶有 autoFlush 引數的建構函式,該引數使模板在每次傳送時呼叫 flush()。只有在設定了 linger.ms 生產者屬性並希望立即傳送部分批次時才需要重新整理。

示例

本節展示了向 Kafka 傳送訊息的示例

示例 1. 非阻塞 (非同步)
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    CompletableFuture<SendResult<String, String>> future = template.send(record);
    future.whenComplete((result, ex) -> {
        if (ex == null) {
            handleSuccess(data);
        }
        else {
            handleFailure(data, record, ex);
        }
    });
}
示例 2. 阻塞 (同步)
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    try {
        template.send(record).get(10, TimeUnit.SECONDS);
        handleSuccess(data);
    }
    catch (ExecutionException e) {
        handleFailure(data, record, e.getCause());
    }
    catch (TimeoutException | InterruptedException e) {
        handleFailure(data, record, e);
    }
}

請注意,ExecutionException 的原因是帶有 producerRecord 屬性的 KafkaProducerException

使用 RoutingKafkaTemplate

從 2.5 版本開始,您可以根據目標 topic 名稱,在執行時使用 RoutingKafkaTemplate 選擇生產者。

路由模板不支援事務、executeflushmetrics 操作,因為這些操作的主題是未知的。

模板需要一個從 java.util.regex.PatternProducerFactory<Object, Object> 例項的對映。此對映應該是有序的(例如 LinkedHashMap),因為它按順序遍歷;您應該在開頭新增更具體的模式。

以下簡單的 Spring Boot 應用程式提供了一個示例,說明如何使用同一模板傳送到不同的主題,每個主題都使用不同的值序列化器。

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
            ProducerFactory<Object, Object> pf) {

        // Clone the PF with a different Serializer, register with Spring for shutdown
        Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
        context.registerBean("bytesPF", DefaultKafkaProducerFactory.class, () -> bytesPF);

        Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
        map.put(Pattern.compile("two"), bytesPF);
        map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
        return new RoutingKafkaTemplate(map);
    }

    @Bean
    public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
        return args -> {
            routingTemplate.send("one", "thing1");
            routingTemplate.send("two", "thing2".getBytes());
        };
    }

}

此示例對應的 @KafkaListener 顯示在 註解屬性 中。

要實現類似結果的另一種技術,但具有向同一主題傳送不同型別的額外功能,請參閱 委派序列化器和反序列化器

使用 DefaultKafkaProducerFactory

使用 KafkaTemplate 中所述,ProducerFactory 用於建立生產者。

當不使用 事務 時,預設情況下,DefaultKafkaProducerFactory 會建立一個由所有客戶端使用的單例生產者,如 KafkaProducer JavaDocs 中所建議。但是,如果您在模板上呼叫 flush(),這可能會導致使用同一生產者的其他執行緒出現延遲。從 2.3 版本開始,DefaultKafkaProducerFactory 有一個新屬性 producerPerThread。當設定為 true 時,工廠將為每個執行緒建立(並快取)一個單獨的生產者,以避免此問題。

producerPerThreadtrue 時,當不再需要生產者時,使用者程式碼必須在工廠上呼叫 closeThreadBoundProducer()。這將物理關閉生產者並將其從 ThreadLocal 中刪除。呼叫 reset()destroy() 不會清理這些生產者。

建立 DefaultKafkaProducerFactory 時,可以透過呼叫只接受屬性 Map 的建構函式(請參閱 使用 KafkaTemplate 中的示例)從配置中獲取鍵和/或值 Serializer 類,或者可以將 Serializer 例項傳遞給 DefaultKafkaProducerFactory 建構函式(在這種情況下,所有 Producer 共享相同的例項)。或者,您可以提供 Supplier<Serializer>(從 2.3 版本開始),它們將用於為每個 Producer 獲取單獨的 Serializer 例項

@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}

@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
    return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}

從 2.5.10 版本開始,您現在可以在工廠建立後更新生產者屬性。例如,如果您必須在憑據更改後更新 SSL 金鑰/信任儲存位置,這可能很有用。這些更改不會影響現有生產者例項;呼叫 reset() 關閉任何現有生產者,以便使用新屬性建立新生產者。

您不能將事務性生產者工廠更改為非事務性,反之亦然。

現在提供了兩個新方法

void updateConfigs(Map<String, Object> updates);

void removeConfig(String configKey);

從 2.8 版本開始,如果您將序列化器作為物件提供(在建構函式中或透過 setter),工廠將呼叫 configure() 方法以使用配置屬性配置它們。

使用 ReplyingKafkaTemplate

2.1.3 版本引入了 KafkaTemplate 的子類以提供請求/回覆語義。該類名為 ReplyingKafkaTemplate,並有兩個額外的方法;以下顯示了方法簽名

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
    Duration replyTimeout);

結果是一個 CompletableFuture,它被非同步填充結果(或異常,如果超時)。結果還有一個 sendFuture 屬性,它是呼叫 KafkaTemplate.send() 的結果。您可以使用此 future 來確定傳送操作的結果。

如果使用第一個方法,或者 replyTimeout 引數為 null,則使用模板的 defaultReplyTimeout 屬性(預設為 5 秒)。

從 2.8.8 版本開始,模板有一個新方法 waitForAssignment。如果回覆容器配置了 auto.offset.reset=latest,這很有用,可以避免在容器初始化之前傳送請求和回覆。

當使用手動分割槽分配(無組管理)時,等待時間必須大於容器的 pollTimeout 屬性,因為直到第一次輪詢完成之後才會傳送通知。

以下 Spring Boot 應用程式展示瞭如何使用此功能的示例

@SpringBootApplication
public class KRequestingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KRequestingApplication.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
        return args -> {
            if (!template.waitForAssignment(Duration.ofSeconds(10))) {
                throw new IllegalStateException("Reply container did not initialize");
            }
            ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
            RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
            SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
            System.out.println("Sent ok: " + sendResult.getRecordMetadata());
            ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
            System.out.println("Return value: " + consumerRecord.value());
        };
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
            ProducerFactory<String, String> pf,
            ConcurrentMessageListenerContainer<String, String> repliesContainer) {

        return new ReplyingKafkaTemplate<>(pf, repliesContainer);
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, String> repliesContainer(
            ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

        ConcurrentMessageListenerContainer<String, String> repliesContainer =
                containerFactory.createContainer("kReplies");
        repliesContainer.getContainerProperties().setGroupId("repliesGroup");
        repliesContainer.setAutoStartup(false);
        return repliesContainer;
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kRequests")
            .partitions(10)
            .replicas(2)
            .build();
    }

    @Bean
    public NewTopic kReplies() {
        return TopicBuilder.name("kReplies")
            .partitions(10)
            .replicas(2)
            .build();
    }

}

請注意,我們可以使用 Boot 的自動配置容器工廠來建立回覆容器。

如果正在為回覆使用非平凡的反序列化器,請考慮使用委託給您配置的反序列化器的 ErrorHandlingDeserializer。當如此配置時,RequestReplyFuture 將異常完成,您可以捕獲 ExecutionException,其 cause 屬性中包含 DeserializationException

從 2.6.7 版本開始,除了檢測 DeserializationException 之外,如果提供了 replyErrorChecker 函式,模板將呼叫它。如果它返回異常,future 將異常完成。

這是一個例子

template.setReplyErrorChecker(record -> {
    Header error = record.headers().lastHeader("serverSentAnError");
    if (error != null) {
        return new MyException(new String(error.value()));
    }
    else {
        return null;
    }
});

...

RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
try {
    future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
    ConsumerRecord<Integer, String> consumerRecord = future.get(10, TimeUnit.SECONDS);
    ...
}
catch (InterruptedException e) {
    ...
}
catch (ExecutionException e) {
    if (e.getCause() instanceof MyException) {
        ...
    }
}
catch (TimeoutException e) {
    ...
}

模板設定一個標頭(預設為 KafkaHeaders.CORRELATION_ID),伺服器端必須將其回顯。

在這種情況下,以下 @KafkaListener 應用程式會響應

@SpringBootApplication
public class KReplyingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KReplyingApplication.class, args);
    }

    @KafkaListener(id="server", topics = "kRequests")
    @SendTo // use default replyTo expression
    public String listen(String in) {
        System.out.println("Server received: " + in);
        return in.toUpperCase();
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kRequests")
            .partitions(10)
            .replicas(2)
            .build();
    }

    @Bean // not required if Jackson is on the classpath
    public MessagingMessageConverter simpleMapperConverter() {
        MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
        messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
        return messagingMessageConverter;
    }

}

@KafkaListener 基礎設施回顯關聯 ID 並確定回覆主題。

有關傳送回覆的更多資訊,請參閱 使用 @SendTo 轉發偵聽器結果。模板使用預設標頭 KafKaHeaders.REPLY_TOPIC 來指示回覆要傳送到的主題。

從 2.2 版本開始,模板嘗試從配置的回覆容器中檢測回覆主題或分割槽。如果容器配置為偵聽單個主題或單個 TopicPartitionOffset,則將其用於設定回覆標頭。如果容器配置為其他方式,則使用者必須設定回覆標頭。在這種情況下,在初始化期間會寫入一條 INFO 日誌訊息。以下示例使用 KafkaHeaders.REPLY_TOPIC

record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));

當您配置單個回覆 TopicPartitionOffset 時,只要每個例項偵聽不同的分割槽,就可以為多個模板使用相同的回覆主題。當配置單個回覆主題時,每個例項必須使用不同的 group.id。在這種情況下,所有例項都接收每個回覆,但只有傳送請求的例項才能找到關聯 ID。這對於自動伸縮可能很有用,但會增加額外的網路流量開銷以及丟棄每個不需要的回覆的少量成本。當您使用此設定時,我們建議您將模板的 sharedReplyTopic 設定為 true,這將把意外回覆的日誌級別從預設的 ERROR 降低到 DEBUG。

以下是配置回覆容器以使用相同共享回覆主題的示例

@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
        ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

    ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
    container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique
    Properties props = new Properties();
    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
    container.getContainerProperties().setKafkaConsumerProperties(props);
    return container;
}
如果您有多個客戶端例項並且您沒有按照前一段中的討論進行配置,則每個例項都需要一個專用的回覆主題。另一種方法是設定 KafkaHeaders.REPLY_PARTITION 併為每個例項使用專用分割槽。Header 包含一個四位元組整數(大端序)。伺服器必須使用此標頭將回復路由到正確的分割槽(@KafkaListener 執行此操作)。在這種情況下,回覆容器不能使用 Kafka 的組管理功能,並且必須配置為偵聽固定分割槽(透過在其 ContainerProperties 建構函式中使用 TopicPartitionOffset)。
JsonKafkaHeaderMapper 需要 Jackson 在類路徑上(用於 @KafkaListener)。如果不可用,訊息轉換器將沒有標頭對映器,因此您必須使用 SimpleKafkaHeaderMapper 配置 MessagingMessageConverter,如前所示。

預設情況下,使用 3 個標頭

  • KafkaHeaders.CORRELATION_ID - 用於將回復與請求關聯起來

  • KafkaHeaders.REPLY_TOPIC - 用於告訴伺服器回覆到哪裡

  • KafkaHeaders.REPLY_PARTITION - (可選) 用於告訴伺服器回覆到哪個分割槽

這些標頭名稱由 @KafkaListener 基礎設施用於路由回覆。

從 2.3 版本開始,您可以自定義標頭名稱 - 模板有 3 個屬性 correlationHeaderNamereplyTopicHeaderNamereplyPartitionHeaderName。如果您的伺服器不是 Spring 應用程式(或不使用 @KafkaListener),這很有用。

相反,如果請求應用程式不是 Spring 應用程式並且將關聯資訊放在不同的標頭中,則從 3.0 版本開始,您可以在偵聽器容器工廠上配置自定義 correlationHeaderName,並且該標頭將被回顯。以前,偵聽器必須回顯自定義關聯標頭。

使用 Message<?> 進行請求/回覆

2.7 版本向 ReplyingKafkaTemplate 添加了傳送和接收 spring-messagingMessage<?> 抽象的方法

RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);

<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message,
        ParameterizedTypeReference<P> returnType);

這些將使用模板的預設 replyTimeout,還有過載版本可以在方法呼叫中接受超時。

如果消費者的 Deserializer 或模板的 MessageConverter 可以在沒有任何額外資訊的情況下轉換有效負載(透過配置或回覆訊息中的型別元資料),則使用第一個方法。

如果您需要為返回型別提供型別資訊以協助訊息轉換器,則使用第二個方法。這還允許同一個模板接收不同型別,即使回覆中沒有型別元資料,例如當伺服器端不是 Spring 應用程式時。以下是後者的一個示例

模板 Bean
  • Java

  • Kotlin

@Bean
ReplyingKafkaTemplate<String, String, String> template(
        ProducerFactory<String, String> pf,
        ConcurrentKafkaListenerContainerFactory<String, String> factory) {

    ConcurrentMessageListenerContainer<String, String> replyContainer =
            factory.createContainer("replies");
    replyContainer.getContainerProperties().setGroupId("request.replies");
    ReplyingKafkaTemplate<String, String, String> template =
            new ReplyingKafkaTemplate<>(pf, replyContainer);
    template.setMessageConverter(new ByteArrayJacksonJsonMessageConverter());
    template.setDefaultTopic("requests");
    return template;
}
@Bean
fun template(
    pf: ProducerFactory<String, String>,
    factory: ConcurrentKafkaListenerContainerFactory<String, String>
): ReplyingKafkaTemplate<String, String, String> {
    val replyContainer = factory.createContainer("replies")
    replyContainer.containerProperties.setGroupId("request.replies")
    val template = ReplyingKafkaTemplate<String, String, String>(pf, replyContainer)
    template.messageConverter = ByteArrayJacksonJsonMessageConverter()
    template.setDefaultTopic("requests")
    return template
}
使用模板
  • Java

  • Kotlin

RequestReplyTypedMessageFuture<String, String, Thing> future1 =
        template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
                new ParameterizedTypeReference<Thing>() { });
log.info(future1.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
Thing thing = future1.get(10, TimeUnit.SECONDS).getPayload();
log.info(thing.toString());

RequestReplyTypedMessageFuture<String, String, List<Thing>> future2 =
        template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
                new ParameterizedTypeReference<List<Thing>>() { });
log.info(future2.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
List<Thing> things = future2.get(10, TimeUnit.SECONDS).getPayload();
things.forEach(thing1 -> log.info(thing1.toString()));
val future1: RequestReplyTypedMessageFuture<String, String, Thing> =
    template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
        object : ParameterizedTypeReference<Thing>() {})
log.info(future1.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata.toString())
val thing = future1.get(10, TimeUnit.SECONDS).payload
log.info(thing.toString())

val future2: RequestReplyTypedMessageFuture<String, String, List<Thing>> =
    template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
        object : ParameterizedTypeReference<List<Thing>>() {})
log.info(future2.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata.toString())
val things = future2.get(10, TimeUnit.SECONDS).payload
things.forEach { thing1 -> log.info(thing1.toString()) }

回覆型別 Message<?>

@KafkaListener 返回 Message<?> 時,在 2.5 版本之前,需要填充回覆主題和關聯 ID 標頭。在此示例中,我們使用來自請求的回覆主題標頭

@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.TOPIC, replyTo)
            .setHeader(KafkaHeaders.KEY, 42)
            .setHeader(KafkaHeaders.CORRELATION_ID, correlation)
            .build();
}

這也顯示瞭如何為回覆記錄設定鍵。

從 2.5 版本開始,框架將檢測這些標頭是否缺失,並使用主題填充它們 - 無論是從 @SendTo 值確定的主題,還是傳入的 KafkaHeaders.REPLY_TOPIC 標頭(如果存在)。它還將回顯傳入的 KafkaHeaders.CORRELATION_IDKafkaHeaders.REPLY_PARTITION(如果存在)。

@KafkaListener(id = "requestor", topics = "request")
@SendTo  // default REPLY_TOPIC header
public Message<?> messageReturn(String in) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.KEY, 42)
            .build();
}

回覆中的原始記錄鍵

從 3.3 版本開始,來自傳入請求的 Kafka 記錄鍵(如果存在)將保留在回覆記錄中。這僅適用於單記錄請求/回覆場景。當偵聽器是批次或返回型別是集合時,由應用程式決定透過將回復記錄包裝在 Message 型別中來指定要使用的鍵。

聚合多個回覆

使用 ReplyingKafkaTemplate 中的模板嚴格用於單個請求/回覆場景。對於單個訊息的多個接收者返回回覆的情況,您可以使用 AggregatingReplyingKafkaTemplate。這是 分散-聚合企業整合模式 客戶端的實現。

ReplyingKafkaTemplate 類似,AggregatingReplyingKafkaTemplate 建構函式接受一個生產者工廠和一個偵聽器容器來接收回復;它有第三個引數 BiPredicate<List<ConsumerRecord<K, R>>, Boolean> releaseStrategy,每次收到回覆時都會諮詢它;當謂詞返回 true 時,ConsumerRecord 的集合將用於完成 sendAndReceive 方法返回的 Future

還有一個額外的屬性 returnPartialOnTimeout(預設為 false)。當此設定為 true 時,不會使用 KafkaReplyTimeoutException 完成 future,而是部分結果正常完成 future(只要至少收到一條回覆記錄)。

從 2.3.5 版本開始,謂詞也會在超時後呼叫(如果 returnPartialOnTimeouttrue)。第一個引數是當前記錄列表;第二個是如果此呼叫是由於超時,則為 true。謂詞可以修改記錄列表。

AggregatingReplyingKafkaTemplate<Integer, String, String> template =
        new AggregatingReplyingKafkaTemplate<>(producerFactory, container,
                        coll -> coll.size() == releaseSize);
...
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
        template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =
        future.get(30, TimeUnit.SECONDS);

請注意,返回型別是 ConsumerRecord,其值是 ConsumerRecord 的集合。這個“外部”ConsumerRecord 不是一個“真實”的記錄,它是由模板合成的,作為請求收到的實際回覆記錄的持有者。當正常釋放發生時(釋放策略返回 true),主題設定為 aggregatedResults;如果 returnPartialOnTimeout 為 true 並且發生超時(並且至少收到一條回覆記錄),則主題設定為 partialResultsAfterTimeout。模板為這些“主題”名稱提供了常量靜態變數

/**
 * Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
 * results in its value after a normal release by the release strategy.
 */
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";

/**
 * Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
 * results in its value after a timeout.
 */
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";

Collection 中真實的 ConsumerRecord 包含收到回覆的實際主題。

回覆的偵聽器容器必須配置為 AckMode.MANUALAckMode.MANUAL_IMMEDIATE;消費者屬性 enable.auto.commit 必須為 false(2.3 版本以來的預設值)。為了避免任何訊息丟失的可能性,模板僅在沒有未完成的請求時提交偏移量,即當最後一個未完成的請求被釋放策略釋放時。重新平衡後,可能會出現重複的回覆傳遞;這些將被忽略任何正在進行的請求;當收到已釋放回復的重複回覆時,您可能會看到錯誤日誌訊息。
如果您將 ErrorHandlingDeserializer 與此聚合模板一起使用,框架將不會自動檢測 DeserializationException。相反,記錄(值為 null)將完整返回,並在標頭中包含反序列化異常。建議應用程式呼叫實用方法 ReplyingKafkaTemplate.checkDeserialization() 方法來確定是否發生反序列化異常。有關更多資訊,請參閱其 JavaDocs。replyErrorChecker 也不會為此聚合模板呼叫;您應該對回覆的每個元素執行檢查。
© . This site is unofficial and not affiliated with VMware.