冪等接收者企業整合模式

從 4.1 版本開始,Spring Integration 提供了 冪等接收者 (Idempotent Receiver) 企業整合模式的一種實現。這是一個功能性模式,所有的冪等性邏輯應在應用程式中實現。然而,為了簡化決策過程,提供了 IdempotentReceiverInterceptor 元件。這是一個應用於 MessageHandler.handleMessage() 方法的 AOP Advice,可以根據其配置來 `filter` 請求訊息或將其標記為 `duplicate`。

之前,你可以透過在 <filter/> (參見 過濾器) 中使用自定義的 MessageSelector 來實現此模式,例如。然而,由於此模式真正定義的是端點的行為而非自身就是一個端點,冪等接收者實現不提供端點元件。相反,它被應用於在應用程式中宣告的端點。

IdempotentReceiverInterceptor 的邏輯基於提供的 MessageSelector;如果訊息未被該選擇器接受,則會透過將 duplicateMessage 訊息頭設定為 true 來豐富該訊息。目標 MessageHandler(或下游流)可以查閱此訊息頭以實現正確的冪等性邏輯。如果 IdempotentReceiverInterceptor 配置了 discardChannelthrowExceptionOnRejection = true,則重複訊息不會發送到目標 MessageHandler.handleMessage(),而是被丟棄。如果你想丟棄(對重複訊息不做任何處理),discardChannel 應配置一個 NullChannel,例如預設的 nullChannel bean。

為了在訊息之間維護狀態並提供比較訊息以實現冪等性的能力,我們提供了 MetadataStoreSelector。它接受一個 MessageProcessor 實現(基於 Message 建立查詢鍵)和一個可選的 ConcurrentMetadataStore元資料儲存)。更多資訊請參閱 MetadataStoreSelector Javadoc。你還可以透過額外的 MessageProcessor 定製 ConcurrentMetadataStorevalue。預設情況下,MetadataStoreSelector 使用 timestamp 訊息頭。

通常,如果鍵不存在現有值,選擇器會選擇訊息進行接受。在某些情況下,比較鍵的當前值和新值以確定是否應接受訊息會很有用。從 5.3 版本開始,提供了 compareValues 屬性,它引用了一個 BiPredicate<String, String>;第一個引數是舊值;返回 true 表示接受訊息並在 MetadataStore 中用新值替換舊值。這對於減少鍵的數量很有用;例如,處理檔案中的行時,可以將檔名儲存在鍵中,將當前行號儲存在值中。然後,在重新啟動後,可以跳過已經處理的行。請參閱 拆分檔案中的冪等下游處理 以獲取示例。

為方便起見,MetadataStoreSelector 的選項可以直接在 <idempotent-receiver> 元件上配置。以下列表顯示了所有可能的屬性

<idempotent-receiver
        id=""  (1)
        endpoint=""  (2)
        selector=""  (3)
        discard-channel=""  (4)
        metadata-store=""  (5)
        key-strategy=""  (6)
        key-expression=""  (7)
        value-strategy=""  (8)
        value-expression=""  (9)
        compare-values="" (10)
        throw-exception-on-rejection="" />  (11)
1 IdempotentReceiverInterceptor bean 的 ID。可選。
2 此攔截器應用的消費者端點名稱或模式。用逗號 (,) 分隔名稱(模式),例如 endpoint="aaa, bbb*, ccc, *ddd, eee*fff"。與這些模式匹配的端點 bean 名稱用於檢索目標端點的 MessageHandler bean(使用其 `.handler` 字尾),並將 IdempotentReceiverInterceptor 應用於這些 bean。必需。
3 一個 MessageSelector bean 引用。與 metadata-storekey-strategy (key-expression) 互斥。當未提供 selector 時,key-strategykey-strategy-expression 之一是必需的。
4 指定當 IdempotentReceiverInterceptor 不接受訊息時將其傳送到的通道。省略時,重複訊息會帶著 duplicateMessage 訊息頭轉發給處理器。可選。
5 一個 ConcurrentMetadataStore 引用。由底層 MetadataStoreSelector 使用。與 selector 互斥。可選。預設的 MetadataStoreSelector 使用一個內部的 SimpleMetadataStore,它不會在應用程式執行之間維護狀態。
6 一個 MessageProcessor 引用。由底層 MetadataStoreSelector 使用。從請求訊息評估 idempotentKey。與 selectorkey-expression 互斥。當未提供 selector 時,key-strategykey-strategy-expression 之一是必需的。
7 用於填充 ExpressionEvaluatingMessageProcessor 的 SpEL 表示式。由底層 MetadataStoreSelector 使用。透過使用請求訊息作為評估上下文根物件來評估 idempotentKey。與 selectorkey-strategy 互斥。當未提供 selector 時,key-strategykey-strategy-expression 之一是必需的。
8 一個 MessageProcessor 引用。由底層 MetadataStoreSelector 使用。從請求訊息評估 idempotentKeyvalue。與 selectorvalue-expression 互斥。預設情況下,'MetadataStoreSelector' 使用 'timestamp' 訊息頭作為元資料 'value'。
9 用於填充 ExpressionEvaluatingMessageProcessor 的 SpEL 表示式。由底層 MetadataStoreSelector 使用。透過使用請求訊息作為評估上下文根物件來評估 idempotentKeyvalue。與 selectorvalue-strategy 互斥。預設情況下,'MetadataStoreSelector' 使用 'timestamp' 訊息頭作為元資料 'value'。
10 引用一個 BiPredicate<String, String> bean,它允許你透過比較鍵的舊值和新值來有選擇地接受訊息;預設情況下為 null
11 如果 IdempotentReceiverInterceptor 拒絕訊息是否丟擲異常。預設為 false。無論是否提供了 discard-channel,此設定都有效。

對於 Java 配置,Spring Integration 提供了方法級別的 @IdempotentReceiver 註解。它用於標記帶有訊息處理註解(如 @ServiceActivator@Router 等)的 method,以指定哪些 IdempotentReceiverInterceptor 物件應用於此端點。以下示例展示瞭如何使用 @IdempotentReceiver 註解

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
   return new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
                                                    m.getHeaders().get(INVOICE_NBR_HEADER)));
}

@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService() {
    ....
}

使用 Java DSL 時,可以將攔截器新增到端點的 advice 鏈中,示例如下

@Bean
public IntegrationFlow flow() {
    ...
        .handle("someBean", "someMethod",
            e -> e.advice(idempotentReceiverInterceptor()))
    ...
}
IdempotentReceiverInterceptor 僅為 MessageHandler.handleMessage(Message<?>) 方法設計。從 4.3.1 版本開始,它實現了 HandleMessageAdvice,並以 AbstractHandleMessageAdvice 作為基類,以實現更好的解耦。有關更多資訊,請參閱 處理訊息 Advice