提示、技巧和秘訣
使用 Kafka 實現簡單的 DLQ
問題描述
作為一名開發人員,我希望編寫一個消費應用程式來處理 Kafka 主題中的記錄。但是,如果在處理過程中發生錯誤,我不希望應用程式完全停止。相反,我希望將錯誤記錄傳送到 DLT(死信主題),然後繼續處理新記錄。
解決方案
解決此問題的方法是使用 Spring Cloud Stream 中的 DLQ 功能。為了便於討論,我們假設以下是我們的處理器函式。
@Bean
public Consumer<byte[]> processData() {
return s -> {
throw new RuntimeException();
};
}
這是一個非常簡單的函式,它對所有處理的記錄都丟擲異常,但您可以採用此函式並將其擴充套件到任何其他類似情況。
為了將錯誤記錄傳送到 DLT,我們需要提供以下配置。
spring.cloud.stream:
bindings:
processData-in-0:
group: my-group
destination: input-topic
kafka:
bindings:
processData-in-0:
consumer:
enableDlq: true
dlqName: input-topic-dlq
為了啟用 DLQ,應用程式必須提供一個組名。匿名消費者不能使用 DLQ 功能。我們還需要透過將 Kafka 消費者繫結上的 enableDLQ 屬性設定為 true 來啟用 DLQ。最後,我們可以選擇透過在 Kafka 消費者繫結上提供 dlqName 來提供 DLT 名稱,否則在這種情況下它預設為 error.input-topic.my-group。
請注意,在上面提供的示例消費者中,有效載荷的型別是 byte[]。預設情況下,Kafka 繫結器中的 DLQ 生產者期望有效載荷型別為 byte[]。如果不是這種情況,那麼我們需要提供適當的序列化器配置。例如,讓我們將消費者函式重寫如下
@Bean
public Consumer<String> processData() {
return s -> {
throw new RuntimeException();
};
}
現在,我們需要告訴 Spring Cloud Stream,我們希望在寫入 DLT 時如何序列化資料。以下是此場景的修改配置
spring.cloud.stream:
bindings:
processData-in-0:
group: my-group
destination: input-topic
kafka:
bindings:
processData-in-0:
consumer:
enableDlq: true
dlqName: input-topic-dlq
dlqProducerProperties:
configuration:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
帶高階重試選項的 DLQ
解決方案
如果您遵循了上述秘訣,那麼在處理遇到錯誤時,您將獲得 Kafka 繫結器中內建的預設重試選項。
預設情況下,繫結器最多重試 3 次,初始延遲為 1 秒,每次退避的乘數為 2.0,最大延遲為 10 秒。您可以更改所有這些配置,如下所示
spring.cloud.stream.bindings.processData-in-0.consumer.maxAttempts
spring.cloud.stream.bindings.processData-in-0.consumer.backOffInitialInterval
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMultiplier
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMaxInterval
如果您願意,您還可以透過提供布林值對映來提供可重試異常列表。例如,
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalStateException=true
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalArgumentException=false
預設情況下,對映中未列出的任何異常都將重試。如果不需要,可以透過提供以下內容來停用它,
spring.cloud.stream.bindings.processData-in-0.consumer.defaultRetryable=false
您還可以提供自己的 RetryTemplate 並將其標記為 @StreamRetryTemplate,繫結器將掃描並使用它。當您需要更復雜的重試策略時,這會很有用。
如果您有多個 @StreamRetryTemplate bean,那麼您可以使用以下屬性指定您的繫結需要哪個:
spring.cloud.stream.bindings.processData-in-0.consumer.retry-template-name=<your-retry-template-bean-name>
使用 DLQ 處理反序列化錯誤
解決方案
當 Kafka 消費者丟擲不可恢復的反序列化異常時,Spring Cloud Stream 提供的正常 DLQ 機制將無濟於事。這是因為,此異常甚至在消費者 poll() 方法返回之前就發生了。Spring for Apache Kafka 專案提供了一些很好的方法來幫助繫結器處理這種情況。讓我們來探討一下。
假設這是我們的函式
@Bean
public Consumer<String> functionName() {
return s -> {
System.out.println(s);
};
}
這是一個採用 String 引數的簡單函式。
我們希望繞過 Spring Cloud Stream 提供的訊息轉換器,而改用原生反序列化器。對於 String 型別,這沒什麼意義,但對於 AVRO 等更復雜的型別,您必須依賴外部反序列化器,因此希望將轉換委託給 Kafka。
現在,當消費者接收到資料時,假設有一個導致反序列化錯誤的壞記錄,例如,可能有人傳遞了一個 Integer 而不是一個 String。在這種情況下,如果您不在應用程式中做任何事情,異常將透過鏈傳播,您的應用程式最終將退出。
為了處理此問題,您可以新增一個 ListenerContainerCustomizer @Bean,它配置一個 DefaultErrorHandler。此 DefaultErrorHandler 配置了一個 DeadLetterPublishingRecoverer。我們還需要為消費者配置一個 ErrorHandlingDeserializer。這聽起來很複雜,但實際上,在這種情況下它歸結為這 3 個 bean。
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(DefaultErrorHandler errorHandler) {
return (container, dest, group) -> {
container.setCommonErrorHandler(errorHandler);
};
}
@Bean
public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new DefaultErrorHandler(deadLetterPublishingRecoverer);
}
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
return new DeadLetterPublishingRecoverer(bytesTemplate);
}
讓我們分析一下它們中的每一個。第一個是 ListenerContainerCustomizer bean,它接受一個 DefaultErrorHandler。現在,容器已使用該特定錯誤處理程式進行自定義。您可以在此處瞭解有關容器自定義的更多資訊。
第二個 bean 是配置為釋出到 DLT 的 DefaultErrorHandler。有關 DefaultErrorHandler 的更多詳細資訊,請參閱此處。
第三個 bean 是 DeadLetterPublishingRecoverer,它最終負責傳送到 DLT。預設情況下,DLT 主題命名為 ORIGINAL_TOPIC_NAME.DLT。但是您可以更改它。有關詳細資訊,請參閱文件。
我們還需要透過應用程式配置配置一個 ErrorHandlingDeserializer。
ErrorHandlingDeserializer 委託給實際的反序列化器。如果出現錯誤,它會將記錄的鍵/值設定為 null,幷包含訊息的原始位元組。然後它在頭部設定異常並將此記錄傳遞給監聽器,然後監聽器呼叫註冊的錯誤處理程式。
以下是所需的配置
spring.cloud.stream:
function:
definition: functionName
bindings:
functionName-in-0:
group: group-name
destination: input-topic
consumer:
use-native-decoding: true
kafka:
bindings:
functionName-in-0:
consumer:
enableDlq: true
dlqName: dlq-topic
dlqProducerProperties:
configuration:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
configuration:
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
我們透過繫結上的 configuration 屬性提供 ErrorHandlingDeserializer。我們還指示要委託的實際反序列化器是 StringDeserializer。
請記住,上述 dlq 屬性均與本秘訣中的討論無關。它們純粹用於處理任何應用程式級別的錯誤。
Kafka 繫結器中的基本偏移量管理
解決方案
我們鼓勵您閱讀有關此內容的文件部分,以對其有透徹的瞭解。
簡而言之
Kafka 預設支援兩種型別的起始偏移量 - earliest 和 latest。它們的語義從名稱上就可以不言自明。
假設您是第一次執行消費者。如果您的 Spring Cloud Stream 應用程式中缺少 group.id,那麼它就成為一個匿名消費者。每當您有一個匿名消費者時,Spring Cloud Stream 應用程式預設會從主題分割槽中 latest 可用的偏移量開始。另一方面,如果您明確指定 group.id,那麼 Spring Cloud Stream 應用程式預設會從主題分割槽中 earliest 可用的偏移量開始。
在上述兩種情況(具有顯式組和匿名組的消費者)下,可以透過使用屬性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.startOffset 並將其設定為 earliest 或 latest 來切換起始偏移量。
現在,假設您之前已經執行過消費者,並且現在再次啟動它。在這種情況下,上述情況中的起始偏移量語義不適用,因為消費者找到了已為消費者組提交的偏移量(對於匿名消費者,儘管應用程式不提供 group.id,但繫結器將為您自動生成一個)。它只是從上次提交的偏移量繼續。即使提供了 startOffset 值,這也是正確的。
但是,您可以透過使用 resetOffsets 屬性來覆蓋消費者從上次提交的偏移量開始的預設行為。為此,請將屬性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.resetOffsets 設定為 true(預設值為 false)。然後確保提供 startOffset 值(earliest 或 latest)。當您這樣做並啟動消費者應用程式時,每次啟動時,它都會像第一次啟動一樣啟動,並忽略分割槽的任何已提交偏移量。
在 Kafka 中尋求任意偏移量
問題描述
使用 Kafka 繫結器,我知道它可以將偏移量設定為 earliest 或 latest,但我有一個需求,需要將偏移量定位到中間的某個任意偏移量。有什麼方法可以使用 Spring Cloud Stream Kafka 繫結器實現此目的嗎?
解決方案
前面我們看到了 Kafka 繫結器如何處理基本的偏移量管理。預設情況下,繫結器不允許您回溯到任意偏移量,至少透過我們在該秘訣中看到的機制是如此。但是,繫結器提供了一些低階策略來實現此用例。讓我們來探討一下。
首先,當您要重置為除 earliest 或 latest 之外的任意偏移量時,請確保將 resetOffsets 配置保留為其預設值,即 false。然後您必須提供一個型別為 KafkaBindingRebalanceListener 的自定義 bean,該 bean 將注入到所有消費者繫結中。它是一個帶有一些預設方法的介面,但這是我們感興趣的方法
/**
* Invoked when partitions are initially assigned or after a rebalance. Applications
* might only want to perform seek operations on an initial assignment. While the
* 'initial' argument is true for each thread (when concurrency is greater than 1),
* implementations should keep track of exactly which partitions have been sought.
* There is a race in that a rebalance could occur during startup and so a topic/
* partition that has been sought on one thread may be re-assigned to another
* thread and you may not wish to re-seek it at that time.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
* @param initial true if this is the initial assignment on the current thread.
*/
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
Collection<TopicPartition> partitions, boolean initial) {
// do nothing
}
讓我們看看細節。
本質上,此方法將在主題分割槽的初始分配期間或再平衡之後每次呼叫。為了更好地說明,我們假設我們的主題是 foo 並且它有 4 個分割槽。最初,我們只在該組中啟動一個消費者,並且此消費者將從所有分割槽消費。當消費者第一次啟動時,所有 4 個分割槽都將進行初始分配。但是,我們不希望分割槽從預設值開始消費(earliest,因為我們定義了一個組),而是希望每個分割槽在尋求到任意偏移量後開始消費。想象一下您有一個業務案例需要從以下特定偏移量消費。
Partition start offset
0 1000
1 2000
2 2000
3 1000
這可以透過如下實現上述方法來實現。
@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {
Map<TopicPartition, Long> topicPartitionOffset = new HashMap<>();
topicPartitionOffset.put(new TopicPartition("foo", 0), 1000L);
topicPartitionOffset.put(new TopicPartition("foo", 1), 2000L);
topicPartitionOffset.put(new TopicPartition("foo", 2), 2000L);
topicPartitionOffset.put(new TopicPartition("foo", 3), 1000L);
if (initial) {
partitions.forEach(tp -> {
if (topicPartitionOffset.containsKey(tp)) {
final Long offset = topicPartitionOffset.get(tp);
try {
consumer.seek(tp, offset);
}
catch (Exception e) {
// Handle exceptions carefully.
}
}
});
}
}
這只是一個粗略的實現。現實世界的用例比這複雜得多,您需要相應地進行調整,但這確實為您提供了一個基本的草圖。當消費者 seek 失敗時,它可能會丟擲一些執行時異常,您需要決定在這種情況下如何處理。
[[what-if-we-start-a-second-consumer-with-the-same-group-id?]] === 如果我們啟動第二個具有相同組 ID 的消費者會怎樣?
當我們新增第二個消費者時,會發生再平衡,並且一些分割槽將被移動。假設新消費者獲得了分割槽 2 和 3。當這個新的 Spring Cloud Stream 消費者呼叫 onPartitionsAssigned 方法時,它將看到這是該消費者上分割槽 2 和 3 的初始分配。因此,它會因為對 initial 引數的條件檢查而執行 seek 操作。對於第一個消費者,它現在只擁有分割槽 0 和 1。但是,對於這個消費者來說,這僅僅是一個再平衡事件,不被認為是初始分配。因此,它不會因為對 initial 引數的條件檢查而重新尋求到給定偏移量。
[[how-do-i-manually-acknowledge-using-kafka-binder?]] == 如何使用 Kafka 繫結器手動確認?
解決方案
預設情況下,Kafka 繫結器委託給 Spring for Apache Kafka 專案中的預設提交設定。Spring Kafka 中的預設 ackMode 是 batch。有關詳細資訊,請參閱此處。
在某些情況下,您希望停用此預設提交行為並依賴手動提交。以下步驟允許您執行此操作。
將屬性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.ackMode 設定為 MANUAL 或 MANUAL_IMMEDIATE。當這樣設定時,消費者方法收到的訊息中將存在一個名為 kafka_acknowledgment(來自 KafkaHeaders.ACKNOWLEDGMENT)的頭。
例如,假設這是您的消費者方法。
@Bean
public Consumer<Message<String>> myConsumer() {
return msg -> {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
};
}
然後將屬性 spring.cloud.stream.kafka.bindings.myConsumer-in-0.consumer.ackMode 設定為 MANUAL 或 MANUAL_IMMEDIATE。
[[how-do-i-override-the-default-binding-names-in-spring-cloud-stream?]] == 如何覆蓋 Spring Cloud Stream 中的預設繫結名稱?
解決方案
假設這是您的函式簽名。
@Bean
public Function<String, String> uppercase(){
...
}
預設情況下,Spring Cloud Stream 將建立如下繫結。
-
uppercase-in-0
-
uppercase-out-0
您可以使用以下屬性將這些繫結覆蓋為其他名稱。
spring.cloud.stream.function.bindings.uppercase-in-0=my-transformer-in
spring.cloud.stream.function.bindings.uppercase-out-0=my-transformer-out
此後,所有繫結屬性都必須在新名稱 my-transformer-in 和 my-transformer-out 上進行。
這是另一個使用 Kafka Streams 和多個輸入的示例。
@Bean
public BiFunction<KStream<String, Order>, KTable<String, Account>, KStream<String, EnrichedOrder>> processOrder() {
...
}
預設情況下,Spring Cloud Stream 將為此函式建立三個不同的繫結名稱。
-
processOrder-in-0
-
processOrder-in-1
-
processOrder-out-0
每次您想要在此類繫結上設定一些配置時,都必須使用這些繫結名稱。您不喜歡這樣,並且希望使用更符合領域友好且更易讀的繫結名稱,例如:
-
orders
-
accounts
-
enrichedOrders
您可以透過簡單地設定這三個屬性來實現這一點
-
spring.cloud.stream.function.bindings.processOrder-in-0=orders
-
spring.cloud.stream.function.bindings.processOrder-in-1=accounts
-
spring.cloud.stream.function.bindings.processOrder-out-0=enrichedOrders
一旦您這樣做了,它將覆蓋預設的繫結名稱,並且您想要在它們上設定的任何屬性都必須使用這些新的繫結名稱。
[[how-do-i-send-a-message-key-as-part-of-my-record?]] == 如何將訊息鍵作為記錄的一部分發送?
解決方案
通常需要您將關聯資料結構(如對映)作為帶有鍵和值的記錄傳送。Spring Cloud Stream 允許您以直接的方式實現這一點。以下是實現此目的的基本藍圖,但您可能需要根據您的特定用例進行調整。
這是一個示例生產者方法(又名 Supplier)。
@Bean
public Supplier<Message<String>> supplier() {
return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}
這是一個簡單的函式,它傳送一個帶有 String 有效載荷的訊息,但也有一個鍵。請注意,我們使用 KafkaHeaders.MESSAGE_KEY 將鍵設定為訊息頭。
如果您想更改預設的 kafka_messageKey 鍵,那麼在配置中,我們需要指定此屬性
spring.cloud.stream.kafka.bindings.supplier-out-0.producer.messageKeyExpression=headers['my-special-key']
請注意,我們使用繫結名稱 supplier-out-0,因為這是我們的函式名稱,請相應更新。
然後,我們在生成訊息時使用這個新鍵。
[[how-do-i-use-native-serializer-and-deserializer-instead-of-message-conversion-done-by-spring-cloud-stream?]] == 如何使用原生序列化器和反序列化器而不是 Spring Cloud Stream 完成的訊息轉換?
問題描述
我不想使用 Spring Cloud Stream 中的訊息轉換器,而是想在 Kafka 中使用原生序列化器和反序列化器。預設情況下,Spring Cloud Stream 使用其內部內建訊息轉換器處理此轉換。我如何繞過它並將責任委託給 Kafka?
解決方案
這真的很容易做到。
您所要做的就是提供以下屬性以啟用原生序列化。
spring.cloud.stream.kafka.bindings.<binding-name>.producer.useNativeEncoding: true
然後,您還需要設定序列化器。有兩種方法可以做到這一點。
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer
或者使用繫結器配置。
spring.cloud.stream.kafka.binder.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer
當使用繫結器方式時,它應用於所有繫結,而將它們設定在繫結級別是針對每個繫結的。
在反序列化方面,您只需要將反序列化器作為配置提供。
例如,
spring.cloud.stream.kafka.bindings.<binding-name>.consumer.configuration.key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
您也可以在繫結器級別設定它們。
有一個可選屬性可以設定為強制原生解碼。
spring.cloud.stream.kafka.bindings.<binding-name>.consumer.useNativeDecoding: true
然而,在 Kafka 繫結器的情況下,這是不必要的,因為當它到達繫結器時,Kafka 已經使用配置的反序列化器對其進行了反序列化。
解釋 Kafka Streams 繫結器中的偏移量重置工作原理
解決方案
在檢視解決方案之前,讓我們看以下場景。
@Bean
public BiConsumer<KStream<Object, Object>, KTable<Object, Object>> myBiConsumer{
(s, t) -> s.join(t, ...)
...
}
我們有一個 BiConsumer bean,它需要兩個輸入繫結。在這種情況下,第一個繫結用於 KStream,第二個繫結用於 KTable。當第一次執行此應用程式時,預設情況下,兩個繫結都從 earliest 偏移量開始。如果由於某些要求,我希望從 latest 偏移量開始怎麼辦?您可以透過啟用以下屬性來實現這一點。
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-0.consumer.startOffset: latest
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-1.consumer.startOffset: latest
如果您只想讓一個繫結從 latest 偏移量開始,而另一個繫結從預設的 earliest 消費者開始,那麼請將後一個繫結從配置中刪除。
請記住,一旦存在已提交的偏移量,這些設定將不生效,並且已提交的偏移量將優先。
在 Kafka 中跟蹤記錄成功傳送(生產)
解決方案
讓我們假設應用程式中有以下供應商。
@Bean
public Supplier<Message<String>> supplier() {
return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}
然後,我們需要定義一個新的 MessageChannel bean 來捕獲所有成功的傳送資訊。
@Bean
public MessageChannel fooRecordChannel() {
return new DirectChannel();
}
接下來,在應用程式配置中定義此屬性以提供 recordMetadataChannel 的 bean 名稱。
spring.cloud.stream.kafka.bindings.supplier-out-0.producer.recordMetadataChannel: fooRecordChannel
此時,成功的傳送資訊將傳送到 fooRecordChannel。
您可以編寫一個 IntegrationFlow 如下所示來檢視資訊。
@Bean
public IntegrationFlow integrationFlow() {
return f -> f.channel("fooRecordChannel")
.handle((payload, messageHeaders) -> payload);
}
在 handle 方法中,payload 是傳送到 Kafka 的內容,訊息頭包含一個名為 kafka_recordMetadata 的特殊鍵。其值是一個 RecordMetadata,其中包含有關主題分割槽、當前偏移量等資訊。
在 Kafka 中新增自定義頭對映器
解決方案
在正常情況下,這應該沒問題。
想象一下,您有以下生產者。
@Bean
public Supplier<Message<String>> supply() {
return () -> MessageBuilder.withPayload("foo").setHeader("foo", "bar").build();
}
在消費者端,您應該仍然看到頭“foo”,以下內容不應給您帶來任何問題。
@Bean
public Consumer<Message<String>> consume() {
return s -> {
final String foo = (String)s.getHeaders().get("foo");
System.out.println(foo);
};
}
如果您在應用程式中提供自定義頭對映器,那麼這將不起作用。假設您在應用程式中有一個空的 KafkaHeaderMapper。
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
}
};
}
如果這是您的實現,那麼您將錯過消費者端的 foo 頭。您可能在這些 KafkaHeaderMapper 方法中包含一些邏輯。您需要以下內容來填充 foo 頭。
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
final String foo = (String) headers.get("foo");
target.add("foo", foo.getBytes());
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
final Header foo = source.lastHeader("foo");
target.put("foo", new String(foo.value()));
}
}
這將正確地將 foo 頭從生產者填充到消費者。
關於 id 頭的特別說明
在 Spring Cloud Stream 中,id 頭是一個特殊的頭,但有些應用程式可能希望擁有特殊的自定義 id 頭——例如 custom-id 或 ID 或 Id。第一個(custom-id)將在沒有任何自定義頭對映器的情況下從生產者傳播到消費者。但是,如果您使用框架保留的 id 頭的一個變體(例如 ID、Id、iD 等)進行生產,那麼您將遇到框架內部的問題。有關此用例的更多上下文,請參閱此StackOverflow 帖子。在這種情況下,您必須使用自定義 KafkaHeaderMapper 來對映區分大小寫的 id 頭。例如,假設您有以下生產者。
@Bean
public Supplier<Message<String>> supply() {
return () -> MessageBuilder.withPayload("foo").setHeader("Id", "my-id").build();
}
上面的 Id 頭將從消費端消失,因為它與框架 id 頭衝突。您可以提供自定義 KafkaHeaderMapper 來解決此問題。
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper1() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
final String myId = (String) headers.get("Id");
target.add("Id", myId.getBytes());
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
final Header Id = source.lastHeader("Id");
target.put("Id", new String(Id.value()));
}
};
}
透過這樣做,id 和 Id 頭都將從生產者端提供給消費者端。
以事務方式生產到多個主題
解決方案
在 Kafka 繫結器中使用事務支援進行事務處理,然後提供一個 AfterRollbackProcessor。為了生產到多個主題,請使用 StreamBridge API。
以下是此程式碼片段
@Autowired
StreamBridge bridge;
@Bean
Consumer<String> input() {
return str -> {
System.out.println(str);
this.bridge.send("left", str.toUpperCase());
this.bridge.send("right", str.toLowerCase());
if (str.equals("Fail")) {
throw new RuntimeException("test");
}
};
}
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BinderFactory binders) {
return (container, dest, group) -> {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(pf);
DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template);
container.setAfterRollbackProcessor(rollbackProcessor);
};
}
DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate<byte[], byte[]> template) {
return new DefaultAfterRollbackProcessor<>(
new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true);
}
所需配置
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx-
spring.cloud.stream.kafka.binder.required-acks=all
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.destination=input
spring.cloud.stream.bindings.left.destination=left
spring.cloud.stream.bindings.right.destination=right
spring.cloud.stream.kafka.bindings.input-in-0.consumer.maxAttempts=1
為了測試,您可以使用以下內容
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
System.in.read();
template.send("input", "Fail".getBytes());
template.send("input", "Good".getBytes());
};
}
一些重要注意事項
請確保您的應用程式配置中沒有任何 DLQ 設定,因為我們手動配置 DLT(預設情況下,它將根據初始消費者函式釋出到名為 input.DLT 的主題)。此外,將消費者繫結上的 maxAttempts 重置為 1,以避免繫結器重試。在上面的示例中,它最多重試總共 3 次(初始嘗試 + FixedBackoff 中的 2 次嘗試)。
有關如何測試此程式碼的更多詳細資訊,請參閱StackOverflow 帖子。如果您使用 Spring Cloud Stream 透過新增更多消費者函式來測試它,請務必將消費者繫結上的 isolation-level 設定為 read-committed。
此StackOverflow 帖子也與此討論相關。
執行多個可輪詢消費者時應避免的陷阱
解決方案
假設我有以下定義
spring.cloud.stream.pollable-source: foo
spring.cloud.stream.bindings.foo-in-0.group: my-group
執行應用程式時,Kafka 消費者會生成一個 client.id(類似於 consumer-my-group-1)。對於每個執行的應用程式例項,此 client.id 將相同,從而導致意外問題。
為了解決這個問題,您可以在應用程式的每個例項上新增以下屬性
spring.cloud.stream.kafka.bindings.foo-in-0.consumer.configuration.client.id=${client.id}
有關更多詳細資訊,請參閱此GitHub 問題。