技巧、竅門和配方

Kafka 簡單 DLQ

問題陳述

作為開發者,我想編寫一個從 Kafka 主題處理記錄的消費者應用。但是,如果在處理過程中發生錯誤,我不希望應用完全停止。相反,我希望將出錯的記錄傳送到 DLT(死信主題),然後繼續處理新的記錄。

解決方案

解決此問題的方法是使用 Spring Cloud Stream 中的 DLQ 功能。為了本次討論的目的,我們假設以下是我們的處理器函式。

@Bean
public Consumer<byte[]> processData() {
  return s -> {
     throw new RuntimeException();
  };
}

這是一個非常簡單的函式,它對其處理的所有記錄都丟擲異常,但您可以採用此函式並將其擴充套件到任何其他類似情況。

為了將出錯的記錄傳送到 DLT,我們需要提供以下配置。

spring.cloud.stream:
  bindings:
   processData-in-0:
     group: my-group
     destination: input-topic
 kafka:
   bindings:
     processData-in-0:
       consumer:
         enableDlq: true
         dlqName: input-topic-dlq

為了啟用 DLQ,應用必須提供一個組名。匿名消費者無法使用 DLQ 功能。我們還需要透過將 Kafka 消費者繫結的 enableDLQ 屬性設定為 true 來啟用 DLQ。最後,我們可以選擇透過在 Kafka 消費者繫結上提供 dlqName 來指定 DLT 名稱,否則在此例中它將預設為 error.input-topic.my-group

請注意,在上面提供的示例消費者中,負載的型別是 byte[]。預設情況下,Kafka Binder 中的 DLQ 生產者期望負載型別為 byte[]。如果不是這種情況,那麼我們需要提供適當的序列化器配置。例如,讓我們將消費者函式重寫如下:

@Bean
public Consumer<String> processData() {
  return s -> {
     throw new RuntimeException();
  };
}

現在,我們需要告訴 Spring Cloud Stream,當寫入 DLT 時,我們希望如何序列化資料。以下是針對此場景修改後的配置:

spring.cloud.stream:
  bindings:
   processData-in-0:
     group: my-group
     destination: input-topic
 kafka:
   bindings:
     processData-in-0:
       consumer:
         enableDlq: true
         dlqName: input-topic-dlq
         dlqProducerProperties:
           configuration:
             value.serializer: org.apache.kafka.common.serialization.StringSerializer

帶高階重試選項的 DLQ

問題陳述

這與上面的配方類似,但作為開發者,我希望配置重試的處理方式。

解決方案

如果您遵循了上面的配方,那麼當處理遇到錯誤時,您將獲得內置於 Kafka Binder 中的預設重試選項。

預設情況下,Binder 會重試最多 3 次,初始延遲為一秒,每次回退的乘數為 2.0,最大延遲為 10 秒。您可以按如下方式更改所有這些配置:

spring.cloud.stream.bindings.processData-in-0.consumer.maxAttempts
spring.cloud.stream.bindings.processData-in-0.consumer.backOffInitialInterval
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMultiplier
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMaxInterval

如果您願意,還可以透過提供一個布林值對映來提供可重試異常的列表。例如:

spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalStateException=true
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalArgumentException=false

預設情況下,任何未在上述對映中列出的異常都將重試。如果不需要這樣,則可以透過提供以下配置來停用它:

spring.cloud.stream.bindings.processData-in-0.consumer.defaultRetryable=false

您還可以提供自己的 RetryTemplate,並將其標記為 @StreamRetryTemplate,Binder 將會掃描並使用它。這對於您想要更復雜的重試策略和策略時很有用。

如果您有多個 @StreamRetryTemplate bean,則可以使用以下屬性指定您的繫結需要哪一個:

spring.cloud.stream.bindings.processData-in-0.consumer.retry-template-name=<your-retry-template-bean-name>

處理帶有 DLQ 的反序列化錯誤

問題陳述

我的處理器在 Kafka 消費者中遇到了反序列化異常。我期望 Spring Cloud Stream DLQ 機制能夠捕獲這種情況,但它沒有。我該如何處理?

解決方案

當 Kafka 消費者丟擲不可恢復的反序列化異常時,Spring Cloud Stream 提供的常規 DLQ 機制將無濟於事。這是因為此異常甚至在消費者方法的 poll() 返回之前就發生了。Spring for Apache Kafka 專案提供了一些很好的方法來幫助 Binder 處理這種情況。讓我們來探討一下。

假設這是我們的函式:

@Bean
public Consumer<String> functionName() {
    return s -> {
        System.out.println(s);
    };
}

這是一個採用 String 引數的簡單函式。

我們希望繞過 Spring Cloud Stream 提供的訊息轉換器,轉而使用原生的反序列化器。對於 String 型別,這樣做意義不大,但對於 AVRO 等更復雜的型別,您必須依賴外部反序列化器,因此希望將轉換委託給 Kafka。

現在當消費者接收到資料時,假設有一個導致反序列化錯誤的壞記錄,例如有人傳遞了一個 Integer 而不是 String。在這種情況下,如果您不在應用中做任何處理,異常將沿著呼叫鏈傳播,最終導致您的應用退出。

為了處理這種情況,您可以新增一個 ListenerContainerCustomizer@Bean,它配置了一個 DefaultErrorHandler。這個 DefaultErrorHandler 配置了一個 DeadLetterPublishingRecoverer。我們還需要為消費者配置一個 ErrorHandlingDeserializer。聽起來很複雜,但實際上,在這種情況下,它歸結為這 3 個 bean:

	@Bean
	public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(DefaultErrorHandler errorHandler) {
		return (container, dest, group) -> {
			container.setCommonErrorHandler(errorHandler);
		};
	}
	@Bean
	public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
		return new DefaultErrorHandler(deadLetterPublishingRecoverer);
	}
	@Bean
	public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
		return new DeadLetterPublishingRecoverer(bytesTemplate);
	}

讓我們分析一下它們。第一個是 ListenerContainerCustomizer bean,它接受一個 DefaultErrorHandler。容器現在會使用特定的錯誤處理器進行定製。您可以在這裡瞭解更多關於容器定製的資訊。

第二個 bean 是 DefaultErrorHandler,它配置為釋出到 DLT。有關 DefaultErrorHandler 的更多詳細資訊,請參閱這裡

第三個 bean 是 DeadLetterPublishingRecoverer,它最終負責傳送到 DLT。預設情況下,DLT 主題的名稱為 ORIGINAL_TOPIC_NAME.DLT。不過您可以更改它。有關更多詳細資訊,請參閱文件

我們還需要透過應用配置來配置一個ErrorHandlingDeserializer

ErrorHandlingDeserializer 將委託給實際的反序列化器。如果發生錯誤,它將記錄的 key/value 設定為 null,幷包含訊息的原始位元組。然後它會在 header 中設定異常,並將此記錄傳遞給監聽器,監聽器隨後會呼叫註冊的錯誤處理器。

以下是所需的配置:

spring.cloud.stream:
  function:
    definition: functionName
  bindings:
    functionName-in-0:
      group: group-name
      destination: input-topic
      consumer:
       use-native-decoding: true
  kafka:
    bindings:
      functionName-in-0:
        consumer:
          enableDlq: true
          dlqName: dlq-topic
          dlqProducerProperties:
            configuration:
              value.serializer: org.apache.kafka.common.serialization.StringSerializer
          configuration:
            value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer

我們透過繫結上的 configuration 屬性提供了 ErrorHandlingDeserializer。我們還指出實際委託的反序列化器是 StringDeserializer

請記住,上面的任何 dlq 屬性都與本配方中的討論無關。它們純粹是為了處理任何應用級別的錯誤。

Kafka Binder 中的基本 offset 管理

問題陳述

我想編寫一個 Spring Cloud Stream Kafka 消費者應用,但不確定它是如何管理 Kafka 消費者 offset 的。您能解釋一下嗎?

解決方案

我們鼓勵您閱讀關於此主題的文件部分,以獲得全面的理解。

以下是其要點:

Kafka 預設支援兩種型別的起始 offset - earliestlatest。它們的語義從名稱中就可以看出。

假設您是第一次執行消費者。如果在您的 Spring Cloud Stream 應用中缺少 group.id,那麼它就會成為一個匿名消費者。無論何時您有一個匿名消費者,Spring Cloud Stream 應用預設都會從主題分割槽中可用的 latest offset 開始。另一方面,如果您明確指定了 group.id,則 Spring Cloud Stream 應用預設會從主題分割槽中可用的 earliest offset 開始。

在上述兩種情況(具有顯式組的消費者和匿名組的消費者)下,可以使用屬性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.startOffset 將起始 offset 切換為 earliestlatest

現在,假設您之前已經執行過消費者,現在再次啟動它。在這種情況下,上述情況中的起始 offset 語義不再適用,因為消費者會找到一個已經為消費者組提交的 offset(對於匿名消費者,儘管應用沒有提供 group.id,但 Binder 會自動為您生成一個)。它只會從最後提交的 offset 開始。即使您提供了 startOffset 值,情況也是如此。

但是,您可以使用 resetOffsets 屬性來覆蓋消費者從上次提交的 offset 開始的預設行為。要做到這一點,將屬性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.resetOffsets 設定為 true(預設為 false)。然後確保您提供了 startOffset 值(可以是 earliestlatest)。當您這樣做並啟動消費者應用時,每次啟動都會像第一次啟動一樣,忽略分割槽的所有已提交 offset。

在 Kafka 中定位到任意 offset

問題陳述

使用 Kafka Binder,我知道它可以將 offset 設定為 earliestlatest,但我需要將 offset 定位到中間的某個任意 offset。Spring Cloud Stream Kafka Binder 有什麼辦法實現這一點嗎?

解決方案

之前我們看到了 Kafka Binder 如何處理基本的 offset 管理。預設情況下,Binder 不允許您回退到任意 offset,至少透過我們在該配方中看到的機制不行。但是,Binder 提供了一些底層策略來實現此用例。讓我們來探討一下。

首先,當您想要重置到除 earliestlatest 之外的任意 offset 時,請確保將 resetOffsets 配置保留其預設值 false。然後,您必須提供一個型別為 KafkaBindingRebalanceListener 的自定義 bean,它將被注入到所有消費者繫結中。這是一個介面,帶有一些預設方法,但我們關注的是以下方法:

/**
	 * Invoked when partitions are initially assigned or after a rebalance. Applications
	 * might only want to perform seek operations on an initial assignment. While the
	 * 'initial' argument is true for each thread (when concurrency is greater than 1),
	 * implementations should keep track of exactly which partitions have been sought.
	 * There is a race in that a rebalance could occur during startup and so a topic/
	 * partition that has been sought on one thread may be re-assigned to another
	 * thread and you may not wish to re-seek it at that time.
	 * @param bindingName the name of the binding.
	 * @param consumer the consumer.
	 * @param partitions the partitions.
	 * @param initial true if this is the initial assignment on the current thread.
	 */
	default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
			Collection<TopicPartition> partitions, boolean initial) {
		// do nothing
	}

讓我們看看詳細資訊。

本質上,此方法將在主題分割槽的初始分配期間或再均衡後每次呼叫。為了更好地說明,假設我們的主題是 foo,它有 4 個分割槽。最初,我們只在該組中啟動一個消費者,此消費者將消費所有分割槽。當消費者第一次啟動時,所有 4 個分割槽都會獲得初始分配。但是,我們不想讓分割槽從預設值(因為我們定義了組,所以是 earliest)開始消費,而是希望每個分割槽在定位到任意 offset 後開始消費。想象一下,您有一個業務場景需要從以下特定 offset 開始消費:

Partition   start offset

0           1000
1           2000
2           2000
3           1000

這可以透過如下實現上述方法來實現:

@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {

    Map<TopicPartition, Long> topicPartitionOffset = new HashMap<>();
    topicPartitionOffset.put(new TopicPartition("foo", 0), 1000L);
    topicPartitionOffset.put(new TopicPartition("foo", 1), 2000L);
    topicPartitionOffset.put(new TopicPartition("foo", 2), 2000L);
    topicPartitionOffset.put(new TopicPartition("foo", 3), 1000L);

    if (initial) {
        partitions.forEach(tp -> {
            if (topicPartitionOffset.containsKey(tp)) {
                final Long offset = topicPartitionOffset.get(tp);
                try {
                    consumer.seek(tp, offset);
                }
                catch (Exception e) {
                    // Handle exceptions carefully.
                }
            }
        });
    }
}

這只是一個簡單的實現。實際的用例比這複雜得多,您需要根據情況進行調整,但這無疑為您提供了一個基本框架。當消費者 seek 失敗時,可能會丟擲一些執行時異常,您需要決定在這些情況下如何處理。

[[what-if-we-start-a-second-consumer-with-the-same-group-id?]] === 如果我們啟動第二個具有相同組 ID 的消費者怎麼辦?

當我們新增第二個消費者時,會發生再均衡,並且一些分割槽將移位。假設新消費者獲得分割槽 23。當這個新的 Spring Cloud Stream 消費者呼叫 onPartitionsAssigned 方法時,它會看到這是此消費者上分割槽 23 的初始分配。因此,由於對 initial 引數進行了條件檢查,它將執行 seek 操作。對於第一個消費者,它現在只有分割槽 01。但是,對於此消費者而言,這僅僅是一個再均衡事件,不被視為初始分配。因此,由於對 initial 引數進行了條件檢查,它將不會重新定位到給定的 offset。

[[how-do-i-manually-acknowledge-using-kafka-binder?]] == 如何使用 Kafka Binder 手動確認?

問題陳述

使用 Kafka Binder,我想在我的消費者中手動確認訊息。我該怎麼做?

解決方案

預設情況下,Kafka Binder 委託給 Spring for Apache Kafka 專案中的預設提交設定。Spring Kafka 中的預設 ackModebatch。有關更多詳細資訊,請參閱這裡

在某些情況下,您需要停用此預設提交行為,並依賴手動提交。以下步驟允許您這樣做。

將屬性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.ackMode 設定為 MANUALMANUAL_IMMEDIATE。當這樣設定時,消費者方法接收到的訊息中將包含一個名為 kafka_acknowledgment(來自 KafkaHeaders.ACKNOWLEDGMENT)的 header。

例如,假設這是您的消費者方法:

@Bean
public Consumer<Message<String>> myConsumer() {
    return msg -> {
        Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
        if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
        }
    };
}

然後將屬性 spring.cloud.stream.kafka.bindings.myConsumer-in-0.consumer.ackMode 設定為 MANUALMANUAL_IMMEDIATE

[[how-do-i-override-the-default-binding-names-in-spring-cloud-stream?]] == 如何在 Spring Cloud Stream 中覆蓋預設繫結名稱?

問題陳述

Spring Cloud Stream 根據函式定義和簽名建立預設繫結,但如何覆蓋這些名稱以使用更符合領域友好的名稱?

解決方案

假設以下是您的函式簽名:

@Bean
public Function<String, String> uppercase(){
...
}

預設情況下,Spring Cloud Stream 將按如下方式建立繫結:

  1. uppercase-in-0

  2. uppercase-out-0

您可以使用以下屬性將這些繫結覆蓋為其他名稱:

spring.cloud.stream.function.bindings.uppercase-in-0=my-transformer-in
spring.cloud.stream.function.bindings.uppercase-out-0=my-transformer-out

在此之後,所有繫結屬性都必須在新名稱 my-transformer-inmy-transformer-out 上設定。

這是另一個帶有 Kafka Streams 和多個輸入的示例:

@Bean
public BiFunction<KStream<String, Order>, KTable<String, Account>, KStream<String, EnrichedOrder>> processOrder() {
...
}

預設情況下,Spring Cloud Stream 將為此函式建立三個不同的繫結名稱:

  1. processOrder-in-0

  2. processOrder-in-1

  3. processOrder-out-0

每次要對此繫結設定某些配置時,都必須使用這些繫結名稱。您不喜歡這樣,並且希望使用更符合領域友好的、更易讀的繫結名稱,例如:

  1. orders

  2. accounts

  3. enrichedOrders

您只需設定這三個屬性即可輕鬆實現:

  1. spring.cloud.stream.function.bindings.processOrder-in-0=orders

  2. spring.cloud.stream.function.bindings.processOrder-in-1=accounts

  3. spring.cloud.stream.function.bindings.processOrder-out-0=enrichedOrders

完成此操作後,它將覆蓋預設的繫結名稱,您要設定的任何屬性都必須基於這些新的繫結名稱。

[[how-do-i-send-a-message-key-as-part-of-my-record?]] == 如何在我的記錄中包含訊息 key?

問題陳述

我需要與記錄的 payload 一起傳送一個 key,Spring Cloud Stream 有辦法做到這一點嗎?

解決方案

這非常容易做到。以下是實現此目的的基本藍圖,但您可能需要根據您的具體用例進行調整。

以下是示例生產者方法(即 Supplier):

@Bean
public Supplier<Message<String>> supplier() {
    return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}

這是一個簡單的函式,它傳送一個帶有 String payload 但也帶有 key 的訊息。請注意,我們使用 KafkaHeaders.MESSAGE_KEY 將 key 設定為訊息 header。

如果您想更改預設的 key(預設為 kafka_messageKey),則需要在配置中指定此屬性:

spring.cloud.stream.kafka.bindings.supplier-out-0.producer.messageKeyExpression=headers['my-special-key']

請注意,我們使用繫結名稱 supplier-out-0,因為這是我們的函式名稱,請相應更新。

然後,我們在生成訊息時使用這個新的 key。

[[how-do-i-use-native-serializer-and-deserializer-instead-of-message-conversion-done-by-spring-cloud-stream?]] == 如何使用原生的序列化器和反序列化器,而不是 Spring Cloud Stream 的訊息轉換?

問題陳述

我不想使用 Spring Cloud Stream 中的訊息轉換器,而是想使用 Kafka 中的原生 Serializer 和 Deserializer。預設情況下,Spring Cloud Stream 使用其內建的訊息轉換器來處理此轉換。我如何繞過它並將責任委託給 Kafka?

解決方案

這真的很容易做到。

您只需提供以下屬性即可啟用原生序列化:

spring.cloud.stream.kafka.bindings.<binding-name>.producer.useNativeEncoding: true

然後,您還需要設定序列化器。有兩種方法可以做到這一點。

spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer

或者使用 Binder 配置:

spring.cloud.stream.kafka.binder.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer

使用 Binder 方式時,它應用於所有繫結,而在繫結級別設定它們則針對每個繫結。

在反序列化方面,您只需將反序列化器作為配置提供即可。

例如:

spring.cloud.stream.kafka.bindings.<binding-name>.consumer.configuration.key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.deserializer: org.apache.kafka.common.serialization.StringDeserializer

您也可以在 Binder 級別設定它們。

有一個可選屬性,您可以設定它來強制進行原生解碼。

spring.cloud.stream.kafka.bindings.<binding-name>.consumer.useNativeDecoding: true

但是,對於 Kafka Binder,這是不必要的,因為它到達 Binder 時,Kafka 已經使用配置的反序列化器對其進行了反序列化。

解釋 Kafka Streams Binder 中的 offset 重置如何工作

問題陳述

預設情況下,Kafka Streams Binder 對於新消費者始終從 earliest offset 開始。有時,應用程式需要從 latest offset 開始,這可能是有益的或必需的。Kafka Streams Binder 允許您做到這一點。

解決方案

在我們檢視解決方案之前,讓我們看看以下場景。

@Bean
public BiConsumer<KStream<Object, Object>, KTable<Object, Object>> myBiConsumer{
    (s, t) -> s.join(t, ...)
    ...
}

我們有一個需要兩個輸入繫結的 BiConsumer bean。在這種情況下,第一個繫結用於 KStream,第二個繫結用於 KTable。當第一次執行此應用程式時,預設情況下,兩個繫結都從 earliest offset 開始。如果由於某些要求,我只想從 latest offset 開始怎麼辦?您可以透過啟用以下屬性來實現此目的。

spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-0.consumer.startOffset: latest
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-1.consumer.startOffset: latest

如果您只想讓一個繫結從 latest offset 開始,而另一個繫結從預設的 earliest 開始,則將後一個繫結排除在配置之外即可。

請記住,一旦存在已提交的 offset,這些設定就將 **不** 會生效,並且已提交的 offset 具有優先權。

跟蹤 Kafka 中記錄成功傳送(生產)的情況

問題陳述

我有一個 Kafka 生產者應用,我想跟蹤所有成功的傳送。

解決方案

假設我們在應用程式中有以下 Supplier:

@Bean
	public Supplier<Message<String>> supplier() {
		return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
	}

然後,我們需要定義一個新的 MessageChannel bean 來捕獲所有成功的傳送資訊。

@Bean
	public MessageChannel fooRecordChannel() {
		return new DirectChannel();
	}

接下來,在應用程式配置中定義此屬性,為 recordMetadataChannel 提供 bean 名稱。

spring.cloud.stream.kafka.bindings.supplier-out-0.producer.recordMetadataChannel: fooRecordChannel

此時,成功的傳送資訊將傳送到 fooRecordChannel

您可以按如下方式編寫 IntegrationFlow 以檢視資訊。

@Bean
public IntegrationFlow integrationFlow() {
    return f -> f.channel("fooRecordChannel")
                 .handle((payload, messageHeaders) -> payload);
}

handle 方法中,payload 是傳送到 Kafka 的內容,訊息 header 中包含一個特殊 key kafka_recordMetadata。其值是一個 RecordMetadata,包含主題分割槽、當前 offset 等資訊。

在 Kafka 中新增自定義 header mapper

問題陳述

我有一個 Kafka 生產者應用程式,它設定了一些 header,但在消費者應用程式中它們不見了。這是為什麼?

解決方案

正常情況下,這應該沒有問題。

想象一下,您有以下生產者:

@Bean
public Supplier<Message<String>> supply() {
    return () -> MessageBuilder.withPayload("foo").setHeader("foo", "bar").build();
}

在消費者端,您仍然應該看到 header "foo",並且以下程式碼不應該有任何問題:

@Bean
public Consumer<Message<String>> consume() {
    return s -> {
        final String foo = (String)s.getHeaders().get("foo");
        System.out.println(foo);
    };
}

如果您在應用程式中提供了自定義 header mapper,那麼這將不起作用。假設您在應用程式中有一個空的 KafkaHeaderMapper

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {

        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {

        }
    };
}

如果這是您的實現,那麼您在消費者端將丟失 foo header。很可能,您在這些 KafkaHeaderMapper 方法內部有一些邏輯。您需要以下程式碼來填充 foo header:

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {
            final String foo = (String) headers.get("foo");
            target.add("foo", foo.getBytes());
        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {
            final Header foo = source.lastHeader("foo");
			target.put("foo", new String(foo.value()));
        }
    }

這將正確地將 foo header 從生產者端傳遞到消費者端。

關於 id header 的特別說明

在 Spring Cloud Stream 中,id header 是一個特殊的 header,但有些應用程式可能希望有特殊的自定義 id header - 例如 custom-idIDId。第一個 (custom-id) 將在沒有自定義 header mapper 的情況下從生產者傳播到消費者。但是,如果您使用框架保留的 id header 的變體(例如 ID, Id, iD 等)進行生產,則會遇到框架內部的問題。有關此用例的更多上下文,請參閱此Stack Overflow 帖子。在這種情況下,您必須使用自定義 KafkaHeaderMapper 來對映區分大小寫的 id header。例如,假設您有以下生產者:

@Bean
public Supplier<Message<String>> supply() {
    return () -> MessageBuilder.withPayload("foo").setHeader("Id", "my-id").build();
}

上面示例中的 Id header 將會從消費端丟失,因為它與框架的 id header 衝突。您可以提供一個自定義 KafkaHeaderMapper 來解決此問題。

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper1() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {
            final String myId = (String) headers.get("Id");
			target.add("Id", myId.getBytes());
        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {
            final Header Id = source.lastHeader("Id");
			target.put("Id", new String(Id.value()));
        }
    };
}

透過這樣做,idId header 都將從生產者端可用於消費者端。

事務中生產到多個主題

問題陳述

如何將事務性訊息生產到多個 Kafka 主題?

有關更多上下文,請參閱此Stack Overflow 問題

解決方案

使用 Kafka Binder 中的事務支援進行事務處理,然後提供一個 AfterRollbackProcessor。為了生產到多個主題,使用 StreamBridge API。

以下是實現此目的的程式碼片段:

@Autowired
StreamBridge bridge;

@Bean
Consumer<String> input() {
    return str -> {
        System.out.println(str);
        this.bridge.send("left", str.toUpperCase());
        this.bridge.send("right", str.toLowerCase());
        if (str.equals("Fail")) {
            throw new RuntimeException("test");
        }
    };
}

@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BinderFactory binders) {
    return (container, dest, group) -> {
        ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
                MessageChannel.class)).getTransactionalProducerFactory();
        KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(pf);
        DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template);
        container.setAfterRollbackProcessor(rollbackProcessor);
    };
}

DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate<byte[], byte[]> template) {
    return new DefaultAfterRollbackProcessor<>(
            new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true);
}

所需配置

spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx-
spring.cloud.stream.kafka.binder.required-acks=all
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.destination=input
spring.cloud.stream.bindings.left.destination=left
spring.cloud.stream.bindings.right.destination=right

spring.cloud.stream.kafka.bindings.input-in-0.consumer.maxAttempts=1

為了測試,您可以使用以下配置:

@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
    return args -> {
        System.in.read();
        template.send("input", "Fail".getBytes());
        template.send("input", "Good".getBytes());
    };
}

一些重要說明

請確保您的應用程式配置中沒有 DLQ 設定,因為我們手動配置 DLT(預設情況下它將釋出到名為 input.DLT 的主題,基於初始消費者函式)。此外,將消費者繫結的 maxAttempts 重置為 1,以避免 Binder 進行重試。在上面的示例中,總共會嘗試最多 3 次(初始嘗試 + FixedBackoff 中的 2 次嘗試)。

有關如何測試此程式碼的更多詳細資訊,請參閱Stack Overflow 帖子。如果您正在使用 Spring Cloud Stream 透過新增更多消費者函式來測試它,請確保將消費者繫結上的 isolation-level 設定為 read-committed

這篇Stack Overflow 帖子也與此討論相關。

執行多個 pollable 消費者時要避免的陷阱

問題陳述

如何執行多個 pollable 消費者例項併為每個例項生成唯一的 client.id

解決方案

假設我有以下定義:

spring.cloud.stream.pollable-source: foo
spring.cloud.stream.bindings.foo-in-0.group: my-group

執行應用程式時,Kafka 消費者會生成一個 client.id(類似於 consumer-my-group-1)。對於執行的每個應用程式例項,此 client.id 將是相同的,從而導致意外問題。

為了解決此問題,您可以在應用程式的每個例項上新增以下屬性

spring.cloud.stream.kafka.bindings.foo-in-0.consumer.configuration.client.id=${client.id}

有關更多詳細資訊,請參閱此 GitHub 問題