重試關鍵業務邏輯

在某些場景中,您可能需要重試對應用程式至關重要的部分業務邏輯。這可能涉及到對關係資料庫的外部呼叫,或者從 Kafka Streams 處理器呼叫 REST 端點。這些呼叫可能由於各種原因失敗,例如網路問題或遠端服務不可用。通常情況下,如果您可以再次嘗試這些失敗,它們可能會自行解決。預設情況下,Kafka Streams 繫結器為所有輸入繫結建立 RetryTemplate bean。

如果函式具有以下簽名,

@Bean
public java.util.function.Consumer<KStream<Object, String>> process()

並且使用預設繫結名稱,RetryTemplate 將被註冊為 process-in-0-RetryTemplate。這遵循了繫結名稱 (process-in-0) 後跟字面量 -RetryTemplate 的約定。在有多個輸入繫結的情況下,每個繫結都會有一個單獨的 RetryTemplate bean 可用。如果應用程式中存在自定義的 RetryTemplate bean,並透過 spring.cloud.stream.bindings.<binding-name>.consumer.retryTemplateName 提供,則它將優先於任何輸入繫結級別的重試模板配置屬性。

一旦從繫結中注入 RetryTemplate 到應用程式中,它就可以用於重試應用程式的任何關鍵部分。以下是一個示例:

@Bean
public java.util.function.Consumer<KStream<Object, String>> process(@Lazy @Qualifier("process-in-0-RetryTemplate") RetryTemplate retryTemplate) {

    return input -> input
            .process(() -> new Processor<Object, String>() {
                @Override
                public void init(ProcessorContext processorContext) {
                }

                @Override
                public void process(Object o, String s) {
                    retryTemplate.execute(context -> {
                       //Critical business logic goes here.
                    });
                }

                @Override
                public void close() {
                }
            });
}

或者您可以使用自定義的 RetryTemplate,如下所示。

@EnableAutoConfiguration
public static class CustomRetryTemplateApp {

    @Bean
    @StreamRetryTemplate
    RetryTemplate fooRetryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();

        RetryPolicy retryPolicy = new SimpleRetryPolicy(4);
        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(1);

        retryTemplate.setBackOffPolicy(backOffPolicy);
        retryTemplate.setRetryPolicy(retryPolicy);

        return retryTemplate;
    }

    @Bean
    public java.util.function.Consumer<KStream<Object, String>> process() {

        return input -> input
                .process(() -> new Processor<Object, String>() {
                    @Override
                    public void init(ProcessorContext processorContext) {
                    }

                    @Override
                    public void process(Object o, String s) {
                        fooRetryTemplate().execute(context -> {
                           //Critical business logic goes here.
                        });

                    }

                    @Override
                    public void close() {
                    }
                });
    }
}

請注意,當重試耗盡時,預設情況下,將丟擲最後一個異常,導致處理器終止。如果您希望處理異常並繼續處理,可以將 RecoveryCallback 新增到 execute 方法中:以下是一個示例。

retryTemplate.execute(context -> {
    //Critical business logic goes here.
    }, context -> {
       //Recovery logic goes here.
       return null;
    ));

有關 RetryTemplate、重試策略、回退策略等更多資訊,請參閱 Spring Retry 專案。

© . This site is unofficial and not affiliated with VMware.