提供的 Advice 類
除了提供應用 AOP 建議類的通用機制外,Spring Integration 還提供了以下開箱即用的建議實現:
-
RequestHandlerRetryAdvice(在重試建議中描述) -
RequestHandlerCircuitBreakerAdvice(在斷路器建議中描述) -
ExpressionEvaluatingRequestHandlerAdvice(在表示式建議中描述) -
RateLimiterRequestHandlerAdvice(在速率限制器建議中描述) -
CacheRequestHandlerAdvice(在快取建議中描述) -
ReactiveRequestHandlerAdvice(在響應式建議中描述) -
ContextHolderRequestHandlerAdvice(在上下文持有者建議中描述) -
LockRequestHandlerAdvice(在鎖定建議中描述)
重試建議
重試建議(o.s.i.handler.advice.RequestHandlerRetryAdvice)利用了 Spring Framework 中重試支援提供的豐富重試機制。此建議的核心元件是 RetryTemplate,它允許配置複雜的重試場景,包括 RetryPolicy 以及 RecoveryCallback 策略,以確定重試耗盡時要採取的操作。
- 無狀態重試
-
無狀態重試是指重試活動完全在建議中處理的情況。執行緒會暫停(如果配置如此),然後重試該操作。
- 有狀態重試
-
有狀態重試是指重試狀態在建議中管理,但會丟擲異常並且呼叫者重新提交請求的情況。有狀態重試的一個例子是,我們希望訊息發起者(例如 JMS)負責重新提交,而不是在當前執行緒上執行。有狀態重試需要某種機制來檢測重試的提交。為此,
RequestHandlerRetryAdvice公開了stateKeyFunction、newMessagePredicate和stateCacheSize屬性。其中後兩個屬性僅在提供了第一個屬性時才有意義。本質上,stateKeyFunction是將RequestHandlerRetryAdvice邏輯從無狀態切換到有狀態的指示器。newMessagePredicate的含義是根據要處理的訊息重新整理鍵的現有重試狀態。stateCacheSize預設為100,當有更多新的重試狀態到來時,較舊的條目會從快取中刪除。也許這些舊訊息不再從上游流重新傳遞,例如,訊息代理根據其重新傳遞策略將這些訊息死信。
| 預設的退避行為是不退避。重試會立即嘗試。使用導致執行緒在嘗試之間暫停的退避策略可能會導致效能問題,包括過多的記憶體使用和執行緒飢餓。在高併發環境中,應謹慎使用退避策略。 |
配置重試建議
本節中的示例使用以下始終丟擲異常的 @ServiceActivator
public class FailingService {
@ServiceActivator(inputChannel = "input", adviceChain = "retryAdvice")
public void service(String message) {
throw new RuntimeException("error");
}
}
- 簡單的無狀態重試
-
預設的
RetryPolicy是嘗試三次,加上對目標MessageHandler的原始呼叫。預設情況下沒有退避,因此三次嘗試會背靠背地進行,嘗試之間沒有延遲。沒有RecoveryCallback,因此在最終重試失敗後,結果是將異常拋給呼叫者。在 Spring Integration 環境中,此最終異常可以透過在入站端點上使用error-channel來處理。以下示例使用RequestHandlerRetryAdvice的預設配置@Bean RequestHandlerRetryAdvice retryAdvice() { return new RequestHandlerRetryAdvice(); } - 帶恢復功能的簡單無狀態重試
-
以下示例在前一個示例中添加了一個
RecoveryCallback,並使用ErrorMessageSendingRecoverer將ErrorMessage傳送到一個通道@Bean RequestHandlerRetryAdvice retryAdvice(MessageChannel recoveryChannel) { RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice(); requestHandlerRetryAdvice.setRecoveryCallback(new ErrorMessageSendingRecoverer(recoveryChannel)); return requestHandlerRetryAdvice; } - 具有自定義策略和恢復功能的無狀態重試
-
為了更復雜,可以為
RequestHandlerRetryAdvice提供自定義的RetryPolicy。此示例繼續使用簡單的RetryPolicy,但將嘗試次數增加到四次。它還添加了一個ExponentialBackoff,其中第一次重試等待一秒,第二次等待五秒,第三次等待 25 秒(總共四次嘗試)。以下清單顯示了此類配置的示例@Bean RequestHandlerRetryAdvice retryAdvice() { RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice(); requestHandlerRetryAdvice.setRecoveryCallback(new ErrorMessageSendingRecoverer(recoveryChannel())); RetryPolicy retryPolicy = RetryPolicy.builder() .maxRetries(4) .delay(Duration.ofSeconds(1)) .multiplier(5.0) .maxDelay(Duration.ofMinutes(1)) .build(); requestHandlerRetryAdvice.setRetryPolicy(retryPolicy); return requestHandlerRetryAdvice; } - 無狀態重試的名稱空間支援
-
以下示例演示瞭如何使用 Spring Integration XML 名稱空間及其自定義標籤配置
RequestHandlerRetryAdvice<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <ref bean="retrier" /> </int:request-handler-advice-chain> </int:service-activator> <int:handler-retry-advice id="retrier" max-retries="4" recovery-channel="myErrorChannel"> <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" /> </int:handler-retry-advice>在前面的示例中,建議被定義為頂級 bean,以便可以在多個
request-handler-advice-chain例項中使用。您也可以直接在鏈中定義建議,如下例所示<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <int:retry-advice id="retrier" max-retries="4" recovery-channel="myErrorChannel"> <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" /> </int:retry-advice> </int:request-handler-advice-chain> </int:service-activator><handler-retry-advice>可以有一個<fixed-back-off>或<exponential-back-off>子元素,也可以沒有子元素。沒有子元素的<handler-retry-advice>不使用退避。如果沒有recovery-channel,則在重試耗盡時丟擲異常。名稱空間只能用於無狀態重試。對於更復雜的環境(自定義策略等),請使用正常的
<bean>定義。 - 帶恢復功能的簡單有狀態重試
-
要使重試有狀態,必須為
RequestHandlerRetryAdvice例項提供Function<Message<?>, Object> stateKeyFunction。此函式用於將訊息識別為重新提交,以便RequestHandlerRetryAdvice可以確定此訊息的當前重試狀態。有狀態重試背後的想法是不阻塞當前執行緒,而是快取此訊息的重試狀態並將MessageHandler失敗重新拋回給呼叫者。通常,這適用於能夠重新提交(或重新傳遞)事件的訊息發起者,例如,使用nack的 RabbitMQ 或具有 seek 功能的 Apache Kafka 等訊息代理;或者在消費回滾後的 JMS。如果還沒有快取狀態(或者Predicate<Message<?>> newMessagePredicate為該訊息返回true),則MessageHandler呼叫被視為第一次呼叫,在其失敗時,基於BackOffExecution的內部RetryState將快取到上述鍵下。在下一條訊息到達時,快取狀態為Thread.sleep()提供一個退避間隔,然後嘗試呼叫MessageHandler。如果此退避間隔等於BackOffExecution.STOP(例如,已達到maxAttempts),則表示此訊息不再重試:整個重試周期被視為已耗盡,並將相應的RetryException拋回給呼叫者,或者在提供了RecoveryCallback時用於呼叫RecoveryCallback。通常,異常處理邏輯和退避執行與無狀態行為類似,唯一的區別是執行緒不會在所有maxAttempts期間阻塞。由訊息發起者來重新傳遞訊息以進行下一次重試呼叫。
斷路器建議
斷路器模式的一般思想是,如果服務當前不可用,則不要浪費時間(和資源)嘗試使用它。o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice 實現了此模式。當斷路器處於關閉狀態時,端點嘗試呼叫服務。如果一定數量的連續嘗試失敗,斷路器將進入開啟狀態。當處於開啟狀態時,新請求“快速失敗”,並且在一段時間過期之前不會嘗試呼叫服務。
當時間到期時,斷路器被設定為半開狀態。在此狀態下,即使只有一次嘗試失敗,斷路器也會立即進入開啟狀態。如果嘗試成功,斷路器將進入關閉狀態,在這種情況下,它不會再次進入開啟狀態,直到再次發生配置的連續失敗次數。任何成功的嘗試都會將狀態重置為零失敗,以確定斷路器何時可能再次進入開啟狀態。
通常,此建議可能用於外部服務,其中可能需要一些時間才能失敗,例如嘗試建立網路連線時超時。
RequestHandlerCircuitBreakerAdvice 有兩個屬性:threshold 和 halfOpenAfter。threshold 屬性表示斷路器進入開啟狀態所需的連續失敗次數。它預設為 5。halfOpenAfter 屬性表示在上次失敗後,斷路器等待多長時間才嘗試另一個請求。預設值為 1000 毫秒。
以下示例配置了一個斷路器並顯示了其 DEBUG 和 ERROR 輸出
<int:service-activator input-channel="input" ref="failer" method="service">
<int:request-handler-advice-chain>
<bean class="o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2" />
<property name="halfOpenAfter" value="12000" />
</bean>
</int:request-handler-advice-chain>
</int:service-activator>
05.617 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
05.638 ERROR [task-scheduler-1]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
10.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
10.600 ERROR [task-scheduler-2]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
15.598 DEBUG [task-scheduler-3]preSend on channel 'input', message: [Payload=...]
15.599 ERROR [task-scheduler-3]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
20.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
20.598 ERROR [task-scheduler-2]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
25.598 DEBUG [task-scheduler-5]preSend on channel 'input', message: [Payload=...]
25.601 ERROR [task-scheduler-5]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
30.598 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=foo...]
30.599 ERROR [task-scheduler-1]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
在前面的示例中,閾值設定為 2,halfOpenAfter 設定為 12 秒。每 5 秒到達一個新請求。前兩次嘗試呼叫了服務。第三次和第四次嘗試失敗,並丟擲異常,表明斷路器已開啟。第五個請求被嘗試,因為該請求是在上次失敗後 15 秒進行的。第六次嘗試立即失敗,因為斷路器立即進入開啟狀態。
表示式求值建議
最後一個提供的建議類是 o.s.i.handler.advice.ExpressionEvaluatingRequestHandlerAdvice。此建議比其他兩個建議更通用。它提供了一種機制,用於評估傳送到端點的原始入站訊息上的表示式。成功或失敗後,可以使用單獨的表示式進行評估。可選地,包含評估結果和輸入訊息的訊息可以傳送到訊息通道。
此建議的典型用例可能是與 <ftp:outbound-channel-adapter/> 結合使用,也許在傳輸成功時將檔案移動到某個目錄,或者在失敗時移動到另一個目錄
該建議具有在成功時設定表示式、在失敗時設定表示式以及各自通道的屬性。對於成功的情況,傳送到 successChannel 的訊息是 AdviceMessage,其有效負載是表示式求值的結果。一個附加屬性,稱為 inputMessage,包含傳送給處理程式的原始訊息。傳送到 failureChannel 的訊息(當處理程式丟擲異常時)是 ErrorMessage,其有效負載為 MessageHandlingExpressionEvaluatingAdviceException。與所有 MessagingException 例項一樣,此有效負載具有 failedMessage 和 cause 屬性,以及一個名為 evaluationResult 的附加屬性,其中包含表示式求值的結果。
從版本 5.1.3 開始,如果配置了通道但未提供表示式,則使用預設表示式求值為訊息的 payload。 |
預設情況下,當在建議範圍內丟擲異常時,在評估任何 failureExpression 之後,該異常會拋給呼叫者。如果您希望抑制丟擲異常,請將 trapException 屬性設定為 true。以下建議演示瞭如何使用 Java DSL 配置 advice
@SpringBootApplication
public class EerhaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(EerhaApplication.class, args);
MessageChannel in = context.getBean("advised.input", MessageChannel.class);
in.send(new GenericMessage<>("good"));
in.send(new GenericMessage<>("bad"));
context.close();
}
@Bean
public IntegrationFlow advised() {
return f -> f.<String>handle((payload, headers) -> {
if (payload.equals("good")) {
return null;
}
else {
throw new RuntimeException("some failure");
}
}, c -> c.advice(expressionAdvice()));
}
@Bean
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannelName("success.input");
advice.setOnSuccessExpressionString("payload + ' was successful'");
advice.setFailureChannelName("failure.input");
advice.setOnFailureExpressionString(
"payload + ' was bad, with reason: ' + #exception.cause.message");
advice.setTrapException(true);
return advice;
}
@Bean
public IntegrationFlow success() {
return f -> f.handle(System.out::println);
}
@Bean
public IntegrationFlow failure() {
return f -> f.handle(System.out::println);
}
}
速率限制器建議
速率限制器建議(RateLimiterRequestHandlerAdvice)可確保端點不會因請求而過載。當超出速率限制時,請求將進入阻塞狀態。
此建議的典型用例可能是外部服務提供商不允許每分鐘的請求數超過 n。
RateLimiterRequestHandlerAdvice 實現完全基於 Resilience4j 專案,並且需要注入 RateLimiter 或 RateLimiterConfig。也可以使用預設值和/或自定義名稱進行配置。
以下示例配置了一個速率限制器建議,每 1 秒請求一次
@Bean
public RateLimiterRequestHandlerAdvice rateLimiterRequestHandlerAdvice() {
return new RateLimiterRequestHandlerAdvice(RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofSeconds(1))
.limitForPeriod(1)
.build());
}
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "resultChannel",
adviceChain = "rateLimiterRequestHandlerAdvice")
public String handleRequest(String payload) {
...
}
快取建議
從版本 5.2 開始,引入了 CacheRequestHandlerAdvice。它基於 Spring Framework 中的快取抽象,並與 @Caching 註解族提供的概念和功能保持一致。內部邏輯基於 CacheAspectSupport 擴充套件,其中快取操作的代理是在 AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage 方法周圍完成的,以請求 Message<?> 作為引數。此建議可以使用 SpEL 表示式或 Function 配置以評估快取鍵。請求 Message<?> 可作為 SpEL 評估上下文的根物件,或作為 Function 輸入引數。預設情況下,請求訊息的 payload 用於快取鍵。CacheRequestHandlerAdvice 必須使用 cacheNames 進行配置,當預設快取操作是 CacheableOperation 時,或者使用一組任意的 CacheOperation。每個 CacheOperation 都可以單獨配置,也可以共享選項,例如 CacheManager、CacheResolver 和 CacheErrorHandler,可以從 CacheRequestHandlerAdvice 配置中重用。此配置功能類似於 Spring Framework 的 @CacheConfig 和 @Caching 註解組合。如果未提供 CacheManager,則預設情況下從 CacheAspectSupport 中的 BeanFactory 解析單個 bean。
以下示例配置了兩個具有不同快取操作集的建議
@Bean
public CacheRequestHandlerAdvice cacheAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice(TEST_CACHE);
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
return cacheRequestHandlerAdvice;
}
@Transformer(inputChannel = "transformerChannel", outputChannel = "nullChannel", adviceChain = "cacheAdvice")
public Object transform(Message<?> message) {
...
}
@Bean
public CacheRequestHandlerAdvice cachePutAndEvictAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice();
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
CachePutOperation.Builder cachePutBuilder = new CachePutOperation.Builder();
cachePutBuilder.setCacheName(TEST_PUT_CACHE);
CacheEvictOperation.Builder cacheEvictBuilder = new CacheEvictOperation.Builder();
cacheEvictBuilder.setCacheName(TEST_CACHE);
cacheRequestHandlerAdvice.setCacheOperations(cachePutBuilder.build(), cacheEvictBuilder.build());
return cacheRequestHandlerAdvice;
}
@ServiceActivator(inputChannel = "serviceChannel", outputChannel = "nullChannel",
adviceChain = "cachePutAndEvictAdvice")
public Message<?> service(Message<?> message) {
...
}