提供的 Advice 類

除了提供應用 AOP advice 類的通用機制外,Spring Integration 還提供了以下開箱即用的 advice 實現:

重試 Advice

重試 advice (`o.s.i.handler.advice.RequestHandlerRetryAdvice`) 利用了 Spring Retry 專案提供的豐富重試機制。`spring-retry` 的核心元件是 `RetryTemplate`,它允許配置複雜的重試場景,包括 `RetryPolicy` 和 `BackoffPolicy` 策略(有多種實現)以及 `RecoveryCallback` 策略,用於確定重試次數耗盡時要採取的操作。

無狀態重試

無狀態重試是指重試活動完全在 advice 中處理的情況。執行緒暫停(如果配置了)並重試該操作。

有狀態重試

有狀態重試是指重試狀態在 advice 中管理,但丟擲異常後由呼叫者重新提交請求的情況。有狀態重試的一個例子是當我們需要訊息發起方(例如 JMS)負責重新提交,而不是在當前執行緒上執行時。有狀態重試需要某種機制來檢測重複提交。

有關 `spring-retry` 的更多資訊,請參閱該專案的 Javadoc 以及 Spring Batch 的參考文件,`spring-retry` 最初源於此。

預設的回退行為是不進行回退。重試會立即嘗試。使用導致執行緒在嘗試之間暫停的回退策略可能會導致效能問題,包括過多的記憶體使用和執行緒飢餓。在高併發環境中,應謹慎使用回退策略。

配置重試 Advice

本節的示例使用以下始終丟擲異常的 ``

public class FailingService {

    public void service(String message) {
        throw new RuntimeException("error");
    }
}
簡單無狀態重試

預設的 `RetryTemplate` 具有 `SimpleRetryPolicy`,它會嘗試三次。沒有 `BackOffPolicy`,因此三次嘗試是連續進行的,中間沒有延遲。沒有 `RecoveryCallback`,因此最終失敗重試後會將異常拋給呼叫者。在 Spring Integration 環境中,可以使用入站端點上的 `error-channel` 來處理這個最終異常。以下示例使用了 `RetryTemplate` 並顯示了其 `DEBUG` 輸出

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"/>
    </int:request-handler-advice-chain>
</int:service-activator>

DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
DEBUG [task-scheduler-2]Retry: count=0
DEBUG [task-scheduler-2]Checking for rethrow: count=1
DEBUG [task-scheduler-2]Retry: count=1
DEBUG [task-scheduler-2]Checking for rethrow: count=2
DEBUG [task-scheduler-2]Retry: count=2
DEBUG [task-scheduler-2]Checking for rethrow: count=3
DEBUG [task-scheduler-2]Retry failed last attempt: count=3
帶 Recovery 的簡單無狀態重試

以下示例在前一個示例中添加了 `RecoveryCallback`,並使用 `ErrorMessageSendingRecoverer` 將 `ErrorMessage` 傳送到通道

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice">
            <property name="recoveryCallback">
                <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer">
                    <constructor-arg ref="myErrorChannel" />
                </bean>
            </property>
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
DEBUG [task-scheduler-2]Retry: count=0
DEBUG [task-scheduler-2]Checking for rethrow: count=1
DEBUG [task-scheduler-2]Retry: count=1
DEBUG [task-scheduler-2]Checking for rethrow: count=2
DEBUG [task-scheduler-2]Retry: count=2
DEBUG [task-scheduler-2]Checking for rethrow: count=3
DEBUG [task-scheduler-2]Retry failed last attempt: count=3
DEBUG [task-scheduler-2]Sending ErrorMessage :failedMessage:[Payload=...]
帶自定義策略和 Recovery 的無狀態重試

為了更精細的控制,我們可以為 advice 提供一個定製的 `RetryTemplate`。本示例繼續使用 `SimpleRetryPolicy`,但將嘗試次數增加到四次。它還添加了 `ExponentialBackoffPolicy`,其中第一次重試等待一秒,第二次等待五秒,第三次等待 25 秒(總共四次嘗試)。以下列表展示了示例及其 `DEBUG` 輸出

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice">
            <property name="recoveryCallback">
                <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer">
                    <constructor-arg ref="myErrorChannel" />
                </bean>
            </property>
            <property name="retryTemplate" ref="retryTemplate" />
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
    <property name="retryPolicy">
        <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
            <property name="maxAttempts" value="4" />
        </bean>
    </property>
    <property name="backOffPolicy">
        <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
            <property name="initialInterval" value="1000" />
            <property name="multiplier" value="5.0" />
            <property name="maxInterval" value="60000" />
        </bean>
    </property>
</bean>

27.058 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
27.071 DEBUG [task-scheduler-1]Retry: count=0
27.080 DEBUG [task-scheduler-1]Sleeping for 1000
28.081 DEBUG [task-scheduler-1]Checking for rethrow: count=1
28.081 DEBUG [task-scheduler-1]Retry: count=1
28.081 DEBUG [task-scheduler-1]Sleeping for 5000
33.082 DEBUG [task-scheduler-1]Checking for rethrow: count=2
33.082 DEBUG [task-scheduler-1]Retry: count=2
33.083 DEBUG [task-scheduler-1]Sleeping for 25000
58.083 DEBUG [task-scheduler-1]Checking for rethrow: count=3
58.083 DEBUG [task-scheduler-1]Retry: count=3
58.084 DEBUG [task-scheduler-1]Checking for rethrow: count=4
58.084 DEBUG [task-scheduler-1]Retry failed last attempt: count=4
58.086 DEBUG [task-scheduler-1]Sending ErrorMessage :failedMessage:[Payload=...]
無狀態重試的名稱空間支援

從 4.0 版本開始,得益於對重試 advice 的名稱空間支援,前面的配置可以大大簡化,示例如下

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <ref bean="retrier" />
    </int:request-handler-advice-chain>
</int:service-activator>

<int:handler-retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel">
    <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" />
</int:handler-retry-advice>

在前面的示例中,advice 被定義為一個頂級 bean,以便可以在多個 `request-handler-advice-chain` 例項中使用。您也可以直接在鏈中定義 advice,示例如下

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <int:retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel">
            <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" />
        </int:retry-advice>
    </int:request-handler-advice-chain>
</int:service-activator>

一個 `` 可以有一個 `` 或 `` 子元素,也可以沒有子元素。沒有子元素的 `` 不使用回退。如果沒有 `recovery-channel`,則在重試次數耗盡時丟擲異常。名稱空間只能用於無狀態重試。

對於更復雜的環境(自定義策略等),請使用普通的 `` 定義。

帶 Recovery 的簡單有狀態重試

為了使重試有狀態,我們需要為 advice 提供一個 `RetryStateGenerator` 實現。此類用於識別訊息是否是重新提交的,以便 `RetryTemplate` 可以確定該訊息當前的重試狀態。框架提供了 `SpelExpressionRetryStateGenerator`,它使用 SpEL 表示式來確定訊息識別符號。本示例再次使用預設策略(三次嘗試,無回退)。與無狀態重試一樣,這些策略也可以自定義。以下列表展示了示例及其 `DEBUG` 輸出

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice">
            <property name="retryStateGenerator">
                <bean class="o.s.i.handler.advice.SpelExpressionRetryStateGenerator">
                    <constructor-arg value="headers['jms_messageId']" />
                </bean>
            </property>
            <property name="recoveryCallback">
                <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer">
                    <constructor-arg ref="myErrorChannel" />
                </bean>
            </property>
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

24.351 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
24.368 DEBUG [Container#0-1]Retry: count=0
24.387 DEBUG [Container#0-1]Checking for rethrow: count=1
24.387 DEBUG [Container#0-1]Rethrow in retry for policy: count=1
24.387 WARN  [Container#0-1]failure occurred in gateway sendAndReceive
org.springframework.integration.MessagingException: Failed to invoke handler
...
Caused by: java.lang.RuntimeException: foo
...
24.391 DEBUG [Container#0-1]Initiating transaction rollback on application exception
...
25.412 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
25.412 DEBUG [Container#0-1]Retry: count=1
25.413 DEBUG [Container#0-1]Checking for rethrow: count=2
25.413 DEBUG [Container#0-1]Rethrow in retry for policy: count=2
25.413 WARN  [Container#0-1]failure occurred in gateway sendAndReceive
org.springframework.integration.MessagingException: Failed to invoke handler
...
Caused by: java.lang.RuntimeException: foo
...
25.414 DEBUG [Container#0-1]Initiating transaction rollback on application exception
...
26.418 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
26.418 DEBUG [Container#0-1]Retry: count=2
26.419 DEBUG [Container#0-1]Checking for rethrow: count=3
26.419 DEBUG [Container#0-1]Rethrow in retry for policy: count=3
26.419 WARN  [Container#0-1]failure occurred in gateway sendAndReceive
org.springframework.integration.MessagingException: Failed to invoke handler
...
Caused by: java.lang.RuntimeException: foo
...
26.420 DEBUG [Container#0-1]Initiating transaction rollback on application exception
...
27.425 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
27.426 DEBUG [Container#0-1]Retry failed last attempt: count=3
27.426 DEBUG [Container#0-1]Sending ErrorMessage :failedMessage:[Payload=...]

如果您將前面的示例與無狀態示例進行比較,您會發現,在有狀態重試中,每次失敗時都會將異常拋給呼叫者。

重試的異常分類

Spring Retry 在確定哪些異常可以觸發重試方面具有很大的靈活性。預設配置會對所有異常進行重試,並且異常分類器會檢視頂級異常。如果您將其配置為僅在 `MyException` 上重試,並且您的應用程式丟擲了一個 `SomeOtherException`,而其原因是 `MyException`,則不會發生重試。

從 Spring Retry 1.0.3 開始,`BinaryExceptionClassifier` 有一個名為 `traverseCauses` 的屬性(預設為 `false`)。當設定為 `true` 時,它會遍歷異常原因,直到找到匹配項或遍歷完所有原因。

要將此分類器用於重試,請使用接受最大嘗試次數、`Exception` 物件 `Map` 和 `traverseCauses` 布林值的建構函式建立 `SimpleRetryPolicy`。然後您可以將此策略注入到 `RetryTemplate` 中。

在這種情況下需要 `traverseCauses`,因為使用者異常可能被包裝在 `MessagingException` 中。

熔斷器 Advice

熔斷器模式的基本思想是,如果服務當前不可用,就不要浪費時間(和資源)去嘗試使用它。`o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice` 實現了此模式。當熔斷器處於關閉狀態時,端點嘗試呼叫服務。如果連續失敗達到一定次數,熔斷器將進入開啟狀態。當處於開啟狀態時,新請求會“快速失敗”,在一段時間過去之前不會嘗試呼叫服務。

當該時間過去後,熔斷器被設定為半開狀態。在此狀態下,即使只有一次嘗試失敗,熔斷器也會立即進入開啟狀態。如果嘗試成功,熔斷器將進入關閉狀態,在這種情況下,直到再次發生配置的連續失敗次數之前,它不會再次進入開啟狀態。任何成功的嘗試都會將失敗狀態重置為零,以確定何時熔斷器可能再次進入開啟狀態。

通常,此 advice 可能用於外部服務,這些服務可能需要一些時間才會失敗(例如嘗試建立網路連線時的超時)。

`RequestHandlerCircuitBreakerAdvice` 有兩個屬性:`threshold` 和 `halfOpenAfter`。`threshold` 屬性表示在熔斷器開啟之前需要發生的連續失敗次數,預設為 `5`。`halfOpenAfter` 屬性表示在最後一次失敗後,熔斷器在嘗試下一個請求之前等待的時間,預設為 1000 毫秒。

以下示例配置了一個熔斷器並顯示其 `DEBUG` 和 `ERROR` 輸出

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice">
            <property name="threshold" value="2" />
            <property name="halfOpenAfter" value="12000" />
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

05.617 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
05.638 ERROR [task-scheduler-1]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
10.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
10.600 ERROR [task-scheduler-2]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
15.598 DEBUG [task-scheduler-3]preSend on channel 'input', message: [Payload=...]
15.599 ERROR [task-scheduler-3]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
20.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
20.598 ERROR [task-scheduler-2]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
25.598 DEBUG [task-scheduler-5]preSend on channel 'input', message: [Payload=...]
25.601 ERROR [task-scheduler-5]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
30.598 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=foo...]
30.599 ERROR [task-scheduler-1]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator

在前面的示例中,`threshold` 設定為 `2`,`halfOpenAfter` 設定為 `12` 秒。每隔 5 秒到達一個新請求。前兩次嘗試呼叫了服務。第三次和第四次失敗並丟擲異常,表明熔斷器已開啟。第五個請求被嘗試是因為該請求發生在最後一次失敗後 15 秒。第六次嘗試立即失敗,因為熔斷器立即進入了開啟狀態。

表示式求值 Advice

最後一個提供的 advice 類是 `o.s.i.handler.advice.ExpressionEvaluatingRequestHandlerAdvice`。這個 advice 比其他兩個更通用。它提供了一種機制,可以在傳送到端點的原始入站訊息上評估表示式。在成功或失敗後,都可以評估單獨的表示式。可選地,可以將包含評估結果以及輸入訊息的訊息傳送到訊息通道。

此 advice 的典型用例可能是與 `` 一起使用,例如在傳輸成功時將檔案移動到某個目錄,或者在失敗時移動到另一個目錄

該 advice 具有屬性,可以在成功時設定表示式,在失敗時設定表示式,併為每個情況設定相應的通道。對於成功的情況,傳送到 `successChannel` 的訊息是 `AdviceMessage`,其有效負載是表示式評估的結果。一個名為 `inputMessage` 的附加屬性包含傳送到處理器的原始訊息。傳送到 `failureChannel` 的訊息(當處理器丟擲異常時)是 `ErrorMessage`,其有效負載為 `MessageHandlingExpressionEvaluatingAdviceException`。與所有 `MessagingException` 例項一樣,此有效負載具有 `failedMessage` 和 `cause` 屬性,以及一個名為 `evaluationResult` 的附加屬性,其中包含表示式評估的結果。

從版本 5.1.3 開始,如果配置了通道但未提供表示式,則使用預設表示式對訊息的 `payload` 進行求值。

預設情況下,當在 advice 範圍內丟擲異常時,在評估任何 `failureExpression` 後,該異常會拋給呼叫者。如果您希望抑制丟擲異常,請將 `trapException` 屬性設定為 `true`。以下 advice 顯示瞭如何使用 Java DSL 配置 `advice`

@SpringBootApplication
public class EerhaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(EerhaApplication.class, args);
        MessageChannel in = context.getBean("advised.input", MessageChannel.class);
        in.send(new GenericMessage<>("good"));
        in.send(new GenericMessage<>("bad"));
        context.close();
    }

    @Bean
    public IntegrationFlow advised() {
        return f -> f.<String>handle((payload, headers) -> {
            if (payload.equals("good")) {
                return null;
            }
            else {
                throw new RuntimeException("some failure");
            }
        }, c -> c.advice(expressionAdvice()));
    }

    @Bean
    public Advice expressionAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setSuccessChannelName("success.input");
        advice.setOnSuccessExpressionString("payload + ' was successful'");
        advice.setFailureChannelName("failure.input");
        advice.setOnFailureExpressionString(
                "payload + ' was bad, with reason: ' + #exception.cause.message");
        advice.setTrapException(true);
        return advice;
    }

    @Bean
    public IntegrationFlow success() {
        return f -> f.handle(System.out::println);
    }

    @Bean
    public IntegrationFlow failure() {
        return f -> f.handle(System.out::println);
    }

}

限速器 Advice

限速器 advice (`RateLimiterRequestHandlerAdvice`) 可以確保端點不會因請求過多而過載。當超過限速時,請求將進入阻塞狀態。

此 advice 的典型用例可能是外部服務提供商不允許每分鐘超過 `n` 個請求的情況。

`RateLimiterRequestHandlerAdvice` 的實現完全基於 Resilience4j 專案,並且需要注入 `RateLimiter` 或 `RateLimiterConfig`。也可以配置預設值和/或自定義名稱。

以下示例配置了一個限速器 advice,限制為每 1 秒一個請求

@Bean
public RateLimiterRequestHandlerAdvice rateLimiterRequestHandlerAdvice() {
    return new RateLimiterRequestHandlerAdvice(RateLimiterConfig.custom()
            .limitRefreshPeriod(Duration.ofSeconds(1))
            .limitForPeriod(1)
            .build());
}

@ServiceActivator(inputChannel = "requestChannel", outputChannel = "resultChannel",
		adviceChain = "rateLimiterRequestHandlerAdvice")
public String handleRequest(String payload) {
    ...
}

快取 Advice

從 5.2 版本開始,引入了 `CacheRequestHandlerAdvice`。它基於 Spring Framework 中的快取抽象,並與 `@Caching` 註解家族提供的概念和功能保持一致。其內部邏輯基於 `CacheAspectSupport` 擴充套件,在 `AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage` 方法周圍進行快取操作的代理,並將請求 `Message` 作為引數。這個 advice 可以配置一個 SpEL 表示式或一個 `Function` 來計算快取鍵。請求 `Message` 可以作為 SpEL 求值上下文的根物件,或作為 `Function` 的輸入引數。預設情況下,使用請求訊息的 `payload` 作為快取鍵。當預設快取操作為 `CacheableOperation` 時,`CacheRequestHandlerAdvice` 必須配置 `cacheNames`,或者配置任意一組 `CacheOperation`s。每個 `CacheOperation` 都可以單獨配置,或者共享選項,例如 `CacheManager`、`CacheResolver` 和 `CacheErrorHandler`,這些可以從 `CacheRequestHandlerAdvice` 配置中重用。此配置功能類似於 Spring Framework 的 `@CacheConfig` 和 `@Caching` 註解組合。如果未提供 `CacheManager`,預設情況下會從 `CacheAspectSupport` 中的 `BeanFactory` 解析單個 bean。

以下示例配置了兩個使用不同快取操作集的 advice

@Bean
public CacheRequestHandlerAdvice cacheAdvice() {
    CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice(TEST_CACHE);
    cacheRequestHandlerAdvice.setKeyExpressionString("payload");
    return cacheRequestHandlerAdvice;
}

@Transformer(inputChannel = "transformerChannel", outputChannel = "nullChannel", adviceChain = "cacheAdvice")
public Object transform(Message<?> message) {
    ...
}

@Bean
public CacheRequestHandlerAdvice cachePutAndEvictAdvice() {
    CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice();
    cacheRequestHandlerAdvice.setKeyExpressionString("payload");
    CachePutOperation.Builder cachePutBuilder = new CachePutOperation.Builder();
    cachePutBuilder.setCacheName(TEST_PUT_CACHE);
    CacheEvictOperation.Builder cacheEvictBuilder = new CacheEvictOperation.Builder();
    cacheEvictBuilder.setCacheName(TEST_CACHE);
    cacheRequestHandlerAdvice.setCacheOperations(cachePutBuilder.build(), cacheEvictBuilder.build());
    return cacheRequestHandlerAdvice;
}

@ServiceActivator(inputChannel = "serviceChannel", outputChannel = "nullChannel",
    adviceChain = "cachePutAndEvictAdvice")
public Message<?> service(Message<?> message) {
    ...
}