提示、技巧和秘訣

使用 Kafka 實現簡單的 DLQ

問題描述

作為一名開發人員,我希望編寫一個消費應用程式來處理 Kafka 主題中的記錄。但是,如果在處理過程中發生錯誤,我不希望應用程式完全停止。相反,我希望將錯誤記錄傳送到 DLT(死信主題),然後繼續處理新記錄。

解決方案

解決此問題的方法是使用 Spring Cloud Stream 中的 DLQ 功能。為了便於討論,我們假設以下是我們的處理器函式。

@Bean
public Consumer<byte[]> processData() {
  return s -> {
     throw new RuntimeException();
  };
}

這是一個非常簡單的函式,它對所有處理的記錄都丟擲異常,但您可以採用此函式並將其擴充套件到任何其他類似情況。

為了將錯誤記錄傳送到 DLT,我們需要提供以下配置。

spring.cloud.stream:
  bindings:
   processData-in-0:
     group: my-group
     destination: input-topic
 kafka:
   bindings:
     processData-in-0:
       consumer:
         enableDlq: true
         dlqName: input-topic-dlq

為了啟用 DLQ,應用程式必須提供一個組名。匿名消費者不能使用 DLQ 功能。我們還需要透過將 Kafka 消費者繫結上的 enableDLQ 屬性設定為 true 來啟用 DLQ。最後,我們可以選擇透過在 Kafka 消費者繫結上提供 dlqName 來提供 DLT 名稱,否則在這種情況下它預設為 error.input-topic.my-group

請注意,在上面提供的示例消費者中,有效載荷的型別是 byte[]。預設情況下,Kafka 繫結器中的 DLQ 生產者期望有效載荷型別為 byte[]。如果不是這種情況,那麼我們需要提供適當的序列化器配置。例如,讓我們將消費者函式重寫如下

@Bean
public Consumer<String> processData() {
  return s -> {
     throw new RuntimeException();
  };
}

現在,我們需要告訴 Spring Cloud Stream,我們希望在寫入 DLT 時如何序列化資料。以下是此場景的修改配置

spring.cloud.stream:
  bindings:
   processData-in-0:
     group: my-group
     destination: input-topic
 kafka:
   bindings:
     processData-in-0:
       consumer:
         enableDlq: true
         dlqName: input-topic-dlq
         dlqProducerProperties:
           configuration:
             value.serializer: org.apache.kafka.common.serialization.StringSerializer

帶高階重試選項的 DLQ

問題描述

這與上面的秘訣類似,但作為一名開發人員,我希望配置重試的處理方式。

解決方案

如果您遵循了上述秘訣,那麼在處理遇到錯誤時,您將獲得 Kafka 繫結器中內建的預設重試選項。

預設情況下,繫結器最多重試 3 次,初始延遲為 1 秒,每次退避的乘數為 2.0,最大延遲為 10 秒。您可以更改所有這些配置,如下所示

spring.cloud.stream.bindings.processData-in-0.consumer.maxAttempts
spring.cloud.stream.bindings.processData-in-0.consumer.backOffInitialInterval
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMultiplier
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMaxInterval

如果您願意,您還可以透過提供布林值對映來提供可重試異常列表。例如,

spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalStateException=true
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalArgumentException=false

預設情況下,對映中未列出的任何異常都將重試。如果不需要,可以透過提供以下內容來停用它,

spring.cloud.stream.bindings.processData-in-0.consumer.defaultRetryable=false

您還可以提供自己的 RetryTemplate 並將其標記為 @StreamRetryTemplate,繫結器將掃描並使用它。當您需要更復雜的重試策略時,這會很有用。

如果您有多個 @StreamRetryTemplate bean,那麼您可以使用以下屬性指定您的繫結需要哪個:

spring.cloud.stream.bindings.processData-in-0.consumer.retry-template-name=<your-retry-template-bean-name>

使用 DLQ 處理反序列化錯誤

問題描述

我的處理器在 Kafka 消費者中遇到反序列化異常。我原以為 Spring Cloud Stream DLQ 機制會捕獲這種情況,但它沒有。我該如何處理?

解決方案

當 Kafka 消費者丟擲不可恢復的反序列化異常時,Spring Cloud Stream 提供的正常 DLQ 機制將無濟於事。這是因為,此異常甚至在消費者 poll() 方法返回之前就發生了。Spring for Apache Kafka 專案提供了一些很好的方法來幫助繫結器處理這種情況。讓我們來探討一下。

假設這是我們的函式

@Bean
public Consumer<String> functionName() {
    return s -> {
        System.out.println(s);
    };
}

這是一個採用 String 引數的簡單函式。

我們希望繞過 Spring Cloud Stream 提供的訊息轉換器,而改用原生反序列化器。對於 String 型別,這沒什麼意義,但對於 AVRO 等更復雜的型別,您必須依賴外部反序列化器,因此希望將轉換委託給 Kafka。

現在,當消費者接收到資料時,假設有一個導致反序列化錯誤的壞記錄,例如,可能有人傳遞了一個 Integer 而不是一個 String。在這種情況下,如果您不在應用程式中做任何事情,異常將透過鏈傳播,您的應用程式最終將退出。

為了處理此問題,您可以新增一個 ListenerContainerCustomizer @Bean,它配置一個 DefaultErrorHandler。此 DefaultErrorHandler 配置了一個 DeadLetterPublishingRecoverer。我們還需要為消費者配置一個 ErrorHandlingDeserializer。這聽起來很複雜,但實際上,在這種情況下它歸結為這 3 個 bean。

	@Bean
	public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(DefaultErrorHandler errorHandler) {
		return (container, dest, group) -> {
			container.setCommonErrorHandler(errorHandler);
		};
	}
	@Bean
	public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
		return new DefaultErrorHandler(deadLetterPublishingRecoverer);
	}
	@Bean
	public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
		return new DeadLetterPublishingRecoverer(bytesTemplate);
	}

讓我們分析一下它們中的每一個。第一個是 ListenerContainerCustomizer bean,它接受一個 DefaultErrorHandler。現在,容器已使用該特定錯誤處理程式進行自定義。您可以在此處瞭解有關容器自定義的更多資訊。

第二個 bean 是配置為釋出到 DLTDefaultErrorHandler。有關 DefaultErrorHandler 的更多詳細資訊,請參閱此處

第三個 bean 是 DeadLetterPublishingRecoverer,它最終負責傳送到 DLT。預設情況下,DLT 主題命名為 ORIGINAL_TOPIC_NAME.DLT。但是您可以更改它。有關詳細資訊,請參閱文件

我們還需要透過應用程式配置配置一個 ErrorHandlingDeserializer

ErrorHandlingDeserializer 委託給實際的反序列化器。如果出現錯誤,它會將記錄的鍵/值設定為 null,幷包含訊息的原始位元組。然後它在頭部設定異常並將此記錄傳遞給監聽器,然後監聽器呼叫註冊的錯誤處理程式。

以下是所需的配置

spring.cloud.stream:
  function:
    definition: functionName
  bindings:
    functionName-in-0:
      group: group-name
      destination: input-topic
      consumer:
       use-native-decoding: true
  kafka:
    bindings:
      functionName-in-0:
        consumer:
          enableDlq: true
          dlqName: dlq-topic
          dlqProducerProperties:
            configuration:
              value.serializer: org.apache.kafka.common.serialization.StringSerializer
          configuration:
            value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer

我們透過繫結上的 configuration 屬性提供 ErrorHandlingDeserializer。我們還指示要委託的實際反序列化器是 StringDeserializer

請記住,上述 dlq 屬性均與本秘訣中的討論無關。它們純粹用於處理任何應用程式級別的錯誤。

Kafka 繫結器中的基本偏移量管理

問題描述

我想編寫一個 Spring Cloud Stream Kafka 消費者應用程式,但不確定它如何管理 Kafka 消費者偏移量。您能解釋一下嗎?

解決方案

我們鼓勵您閱讀有關此內容的文件部分,以對其有透徹的瞭解。

簡而言之

Kafka 預設支援兩種型別的起始偏移量 - earliestlatest。它們的語義從名稱上就可以不言自明。

假設您是第一次執行消費者。如果您的 Spring Cloud Stream 應用程式中缺少 group.id,那麼它就成為一個匿名消費者。每當您有一個匿名消費者時,Spring Cloud Stream 應用程式預設會從主題分割槽中 latest 可用的偏移量開始。另一方面,如果您明確指定 group.id,那麼 Spring Cloud Stream 應用程式預設會從主題分割槽中 earliest 可用的偏移量開始。

在上述兩種情況(具有顯式組和匿名組的消費者)下,可以透過使用屬性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.startOffset 並將其設定為 earliestlatest 來切換起始偏移量。

現在,假設您之前已經執行過消費者,並且現在再次啟動它。在這種情況下,上述情況中的起始偏移量語義不適用,因為消費者找到了已為消費者組提交的偏移量(對於匿名消費者,儘管應用程式不提供 group.id,但繫結器將為您自動生成一個)。它只是從上次提交的偏移量繼續。即使提供了 startOffset 值,這也是正確的。

但是,您可以透過使用 resetOffsets 屬性來覆蓋消費者從上次提交的偏移量開始的預設行為。為此,請將屬性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.resetOffsets 設定為 true(預設值為 false)。然後確保提供 startOffset 值(earliestlatest)。當您這樣做並啟動消費者應用程式時,每次啟動時,它都會像第一次啟動一樣啟動,並忽略分割槽的任何已提交偏移量。

在 Kafka 中尋求任意偏移量

問題描述

使用 Kafka 繫結器,我知道它可以將偏移量設定為 earliestlatest,但我有一個需求,需要將偏移量定位到中間的某個任意偏移量。有什麼方法可以使用 Spring Cloud Stream Kafka 繫結器實現此目的嗎?

解決方案

前面我們看到了 Kafka 繫結器如何處理基本的偏移量管理。預設情況下,繫結器不允許您回溯到任意偏移量,至少透過我們在該秘訣中看到的機制是如此。但是,繫結器提供了一些低階策略來實現此用例。讓我們來探討一下。

首先,當您要重置為除 earliestlatest 之外的任意偏移量時,請確保將 resetOffsets 配置保留為其預設值,即 false。然後您必須提供一個型別為 KafkaBindingRebalanceListener 的自定義 bean,該 bean 將注入到所有消費者繫結中。它是一個帶有一些預設方法的介面,但這是我們感興趣的方法

/**
	 * Invoked when partitions are initially assigned or after a rebalance. Applications
	 * might only want to perform seek operations on an initial assignment. While the
	 * 'initial' argument is true for each thread (when concurrency is greater than 1),
	 * implementations should keep track of exactly which partitions have been sought.
	 * There is a race in that a rebalance could occur during startup and so a topic/
	 * partition that has been sought on one thread may be re-assigned to another
	 * thread and you may not wish to re-seek it at that time.
	 * @param bindingName the name of the binding.
	 * @param consumer the consumer.
	 * @param partitions the partitions.
	 * @param initial true if this is the initial assignment on the current thread.
	 */
	default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
			Collection<TopicPartition> partitions, boolean initial) {
		// do nothing
	}

讓我們看看細節。

本質上,此方法將在主題分割槽的初始分配期間或再平衡之後每次呼叫。為了更好地說明,我們假設我們的主題是 foo 並且它有 4 個分割槽。最初,我們只在該組中啟動一個消費者,並且此消費者將從所有分割槽消費。當消費者第一次啟動時,所有 4 個分割槽都將進行初始分配。但是,我們不希望分割槽從預設值開始消費(earliest,因為我們定義了一個組),而是希望每個分割槽在尋求到任意偏移量後開始消費。想象一下您有一個業務案例需要從以下特定偏移量消費。

Partition   start offset

0           1000
1           2000
2           2000
3           1000

這可以透過如下實現上述方法來實現。

@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {

    Map<TopicPartition, Long> topicPartitionOffset = new HashMap<>();
    topicPartitionOffset.put(new TopicPartition("foo", 0), 1000L);
    topicPartitionOffset.put(new TopicPartition("foo", 1), 2000L);
    topicPartitionOffset.put(new TopicPartition("foo", 2), 2000L);
    topicPartitionOffset.put(new TopicPartition("foo", 3), 1000L);

    if (initial) {
        partitions.forEach(tp -> {
            if (topicPartitionOffset.containsKey(tp)) {
                final Long offset = topicPartitionOffset.get(tp);
                try {
                    consumer.seek(tp, offset);
                }
                catch (Exception e) {
                    // Handle exceptions carefully.
                }
            }
        });
    }
}

這只是一個粗略的實現。現實世界的用例比這複雜得多,您需要相應地進行調整,但這確實為您提供了一個基本的草圖。當消費者 seek 失敗時,它可能會丟擲一些執行時異常,您需要決定在這種情況下如何處理。

[[what-if-we-start-a-second-consumer-with-the-same-group-id?]] === 如果我們啟動第二個具有相同組 ID 的消費者會怎樣?

當我們新增第二個消費者時,會發生再平衡,並且一些分割槽將被移動。假設新消費者獲得了分割槽 23。當這個新的 Spring Cloud Stream 消費者呼叫 onPartitionsAssigned 方法時,它將看到這是該消費者上分割槽 23 的初始分配。因此,它會因為對 initial 引數的條件檢查而執行 seek 操作。對於第一個消費者,它現在只擁有分割槽 01。但是,對於這個消費者來說,這僅僅是一個再平衡事件,不被認為是初始分配。因此,它不會因為對 initial 引數的條件檢查而重新尋求到給定偏移量。

[[how-do-i-manually-acknowledge-using-kafka-binder?]] == 如何使用 Kafka 繫結器手動確認?

問題描述

使用 Kafka 繫結器,我想在我的消費者中手動確認訊息。我該怎麼做?

解決方案

預設情況下,Kafka 繫結器委託給 Spring for Apache Kafka 專案中的預設提交設定。Spring Kafka 中的預設 ackModebatch。有關詳細資訊,請參閱此處

在某些情況下,您希望停用此預設提交行為並依賴手動提交。以下步驟允許您執行此操作。

將屬性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.ackMode 設定為 MANUALMANUAL_IMMEDIATE。當這樣設定時,消費者方法收到的訊息中將存在一個名為 kafka_acknowledgment(來自 KafkaHeaders.ACKNOWLEDGMENT)的頭。

例如,假設這是您的消費者方法。

@Bean
public Consumer<Message<String>> myConsumer() {
    return msg -> {
        Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
        if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
        }
    };
}

然後將屬性 spring.cloud.stream.kafka.bindings.myConsumer-in-0.consumer.ackMode 設定為 MANUALMANUAL_IMMEDIATE

[[how-do-i-override-the-default-binding-names-in-spring-cloud-stream?]] == 如何覆蓋 Spring Cloud Stream 中的預設繫結名稱?

問題描述

Spring Cloud Stream 根據函式定義和簽名建立預設繫結,但我如何將它們覆蓋為更符合領域友好的名稱?

解決方案

假設這是您的函式簽名。

@Bean
public Function<String, String> uppercase(){
...
}

預設情況下,Spring Cloud Stream 將建立如下繫結。

  1. uppercase-in-0

  2. uppercase-out-0

您可以使用以下屬性將這些繫結覆蓋為其他名稱。

spring.cloud.stream.function.bindings.uppercase-in-0=my-transformer-in
spring.cloud.stream.function.bindings.uppercase-out-0=my-transformer-out

此後,所有繫結屬性都必須在新名稱 my-transformer-inmy-transformer-out 上進行。

這是另一個使用 Kafka Streams 和多個輸入的示例。

@Bean
public BiFunction<KStream<String, Order>, KTable<String, Account>, KStream<String, EnrichedOrder>> processOrder() {
...
}

預設情況下,Spring Cloud Stream 將為此函式建立三個不同的繫結名稱。

  1. processOrder-in-0

  2. processOrder-in-1

  3. processOrder-out-0

每次您想要在此類繫結上設定一些配置時,都必須使用這些繫結名稱。您不喜歡這樣,並且希望使用更符合領域友好且更易讀的繫結名稱,例如:

  1. orders

  2. accounts

  3. enrichedOrders

您可以透過簡單地設定這三個屬性來實現這一點

  1. spring.cloud.stream.function.bindings.processOrder-in-0=orders

  2. spring.cloud.stream.function.bindings.processOrder-in-1=accounts

  3. spring.cloud.stream.function.bindings.processOrder-out-0=enrichedOrders

一旦您這樣做了,它將覆蓋預設的繫結名稱,並且您想要在它們上設定的任何屬性都必須使用這些新的繫結名稱。

[[how-do-i-send-a-message-key-as-part-of-my-record?]] == 如何將訊息鍵作為記錄的一部分發送?

問題描述

我需要將一個鍵與記錄的有效載荷一起傳送,Spring Cloud Stream 有辦法實現嗎?

解決方案

通常需要您將關聯資料結構(如對映)作為帶有鍵和值的記錄傳送。Spring Cloud Stream 允許您以直接的方式實現這一點。以下是實現此目的的基本藍圖,但您可能需要根據您的特定用例進行調整。

這是一個示例生產者方法(又名 Supplier)。

@Bean
public Supplier<Message<String>> supplier() {
    return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}

這是一個簡單的函式,它傳送一個帶有 String 有效載荷的訊息,但也有一個鍵。請注意,我們使用 KafkaHeaders.MESSAGE_KEY 將鍵設定為訊息頭。

如果您想更改預設的 kafka_messageKey 鍵,那麼在配置中,我們需要指定此屬性

spring.cloud.stream.kafka.bindings.supplier-out-0.producer.messageKeyExpression=headers['my-special-key']

請注意,我們使用繫結名稱 supplier-out-0,因為這是我們的函式名稱,請相應更新。

然後,我們在生成訊息時使用這個新鍵。

[[how-do-i-use-native-serializer-and-deserializer-instead-of-message-conversion-done-by-spring-cloud-stream?]] == 如何使用原生序列化器和反序列化器而不是 Spring Cloud Stream 完成的訊息轉換?

問題描述

我不想使用 Spring Cloud Stream 中的訊息轉換器,而是想在 Kafka 中使用原生序列化器和反序列化器。預設情況下,Spring Cloud Stream 使用其內部內建訊息轉換器處理此轉換。我如何繞過它並將責任委託給 Kafka?

解決方案

這真的很容易做到。

您所要做的就是提供以下屬性以啟用原生序列化。

spring.cloud.stream.kafka.bindings.<binding-name>.producer.useNativeEncoding: true

然後,您還需要設定序列化器。有兩種方法可以做到這一點。

spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer

或者使用繫結器配置。

spring.cloud.stream.kafka.binder.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer

當使用繫結器方式時,它應用於所有繫結,而將它們設定在繫結級別是針對每個繫結的。

在反序列化方面,您只需要將反序列化器作為配置提供。

例如,

spring.cloud.stream.kafka.bindings.<binding-name>.consumer.configuration.key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.deserializer: org.apache.kafka.common.serialization.StringDeserializer

您也可以在繫結器級別設定它們。

有一個可選屬性可以設定為強制原生解碼。

spring.cloud.stream.kafka.bindings.<binding-name>.consumer.useNativeDecoding: true

然而,在 Kafka 繫結器的情況下,這是不必要的,因為當它到達繫結器時,Kafka 已經使用配置的反序列化器對其進行了反序列化。

解釋 Kafka Streams 繫結器中的偏移量重置工作原理

問題描述

預設情況下,Kafka Streams 繫結器始終從新消費者的最早偏移量開始。有時,應用程式需要或要求從最新偏移量開始。Kafka Streams 繫結器允許您這樣做。

解決方案

在檢視解決方案之前,讓我們看以下場景。

@Bean
public BiConsumer<KStream<Object, Object>, KTable<Object, Object>> myBiConsumer{
    (s, t) -> s.join(t, ...)
    ...
}

我們有一個 BiConsumer bean,它需要兩個輸入繫結。在這種情況下,第一個繫結用於 KStream,第二個繫結用於 KTable。當第一次執行此應用程式時,預設情況下,兩個繫結都從 earliest 偏移量開始。如果由於某些要求,我希望從 latest 偏移量開始怎麼辦?您可以透過啟用以下屬性來實現這一點。

spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-0.consumer.startOffset: latest
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-1.consumer.startOffset: latest

如果您只想讓一個繫結從 latest 偏移量開始,而另一個繫結從預設的 earliest 消費者開始,那麼請將後一個繫結從配置中刪除。

請記住,一旦存在已提交的偏移量,這些設定將生效,並且已提交的偏移量將優先。

在 Kafka 中跟蹤記錄成功傳送(生產)

問題描述

我有一個 Kafka 生產者應用程式,我想跟蹤所有成功的傳送。

解決方案

讓我們假設應用程式中有以下供應商。

@Bean
	public Supplier<Message<String>> supplier() {
		return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
	}

然後,我們需要定義一個新的 MessageChannel bean 來捕獲所有成功的傳送資訊。

@Bean
	public MessageChannel fooRecordChannel() {
		return new DirectChannel();
	}

接下來,在應用程式配置中定義此屬性以提供 recordMetadataChannel 的 bean 名稱。

spring.cloud.stream.kafka.bindings.supplier-out-0.producer.recordMetadataChannel: fooRecordChannel

此時,成功的傳送資訊將傳送到 fooRecordChannel

您可以編寫一個 IntegrationFlow 如下所示來檢視資訊。

@Bean
public IntegrationFlow integrationFlow() {
    return f -> f.channel("fooRecordChannel")
                 .handle((payload, messageHeaders) -> payload);
}

handle 方法中,payload 是傳送到 Kafka 的內容,訊息頭包含一個名為 kafka_recordMetadata 的特殊鍵。其值是一個 RecordMetadata,其中包含有關主題分割槽、當前偏移量等資訊。

在 Kafka 中新增自定義頭對映器

問題描述

我有一個 Kafka 生產者應用程式,它設定了一些頭,但它們在消費者應用程式中丟失了。這是為什麼?

解決方案

在正常情況下,這應該沒問題。

想象一下,您有以下生產者。

@Bean
public Supplier<Message<String>> supply() {
    return () -> MessageBuilder.withPayload("foo").setHeader("foo", "bar").build();
}

在消費者端,您應該仍然看到頭“foo”,以下內容不應給您帶來任何問題。

@Bean
public Consumer<Message<String>> consume() {
    return s -> {
        final String foo = (String)s.getHeaders().get("foo");
        System.out.println(foo);
    };
}

如果您在應用程式中提供自定義頭對映器,那麼這將不起作用。假設您在應用程式中有一個空的 KafkaHeaderMapper

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {

        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {

        }
    };
}

如果這是您的實現,那麼您將錯過消費者端的 foo 頭。您可能在這些 KafkaHeaderMapper 方法中包含一些邏輯。您需要以下內容來填充 foo 頭。

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {
            final String foo = (String) headers.get("foo");
            target.add("foo", foo.getBytes());
        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {
            final Header foo = source.lastHeader("foo");
			target.put("foo", new String(foo.value()));
        }
    }

這將正確地將 foo 頭從生產者填充到消費者。

關於 id 頭的特別說明

在 Spring Cloud Stream 中,id 頭是一個特殊的頭,但有些應用程式可能希望擁有特殊的自定義 id 頭——例如 custom-idIDId。第一個(custom-id)將在沒有任何自定義頭對映器的情況下從生產者傳播到消費者。但是,如果您使用框架保留的 id 頭的一個變體(例如 IDIdiD 等)進行生產,那麼您將遇到框架內部的問題。有關此用例的更多上下文,請參閱此StackOverflow 帖子。在這種情況下,您必須使用自定義 KafkaHeaderMapper 來對映區分大小寫的 id 頭。例如,假設您有以下生產者。

@Bean
public Supplier<Message<String>> supply() {
    return () -> MessageBuilder.withPayload("foo").setHeader("Id", "my-id").build();
}

上面的 Id 頭將從消費端消失,因為它與框架 id 頭衝突。您可以提供自定義 KafkaHeaderMapper 來解決此問題。

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper1() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {
            final String myId = (String) headers.get("Id");
			target.add("Id", myId.getBytes());
        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {
            final Header Id = source.lastHeader("Id");
			target.put("Id", new String(Id.value()));
        }
    };
}

透過這樣做,idId 頭都將從生產者端提供給消費者端。

以事務方式生產到多個主題

問題描述

我如何以事務方式生產訊息到多個 Kafka 主題?

有關更多上下文,請參閱此StackOverflow 問題

解決方案

在 Kafka 繫結器中使用事務支援進行事務處理,然後提供一個 AfterRollbackProcessor。為了生產到多個主題,請使用 StreamBridge API。

以下是此程式碼片段

@Autowired
StreamBridge bridge;

@Bean
Consumer<String> input() {
    return str -> {
        System.out.println(str);
        this.bridge.send("left", str.toUpperCase());
        this.bridge.send("right", str.toLowerCase());
        if (str.equals("Fail")) {
            throw new RuntimeException("test");
        }
    };
}

@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BinderFactory binders) {
    return (container, dest, group) -> {
        ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
                MessageChannel.class)).getTransactionalProducerFactory();
        KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(pf);
        DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template);
        container.setAfterRollbackProcessor(rollbackProcessor);
    };
}

DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate<byte[], byte[]> template) {
    return new DefaultAfterRollbackProcessor<>(
            new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true);
}

所需配置

spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx-
spring.cloud.stream.kafka.binder.required-acks=all
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.destination=input
spring.cloud.stream.bindings.left.destination=left
spring.cloud.stream.bindings.right.destination=right

spring.cloud.stream.kafka.bindings.input-in-0.consumer.maxAttempts=1

為了測試,您可以使用以下內容

@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
    return args -> {
        System.in.read();
        template.send("input", "Fail".getBytes());
        template.send("input", "Good".getBytes());
    };
}

一些重要注意事項

請確保您的應用程式配置中沒有任何 DLQ 設定,因為我們手動配置 DLT(預設情況下,它將根據初始消費者函式釋出到名為 input.DLT 的主題)。此外,將消費者繫結上的 maxAttempts 重置為 1,以避免繫結器重試。在上面的示例中,它最多重試總共 3 次(初始嘗試 + FixedBackoff 中的 2 次嘗試)。

有關如何測試此程式碼的更多詳細資訊,請參閱StackOverflow 帖子。如果您使用 Spring Cloud Stream 透過新增更多消費者函式來測試它,請務必將消費者繫結上的 isolation-level 設定為 read-committed

StackOverflow 帖子也與此討論相關。

執行多個可輪詢消費者時應避免的陷阱

問題描述

如何執行多個可輪詢消費者例項併為每個例項生成唯一的 client.id

解決方案

假設我有以下定義

spring.cloud.stream.pollable-source: foo
spring.cloud.stream.bindings.foo-in-0.group: my-group

執行應用程式時,Kafka 消費者會生成一個 client.id(類似於 consumer-my-group-1)。對於每個執行的應用程式例項,此 client.id 將相同,從而導致意外問題。

為了解決這個問題,您可以在應用程式的每個例項上新增以下屬性

spring.cloud.stream.kafka.bindings.foo-in-0.consumer.configuration.client.id=${client.id}

有關更多詳細資訊,請參閱此GitHub 問題

© . This site is unofficial and not affiliated with VMware.