冪等接收者企業整合模式
從 4.1 版本開始,Spring Integration 提供了 冪等接收者 (Idempotent Receiver) 企業整合模式的一種實現。這是一個功能性模式,所有的冪等性邏輯應在應用程式中實現。然而,為了簡化決策過程,提供了 IdempotentReceiverInterceptor
元件。這是一個應用於 MessageHandler.handleMessage()
方法的 AOP Advice
,可以根據其配置來 `filter` 請求訊息或將其標記為 `duplicate`。
之前,你可以透過在 <filter/>
(參見 過濾器) 中使用自定義的 MessageSelector
來實現此模式,例如。然而,由於此模式真正定義的是端點的行為而非自身就是一個端點,冪等接收者實現不提供端點元件。相反,它被應用於在應用程式中宣告的端點。
IdempotentReceiverInterceptor
的邏輯基於提供的 MessageSelector
;如果訊息未被該選擇器接受,則會透過將 duplicateMessage
訊息頭設定為 true
來豐富該訊息。目標 MessageHandler
(或下游流)可以查閱此訊息頭以實現正確的冪等性邏輯。如果 IdempotentReceiverInterceptor
配置了 discardChannel
或 throwExceptionOnRejection = true
,則重複訊息不會發送到目標 MessageHandler.handleMessage()
,而是被丟棄。如果你想丟棄(對重複訊息不做任何處理),discardChannel
應配置一個 NullChannel
,例如預設的 nullChannel
bean。
為了在訊息之間維護狀態並提供比較訊息以實現冪等性的能力,我們提供了 MetadataStoreSelector
。它接受一個 MessageProcessor
實現(基於 Message
建立查詢鍵)和一個可選的 ConcurrentMetadataStore
(元資料儲存)。更多資訊請參閱 MetadataStoreSelector
Javadoc。你還可以透過額外的 MessageProcessor
定製 ConcurrentMetadataStore
的 value
。預設情況下,MetadataStoreSelector
使用 timestamp
訊息頭。
通常,如果鍵不存在現有值,選擇器會選擇訊息進行接受。在某些情況下,比較鍵的當前值和新值以確定是否應接受訊息會很有用。從 5.3 版本開始,提供了 compareValues
屬性,它引用了一個 BiPredicate<String, String>
;第一個引數是舊值;返回 true
表示接受訊息並在 MetadataStore
中用新值替換舊值。這對於減少鍵的數量很有用;例如,處理檔案中的行時,可以將檔名儲存在鍵中,將當前行號儲存在值中。然後,在重新啟動後,可以跳過已經處理的行。請參閱 拆分檔案中的冪等下游處理 以獲取示例。
為方便起見,MetadataStoreSelector
的選項可以直接在 <idempotent-receiver>
元件上配置。以下列表顯示了所有可能的屬性
<idempotent-receiver
id="" (1)
endpoint="" (2)
selector="" (3)
discard-channel="" (4)
metadata-store="" (5)
key-strategy="" (6)
key-expression="" (7)
value-strategy="" (8)
value-expression="" (9)
compare-values="" (10)
throw-exception-on-rejection="" /> (11)
1 | IdempotentReceiverInterceptor bean 的 ID。可選。 |
2 | 此攔截器應用的消費者端點名稱或模式。用逗號 (, ) 分隔名稱(模式),例如 endpoint="aaa, bbb*, ccc, *ddd, eee*fff" 。與這些模式匹配的端點 bean 名稱用於檢索目標端點的 MessageHandler bean(使用其 `.handler` 字尾),並將 IdempotentReceiverInterceptor 應用於這些 bean。必需。 |
3 | 一個 MessageSelector bean 引用。與 metadata-store 和 key-strategy (key-expression) 互斥。當未提供 selector 時,key-strategy 或 key-strategy-expression 之一是必需的。 |
4 | 指定當 IdempotentReceiverInterceptor 不接受訊息時將其傳送到的通道。省略時,重複訊息會帶著 duplicateMessage 訊息頭轉發給處理器。可選。 |
5 | 一個 ConcurrentMetadataStore 引用。由底層 MetadataStoreSelector 使用。與 selector 互斥。可選。預設的 MetadataStoreSelector 使用一個內部的 SimpleMetadataStore ,它不會在應用程式執行之間維護狀態。 |
6 | 一個 MessageProcessor 引用。由底層 MetadataStoreSelector 使用。從請求訊息評估 idempotentKey 。與 selector 和 key-expression 互斥。當未提供 selector 時,key-strategy 或 key-strategy-expression 之一是必需的。 |
7 | 用於填充 ExpressionEvaluatingMessageProcessor 的 SpEL 表示式。由底層 MetadataStoreSelector 使用。透過使用請求訊息作為評估上下文根物件來評估 idempotentKey 。與 selector 和 key-strategy 互斥。當未提供 selector 時,key-strategy 或 key-strategy-expression 之一是必需的。 |
8 | 一個 MessageProcessor 引用。由底層 MetadataStoreSelector 使用。從請求訊息評估 idempotentKey 的 value 。與 selector 和 value-expression 互斥。預設情況下,'MetadataStoreSelector' 使用 'timestamp' 訊息頭作為元資料 'value'。 |
9 | 用於填充 ExpressionEvaluatingMessageProcessor 的 SpEL 表示式。由底層 MetadataStoreSelector 使用。透過使用請求訊息作為評估上下文根物件來評估 idempotentKey 的 value 。與 selector 和 value-strategy 互斥。預設情況下,'MetadataStoreSelector' 使用 'timestamp' 訊息頭作為元資料 'value'。 |
10 | 引用一個 BiPredicate<String, String> bean,它允許你透過比較鍵的舊值和新值來有選擇地接受訊息;預設情況下為 null 。 |
11 | 如果 IdempotentReceiverInterceptor 拒絕訊息是否丟擲異常。預設為 false 。無論是否提供了 discard-channel ,此設定都有效。 |
對於 Java 配置,Spring Integration 提供了方法級別的 @IdempotentReceiver
註解。它用於標記帶有訊息處理註解(如 @ServiceActivator
、@Router
等)的 method
,以指定哪些 IdempotentReceiverInterceptor
物件應用於此端點。以下示例展示瞭如何使用 @IdempotentReceiver
註解
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
m.getHeaders().get(INVOICE_NBR_HEADER)));
}
@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService() {
....
}
使用 Java DSL 時,可以將攔截器新增到端點的 advice 鏈中,示例如下
@Bean
public IntegrationFlow flow() {
...
.handle("someBean", "someMethod",
e -> e.advice(idempotentReceiverInterceptor()))
...
}
IdempotentReceiverInterceptor 僅為 MessageHandler.handleMessage(Message<?>) 方法設計。從 4.3.1 版本開始,它實現了 HandleMessageAdvice ,並以 AbstractHandleMessageAdvice 作為基類,以實現更好的解耦。有關更多資訊,請參閱 處理訊息 Advice。 |