冪等接收器企業整合模式

從版本 4.1 開始,Spring Integration 提供了 冪等接收器 企業整合模式的實現。它是一個功能性模式,所有冪等性邏輯都應該在應用程式中實現。然而,為了簡化決策過程,提供了 IdempotentReceiverInterceptor 元件。這是一個 AOP Advice,應用於 MessageHandler.handleMessage() 方法,可以根據其配置 過濾 請求訊息或將其標記為 重複

以前,您可以透過在 <filter/> 中使用自定義的 MessageSelector 來實現此模式(請參閱 Filter),例如。但是,由於此模式真正定義了端點的行為,而不是自身作為一個端點,因此冪等接收器實現不提供端點元件。相反,它應用於在應用程式中宣告的端點。

IdempotentReceiverInterceptor 的邏輯基於提供的 MessageSelector,如果訊息不被該選擇器接受,則會使用 duplicateMessage 標頭設定為 true 來豐富該訊息。目標 MessageHandler(或下游流)可以查詢此標頭以實現正確的冪等性邏輯。如果 IdempotentReceiverInterceptor 配置了 discardChannelthrowExceptionOnRejection = true,則重複訊息不會發送到目標 MessageHandler.handleMessage()。相反,它被丟棄。如果要丟棄(對)重複訊息不做任何操作,則應將 discardChannel 配置為 NullChannel,例如預設的 nullChannel bean。

為了在訊息之間保持狀態並提供比較訊息以實現冪等性的能力,我們提供了 MetadataStoreSelector。它接受一個 MessageProcessor 實現(根據 Message 建立查詢鍵)和一個可選的 ConcurrentMetadataStoreMetadata Store)。有關更多資訊,請參閱 MetadataStoreSelector Javadoc。您還可以透過使用額外的 MessageProcessor 來自定義 ConcurrentMetadataStorevalue。預設情況下,MetadataStoreSelector 使用 timestamp 訊息標頭。

通常,如果沒有鍵的現有值,選擇器會選擇一條訊息以接受。在某些情況下,比較鍵的當前值和新值以確定是否應接受訊息很有用。從版本 5.3 開始,提供了 compareValues 屬性,它引用了一個 BiPredicate<String, String>;第一個引數是舊值;返回 true 表示接受訊息並在 MetadataStore 中用新值替換舊值。這對於減少鍵的數量很有用;例如,在處理檔案中的行時,可以將檔名儲存在鍵中,將當前行號儲存在值中。然後,重新啟動後,可以跳過已處理的行。請參閱 冪等下游處理拆分檔案 以獲取示例。

為了方便起見,MetadataStoreSelector 選項可以直接在 <idempotent-receiver> 元件上配置。以下列表顯示了所有可能的屬性

<idempotent-receiver
        id=""  (1)
        endpoint=""  (2)
        selector=""  (3)
        discard-channel=""  (4)
        metadata-store=""  (5)
        key-strategy=""  (6)
        key-expression=""  (7)
        value-strategy=""  (8)
        value-expression=""  (9)
        compare-values="" (10)
        throw-exception-on-rejection="" />  (11)
1 IdempotentReceiverInterceptor bean 的 ID。可選。
2 此攔截器應用到的消費者端點名稱或模式。用逗號 (,) 分隔名稱(模式),例如 endpoint="aaa, bbb*, ccc, *ddd, eee*fff"。匹配這些模式的端點 bean 名稱將用於檢索目標端點的 MessageHandler bean(使用其 .handler 字尾),並且 IdempotentReceiverInterceptor 將應用於這些 bean。必填。
3 一個 MessageSelector bean 引用。與 metadata-storekey-strategy (key-expression) 互斥。當未提供 selector 時,需要 key-strategykey-strategy-expression 之一。
4 標識當 IdempotentReceiverInterceptor 不接受訊息時將訊息傳送到的通道。省略時,重複訊息將轉發到處理器,並帶有 duplicateMessage 標頭。可選。
5 一個 ConcurrentMetadataStore 引用。由底層的 MetadataStoreSelector 使用。與 selector 互斥。可選。預設的 MetadataStoreSelector 使用一個內部 SimpleMetadataStore,它不會在應用程式執行之間保持狀態。
6 一個 MessageProcessor 引用。由底層的 MetadataStoreSelector 使用。從請求訊息評估 idempotentKey。與 selectorkey-expression 互斥。當未提供 selector 時,需要 key-strategykey-strategy-expression 之一。
7 用於填充 ExpressionEvaluatingMessageProcessor 的 SpEL 表示式。由底層的 MetadataStoreSelector 使用。使用請求訊息作為評估上下文根物件來評估 idempotentKey。與 selectorkey-strategy 互斥。當未提供 selector 時,需要 key-strategykey-strategy-expression 之一。
8 一個 MessageProcessor 引用。由底層的 MetadataStoreSelector 使用。從請求訊息中評估 idempotentKeyvalue。與 selectorvalue-expression 互斥。預設情況下,'MetadataStoreSelector' 使用 'timestamp' 訊息標頭作為元資料 'value'。
9 一個 SpEL 表示式,用於填充 ExpressionEvaluatingMessageProcessor。由底層的 MetadataStoreSelector 使用。使用請求訊息作為評估上下文根物件來評估 idempotentKeyvalue。與 selectorvalue-strategy 互斥。預設情況下,'MetadataStoreSelector' 使用 'timestamp' 訊息標頭作為元資料 'value'。
10 BiPredicate<String, String> bean 的引用,該 bean 允許您透過比較鍵的舊值和新值來有選擇地選擇訊息;預設為 null
11 如果 IdempotentReceiverInterceptor 拒絕訊息是否丟擲異常。預設為 false。無論是否提供了 discard-channel,它都會被應用。

對於 Java 配置,Spring Integration 提供了方法級別的 @IdempotentReceiver 註解。它用於標記帶有訊息註解(@ServiceActivator@Router 等)的 方法,以指定哪些 IdempotentReceiverInterceptor 物件應用於此端點。以下示例展示瞭如何使用 @IdempotentReceiver 註解

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
   return new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
                                                    m.getHeaders().get(INVOICE_NBR_HEADER)));
}

@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService() {
    ....
}

當您使用 Java DSL 時,可以將攔截器新增到端點的 advice 鏈中,如下例所示

@Bean
public IntegrationFlow flow() {
    ...
        .handle("someBean", "someMethod",
            e -> e.advice(idempotentReceiverInterceptor()))
    ...
}
IdempotentReceiverInterceptor 僅設計用於 MessageHandler.handleMessage(Message<?>) 方法。從版本 4.3.1 開始,它實現了 HandleMessageAdvice,以 AbstractHandleMessageAdvice 作為基類,以實現更好的解耦。有關更多資訊,請參閱 處理訊息 Advice
© . This site is unofficial and not affiliated with VMware.