傳送訊息

本節介紹如何傳送訊息。

使用 KafkaTemplate

本節介紹如何使用 KafkaTemplate 傳送訊息。

概覽

KafkaTemplate 封裝了一個生產者,並提供了方便的方法將資料傳送到 Kafka topic。以下列表展示了 KafkaTemplate 中的相關方法

CompletableFuture<SendResult<K, V>> sendDefault(V data);

CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, V data);

CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

CompletableFuture<SendResult<K, V>> send(Message<?> message);

Map<MetricName, ? extends Metric> metrics();

List<PartitionInfo> partitionsFor(String topic);

<T> T execute(ProducerCallback<K, V, T> callback);

<T> T executeInTransaction(OperationsCallback<K, V, T> callback);

// Flush the producer.
void flush();

interface ProducerCallback<K, V, T> {

    T doInKafka(Producer<K, V> producer);

}

interface OperationsCallback<K, V, T> {

    T doInOperations(KafkaOperations<K, V> operations);

}

詳情請參閱 Javadoc

sendDefault API 要求為模板提供了預設 topic。

該 API 接受 timestamp 引數,並將此時間戳儲存在記錄中。使用者提供的時間戳的儲存方式取決於 Kafka topic 上配置的時間戳型別。如果 topic 配置為使用 CREATE_TIME,則記錄使用者指定的時間戳(如果未指定則生成)。如果 topic 配置為使用 LOG_APPEND_TIME,則忽略使用者指定的時間戳,代理會新增本地代理時間。

metricspartitionsFor 方法委託給底層 Producer 上的同名方法。execute 方法提供對底層 Producer 的直接訪問。

要使用模板,您可以配置一個生產者工廠並在模板的建構函式中提供它。以下示例展示瞭如何這樣做

@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // See https://kafka.apache.org/documentation/#producerconfigs for more properties
    return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
    return new KafkaTemplate<Integer, String>(producerFactory());
}

從 2.5 版本開始,您現在可以覆蓋工廠的 ProducerConfig 屬性,以便使用同一工廠建立具有不同生產者配置的模板。

@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
    return new KafkaTemplate<>(pf);
}

@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
    return new KafkaTemplate<>(pf,
            Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}

請注意,型別為 ProducerFactory<?, ?> 的 bean(例如 Spring Boot 自動配置的 bean)可以引用不同的窄化泛型型別。

您還可以使用標準 <bean/> 定義來配置模板。

然後,要使用模板,您可以呼叫它的一個方法。

當您使用帶有 Message<?> 引數的方法時,topic、分割槽、key 和時間戳資訊會包含在訊息頭部中,具體包括以下項

  • KafkaHeaders.TOPIC

  • KafkaHeaders.PARTITION

  • KafkaHeaders.KEY

  • KafkaHeaders.TIMESTAMP

訊息 payload 是資料。

(可選)您可以為 KafkaTemplate 配置一個 ProducerListener,以便獲得傳送結果(成功或失敗)的非同步回撥,而不是等待 Future 完成。以下列表顯示了 ProducerListener 介面的定義

public interface ProducerListener<K, V> {

    default void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
	}

    default void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata, Exception exception) {
	}

}

預設情況下,模板配置了一個 LoggingProducerListener,它會記錄錯誤,並在傳送成功時不做任何事情。

為了方便起見,提供了預設方法實現,以防您只想實現其中的一個方法。

請注意,傳送方法返回一個 CompletableFuture<SendResult>。您可以向監聽器註冊回撥,以非同步接收發送結果。以下示例展示瞭如何這樣做

CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.whenComplete((result, ex) -> {
    ...
});

SendResult 有兩個屬性:ProducerRecordRecordMetadata。有關這些物件的資訊,請參閱 Kafka API 文件。

Throwable 可以被轉換為 KafkaProducerException;它的 producerRecord 屬性包含失敗的記錄。

如果您希望阻塞傳送執行緒以等待結果,可以呼叫 future 的 get() 方法;建議使用帶有超時引數的方法。如果您設定了 linger.ms,您可能希望在等待之前呼叫 flush(),或者為了方便,模板有一個帶有 autoFlush 引數的建構函式,這會使得模板在每次傳送時都呼叫 flush()。只有在您設定了 linger.ms 生產者屬性並且想立即傳送部分批次時,才需要進行重新整理。

示例

本節展示了向 Kafka 傳送訊息的示例

示例 1. 非阻塞 (非同步)
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    CompletableFuture<SendResult<String, String>> future = template.send(record);
    future.whenComplete((result, ex) -> {
        if (ex == null) {
            handleSuccess(data);
        }
        else {
            handleFailure(data, record, ex);
        }
    });
}
阻塞 (同步)
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    try {
        template.send(record).get(10, TimeUnit.SECONDS);
        handleSuccess(data);
    }
    catch (ExecutionException e) {
        handleFailure(data, record, e.getCause());
    }
    catch (TimeoutException | InterruptedException e) {
        handleFailure(data, record, e);
    }
}

請注意,ExecutionException 的原因是一個帶有 producerRecord 屬性的 KafkaProducerException

使用 RoutingKafkaTemplate

從 2.5 版本開始,您可以使用 RoutingKafkaTemplate 在執行時根據目標 topic 名稱選擇生產者。

由於這些操作的 topic 未知,路由模板**不支援**事務、executeflushmetrics 操作。

該模板需要一個從 java.util.regex.PatternProducerFactory<Object, Object> 例項的對映。此對映應有序(例如 LinkedHashMap),因為它會按順序遍歷;您應該在開頭新增更具體的模式。

以下是一個簡單的 Spring Boot 應用程式,提供了一個示例,說明如何使用同一個模板向不同的 topic 傳送訊息,每個 topic 使用不同的值序列化器。

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
            ProducerFactory<Object, Object> pf) {

        // Clone the PF with a different Serializer, register with Spring for shutdown
        Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
        context.registerBean("bytesPF", DefaultKafkaProducerFactory.class, () -> bytesPF);

        Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
        map.put(Pattern.compile("two"), bytesPF);
        map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
        return new RoutingKafkaTemplate(map);
    }

    @Bean
    public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
        return args -> {
            routingTemplate.send("one", "thing1");
            routingTemplate.send("two", "thing2".getBytes());
        };
    }

}

此示例對應的 @KafkaListener 位於註解屬性中。

有關實現類似結果的另一種技術,同時具有向同一 topic 傳送不同型別的附加功能,請參閱委託序列化器和反序列化器

使用 DefaultKafkaProducerFactory

使用 KafkaTemplate 中所示,ProducerFactory 用於建立生產者。

在使用事務時,預設情況下,DefaultKafkaProducerFactory 會建立一個由所有客戶端使用的單例生產者,如 KafkaProducer JavaDocs 中推薦的那樣。但是,如果您在模板上呼叫 flush(),這可能會導致使用同一生產者的其他執行緒出現延遲。從 2.3 版本開始,DefaultKafkaProducerFactory 有一個新屬性 producerPerThread。當設定為 true 時,工廠會為每個執行緒建立一個(並快取)單獨的生產者,以避免此問題。

producerPerThreadtrue 時,當不再需要生產者時,使用者程式碼**必須**呼叫工廠上的 closeThreadBoundProducer()。這會物理關閉生產者並將其從 ThreadLocal 中移除。呼叫 reset()destroy() 不會清理這些生產者。

建立 DefaultKafkaProducerFactory 時,可以透過呼叫只接受屬性 Map 的建構函式(參見使用 KafkaTemplate 中的示例)從配置中獲取 key 和/或 value 的 Serializer 類,或者可以將 Serializer 例項傳遞給 DefaultKafkaProducerFactory 建構函式(在這種情況下,所有 Producer 共享相同的例項)。或者,您可以提供 Supplier<Serializer>(從 2.3 版本開始),用於為每個 Producer 獲取單獨的 Serializer 例項

@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}

@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
    return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}

從 2.5.10 版本開始,您現在可以在建立工廠後更新生產者屬性。例如,如果您必須在憑證更改後更新 SSL key/trust 儲存位置,這可能很有用。更改不會影響現有的生產者例項;呼叫 reset() 關閉任何現有生產者,以便使用新屬性建立新生產者。

您不能將事務性生產者工廠更改為非事務性,反之亦然。

現在提供了兩個新方法

void updateConfigs(Map<String, Object> updates);

void removeConfig(String configKey);

從 2.8 版本開始,如果您將序列化器作為物件提供(在建構函式中或透過 setter),工廠將呼叫 configure() 方法,使用配置屬性配置它們。

使用 ReplyingKafkaTemplate

2.1.3 版本引入了 KafkaTemplate 的一個子類,用於提供請求/響應語義。該類名為 ReplyingKafkaTemplate,並有兩個額外的方法;以下顯示了方法簽名

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
    Duration replyTimeout);

結果是一個 CompletableFuture,它會非同步填充結果(或超時異常)。結果還有一個 sendFuture 屬性,這是呼叫 KafkaTemplate.send() 的結果。您可以使用此 future 來確定傳送操作的結果。

如果使用第一個方法,或者 replyTimeout 引數為 null,則使用模板的 defaultReplyTimeout 屬性(預設為 5 秒)。

從 2.8.8 版本開始,模板新增了一個方法 waitForAssignment。如果回覆容器配置了 auto.offset.reset=latest,這非常有用,可以避免在容器初始化之前傳送請求和收到回覆。

在使用手動分割槽分配(無組管理)時,等待的持續時間必須大於容器的 pollTimeout 屬性,因為通知只有在第一次 poll 完成後才會傳送。

以下 Spring Boot 應用程式展示瞭如何使用該功能的一個示例

@SpringBootApplication
public class KRequestingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KRequestingApplication.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
        return args -> {
            if (!template.waitForAssignment(Duration.ofSeconds(10))) {
                throw new IllegalStateException("Reply container did not initialize");
            }
            ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
            RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
            SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
            System.out.println("Sent ok: " + sendResult.getRecordMetadata());
            ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
            System.out.println("Return value: " + consumerRecord.value());
        };
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
            ProducerFactory<String, String> pf,
            ConcurrentMessageListenerContainer<String, String> repliesContainer) {

        return new ReplyingKafkaTemplate<>(pf, repliesContainer);
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, String> repliesContainer(
            ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

        ConcurrentMessageListenerContainer<String, String> repliesContainer =
                containerFactory.createContainer("kReplies");
        repliesContainer.getContainerProperties().setGroupId("repliesGroup");
        repliesContainer.setAutoStartup(false);
        return repliesContainer;
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kRequests")
            .partitions(10)
            .replicas(2)
            .build();
    }

    @Bean
    public NewTopic kReplies() {
        return TopicBuilder.name("kReplies")
            .partitions(10)
            .replicas(2)
            .build();
    }

}

請注意,我們可以使用 Boot 的自動配置容器工廠來建立回覆容器。

如果回覆使用了非平凡的反序列化器,請考慮使用一個委託給您配置的反序列化器的ErrorHandlingDeserializer。這樣配置後,RequestReplyFuture 將以異常方式完成,您可以捕獲 ExecutionException,其 cause 屬性中包含 DeserializationException

從 2.6.7 版本開始,除了檢測 DeserializationExceptions 外,如果提供了 replyErrorChecker 函式,模板還會呼叫它。如果它返回一個異常,future 將以異常方式完成。

這是一個示例

template.setReplyErrorChecker(record -> {
    Header error = record.headers().lastHeader("serverSentAnError");
    if (error != null) {
        return new MyException(new String(error.value()));
    }
    else {
        return null;
    }
});

...

RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
try {
    future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
    ConsumerRecord<Integer, String> consumerRecord = future.get(10, TimeUnit.SECONDS);
    ...
}
catch (InterruptedException e) {
    ...
}
catch (ExecutionException e) {
    if (e.getCause() instanceof MyException) {
        ...
    }
}
catch (TimeoutException e) {
    ...
}

模板會設定一個頭部(預設名為 KafkaHeaders.CORRELATION_ID),伺服器端必須將其回顯回來。

在這種情況下,以下 @KafkaListener 應用程式會響應

@SpringBootApplication
public class KReplyingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KReplyingApplication.class, args);
    }

    @KafkaListener(id="server", topics = "kRequests")
    @SendTo // use default replyTo expression
    public String listen(String in) {
        System.out.println("Server received: " + in);
        return in.toUpperCase();
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kRequests")
            .partitions(10)
            .replicas(2)
            .build();
    }

    @Bean // not required if Jackson is on the classpath
    public MessagingMessageConverter simpleMapperConverter() {
        MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
        messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
        return messagingMessageConverter;
    }

}

@KafkaListener 基礎設施會回顯關聯 ID 並確定回覆 topic。

有關傳送回覆的更多資訊,請參閱使用 @SendTo 轉發監聽器結果。模板使用預設頭部 KafKaHeaders.REPLY_TOPIC 來指示回覆應傳送到的 topic。

從 2.2 版本開始,模板會嘗試從配置的回覆容器中檢測回覆 topic 或分割槽。如果容器配置為監聽單個 topic 或單個 TopicPartitionOffset,則使用它來設定回覆頭部。如果容器的配置不同,使用者必須自行設定回覆頭部。在這種情況下,初始化期間會寫入一條 INFO 日誌訊息。以下示例使用 KafkaHeaders.REPLY_TOPIC

record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));

當您配置單個回覆 TopicPartitionOffset 時,只要每個例項監聽不同的分割槽,就可以為多個模板使用相同的回覆 topic。當配置單個回覆 topic 時,每個例項必須使用不同的 group.id。在這種情況下,所有例項都會收到每個回覆,但只有傳送請求的例項會找到關聯 ID。這可能對自動伸縮有用,但這會帶來額外的網路流量開銷以及丟棄每個不需要的回覆的少量成本。當您使用此設定時,我們建議您將模板的 sharedReplyTopic 設定為 true,這將把意外回覆的日誌級別從預設的 ERROR 降低到 DEBUG。

以下是配置回覆容器使用相同共享回覆 topic 的示例

@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
        ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

    ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
    container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique
    Properties props = new Properties();
    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
    container.getContainerProperties().setKafkaConsumerProperties(props);
    return container;
}
如果您有多個客戶端例項,並且沒有按照前一段中討論的方式配置它們,則每個例項需要一個專用的回覆 topic。另一種方法是設定 KafkaHeaders.REPLY_PARTITION 併為每個例項使用專用的分割槽。Header 包含一個四位元組的 int(大端)。伺服器必須使用此頭部將回復路由到正確的分割槽(@KafkaListener 會這樣做)。但是,在這種情況下,回覆容器不能使用 Kafka 的組管理功能,並且必須配置為監聽一個固定分割槽(透過在其 ContainerProperties 建構函式中使用 TopicPartitionOffset)。
DefaultKafkaHeaderMapper 要求 Jackson 位於 classpath 中(供 @KafkaListener 使用)。如果不可用,訊息轉換器將沒有頭部對映器,因此您必須配置一個帶有 SimpleKafkaHeaderMapperMessagingMessageConverter,如前面所示。

預設情況下,使用 3 個頭部

  • KafkaHeaders.CORRELATION_ID - 用於將回復與請求關聯

  • KafkaHeaders.REPLY_TOPIC - 用於告訴伺服器回覆到哪裡

  • KafkaHeaders.REPLY_PARTITION - (可選)用於告訴伺服器回覆到哪個分割槽

這些頭部名稱由 @KafkaListener 基礎設施用於路由回覆。

從 2.3 版本開始,您可以自定義頭部名稱 - 模板有 3 個屬性:correlationHeaderNamereplyTopicHeaderNamereplyPartitionHeaderName。如果您的伺服器不是 Spring 應用程式(或不使用 @KafkaListener),這非常有用。

反之,如果請求應用程式不是 Spring 應用程式,並且將關聯資訊放在不同的頭部中,從 3.0 版本開始,您可以在監聽器容器工廠上配置自定義的 correlationHeaderName,並且該頭部將被回顯回來。之前,監聽器必須回顯自定義關聯頭部。

使用 Message<?> 進行請求/回覆

2.7 版本在 ReplyingKafkaTemplate 中添加了方法,用於傳送和接收 spring-messagingMessage<?> 抽象

RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);

<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message,
        ParameterizedTypeReference<P> returnType);

這些方法將使用模板的預設 replyTimeout,也有過載版本可以在方法呼叫中指定超時時間。

如果 consumer 的 Deserializer 或模板的 MessageConverter 可以在沒有任何額外資訊的情況下轉換 payload(無論是透過配置還是回覆訊息中的型別元資料),請使用第一個方法。

如果您需要為返回型別提供型別資訊以輔助訊息轉換器,請使用第二個方法。這還允許同一個模板接收不同型別,即使回覆中沒有型別元資料,例如伺服器端不是 Spring 應用程式的情況。以下是後者的一個示例

模板 Bean
  • Java

  • Kotlin

@Bean
ReplyingKafkaTemplate<String, String, String> template(
        ProducerFactory<String, String> pf,
        ConcurrentKafkaListenerContainerFactory<String, String> factory) {

    ConcurrentMessageListenerContainer<String, String> replyContainer =
            factory.createContainer("replies");
    replyContainer.getContainerProperties().setGroupId("request.replies");
    ReplyingKafkaTemplate<String, String, String> template =
            new ReplyingKafkaTemplate<>(pf, replyContainer);
    template.setMessageConverter(new ByteArrayJsonMessageConverter());
    template.setDefaultTopic("requests");
    return template;
}
@Bean
fun template(
    pf: ProducerFactory<String?, String>?,
    factory: ConcurrentKafkaListenerContainerFactory<String?, String?>
): ReplyingKafkaTemplate<String?, String, String?> {
    val replyContainer = factory.createContainer("replies")
    replyContainer.containerProperties.groupId = "request.replies"
    val template = ReplyingKafkaTemplate(pf, replyContainer)
    template.messageConverter = ByteArrayJsonMessageConverter()
    template.defaultTopic = "requests"
    return template
}
使用模板
  • Java

  • Kotlin

RequestReplyTypedMessageFuture<String, String, Thing> future1 =
        template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
                new ParameterizedTypeReference<Thing>() { });
log.info(future1.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
Thing thing = future1.get(10, TimeUnit.SECONDS).getPayload();
log.info(thing.toString());

RequestReplyTypedMessageFuture<String, String, List<Thing>> future2 =
        template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
                new ParameterizedTypeReference<List<Thing>>() { });
log.info(future2.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
List<Thing> things = future2.get(10, TimeUnit.SECONDS).getPayload();
things.forEach(thing1 -> log.info(thing1.toString()));
val future1: RequestReplyTypedMessageFuture<String?, String?, Thing?>? =
    template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
        object : ParameterizedTypeReference<Thing?>() {})
log.info(future1?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata?.toString())
val thing = future1?.get(10, TimeUnit.SECONDS)?.payload
log.info(thing.toString())

val future2: RequestReplyTypedMessageFuture<String?, String?, List<Thing?>?>? =
    template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
        object : ParameterizedTypeReference<List<Thing?>?>() {})
log.info(future2?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata.toString())
val things = future2?.get(10, TimeUnit.SECONDS)?.payload
things?.forEach(Consumer { thing1: Thing? -> log.info(thing1.toString()) })

回覆型別 Message<?>

@KafkaListener 返回 Message<?> 時,在 2.5 版本之前,需要填充回覆 topic 和關聯 id 頭部。在此示例中,我們使用請求中的回覆 topic 頭部

@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.TOPIC, replyTo)
            .setHeader(KafkaHeaders.KEY, 42)
            .setHeader(KafkaHeaders.CORRELATION_ID, correlation)
            .build();
}

這也展示瞭如何在回覆記錄上設定 key。

從 2.5 版本開始,框架將檢測這些頭部是否缺失,並使用 topic 填充它們 - 要麼是根據 @SendTo 值確定的 topic,要麼是傳入的 KafkaHeaders.REPLY_TOPIC 頭部(如果存在)。如果存在傳入的 KafkaHeaders.CORRELATION_IDKafkaHeaders.REPLY_PARTITION,它也會回顯它們。

@KafkaListener(id = "requestor", topics = "request")
@SendTo  // default REPLY_TOPIC header
public Message<?> messageReturn(String in) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.KEY, 42)
            .build();
}

回覆中的原始記錄 Key

從 3.3 版本開始,傳入請求中的 Kafka 記錄 key(如果存在)將在回覆記錄中保留。這僅適用於單記錄請求/回覆場景。當監聽器是批處理或返回型別是集合時,應用程式需要透過將回復記錄包裝在 Message 型別中來指定要使用的 key。

聚合多個回覆

使用 ReplyingKafkaTemplate 中介紹的模板嚴格用於單請求/回覆場景。對於單個訊息有多個接收者返回回覆的情況,您可以使用 AggregatingReplyingKafkaTemplate。這是散播-聚集企業整合模式的客戶端實現。

ReplyingKafkaTemplate 類似,AggregatingReplyingKafkaTemplate 的建構函式接受一個生產者工廠和一個監聽器容器來接收回復;它有第三個引數 BiPredicate<List<ConsumerRecord<K, R>>, Boolean> releaseStrategy,每次收到回覆時都會呼叫它;當謂詞返回 true 時,ConsumerRecord 的集合用於完成 sendAndReceive 方法返回的 Future

還有一個附加屬性 returnPartialOnTimeout(預設為 false)。當此屬性設定為 true 時,未來不會以 KafkaReplyTimeoutException 完成,而是會以部分結果正常完成(只要收到了至少一個回覆記錄)。

從 2.3.5 版本開始,在超時後(如果 returnPartialOnTimeouttrue)也會呼叫謂詞。第一個引數是當前的記錄列表;第二個引數表示如果此呼叫是由於超時引起的,則為 true。謂詞可以修改記錄列表。

AggregatingReplyingKafkaTemplate<Integer, String, String> template =
        new AggregatingReplyingKafkaTemplate<>(producerFactory, container,
                        coll -> coll.size() == releaseSize);
...
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
        template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =
        future.get(30, TimeUnit.SECONDS);

請注意,返回型別是一個 ConsumerRecord,其值是 ConsumerRecord 的集合。"外部" ConsumerRecord 不是一個“真實”的記錄,它是模板合成的,用於儲存為請求收到的實際回覆記錄。當正常釋放發生(釋放策略返回 true)時,topic 設定為 aggregatedResults;如果 returnPartialOnTimeout 為 true 並且發生超時(並且收到了至少一個回覆記錄),則 topic 設定為 partialResultsAfterTimeout。模板為這些“topic”名稱提供了靜態常量變數

/**
 * Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
 * results in its value after a normal release by the release strategy.
 */
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";

/**
 * Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
 * results in its value after a timeout.
 */
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";

Collection 中的真實 ConsumerRecord 包含收到回覆的實際 topic。

回覆的監聽器容器**必須**配置為 AckMode.MANUALAckMode.MANUAL_IMMEDIATE;消費者屬性 enable.auto.commit 必須為 false(從 2.3 版本開始的預設值)。為了避免任何丟失訊息的可能性,模板只在沒有未完成的請求時提交偏移量,即當最後一個未完成的請求被釋放策略釋放時。再平衡後,可能會出現重複的回覆投遞;對於任何正在進行的請求,這些重複投遞將被忽略;當已釋放的回覆收到重複回覆時,您可能會看到錯誤日誌訊息。
如果您在此聚合模板中使用ErrorHandlingDeserializer,框架將不會自動檢測 DeserializationException。相反,記錄(值為 null)將完整返回,反序列化異常將位於頭部中。建議應用程式呼叫實用方法 ReplyingKafkaTemplate.checkDeserialization() 來確定是否發生了反序列化異常。更多資訊請參閱其 JavaDocs。此聚合模板也不會呼叫 replyErrorChecker;您應該對回覆的每個元素執行檢查。