死信主題處理
啟用 DLQ
要啟用 DLQ,基於 Kafka Binder 的應用必須透過屬性 spring.cloud.stream.bindings.<binding-name>.group
提供一個消費者組。匿名消費者組(即應用未顯式提供組的情況)無法啟用 DLQ 功能。
當應用希望將出錯的記錄傳送到 DLQ 主題時,該應用必須啟用 DLQ 功能,因為預設情況下此功能未啟用。要啟用 DLQ,必須將屬性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.enable-dlq
設定為 true。
啟用 DLQ 後,在處理發生錯誤並且根據 spring.cloud.stream.bindings.<binding-name>.consumer.max-attempts
屬性耗盡所有重試次數後,該記錄將被髮送到 DLQ 主題。
預設情況下,max-attempts
屬性設定為三。當 max-attempts
屬性大於 1
且 DLQ 已啟用時,您將看到重試遵循 max-attempts
屬性的設定。如果未啟用 DLQ(這是預設設定),則 max-attempts
屬性對重試的處理方式沒有任何影響。在這種情況下,重試將回退到 Spring for Apache Kafka 中容器的預設設定,即重試 10 次。如果應用希望在 DLQ 被停用時完全停用重試,則將 max-attempts
屬性設定為 1
將不起作用。在這種情況下,要完全停用重試,您需要提供一個 ListenerContainerCustomizer
並使用適當的 Backoff
設定。以下是一個示例。
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
return (container, destinationName, group) -> {
var commonErrorHandler = new DefaultErrorHandler(new FixedBackOff(0L, 0l));
container.setCommonErrorHandler(commonErrorHandler);
};
}
透過這種方式,將停用預設的容器行為,並且不會嘗試任何重試。如上所述,啟用 DLQ 時,Binder 設定將具有優先權。
處理死信主題中的記錄
由於框架無法預測使用者希望如何處理進入死信佇列的訊息,因此它不提供任何標準機制來處理它們。如果死信的原因是暫時的,您可能希望將訊息路由回原始主題。然而,如果問題是永久性的,可能會導致無限迴圈。本主題中的示例 Spring Boot 應用展示瞭如何將這些訊息路由回原始主題,但在三次嘗試後,它會將它們移至一個“停放”主題。該應用是另一個 Spring Cloud Stream 應用,它從死信主題讀取。當連續 5 秒未收到訊息時,它會退出。
示例假設原始目標主題是 so8400out
,消費者組是 so8400
。
有幾種策略可供考慮
-
考慮僅在主應用未執行時執行重新路由。否則,臨時錯誤的重試次數會非常快地耗盡。
-
或者,使用兩階段方法:使用此應用將訊息路由到第三個主題,再使用另一個應用從該主題路由回主主題。
以下程式碼清單展示了示例應用
spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400
spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot
spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest
spring.cloud.stream.kafka.binder.headers=x-retries
@SpringBootApplication
public class ReRouteDlqKApplication implements CommandLineRunner {
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) {
SpringApplication.run(ReRouteDlqKApplication.class, args).close();
}
private final AtomicInteger processed = new AtomicInteger();
@Autowired
private StreamBridge streamBridge;
@Bean
public Function<Message<?>, Message<?>> reRoute() {
return failed -> {
processed.incrementAndGet();
Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
if (retries == null) {
System.out.println("First retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, 1)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else if (retries < 3) {
System.out.println("Another retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, retries + 1)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else {
System.out.println("Retries exhausted for " + failed);
streamBridge.send("parkingLot", MessageBuilder.fromMessage(failed)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build());
}
return null;
};
}
@Override
public void run(String... args) throws Exception {
while (true) {
int count = this.processed.get();
Thread.sleep(5000);
if (count == this.processed.get()) {
System.out.println("Idle, exiting");
return;
}
}
}
}