技巧、竅門和配方
Kafka 簡單 DLQ
問題陳述
作為開發者,我想編寫一個從 Kafka 主題處理記錄的消費者應用。但是,如果在處理過程中發生錯誤,我不希望應用完全停止。相反,我希望將出錯的記錄傳送到 DLT(死信主題),然後繼續處理新的記錄。
解決方案
解決此問題的方法是使用 Spring Cloud Stream 中的 DLQ 功能。為了本次討論的目的,我們假設以下是我們的處理器函式。
@Bean
public Consumer<byte[]> processData() {
return s -> {
throw new RuntimeException();
};
}
這是一個非常簡單的函式,它對其處理的所有記錄都丟擲異常,但您可以採用此函式並將其擴充套件到任何其他類似情況。
為了將出錯的記錄傳送到 DLT,我們需要提供以下配置。
spring.cloud.stream:
bindings:
processData-in-0:
group: my-group
destination: input-topic
kafka:
bindings:
processData-in-0:
consumer:
enableDlq: true
dlqName: input-topic-dlq
為了啟用 DLQ,應用必須提供一個組名。匿名消費者無法使用 DLQ 功能。我們還需要透過將 Kafka 消費者繫結的 enableDLQ
屬性設定為 true
來啟用 DLQ。最後,我們可以選擇透過在 Kafka 消費者繫結上提供 dlqName
來指定 DLT 名稱,否則在此例中它將預設為 error.input-topic.my-group
。
請注意,在上面提供的示例消費者中,負載的型別是 byte[]
。預設情況下,Kafka Binder 中的 DLQ 生產者期望負載型別為 byte[]
。如果不是這種情況,那麼我們需要提供適當的序列化器配置。例如,讓我們將消費者函式重寫如下:
@Bean
public Consumer<String> processData() {
return s -> {
throw new RuntimeException();
};
}
現在,我們需要告訴 Spring Cloud Stream,當寫入 DLT 時,我們希望如何序列化資料。以下是針對此場景修改後的配置:
spring.cloud.stream:
bindings:
processData-in-0:
group: my-group
destination: input-topic
kafka:
bindings:
processData-in-0:
consumer:
enableDlq: true
dlqName: input-topic-dlq
dlqProducerProperties:
configuration:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
帶高階重試選項的 DLQ
解決方案
如果您遵循了上面的配方,那麼當處理遇到錯誤時,您將獲得內置於 Kafka Binder 中的預設重試選項。
預設情況下,Binder 會重試最多 3 次,初始延遲為一秒,每次回退的乘數為 2.0,最大延遲為 10 秒。您可以按如下方式更改所有這些配置:
spring.cloud.stream.bindings.processData-in-0.consumer.maxAttempts
spring.cloud.stream.bindings.processData-in-0.consumer.backOffInitialInterval
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMultiplier
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMaxInterval
如果您願意,還可以透過提供一個布林值對映來提供可重試異常的列表。例如:
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalStateException=true
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalArgumentException=false
預設情況下,任何未在上述對映中列出的異常都將重試。如果不需要這樣,則可以透過提供以下配置來停用它:
spring.cloud.stream.bindings.processData-in-0.consumer.defaultRetryable=false
您還可以提供自己的 RetryTemplate
,並將其標記為 @StreamRetryTemplate
,Binder 將會掃描並使用它。這對於您想要更復雜的重試策略和策略時很有用。
如果您有多個 @StreamRetryTemplate
bean,則可以使用以下屬性指定您的繫結需要哪一個:
spring.cloud.stream.bindings.processData-in-0.consumer.retry-template-name=<your-retry-template-bean-name>
處理帶有 DLQ 的反序列化錯誤
解決方案
當 Kafka 消費者丟擲不可恢復的反序列化異常時,Spring Cloud Stream 提供的常規 DLQ 機制將無濟於事。這是因為此異常甚至在消費者方法的 poll()
返回之前就發生了。Spring for Apache Kafka 專案提供了一些很好的方法來幫助 Binder 處理這種情況。讓我們來探討一下。
假設這是我們的函式:
@Bean
public Consumer<String> functionName() {
return s -> {
System.out.println(s);
};
}
這是一個採用 String
引數的簡單函式。
我們希望繞過 Spring Cloud Stream 提供的訊息轉換器,轉而使用原生的反序列化器。對於 String
型別,這樣做意義不大,但對於 AVRO 等更復雜的型別,您必須依賴外部反序列化器,因此希望將轉換委託給 Kafka。
現在當消費者接收到資料時,假設有一個導致反序列化錯誤的壞記錄,例如有人傳遞了一個 Integer
而不是 String
。在這種情況下,如果您不在應用中做任何處理,異常將沿著呼叫鏈傳播,最終導致您的應用退出。
為了處理這種情況,您可以新增一個 ListenerContainerCustomizer
的 @Bean
,它配置了一個 DefaultErrorHandler
。這個 DefaultErrorHandler
配置了一個 DeadLetterPublishingRecoverer
。我們還需要為消費者配置一個 ErrorHandlingDeserializer
。聽起來很複雜,但實際上,在這種情況下,它歸結為這 3 個 bean:
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(DefaultErrorHandler errorHandler) {
return (container, dest, group) -> {
container.setCommonErrorHandler(errorHandler);
};
}
@Bean
public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new DefaultErrorHandler(deadLetterPublishingRecoverer);
}
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
return new DeadLetterPublishingRecoverer(bytesTemplate);
}
讓我們分析一下它們。第一個是 ListenerContainerCustomizer
bean,它接受一個 DefaultErrorHandler
。容器現在會使用特定的錯誤處理器進行定製。您可以在這裡瞭解更多關於容器定製的資訊。
第二個 bean 是 DefaultErrorHandler
,它配置為釋出到 DLT
。有關 DefaultErrorHandler
的更多詳細資訊,請參閱這裡。
第三個 bean 是 DeadLetterPublishingRecoverer
,它最終負責傳送到 DLT
。預設情況下,DLT
主題的名稱為 ORIGINAL_TOPIC_NAME.DLT。不過您可以更改它。有關更多詳細資訊,請參閱文件。
我們還需要透過應用配置來配置一個ErrorHandlingDeserializer。
ErrorHandlingDeserializer
將委託給實際的反序列化器。如果發生錯誤,它將記錄的 key/value 設定為 null,幷包含訊息的原始位元組。然後它會在 header 中設定異常,並將此記錄傳遞給監聽器,監聽器隨後會呼叫註冊的錯誤處理器。
以下是所需的配置:
spring.cloud.stream:
function:
definition: functionName
bindings:
functionName-in-0:
group: group-name
destination: input-topic
consumer:
use-native-decoding: true
kafka:
bindings:
functionName-in-0:
consumer:
enableDlq: true
dlqName: dlq-topic
dlqProducerProperties:
configuration:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
configuration:
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
我們透過繫結上的 configuration
屬性提供了 ErrorHandlingDeserializer
。我們還指出實際委託的反序列化器是 StringDeserializer
。
請記住,上面的任何 dlq 屬性都與本配方中的討論無關。它們純粹是為了處理任何應用級別的錯誤。
Kafka Binder 中的基本 offset 管理
解決方案
我們鼓勵您閱讀關於此主題的文件部分,以獲得全面的理解。
以下是其要點:
Kafka 預設支援兩種型別的起始 offset - earliest
和 latest
。它們的語義從名稱中就可以看出。
假設您是第一次執行消費者。如果在您的 Spring Cloud Stream 應用中缺少 group.id,那麼它就會成為一個匿名消費者。無論何時您有一個匿名消費者,Spring Cloud Stream 應用預設都會從主題分割槽中可用的 latest
offset 開始。另一方面,如果您明確指定了 group.id,則 Spring Cloud Stream 應用預設會從主題分割槽中可用的 earliest
offset 開始。
在上述兩種情況(具有顯式組的消費者和匿名組的消費者)下,可以使用屬性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.startOffset
將起始 offset 切換為 earliest
或 latest
。
現在,假設您之前已經執行過消費者,現在再次啟動它。在這種情況下,上述情況中的起始 offset 語義不再適用,因為消費者會找到一個已經為消費者組提交的 offset(對於匿名消費者,儘管應用沒有提供 group.id,但 Binder 會自動為您生成一個)。它只會從最後提交的 offset 開始。即使您提供了 startOffset
值,情況也是如此。
但是,您可以使用 resetOffsets
屬性來覆蓋消費者從上次提交的 offset 開始的預設行為。要做到這一點,將屬性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.resetOffsets
設定為 true
(預設為 false
)。然後確保您提供了 startOffset
值(可以是 earliest
或 latest
)。當您這樣做並啟動消費者應用時,每次啟動都會像第一次啟動一樣,忽略分割槽的所有已提交 offset。
在 Kafka 中定位到任意 offset
問題陳述
使用 Kafka Binder,我知道它可以將 offset 設定為 earliest
或 latest
,但我需要將 offset 定位到中間的某個任意 offset。Spring Cloud Stream Kafka Binder 有什麼辦法實現這一點嗎?
解決方案
之前我們看到了 Kafka Binder 如何處理基本的 offset 管理。預設情況下,Binder 不允許您回退到任意 offset,至少透過我們在該配方中看到的機制不行。但是,Binder 提供了一些底層策略來實現此用例。讓我們來探討一下。
首先,當您想要重置到除 earliest
或 latest
之外的任意 offset 時,請確保將 resetOffsets
配置保留其預設值 false
。然後,您必須提供一個型別為 KafkaBindingRebalanceListener
的自定義 bean,它將被注入到所有消費者繫結中。這是一個介面,帶有一些預設方法,但我們關注的是以下方法:
/**
* Invoked when partitions are initially assigned or after a rebalance. Applications
* might only want to perform seek operations on an initial assignment. While the
* 'initial' argument is true for each thread (when concurrency is greater than 1),
* implementations should keep track of exactly which partitions have been sought.
* There is a race in that a rebalance could occur during startup and so a topic/
* partition that has been sought on one thread may be re-assigned to another
* thread and you may not wish to re-seek it at that time.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
* @param initial true if this is the initial assignment on the current thread.
*/
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
Collection<TopicPartition> partitions, boolean initial) {
// do nothing
}
讓我們看看詳細資訊。
本質上,此方法將在主題分割槽的初始分配期間或再均衡後每次呼叫。為了更好地說明,假設我們的主題是 foo
,它有 4 個分割槽。最初,我們只在該組中啟動一個消費者,此消費者將消費所有分割槽。當消費者第一次啟動時,所有 4 個分割槽都會獲得初始分配。但是,我們不想讓分割槽從預設值(因為我們定義了組,所以是 earliest
)開始消費,而是希望每個分割槽在定位到任意 offset 後開始消費。想象一下,您有一個業務場景需要從以下特定 offset 開始消費:
Partition start offset
0 1000
1 2000
2 2000
3 1000
這可以透過如下實現上述方法來實現:
@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {
Map<TopicPartition, Long> topicPartitionOffset = new HashMap<>();
topicPartitionOffset.put(new TopicPartition("foo", 0), 1000L);
topicPartitionOffset.put(new TopicPartition("foo", 1), 2000L);
topicPartitionOffset.put(new TopicPartition("foo", 2), 2000L);
topicPartitionOffset.put(new TopicPartition("foo", 3), 1000L);
if (initial) {
partitions.forEach(tp -> {
if (topicPartitionOffset.containsKey(tp)) {
final Long offset = topicPartitionOffset.get(tp);
try {
consumer.seek(tp, offset);
}
catch (Exception e) {
// Handle exceptions carefully.
}
}
});
}
}
這只是一個簡單的實現。實際的用例比這複雜得多,您需要根據情況進行調整,但這無疑為您提供了一個基本框架。當消費者 seek
失敗時,可能會丟擲一些執行時異常,您需要決定在這些情況下如何處理。
[[what-if-we-start-a-second-consumer-with-the-same-group-id?]] === 如果我們啟動第二個具有相同組 ID 的消費者怎麼辦?
當我們新增第二個消費者時,會發生再均衡,並且一些分割槽將移位。假設新消費者獲得分割槽 2
和 3
。當這個新的 Spring Cloud Stream 消費者呼叫 onPartitionsAssigned
方法時,它會看到這是此消費者上分割槽 2
和 3
的初始分配。因此,由於對 initial
引數進行了條件檢查,它將執行 seek 操作。對於第一個消費者,它現在只有分割槽 0
和 1
。但是,對於此消費者而言,這僅僅是一個再均衡事件,不被視為初始分配。因此,由於對 initial
引數進行了條件檢查,它將不會重新定位到給定的 offset。
[[how-do-i-manually-acknowledge-using-kafka-binder?]] == 如何使用 Kafka Binder 手動確認?
解決方案
預設情況下,Kafka Binder 委託給 Spring for Apache Kafka 專案中的預設提交設定。Spring Kafka 中的預設 ackMode
是 batch
。有關更多詳細資訊,請參閱這裡。
在某些情況下,您需要停用此預設提交行為,並依賴手動提交。以下步驟允許您這樣做。
將屬性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.ackMode
設定為 MANUAL
或 MANUAL_IMMEDIATE
。當這樣設定時,消費者方法接收到的訊息中將包含一個名為 kafka_acknowledgment
(來自 KafkaHeaders.ACKNOWLEDGMENT
)的 header。
例如,假設這是您的消費者方法:
@Bean
public Consumer<Message<String>> myConsumer() {
return msg -> {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
};
}
然後將屬性 spring.cloud.stream.kafka.bindings.myConsumer-in-0.consumer.ackMode
設定為 MANUAL
或 MANUAL_IMMEDIATE
。
[[how-do-i-override-the-default-binding-names-in-spring-cloud-stream?]] == 如何在 Spring Cloud Stream 中覆蓋預設繫結名稱?
解決方案
假設以下是您的函式簽名:
@Bean
public Function<String, String> uppercase(){
...
}
預設情況下,Spring Cloud Stream 將按如下方式建立繫結:
-
uppercase-in-0
-
uppercase-out-0
您可以使用以下屬性將這些繫結覆蓋為其他名稱:
spring.cloud.stream.function.bindings.uppercase-in-0=my-transformer-in
spring.cloud.stream.function.bindings.uppercase-out-0=my-transformer-out
在此之後,所有繫結屬性都必須在新名稱 my-transformer-in
和 my-transformer-out
上設定。
這是另一個帶有 Kafka Streams 和多個輸入的示例:
@Bean
public BiFunction<KStream<String, Order>, KTable<String, Account>, KStream<String, EnrichedOrder>> processOrder() {
...
}
預設情況下,Spring Cloud Stream 將為此函式建立三個不同的繫結名稱:
-
processOrder-in-0
-
processOrder-in-1
-
processOrder-out-0
每次要對此繫結設定某些配置時,都必須使用這些繫結名稱。您不喜歡這樣,並且希望使用更符合領域友好的、更易讀的繫結名稱,例如:
-
orders
-
accounts
-
enrichedOrders
您只需設定這三個屬性即可輕鬆實現:
-
spring.cloud.stream.function.bindings.processOrder-in-0=orders
-
spring.cloud.stream.function.bindings.processOrder-in-1=accounts
-
spring.cloud.stream.function.bindings.processOrder-out-0=enrichedOrders
完成此操作後,它將覆蓋預設的繫結名稱,您要設定的任何屬性都必須基於這些新的繫結名稱。
[[how-do-i-send-a-message-key-as-part-of-my-record?]] == 如何在我的記錄中包含訊息 key?
解決方案
這非常容易做到。以下是實現此目的的基本藍圖,但您可能需要根據您的具體用例進行調整。
以下是示例生產者方法(即 Supplier
):
@Bean
public Supplier<Message<String>> supplier() {
return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}
這是一個簡單的函式,它傳送一個帶有 String
payload 但也帶有 key 的訊息。請注意,我們使用 KafkaHeaders.MESSAGE_KEY
將 key 設定為訊息 header。
如果您想更改預設的 key(預設為 kafka_messageKey
),則需要在配置中指定此屬性:
spring.cloud.stream.kafka.bindings.supplier-out-0.producer.messageKeyExpression=headers['my-special-key']
請注意,我們使用繫結名稱 supplier-out-0
,因為這是我們的函式名稱,請相應更新。
然後,我們在生成訊息時使用這個新的 key。
[[how-do-i-use-native-serializer-and-deserializer-instead-of-message-conversion-done-by-spring-cloud-stream?]] == 如何使用原生的序列化器和反序列化器,而不是 Spring Cloud Stream 的訊息轉換?
問題陳述
我不想使用 Spring Cloud Stream 中的訊息轉換器,而是想使用 Kafka 中的原生 Serializer 和 Deserializer。預設情況下,Spring Cloud Stream 使用其內建的訊息轉換器來處理此轉換。我如何繞過它並將責任委託給 Kafka?
解決方案
這真的很容易做到。
您只需提供以下屬性即可啟用原生序列化:
spring.cloud.stream.kafka.bindings.<binding-name>.producer.useNativeEncoding: true
然後,您還需要設定序列化器。有兩種方法可以做到這一點。
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer
或者使用 Binder 配置:
spring.cloud.stream.kafka.binder.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer
使用 Binder 方式時,它應用於所有繫結,而在繫結級別設定它們則針對每個繫結。
在反序列化方面,您只需將反序列化器作為配置提供即可。
例如:
spring.cloud.stream.kafka.bindings.<binding-name>.consumer.configuration.key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
您也可以在 Binder 級別設定它們。
有一個可選屬性,您可以設定它來強制進行原生解碼。
spring.cloud.stream.kafka.bindings.<binding-name>.consumer.useNativeDecoding: true
但是,對於 Kafka Binder,這是不必要的,因為它到達 Binder 時,Kafka 已經使用配置的反序列化器對其進行了反序列化。
解釋 Kafka Streams Binder 中的 offset 重置如何工作
問題陳述
預設情況下,Kafka Streams Binder 對於新消費者始終從 earliest
offset 開始。有時,應用程式需要從 latest
offset 開始,這可能是有益的或必需的。Kafka Streams Binder 允許您做到這一點。
解決方案
在我們檢視解決方案之前,讓我們看看以下場景。
@Bean
public BiConsumer<KStream<Object, Object>, KTable<Object, Object>> myBiConsumer{
(s, t) -> s.join(t, ...)
...
}
我們有一個需要兩個輸入繫結的 BiConsumer
bean。在這種情況下,第一個繫結用於 KStream
,第二個繫結用於 KTable
。當第一次執行此應用程式時,預設情況下,兩個繫結都從 earliest
offset 開始。如果由於某些要求,我只想從 latest
offset 開始怎麼辦?您可以透過啟用以下屬性來實現此目的。
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-0.consumer.startOffset: latest
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-1.consumer.startOffset: latest
如果您只想讓一個繫結從 latest
offset 開始,而另一個繫結從預設的 earliest
開始,則將後一個繫結排除在配置之外即可。
請記住,一旦存在已提交的 offset,這些設定就將 **不** 會生效,並且已提交的 offset 具有優先權。
跟蹤 Kafka 中記錄成功傳送(生產)的情況
解決方案
假設我們在應用程式中有以下 Supplier:
@Bean
public Supplier<Message<String>> supplier() {
return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}
然後,我們需要定義一個新的 MessageChannel
bean 來捕獲所有成功的傳送資訊。
@Bean
public MessageChannel fooRecordChannel() {
return new DirectChannel();
}
接下來,在應用程式配置中定義此屬性,為 recordMetadataChannel
提供 bean 名稱。
spring.cloud.stream.kafka.bindings.supplier-out-0.producer.recordMetadataChannel: fooRecordChannel
此時,成功的傳送資訊將傳送到 fooRecordChannel
。
您可以按如下方式編寫 IntegrationFlow
以檢視資訊。
@Bean
public IntegrationFlow integrationFlow() {
return f -> f.channel("fooRecordChannel")
.handle((payload, messageHeaders) -> payload);
}
在 handle
方法中,payload 是傳送到 Kafka 的內容,訊息 header 中包含一個特殊 key kafka_recordMetadata
。其值是一個 RecordMetadata
,包含主題分割槽、當前 offset 等資訊。
在 Kafka 中新增自定義 header mapper
解決方案
正常情況下,這應該沒有問題。
想象一下,您有以下生產者:
@Bean
public Supplier<Message<String>> supply() {
return () -> MessageBuilder.withPayload("foo").setHeader("foo", "bar").build();
}
在消費者端,您仍然應該看到 header "foo",並且以下程式碼不應該有任何問題:
@Bean
public Consumer<Message<String>> consume() {
return s -> {
final String foo = (String)s.getHeaders().get("foo");
System.out.println(foo);
};
}
如果您在應用程式中提供了自定義 header mapper,那麼這將不起作用。假設您在應用程式中有一個空的 KafkaHeaderMapper
:
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
}
};
}
如果這是您的實現,那麼您在消費者端將丟失 foo
header。很可能,您在這些 KafkaHeaderMapper
方法內部有一些邏輯。您需要以下程式碼來填充 foo
header:
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
final String foo = (String) headers.get("foo");
target.add("foo", foo.getBytes());
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
final Header foo = source.lastHeader("foo");
target.put("foo", new String(foo.value()));
}
}
這將正確地將 foo
header 從生產者端傳遞到消費者端。
關於 id header 的特別說明
在 Spring Cloud Stream 中,id
header 是一個特殊的 header,但有些應用程式可能希望有特殊的自定義 id header - 例如 custom-id
或 ID
或 Id
。第一個 (custom-id
) 將在沒有自定義 header mapper 的情況下從生產者傳播到消費者。但是,如果您使用框架保留的 id
header 的變體(例如 ID
, Id
, iD
等)進行生產,則會遇到框架內部的問題。有關此用例的更多上下文,請參閱此Stack Overflow 帖子。在這種情況下,您必須使用自定義 KafkaHeaderMapper
來對映區分大小寫的 id header。例如,假設您有以下生產者:
@Bean
public Supplier<Message<String>> supply() {
return () -> MessageBuilder.withPayload("foo").setHeader("Id", "my-id").build();
}
上面示例中的 Id
header 將會從消費端丟失,因為它與框架的 id
header 衝突。您可以提供一個自定義 KafkaHeaderMapper
來解決此問題。
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper1() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
final String myId = (String) headers.get("Id");
target.add("Id", myId.getBytes());
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
final Header Id = source.lastHeader("Id");
target.put("Id", new String(Id.value()));
}
};
}
透過這樣做,id
和 Id
header 都將從生產者端可用於消費者端。
事務中生產到多個主題
解決方案
使用 Kafka Binder 中的事務支援進行事務處理,然後提供一個 AfterRollbackProcessor
。為了生產到多個主題,使用 StreamBridge
API。
以下是實現此目的的程式碼片段:
@Autowired
StreamBridge bridge;
@Bean
Consumer<String> input() {
return str -> {
System.out.println(str);
this.bridge.send("left", str.toUpperCase());
this.bridge.send("right", str.toLowerCase());
if (str.equals("Fail")) {
throw new RuntimeException("test");
}
};
}
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BinderFactory binders) {
return (container, dest, group) -> {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(pf);
DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template);
container.setAfterRollbackProcessor(rollbackProcessor);
};
}
DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate<byte[], byte[]> template) {
return new DefaultAfterRollbackProcessor<>(
new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true);
}
所需配置
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx-
spring.cloud.stream.kafka.binder.required-acks=all
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.destination=input
spring.cloud.stream.bindings.left.destination=left
spring.cloud.stream.bindings.right.destination=right
spring.cloud.stream.kafka.bindings.input-in-0.consumer.maxAttempts=1
為了測試,您可以使用以下配置:
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
System.in.read();
template.send("input", "Fail".getBytes());
template.send("input", "Good".getBytes());
};
}
一些重要說明
請確保您的應用程式配置中沒有 DLQ 設定,因為我們手動配置 DLT(預設情況下它將釋出到名為 input.DLT
的主題,基於初始消費者函式)。此外,將消費者繫結的 maxAttempts
重置為 1
,以避免 Binder 進行重試。在上面的示例中,總共會嘗試最多 3 次(初始嘗試 + FixedBackoff
中的 2 次嘗試)。
有關如何測試此程式碼的更多詳細資訊,請參閱Stack Overflow 帖子。如果您正在使用 Spring Cloud Stream 透過新增更多消費者函式來測試它,請確保將消費者繫結上的 isolation-level
設定為 read-committed
。
這篇Stack Overflow 帖子也與此討論相關。
執行多個 pollable 消費者時要避免的陷阱
解決方案
假設我有以下定義:
spring.cloud.stream.pollable-source: foo
spring.cloud.stream.bindings.foo-in-0.group: my-group
執行應用程式時,Kafka 消費者會生成一個 client.id(類似於 consumer-my-group-1
)。對於執行的每個應用程式例項,此 client.id
將是相同的,從而導致意外問題。
為了解決此問題,您可以在應用程式的每個例項上新增以下屬性
spring.cloud.stream.kafka.bindings.foo-in-0.consumer.configuration.client.id=${client.id}
有關更多詳細資訊,請參閱此 GitHub 問題。