聚合器

聚合器與拆分器基本是映象,它是一種訊息處理器,接收多個訊息並將它們組合成一個訊息。實際上,聚合器通常是包含拆分器的管道中的下游消費者。

從技術上講,聚合器比拆分器更復雜,因為它是有狀態的。它必須儲存要聚合的訊息,並確定何時完成的訊息組可以進行聚合。為此,它需要一個 MessageStore

功能

聚合器透過關聯和儲存相關訊息組,直到該組被認為是完整的。此時,聚合器透過處理整個組來建立一個單一訊息,並將聚合後的訊息作為輸出傳送。

實現聚合器需要提供執行聚合的邏輯(即,從多個訊息建立單個訊息)。兩個相關概念是關聯和釋放。

關聯確定訊息如何分組進行聚合。在 Spring Integration 中,關聯預設基於 IntegrationMessageHeaderAccessor.CORRELATION_ID 訊息頭。具有相同 IntegrationMessageHeaderAccessor.CORRELATION_ID 的訊息被分組在一起。但是,您可以自定義關聯策略,以允許其他方式指定訊息應如何分組。為此,您可以實現 CorrelationStrategy(本章稍後介紹)。

為了確定訊息組何時準備好進行處理,會諮詢 ReleaseStrategy。聚合器的預設釋放策略是當序列中包含的所有訊息都存在時釋放一個組,基於 IntegrationMessageHeaderAccessor.SEQUENCE_SIZE 頭。您可以透過提供對自定義 ReleaseStrategy 實現的引用來覆蓋此預設策略。

程式設計模型

聚合 API 由多個類組成

  • 介面 MessageGroupProcessor 及其子類:MethodInvokingAggregatingMessageGroupProcessorExpressionEvaluatingMessageGroupProcessor

  • ReleaseStrategy 介面及其預設實現:SimpleSequenceSizeReleaseStrategy

  • CorrelationStrategy 介面及其預設實現:HeaderAttributeCorrelationStrategy

AggregatingMessageHandler

AggregatingMessageHandler (AbstractCorrelatingMessageHandler 的子類) 是 MessageHandler 實現,封裝了聚合器(以及其他關聯用例)的常見功能,如下所示

  • 將訊息關聯到要聚合的組中

  • MessageStore 中維護這些訊息,直到組可以被釋放

  • 決定何時可以釋放組

  • 將已釋放的組聚合為單個訊息

  • 識別並響應已過期的組

決定訊息應如何分組的責任委託給 CorrelationStrategy 例項。決定訊息組是否可以釋放的責任委託給 ReleaseStrategy 例項。

以下清單顯示了基類 AbstractAggregatingMessageGroupProcessor 的簡要亮點(實現 aggregatePayloads 方法的責任留給開發人員)

public abstract class AbstractAggregatingMessageGroupProcessor
              implements MessageGroupProcessor {

    protected Map<String, Object> aggregateHeaders(MessageGroup group) {
        // default implementation exists
    }

    protected abstract Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders);

}

請參閱 DefaultAggregatingMessageGroupProcessorExpressionEvaluatingMessageGroupProcessorMethodInvokingMessageGroupProcessor 作為 AbstractAggregatingMessageGroupProcessor 的開箱即用實現。

從 5.2 版本開始,AbstractAggregatingMessageGroupProcessor 提供了一個 Function<MessageGroup, Map<String, Object>> 策略,用於合併和計算輸出訊息的頭部。DefaultAggregateHeadersFunction 實現提供了邏輯,該邏輯返回組中沒有衝突的所有頭部;組中一個或多個訊息中缺失的頭部不被視為衝突。衝突的頭部將被忽略。與新引入的 DelegatingMessageGroupProcessor 一起,此函式用於任何任意(非 AbstractAggregatingMessageGroupProcessorMessageGroupProcessor 實現。本質上,框架將提供的函式注入到 AbstractAggregatingMessageGroupProcessor 例項中,並將所有其他實現包裝到 DelegatingMessageGroupProcessor 中。AbstractAggregatingMessageGroupProcessorDelegatingMessageGroupProcessor 之間的邏輯差異在於,後者在呼叫委託策略之前不預先計算頭部,並且如果委託返回 MessageAbstractIntegrationMessageBuilder,則不呼叫該函式。在這種情況下,框架假定目標實現已負責生成一組正確的頭部並填充到返回結果中。Function<MessageGroup, Map<String, Object>> 策略作為 XML 配置的 headers-function 引用屬性、Java DSL 的 AggregatorSpec.headersFunction() 選項以及純 Java 配置的 AggregatorFactoryBean.setHeadersFunction() 可用。

CorrelationStrategyAbstractCorrelatingMessageHandler 擁有,並根據 IntegrationMessageHeaderAccessor.CORRELATION_ID 訊息頭具有預設值,如下例所示

public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
        CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
    ...
    this.correlationStrategy = correlationStrategy == null ?
        new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) : correlationStrategy;
    this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy;
    ...
}

至於訊息組的實際處理,預設實現是 DefaultAggregatingMessageGroupProcessor。它建立一個單個 Message,其負載是給定組接收到的負載的 List。這對於帶有拆分器、釋出-訂閱通道或上游接收者列表路由器的簡單分散-聚合實現非常有效。

在這種情況下使用釋出-訂閱通道或接收者列表路由器時,請務必啟用 apply-sequence 標誌。這樣做會新增必要的頭部:CORRELATION_IDSEQUENCE_NUMBERSEQUENCE_SIZE。此行為在 Spring Integration 中預設對拆分器啟用,但不對釋出-訂閱通道或接收者列表路由器啟用,因為這些元件可以在不需要這些頭部的各種上下文中使。

在為應用程式實現特定的聚合器策略時,您可以擴充套件 AbstractAggregatingMessageGroupProcessor 並實現 aggregatePayloads 方法。然而,有更好的解決方案,與 API 的耦合度更低,用於實現聚合邏輯,可以透過 XML 或註解進行配置。

通常,任何 POJO 都可以實現聚合演算法,如果它提供一個接受單個 java.util.List 作為引數的方法(也支援引數化列表)。此方法按如下方式呼叫以聚合訊息

  • 如果引數是 java.util.Collection<T> 並且引數型別 T 可以賦值給 Message,則將為聚合累積的整個訊息列表傳送到聚合器。

  • 如果引數是未引數化的 java.util.Collection 或引數型別不能賦值給 Message,則方法接收累積訊息的負載。

  • 如果返回型別不能賦值給 Message,則將其視為由框架自動建立的 Message 的負載。

為了程式碼簡潔並推廣低耦合、可測試性等最佳實踐,實現聚合邏輯的首選方法是透過 POJO 並使用 XML 或註解支援在應用程式中進行配置。

從 5.3 版本開始,處理訊息組後,AbstractCorrelatingMessageHandler 對具有多個巢狀級別的拆分器-聚合器場景執行 MessageBuilder.popSequenceDetails() 訊息頭修改。這僅在訊息組釋放結果不是訊息集合的情況下完成。在這種情況下,目標 MessageGroupProcessor 負責在構建這些訊息時呼叫 MessageBuilder.popSequenceDetails()

如果 MessageGroupProcessor 返回一個 Message,只有當 sequenceDetails 與組中的第一個訊息匹配時,才會在輸出訊息上執行 MessageBuilder.popSequenceDetails()。(以前,這隻在 MessageGroupProcessor 返回純負載或 AbstractIntegrationMessageBuilder 時才完成。)

此功能可以透過新的 popSequence 布林屬性控制,因此在某些情況下,當關聯詳細資訊未由標準拆分器填充時,可以停用 MessageBuilder.popSequenceDetails()。此屬性本質上撤銷了最近上游 AbstractMessageSplitterapplySequence = true 所做的事情。有關更多資訊,請參閱 拆分器

SimpleMessageGroup.getMessages() 方法返回一個 unmodifiableCollection。因此,如果聚合 POJO 方法具有 Collection<Message> 引數,則傳入的引數就是該 Collection 例項,並且當您將 SimpleMessageStore 用於聚合器時,該原始 Collection<Message> 在釋放組後被清除。因此,如果 POJO 中的 Collection<Message> 變數超出聚合器,它也會被清除。如果您希望僅將該集合原樣釋放以供進一步處理,則必須構建一個新的 Collection(例如,new ArrayList<Message>(messages))。從 4.3 版本開始,框架不再將訊息複製到新集合中,以避免不必要的額外物件建立。

在 4.2 版本之前,無法透過 XML 配置提供 MessageGroupProcessor。只能使用 POJO 方法進行聚合。現在,如果框架檢測到引用的(或內部)bean 實現了 MessageProcessor,則將其用作聚合器的輸出處理器。

如果您希望從自定義 MessageGroupProcessor 釋放物件集合作為訊息的負載,您的類應該擴充套件 AbstractAggregatingMessageGroupProcessor 並實現 aggregatePayloads()

此外,自 4.2 版本以來,提供了 SimpleMessageGroupProcessor。它返回組中的訊息集合,如前所述,這會導致釋放的訊息單獨傳送。

這使得聚合器可以作為訊息屏障,其中到達的訊息被保留,直到釋放策略觸發並將組作為一系列單獨的訊息釋放。

從 6.0 版本開始,上述拆分行為僅在組處理器為 SimpleMessageGroupProcessor 時有效。否則,對於任何其他返回 Collection<Message>MessageGroupProcessor 實現,只會發出一個回覆訊息,其負載是整個訊息集合。這種邏輯是由聚合器的規範目的決定的——透過某個鍵收集請求訊息並生成一個單一的組合訊息。

在 6.5 版本之前,如果 MessageGroupProcessor(通常是 DSL 中的 lambda)返回一個負載集合,AbstractCorrelatingMessageHandler 會因 IllegalArgumentException 而失敗,宣告只允許訊息集合。從現在起,這種限制被取消,返回的負載集合將作為聚合器發出的單個回覆訊息,其中只包含來自最後一條請求訊息的頭部。如果需要同時進行頭部聚合和負載集合,建議使用 AbstractAggregatingMessageGroupProcessor 實現,而不是普通的 MessageGroupProcessor 函式式介面。

ReleaseStrategy

ReleaseStrategy 介面定義如下

public interface ReleaseStrategy {

  boolean canRelease(MessageGroup group);

}

通常,任何 POJO 都可以實現完成決策邏輯,如果它提供一個接受單個 java.util.List 作為引數(也支援引數化列表)並返回布林值的方法。此方法在每條新訊息到達後呼叫,以決定組是否完成,如下所示

  • 如果引數是 java.util.List<T> 並且引數型別 T 可以賦值給 Message,則組中累積的整個訊息列表將傳送到該方法。

  • 如果引數是未引數化的 java.util.List 或引數型別不能賦值給 Message,則方法接收累積訊息的負載。

  • 如果訊息組已準備好進行聚合,則該方法必須返回 true,否則返回 false。

以下示例展示瞭如何將 @ReleaseStrategy 註解用於 Message 型別的 List

public class MyReleaseStrategy {

    @ReleaseStrategy
    public boolean canMessagesBeReleased(List<Message<?>>) {...}
}

以下示例展示瞭如何將 @ReleaseStrategy 註解用於 String 型別的 List

public class MyReleaseStrategy {

    @ReleaseStrategy
    public boolean canMessagesBeReleased(List<String>) {...}
}

根據前面兩個示例中的簽名,基於 POJO 的釋放策略會傳遞一個尚未釋放的訊息 Collection(如果您需要訪問整個 Message)或一個負載物件 Collection(如果型別引數不是 Message)。這滿足了大多數用例。但是,如果出於某種原因,您需要訪問完整的 MessageGroup,則應提供 ReleaseStrategy 介面的實現。

在處理可能很大的組時,您應該瞭解這些方法是如何呼叫的,因為在組釋放之前,釋放策略可能會被多次呼叫。最有效的是 ReleaseStrategy 的實現,因為聚合器可以直接呼叫它。其次最有效的是帶有 Collection<Message<?>> 引數型別的 POJO 方法。效率最低的是帶有 Collection<Something> 型別的 POJO 方法。每次呼叫釋放策略時,框架都必須將組中訊息的負載複製到新集合中(並可能嘗試將負載轉換為 Something)。使用 Collection<?> 可以避免轉換,但仍然需要建立新 Collection

由於這些原因,對於大型組,我們建議您實現 ReleaseStrategy

當組被釋放以進行聚合時,所有尚未釋放的訊息都會被處理並從組中移除。如果組也已完成(即,如果序列中的所有訊息都已到達或者沒有定義序列),則該組被標記為完成。此組的任何新訊息都會發送到丟棄通道(如果已定義)。將 expire-groups-upon-completion 設定為 true(預設為 false)會移除整個組,並且任何新訊息(與移除的組具有相同的關聯 ID)會形成一個新組。您可以使用 MessageGroupStoreReaper 和將 send-partial-result-on-expiry 設定為 true 來釋放部分序列。

從 6.5 版本開始,關聯處理程式還可以配置 discardIndividuallyOnExpiry 選項,以將整個組作為單個訊息丟棄。本質上,此訊息的負載是來自已過期組的訊息列表。僅在 sendPartialResultOnExpiry 設定為 false(預設)且提供了 dicardChannel 時有效。

為了方便丟棄遲到的訊息,聚合器必須在組釋放後保持其狀態。這最終可能導致記憶體不足的情況。為了避免這種情況,您應該考慮配置 MessageGroupStoreReaper 來移除組元資料。過期引數應設定為在達到某個點後使組過期,此後預期不會有遲到的訊息到達。有關配置收割器的資訊,請參閱 在聚合器中管理狀態:MessageGroupStore

Spring Integration 提供了 ReleaseStrategy 的實現:SimpleSequenceSizeReleaseStrategy。此實現會查詢每個到達訊息的 SEQUENCE_NUMBERSEQUENCE_SIZE 頭部,以決定訊息組何時完成並準備好進行聚合。如前所示,它也是預設策略。

在 5.0 版本之前,預設的釋放策略是 SequenceSizeReleaseStrategy,它在大組中表現不佳。使用該策略,會檢測並拒絕重複的序列號。此操作可能很耗時。

如果您正在聚合大型組,並且不需要釋放部分組,並且不需要檢測/拒絕重複序列,請考慮使用 SimpleSequenceSizeReleaseStrategy——對於這些用例,它效率更高,並且自 5.0 版本 以來,在未指定部分組釋放時,它就是預設值。

聚合大型組

4.3 版本將 SimpleMessageGroup 中訊息的預設 Collection 更改為 HashSet(之前是 BlockingQueue)。這在從大型組中移除單個訊息時成本很高(需要 O(n) 線性掃描)。雖然雜湊集通常移除速度快得多,但對於大型訊息來說可能成本很高,因為在插入和移除時都必須計算雜湊。如果您的訊息雜湊成本很高,請考慮使用其他集合型別。如 使用 MessageGroupFactory 中所述,提供了 SimpleMessageGroupFactory,以便您可以選擇最適合您需求的 Collection。您也可以提供自己的工廠實現來建立其他 Collection<Message<?>>

以下示例展示瞭如何使用先前實現和 SimpleSequenceSizeReleaseStrategy 配置聚合器

<int:aggregator input-channel="aggregate"
    output-channel="out" message-store="store" release-strategy="releaser" />

<bean id="store" class="org.springframework.integration.store.SimpleMessageStore">
    <property name="messageGroupFactory">
        <bean class="org.springframework.integration.store.SimpleMessageGroupFactory">
            <constructor-arg value="BLOCKING_QUEUE"/>
        </bean>
    </property>
</bean>

<bean id="releaser" class="SimpleSequenceSizeReleaseStrategy" />
如果過濾器端點涉及聚合器上游的流,序列大小釋放策略(固定或基於 sequenceSize 頭部)將無法實現其目的,因為序列中的某些訊息可能會被過濾器丟棄。在這種情況下,建議選擇另一個 ReleaseStrategy,或者使用從丟棄子流傳送的補償訊息,其內容中包含一些資訊,以便在自定義完整組函式中跳過。有關更多資訊,請參閱 過濾器

關聯策略

CorrelationStrategy 介面定義如下

public interface CorrelationStrategy {

  Object getCorrelationKey(Message<?> message);

}

該方法返回一個 Object,它表示用於將訊息與訊息組關聯的關聯鍵。該鍵必須滿足 Map 中鍵的實現標準,包括 equals()hashCode()

通常,任何 POJO 都可以實現關聯邏輯,並且將訊息對映到方法引數的規則與 ServiceActivator 相同(包括對 @Header 註解的支援)。方法必須返回一個值,並且該值不能為 null

Spring Integration 提供了 CorrelationStrategy 的一個實現:HeaderAttributeCorrelationStrategy。此實現將其中一個訊息頭(其名稱由建構函式引數指定)的值作為關聯鍵返回。預設情況下,關聯策略是一個 HeaderAttributeCorrelationStrategy,它返回 CORRELATION_ID 頭屬性的值。如果您想使用自定義頭名稱進行關聯,可以在 HeaderAttributeCorrelationStrategy 例項上配置它,並將其作為聚合器關聯策略的引用。

鎖登錄檔

對組的更改是執行緒安全的。因此,當您同時傳送具有相同關聯 ID 的訊息時,聚合器中只會處理其中一個,使其有效地成為每個訊息組的單執行緒LockRegistry 用於獲取已解析的關聯 ID 的鎖。預設情況下使用 DefaultLockRegistry(記憶體中)。為了跨伺服器同步更新(在使用共享 MessageGroupStore 時),您必須配置一個共享鎖登錄檔。

避免死鎖

如上所述,當訊息組發生變異(新增或釋放訊息)時,會持有一個鎖。

考慮以下流程

...->aggregator1-> ... ->aggregator2-> ...

如果存在多個執行緒,並且聚合器共享一個公共鎖登錄檔,則可能發生死鎖。這將導致執行緒掛起,jstack <pid> 可能會顯示如下結果

Found one Java-level deadlock:
=============================
"t2":
  waiting for ownable synchronizer 0x000000076c1cbfa0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
  which is held by "t1"
"t1":
  waiting for ownable synchronizer 0x000000076c1ccc00, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
  which is held by "t2"

有幾種方法可以避免這個問題

  • 確保每個聚合器都有自己的鎖登錄檔(這可以是跨應用程式例項的共享登錄檔,但流中的兩個或更多聚合器必須各自擁有一個獨特的登錄檔)

  • 使用 ExecutorChannelQueueChannel 作為聚合器的輸出通道,以便下游流在新執行緒上執行

  • 從 5.1.1 版本開始,將 releaseLockBeforeSend 聚合器屬性設定為 true

如果由於某種原因,單個聚合器的輸出最終被路由回同一個聚合器,也會導致此問題。當然,上述第一個解決方案在這種情況下不適用。

在 Java DSL 中配置聚合器

有關如何在 Java DSL 中配置聚合器,請參閱 聚合器和重排序器

使用 XML 配置聚合器

Spring Integration 支援透過 <aggregator/> 元素配置聚合器。以下示例展示了一個聚合器示例

<channel id="inputChannel"/>

<int:aggregator id="myAggregator"                          (1)
        auto-startup="true"                                (2)
        input-channel="inputChannel"                       (3)
        output-channel="outputChannel"                     (4)
        discard-channel="throwAwayChannel"                 (5)
        message-store="persistentMessageStore"             (6)
        order="1"                                          (7)
        send-partial-result-on-expiry="false"              (8)
        send-timeout="1000"                                (9)

        correlation-strategy="correlationStrategyBean"     (10)
        correlation-strategy-method="correlate"            (11)
        correlation-strategy-expression="headers['foo']"   (12)

        ref="aggregatorBean"                               (13)
        method="aggregate"                                 (14)

        release-strategy="releaseStrategyBean"             (15)
        release-strategy-method="release"                  (16)
        release-strategy-expression="size() == 5"          (17)

        expire-groups-upon-completion="false"              (18)
        empty-group-min-timeout="60000"                    (19)

        lock-registry="lockRegistry"                       (20)

        group-timeout="60000"                              (21)
        group-timeout-expression="size() ge 2 ? 100 : -1"  (22)
        expire-groups-upon-timeout="true"                  (23)

        scheduler="taskScheduler" >                        (24)
            <expire-transactional/>                        (25)
            <expire-advice-chain/>                         (26)
</aggregator>

<int:channel id="outputChannel"/>

<int:channel id="throwAwayChannel"/>

<bean id="persistentMessageStore" class="org.springframework.integration.jdbc.store.JdbcMessageStore">
    <constructor-arg ref="dataSource"/>
</bean>

<bean id="aggregatorBean" class="sample.PojoAggregator"/>

<bean id="releaseStrategyBean" class="sample.PojoReleaseStrategy"/>

<bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>
1 聚合器的 ID 是可選的。
2 生命週期屬性,表示聚合器是否應在應用程式上下文啟動期間啟動。可選(預設為 'true')。
3 聚合器接收訊息的通道。必需。
4 聚合器傳送聚合結果的通道。可選(因為傳入訊息本身可以在 'replyChannel' 訊息頭中指定回覆通道)。
5 聚合器傳送超時訊息的通道(如果 send-partial-result-on-expiryfalse)。可選。
6 MessageGroupStore 的引用,用於在訊息組完成之前,將其儲存在關聯鍵下。可選。預設情況下,它是易失的記憶體儲存。有關更多資訊,請參閱 訊息儲存
7 當多個控制代碼訂閱到同一個 DirectChannel 時,此聚合器的順序(用於負載均衡目的)。可選。
8 指示過期訊息應在它們包含的 MessageGroup 過期後(參見 MessageGroupStore.expireMessageGroups(long))聚合併發送到 'output-channel' 或 'replyChannel'。過期 MessageGroup 的一種方法是配置 MessageGroupStoreReaper。但是,您也可以透過呼叫 MessageGroupStore.expireMessageGroups(timeout) 來過期 MessageGroup。您可以透過控制匯流排操作或如果您有 MessageGroupStore 例項的引用,透過呼叫 expireMessageGroups(timeout) 來實現。否則,此屬性本身什麼也不做。它僅作為一個指示器,指示是否丟棄或傳送到輸出或回覆通道中那些仍處於即將過期的 MessageGroup 中的任何訊息。可選(預設為 false)。注意:此屬性可能更恰當地稱為 send-partial-result-on-timeout,因為如果 expire-groups-upon-timeout 設定為 false,該組可能實際上不會過期。
9 當向 output-channeldiscard-channel 傳送回覆 Message 時等待的超時間隔。預設為 30 秒。它僅在輸出通道具有某些“傳送”限制(例如具有固定“容量”的 QueueChannel)時應用。在這種情況下,會丟擲 MessageDeliveryException。對於 AbstractSubscribableChannel 實現,send-timeout 被忽略。對於 group-timeout(-expression),來自排程過期任務的 MessageDeliveryException 導致此任務被重新排程。可選。
10 對實現訊息關聯(分組)演算法的 bean 的引用。該 bean 可以是 CorrelationStrategy 介面的實現,也可以是 POJO。在後一種情況下,還必須定義 correlation-strategy-method 屬性。可選(預設情況下,聚合器使用 IntegrationMessageHeaderAccessor.CORRELATION_ID 頭部)。
11 correlation-strategy 引用的 bean 上定義的方法。它實現了關聯決策演算法。可選,有約束(correlation-strategy 必須存在)。
12 表示關聯策略的 SpEL 表示式。示例:"headers['something']"correlation-strategycorrelation-strategy-expression 只能有一個。
13 對應用程式上下文中定義的 bean 的引用。該 bean 必須實現聚合邏輯,如前所述。可選(預設情況下,聚合訊息列表成為輸出訊息的負載)。
14 ref 屬性引用的 bean 上定義的方法。它實現了訊息聚合演算法。可選(取決於是否定義了 ref 屬性)。
15 對實現釋放策略的 bean 的引用。該 bean 可以是 ReleaseStrategy 介面的實現,也可以是 POJO。在後一種情況下,還必須定義 release-strategy-method 屬性。可選(預設情況下,聚合器使用 IntegrationMessageHeaderAccessor.SEQUENCE_SIZE 頭屬性)。
16 release-strategy 屬性引用的 bean 上定義的方法。它實現了完成決策演算法。可選,有約束(release-strategy 必須存在)。
17 表示釋放策略的 SpEL 表示式。表示式的根物件是 MessageGroup。示例:"size() == 5"release-strategyrelease-strategy-expression 只能有一個。
18 當設定為 true 時(預設值為 false),完成的組將從訊息儲存中移除,從而允許具有相同關聯的後續訊息形成新組。預設行為是將與已完成組具有相同關聯的訊息傳送到 discard-channel
19 僅當為 <aggregator>MessageStore 配置了 MessageGroupStoreReaper 時才適用。預設情況下,當 MessageGroupStoreReaper 配置為過期部分組時,空組也會被移除。空組在組正常釋放後存在。空組可以檢測並丟棄遲到的訊息。如果您希望空組以比過期部分組更長的時間表過期,請設定此屬性。空組將在此毫秒數內未被修改後才從 MessageStore 中移除。請注意,過期空組的實際時間也受收割器的 timeout 屬性影響,可能高達此值加上超時時間。
20 org.springframework.integration.util.LockRegistry bean 的引用。它用於根據 groupId 獲取 Lock,以進行 MessageGroup 上的併發操作。預設情況下,使用內部 DefaultLockRegistry。使用分散式 LockRegistry(例如 ZookeeperLockRegistry)可確保只有一個聚合器例項可以併發操作一個組。有關更多資訊,請參閱 Redis 鎖登錄檔Zookeeper 鎖登錄檔
21 一個超時(以毫秒為單位),用於在當前訊息到達時,如果 ReleaseStrategy 未釋放組,則強制 MessageGroup 完成。此屬性為聚合器提供了一個內建的基於時間的釋放策略,當需要在超時時間內(從最後一條訊息到達時開始計時)沒有新訊息到達 MessageGroup 時,發出部分結果(或丟棄組)。要設定從 MessageGroup 建立時間開始計時的超時,請參閱 group-timeout-expression 資訊。當新訊息到達聚合器時,將取消其 MessageGroup 的任何現有 ScheduledFuture<?>。如果 ReleaseStrategy 返回 false(表示不釋放)並且 groupTimeout > 0,則會安排一個新任務來使組過期。我們不建議將此屬性設定為零(或負值)。這樣做會有效地停用聚合器,因為每個訊息組都會立即完成。但是,您可以使用表示式有條件地將其設定為零(或負值)。有關資訊,請參閱 group-timeout-expression。完成期間採取的操作取決於 ReleaseStrategysend-partial-group-on-expiry 屬性。有關更多資訊,請參閱 聚合器和組超時。它與 group-timeout-expression 屬性互斥。
22 SpEL 表示式,它計算為 groupTimeout,其中 MessageGroup 作為 #root 評估上下文物件。用於排程 MessageGroup 以強制完成。如果表示式計算為 null,則不排程完成。如果計算為零,則該組立即在當前執行緒上完成。實際上,這提供了一個動態的 group-timeout 屬性。例如,如果您希望在組建立時間起 10 秒後強制完成 MessageGroup,您可以考慮使用以下 SpEL 表示式:timestamp + 10000 - T(System).currentTimeMillis(),其中 timestampMessageGroup.getTimestamp() 提供,因為這裡的 MessageGroup#root 評估上下文物件。但是請記住,組建立時間可能與第一條到達訊息的時間不同,具體取決於其他組過期屬性的配置。有關更多資訊,請參閱 group-timeout。與 group-timeout 屬性互斥。
23 當一個組因超時(或由 MessageGroupStoreReaper)而完成時,該組預設會過期(完全移除)。遲到的訊息會啟動一個新組。您可以將其設定為 false 以完成該組但保留其元資料,以便丟棄遲到的訊息。空組可以在以後使用 MessageGroupStoreReaperempty-group-min-timeout 屬性一起過期。它預設為 'true'。
24 一個 TaskScheduler bean 引用,用於排程 MessageGroup,以便在 groupTimeout 內沒有新訊息到達 MessageGroup 時強制其完成。如果未提供,則使用 ApplicationContext 中註冊的預設排程程式(taskScheduler)(ThreadPoolTaskScheduler)。如果未指定 group-timeoutgroup-timeout-expression,則此屬性不適用。
25 自 4.1 版本起。它允許為 forceComplete 操作啟動事務。它由 group-timeout(-expression)MessageGroupStoreReaper 啟動,不適用於正常的 addreleasediscard 操作。只允許此子元素或 <expire-advice-chain/>
26 4.1 版本 起。它允許為 forceComplete 操作配置任何 Advice。它由 group-timeout(-expression)MessageGroupStoreReaper 啟動,不適用於正常的 addreleasediscard 操作。只允許此子元素或 <expire-transactional/>。也可以使用 Spring tx 名稱空間在此處配置事務 Advice
過期組

有兩個屬性與組的過期(完全刪除)有關。當一個組過期時,就沒有它的記錄了,如果新訊息以相同的關聯到達,就會啟動一個新組。當一個組完成(沒有過期)時,空組仍然存在,遲到的訊息會被丟棄。空組可以稍後使用 MessageGroupStoreReaper 結合 empty-group-min-timeout 屬性刪除。

expire-groups-upon-completionReleaseStrategy 釋放組時的“正常”完成有關。這預設為 false

如果一個組沒有正常完成,而是因為超時而被釋放或丟棄,則該組通常會過期。從 4.1 版本開始,您可以使用 expire-groups-upon-timeout 來控制此行為。為了向後相容,它預設為 true

當一個組超時時,ReleaseStrategy 會有另一次機會釋放該組。如果它這樣做並且 expire-groups-upon-timeout 為 false,則過期由 expire-groups-upon-completion 控制。如果該組未在超時期間由釋放策略釋放,則過期由 expire-groups-upon-timeout 控制。超時組要麼被丟棄,要麼發生部分釋放(基於 send-partial-result-on-expiry)。

從 5.0 版本開始,空組也會在 empty-group-min-timeout 後安排刪除。如果 expireGroupsUponCompletion == falseminimumTimeoutForEmptyGroups > 0,則在正常或部分序列釋放發生時排程刪除組的任務。

從 5.4 版本開始,聚合器(和重新排序器)可以配置為使孤立組過期(那些在持久訊息儲存中可能不會被釋放的組)。expireTimeout(如果大於 0)表示儲存中比此值更舊的組應該被清除。purgeOrphanedGroups() 方法在啟動時呼叫,並與提供的 expireDuration 一起,在計劃任務中定期呼叫。此方法也可以在任何時候從外部呼叫。過期邏輯完全委託給 forceComplete(MessageGroup) 功能,根據上面提到的過期選項。這種定期清除功能在需要清理訊息儲存中的舊組時非常有用,這些舊組將不再透過常規訊息到達邏輯釋放。在大多數情況下,這在使用持久訊息組儲存時,在應用程式重新啟動後發生。該功能類似於帶有計劃任務的 MessageGroupStoreReaper,但在使用組超時而不是收割器時,提供了一種方便的方法來處理特定元件中的舊組。MessageGroupStore 必須專用於當前關聯端點。否則,一個聚合器可能會清除另一個聚合器的組。對於聚合器,使用此技術過期的組將根據 expireGroupsUponCompletion 屬性被丟棄或作為部分組釋放。

如果自定義聚合器處理程式實現可能在其他 <aggregator> 定義中被引用,我們通常建議使用 ref 屬性。但是,如果自定義聚合器實現僅由 <aggregator> 的單個定義使用,則可以使用內部 bean 定義(從 1.0.3 版本開始)在 <aggregator> 元素中配置聚合 POJO,如下例所示

<aggregator input-channel="input" method="sum" output-channel="output">
    <beans:bean class="org.foo.PojoAggregator"/>
</aggregator>
在相同的 <aggregator> 配置中同時使用 ref 屬性和內部 bean 定義是不允許的,因為它會建立歧義條件。在這種情況下,會丟擲異常。

以下示例展示了聚合器 bean 的實現

public class PojoAggregator {

  public Long add(List<Long> results) {
    long total = 0l;
    for (long partialResult: results) {
      total += partialResult;
    }
    return total;
  }
}

前面示例的完成策略 bean 的實現可能如下所示

public class PojoReleaseStrategy {
...
  public boolean canRelease(List<Long> numbers) {
    int sum = 0;
    for (long number: numbers) {
      sum += number;
    }
    return sum >= maxValue;
  }
}
在適當的情況下,釋放策略方法和聚合器方法可以組合成一個 bean。

前面示例的關聯策略 bean 的實現可能如下所示

public class PojoCorrelationStrategy {
...
  public Long groupNumbersByLastDigit(Long number) {
    return number % 10;
  }
}

前面示例中的聚合器將根據某個標準(在本例中為除以十後的餘數)對數字進行分組,並保留該組,直到有效負載提供的數字之和超過某個值。

在適當的情況下,釋放策略方法、關聯策略方法和聚合器方法可以組合在一個 bean 中。(實際上,它們全部或任意兩個都可以組合。)

聚合器和 Spring 表示式語言 (SpEL)

自 Spring Integration 2.0 起,您可以使用 SpEL 處理各種策略(關聯、釋放和聚合),如果這些釋放策略背後的邏輯相對簡單,我們建議使用 SpEL。假設您有一個遺留元件,它被設計為接收物件陣列。我們知道預設的釋放策略會將所有聚合訊息組裝到 List 中。現在我們有兩個問題。首先,我們需要從列表中提取單個訊息。其次,我們需要提取每條訊息的負載並組裝物件陣列。以下示例解決了這兩個問題

public String[] processRelease(List<Message<String>> messages){
    List<String> stringList = new ArrayList<>();
    for (Message<String> message : messages) {
        stringList.add(message.getPayload());
    }
    return stringList.toArray(new String[]{});
}

然而,使用 SpEL,這樣的要求實際上可以透過一行表示式相對容易地處理,從而省去編寫自定義類並將其配置為 bean 的麻煩。以下示例展示瞭如何實現

<int:aggregator input-channel="aggChannel"
    output-channel="replyChannel"
    expression="#this.![payload].toArray()"/>

在前面的配置中,我們使用 集合投影 表示式從列表中所有訊息的負載中組裝一個新的集合,然後將其轉換為陣列,從而實現與早期 Java 程式碼相同的結果。

在處理自定義釋放和關聯策略時,您可以應用相同的基於表示式的方法。

除了在 correlation-strategy 屬性中定義自定義 CorrelationStrategy 的 bean 外,您還可以將簡單的關聯邏輯實現為 SpEL 表示式,並在 correlation-strategy-expression 屬性中進行配置,如下例所示

correlation-strategy-expression="payload.person.id"

在前面的示例中,我們假設負載具有一個帶有 idperson 屬性,該 id 將用於關聯訊息。

同樣,對於 ReleaseStrategy,您可以將釋放邏輯實現為 SpEL 表示式,並在 release-strategy-expression 屬性中進行配置。評估上下文的根物件是 MessageGroup 本身。訊息列表可以透過表示式中的組的 message 屬性進行引用。

在 5.0 版本之前的版本中,根物件是 Message<?> 的集合,如上一個示例所示
release-strategy-expression="!messages.?[payload==5].empty"

在前面的示例中,SpEL 評估上下文的根物件是 MessageGroup 本身,您宣告一旦此組中存在負載為 5 的訊息,該組就應該被釋放。

聚合器和組超時

從 4.0 版本開始,引入了兩個新的互斥屬性:group-timeoutgroup-timeout-expression。請參閱 使用 XML 配置聚合器。在某些情況下,如果 ReleaseStrategy 在當前訊息到達時未釋放,您可能需要在超時後發出聚合器結果(或丟棄組)。為此,groupTimeout 選項允許排程 MessageGroup 強制完成,如下例所示

<aggregator input-channel="input" output-channel="output"
        send-partial-result-on-expiry="true"
        group-timeout-expression="size() ge 2 ? 10000 : -1"
        release-strategy-expression="messages[0].headers.sequenceNumber == messages[0].headers.sequenceSize"/>

在此示例中,如果聚合器收到序列中的最後一條訊息(由 release-strategy-expression 定義),則可以進行正常釋放。如果該特定訊息未到達,groupTimeout 將在十秒後強制組完成,只要該組至少包含兩條訊息。

強制組完成的結果取決於 ReleaseStrategysend-partial-result-on-expiry。首先,再次諮詢釋放策略,以檢視是否進行正常釋放。儘管組沒有改變,但 ReleaseStrategy 此時可以決定釋放組。如果釋放策略仍然不釋放組,則它會過期。如果 send-partial-result-on-expirytrue,則(部分)MessageGroup 中的現有訊息將作為正常的聚合器回覆訊息傳送到 output-channel。否則,它將被丟棄。

groupTimeout 行為與 MessageGroupStoreReaper 之間存在差異(參見 使用 XML 配置聚合器)。收割器定期為 MessageGroupStore 中的所有 MessageGroup 啟動強制完成。groupTimeout 在新訊息未在 groupTimeout 期間到達時,單獨對每個 MessageGroup 執行此操作。此外,收割器可用於刪除空組(那些為丟棄遲到的訊息而保留的組,如果 expire-groups-upon-completion 為 false)。

從 5.5 版本開始,groupTimeoutExpression 可以評估為 java.util.Date 例項。這在確定基於組建立時間(MessageGroup.getTimestamp())而不是當前訊息到達時間的計劃任務時刻時非常有用,因為當 groupTimeoutExpression 評估為 long 時會計算當前訊息到達時間

group-timeout-expression="size() ge 2 ? new java.util.Date(timestamp + 200) : null"

使用註解配置聚合器

以下示例展示了使用註解配置的聚合器

public class Waiter {
  ...

  @Aggregator  (1)
  public Delivery aggregatingMethod(List<OrderItem> items) {
    ...
  }

  @ReleaseStrategy  (2)
  public boolean releaseChecker(List<Message<?>> messages) {
    ...
  }

  @CorrelationStrategy  (3)
  public String correlateBy(OrderItem item) {
    ...
  }
}
1 一個註解,指示此方法應作為聚合器使用。如果此類別用作聚合器,則必須指定此註解。
2 一個註解,指示此方法用作聚合器的釋放策略。如果任何方法上不存在,聚合器將使用 SimpleSequenceSizeReleaseStrategy
3 一個註解,指示此方法應作為聚合器的關聯策略使用。如果沒有指示關聯策略,聚合器將使用基於 CORRELATION_IDHeaderAttributeCorrelationStrategy

XML 元素提供的所有配置選項也適用於 @Aggregator 註解。

聚合器可以從 XML 中顯式引用,或者,如果 @MessageEndpoint 定義在類上,則可以透過類路徑掃描自動檢測。

聚合器元件的註解配置(@Aggregator 等)僅涵蓋簡單的用例,其中大多數預設選項就足夠了。如果您在使用註解配置時需要對這些選項進行更多控制,請考慮為 AggregatingMessageHandler 使用 @Bean 定義,並使用 @ServiceActivator 標記其 @Bean 方法,如下例所示

@ServiceActivator(inputChannel = "aggregatorChannel")
@Bean
public MessageHandler aggregator(MessageGroupStore jdbcMessageGroupStore) {
     AggregatingMessageHandler aggregator =
                       new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
                                                 jdbcMessageGroupStore);
     aggregator.setOutputChannel(resultsChannel());
     aggregator.setGroupTimeoutExpression(new ValueExpression<>(500L));
     aggregator.setTaskScheduler(this.taskScheduler);
     return aggregator;
}

有關更多資訊,請參閱 程式設計模型@Bean 方法上的註解

從 4.2 版本開始,提供了 AggregatorFactoryBean 來簡化 AggregatingMessageHandler 的 Java 配置。

在聚合器中管理狀態:MessageGroupStore

聚合器(以及 Spring Integration 中的其他一些模式)是一種有狀態模式,需要根據一段時間內到達的具有相同關聯鍵的訊息組做出決策。有狀態模式(如 ReleaseStrategy)中介面的設計遵循的原則是,元件(無論是框架定義的還是使用者定義的)應該能夠保持無狀態。所有狀態都由 MessageGroup 攜帶,其管理委託給 MessageGroupStoreMessageGroupStore 介面定義如下

public interface MessageGroupStore {

    int getMessageCountForAllMessageGroups();

    int getMarkedMessageCountForAllMessageGroups();

    int getMessageGroupCount();

    MessageGroup getMessageGroup(Object groupId);

    MessageGroup addMessageToGroup(Object groupId, Message<?> message);

    MessageGroup markMessageGroup(MessageGroup group);

    MessageGroup removeMessageFromGroup(Object key, Message<?> messageToRemove);

    MessageGroup markMessageFromGroup(Object key, Message<?> messageToMark);

    void removeMessageGroup(Object groupId);

    void registerMessageGroupExpiryCallback(MessageGroupCallback callback);

    int expireMessageGroups(long timeout);
}

有關更多資訊,請參閱 Javadoc

MessageGroupStore 在等待釋放策略觸發時累積 MessageGroups 中的狀態資訊,並且該事件可能永遠不會發生。因此,為了防止陳舊訊息滯留,併為易失性儲存提供在應用程式關閉時進行清理的鉤子,MessageGroupStore 允許您註冊回撥,以便在其 MessageGroups 過期時應用這些回撥。該介面非常簡單明瞭,如下清單所示

public interface MessageGroupCallback {

    void execute(MessageGroupStore messageGroupStore, MessageGroup group);

}

回撥可以直接訪問儲存和訊息組,以便它可以管理持久狀態(例如,透過完全從儲存中移除組)。

MessageGroupStore 維護這些回撥的列表,當所有時間戳早於作為引數提供的時間(請參閱前面描述的 registerMessageGroupExpiryCallback(..)expireMessageGroups(..) 方法)的訊息時,它會按需應用這些回撥。

當您打算依賴 expireMessageGroups 功能時,在不同的聚合器元件中不使用相同的 MessageGroupStore 例項非常重要。每個 AbstractCorrelatingMessageHandler 都會根據 forceComplete() 回撥註冊自己的 MessageGroupCallback。這樣,每個要過期的組都可能被錯誤的聚合器完成或丟棄。從 5.0.10 版本開始,AbstractCorrelatingMessageHandler 使用 UniqueExpiryCallbackMessageGroupStore 中註冊回撥。MessageGroupStore 反過來會檢查是否存在此類的例項,如果回撥集中已存在,則會記錄一個包含適當訊息的錯誤。這樣,框架就不允許在不同的聚合器/重新排序器中使用 MessageGroupStore 例項,以避免上述過期不由特定關聯處理程式建立的組的副作用。

您可以呼叫 expireMessageGroups 方法並指定超時值。任何早於當前時間減去此值的訊息都將過期並應用回撥。因此,是儲存的使用者定義了訊息組“過期”的含義。

為了方便使用者,Spring Integration 以 MessageGroupStoreReaper 的形式提供了訊息過期包裝器,如下例所示

<bean id="reaper" class="org...MessageGroupStoreReaper">
    <property name="messageGroupStore" ref="messageStore"/>
    <property name="timeout" value="30000"/>
</bean>

<task:scheduled-tasks scheduler="scheduler">
    <task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
</task:scheduled-tasks>

收割器是一個 Runnable。在前面的示例中,訊息組儲存的過期方法每十秒呼叫一次。超時本身是 30 秒。

重要的是要理解 MessageGroupStoreReaper 的“timeout”屬性是一個近似值,並且受任務排程器速率的影響,因為此屬性僅在 MessageGroupStoreReaper 任務的下一次排程執行時檢查。例如,如果超時設定為十分鐘,但 MessageGroupStoreReaper 任務安排每小時執行一次,並且 MessageGroupStoreReaper 任務的上次執行發生在超時前一分鐘,則 MessageGroup 在接下來的 59 分鐘內不會過期。因此,我們建議將速率設定為至少等於或短於超時值。

除了收割器之外,當應用程式透過 AbstractCorrelatingMessageHandler 中的生命週期回撥關閉時,還會呼叫過期回撥。

AbstractCorrelatingMessageHandler 註冊其自己的過期回撥,這與聚合器 XML 配置中的布林標誌 send-partial-result-on-expiry 相關聯。如果該標誌設定為 true,那麼當呼叫過期回撥時,組中尚未釋放的任何未標記訊息都可以傳送到輸出通道。

由於 MessageGroupStoreReaper 是從計劃任務中呼叫的,並且可能導致生成訊息(取決於 sendPartialResultOnExpiry 選項)到下游整合流,因此建議提供一個帶有 MessagePublishingErrorHandler 的自定義 TaskScheduler,以透過 errorChannel 處理異常,正如常規聚合器釋放功能所期望的那樣。相同的邏輯適用於組超時功能,該功能也依賴於 TaskScheduler。有關更多資訊,請參閱 錯誤處理

當不同關聯端點使用共享 MessageStore 時,您必須配置適當的 CorrelationStrategy 以確保組 ID 的唯一性。否則,當一個關聯端點釋放或過期來自其他端點的訊息時,可能會發生意外行為。具有相同關聯鍵的訊息儲存在同一訊息組中。

一些 MessageStore 實現允許透過分割槽資料來使用相同的物理資源。例如,JdbcMessageStore 有一個 region 屬性,而 MongoDbMessageStore 有一個 collectionName 屬性。

有關 MessageStore 介面及其實現的更多資訊,請參閱 訊息儲存

Flux 聚合器

在 5.2 版本中,引入了 FluxAggregatorMessageHandler 元件。它基於 Project Reactor 的 Flux.groupBy()Flux.window() 運算子。傳入的訊息被髮送到由該元件建構函式中的 Flux.create() 初始化的 FluxSink。如果未提供 outputChannel 或它不是 ReactiveStreamsSubscribableChannel 的例項,則主 Flux 的訂閱是在 Lifecycle.start() 實現中完成的。否則,它將推遲到由 ReactiveStreamsSubscribableChannel 實現完成的訂閱。訊息使用 CorrelationStrategy 作為組鍵,透過 Flux.groupBy() 進行分組。預設情況下,會查詢訊息的 IntegrationMessageHeaderAccessor.CORRELATION_ID 頭部。

預設情況下,每個關閉的視窗都作為一個 Flux 釋放到要生成的訊息的負載中。此訊息包含視窗中第一條訊息的所有頭部。輸出訊息負載中的此 Flux 必須在下游訂閱和處理。這種邏輯可以透過 FluxAggregatorMessageHandlersetCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>>) 配置選項進行自定義(或取代)。例如,如果希望最終訊息中包含 List 的負載,可以像這樣配置 Flux.collectList()

fluxAggregatorMessageHandler.setCombineFunction(
                (messageFlux) ->
                        messageFlux
                                .map(Message::getPayload)
                                .collectList()
                                .map(GenericMessage::new));

FluxAggregatorMessageHandler 中有幾個選項可以選擇合適的視窗策略

  • setBoundaryTrigger(Predicate<Message<?>>) - 傳播到 Flux.windowUntil() 運算子。有關更多資訊,請參閱其 Javadoc。優先於所有其他視窗選項。

  • setWindowSize(int)setWindowSizeFunction(Function<Message<?>, Integer>) - 傳播到 Flux.window(int)windowTimeout(int, Duration)。預設情況下,視窗大小根據組中的第一條訊息及其 IntegrationMessageHeaderAccessor.SEQUENCE_SIZE 頭部計算。

  • setWindowTimespan(Duration) - 根據視窗大小配置傳播到 Flux.window(Duration)windowTimeout(int, Duration)

  • setWindowConfigurer(Function<Flux<Message<?>>, Flux<Flux<Message<?>>>>) - 一個函式,用於將轉換應用於分組的 Flux,以進行未被公開選項覆蓋的任何自定義視窗操作。

由於此元件是一個 MessageHandler 實現,因此可以簡單地將其用作 @Bean 定義以及 @ServiceActivator 訊息傳遞註解。透過 Java DSL,可以從 .handle() EIP 方法中使用它。下面的示例演示了我們如何在執行時註冊 IntegrationFlow 以及 FluxAggregatorMessageHandler 如何與上游拆分器相關聯

IntegrationFlow fluxFlow =
        (flow) -> flow
                .split()
                .channel(MessageChannels.flux())
                .handle(new FluxAggregatorMessageHandler());

IntegrationFlowContext.IntegrationFlowRegistration registration =
        this.integrationFlowContext.registration(fluxFlow)
                .register();

Flux<Message<?>> window =
        registration.getMessagingTemplate()
                .convertSendAndReceive(new Integer[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }, Flux.class);

訊息組上的條件

從 5.5 版本開始,AbstractCorrelatingMessageHandler(包括其 Java 和 XML DSL)公開了一個 BiFunction<Message<?>, String, String> 實現的 groupConditionSupplier 選項。此函式用於新增到組中的每條訊息,結果條件語句儲存在組中以供將來考慮。ReleaseStrategy 可以諮詢此條件,而不是迭代組中的所有訊息。有關更多資訊,請參閱 GroupConditionProvider JavaDocs 和 訊息組條件

另請參閱 檔案聚合器

© . This site is unofficial and not affiliated with VMware.