聚合器
聚合器基本上是分割器的映象,它是一種訊息處理器,接收多條訊息並將它們合併成一條訊息。事實上,聚合器通常是包含分割器的管道中的下游消費者。
從技術上講,聚合器比分割器更復雜,因為它是有狀態的。它必須持有待聚合的訊息,並確定何時完整的訊息組已準備好進行聚合。為此,它需要一個 MessageStore
。
功能
聚合器透過關聯和儲存一組相關訊息來將它們組合起來,直到該組被認為是完整的。此時,聚合器透過處理整個組來建立一條訊息,並將聚合後的訊息作為輸出傳送。
實現聚合器需要提供執行聚合的邏輯(即從多條訊息建立一條訊息)。兩個相關的概念是關聯(correlation)和釋放(release)。
關聯決定了訊息如何分組進行聚合。在 Spring Integration 中,關聯預設是基於 IntegrationMessageHeaderAccessor.CORRELATION_ID
訊息頭進行的。具有相同 IntegrationMessageHeaderAccessor.CORRELATION_ID
的訊息會被分組在一起。但是,你可以自定義關聯策略,以允許使用其他方式指定訊息應如何分組。為此,你可以實現一個 CorrelationStrategy
(本章後面會介紹)。
為了確定訊息組何時準備好進行處理,會諮詢一個 ReleaseStrategy
。聚合器的預設釋放策略是當序列中包含的所有訊息都存在時釋放該組,這是基於 IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
頭進行的。你可以透過提供自定義 ReleaseStrategy
實現的引用來覆蓋此預設策略。
程式設計模型
聚合 API 由以下類組成
-
MessageGroupProcessor
介面及其子類:MethodInvokingAggregatingMessageGroupProcessor
和ExpressionEvaluatingMessageGroupProcessor
-
ReleaseStrategy
介面及其預設實現:SimpleSequenceSizeReleaseStrategy
-
CorrelationStrategy
介面及其預設實現:HeaderAttributeCorrelationStrategy
AggregatingMessageHandler
AggregatingMessageHandler
(AbstractCorrelatingMessageHandler
的子類)是一個 MessageHandler
實現,封裝了聚合器(以及其他關聯用例)的通用功能,如下所示
-
將訊息關聯到一個組以進行聚合
-
將這些訊息儲存在
MessageStore
中,直到該組可以被釋放 -
決定何時可以釋放該組
-
將已釋放的組聚合為一條訊息
-
識別並響應已過期的組
決定訊息應如何分組的責任委託給 CorrelationStrategy
例項。決定訊息組是否可以被釋放的責任委託給 ReleaseStrategy
例項。
以下列表簡要展示了基礎 AbstractAggregatingMessageGroupProcessor
(實現 aggregatePayloads
方法的責任留給開發者)
public abstract class AbstractAggregatingMessageGroupProcessor
implements MessageGroupProcessor {
protected Map<String, Object> aggregateHeaders(MessageGroup group) {
// default implementation exists
}
protected abstract Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders);
}
將 DefaultAggregatingMessageGroupProcessor
、ExpressionEvaluatingMessageGroupProcessor
和 MethodInvokingMessageGroupProcessor
視為 AbstractAggregatingMessageGroupProcessor
的開箱即用實現。
從 5.2 版本開始,AbstractAggregatingMessageGroupProcessor
可使用 Function<MessageGroup, Map<String, Object>>
策略來合併和計算(聚合)輸出訊息的頭。DefaultAggregateHeadersFunction
實現提供了一種邏輯,該邏輯返回組中所有沒有衝突的頭;組內一個或多個訊息中缺少的頭不被視為衝突。衝突的頭會被忽略。連同新引入的 DelegatingMessageGroupProcessor
,此函式用於任何任意(非 AbstractAggregatingMessageGroupProcessor
)的 MessageGroupProcessor
實現。本質上,框架會將提供的函式注入到 AbstractAggregatingMessageGroupProcessor
例項中,並將所有其他實現包裝到 DelegatingMessageGroupProcessor
中。AbstractAggregatingMessageGroupProcessor
和 DelegatingMessageGroupProcessor
之間的邏輯區別在於,後者不會在呼叫委託策略之前提前計算頭,並且如果委託返回 Message
或 AbstractIntegrationMessageBuilder
,則不會呼叫該函式。在這種情況下,框架假定目標實現已經負責生成填充到返回結果中的適當頭集。Function<MessageGroup, Map<String, Object>>
策略在 XML 配置中可用作 headers-function
引用屬性,在 Java DSL 中可用作 AggregatorSpec.headersFunction()
選項,在普通 Java 配置中可用作 AggregatorFactoryBean.setHeadersFunction()
。
CorrelationStrategy
由 AbstractCorrelatingMessageHandler
持有,並具有基於 IntegrationMessageHeaderAccessor.CORRELATION_ID
訊息頭的預設值,如下例所示
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
...
this.correlationStrategy = correlationStrategy == null ?
new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) : correlationStrategy;
this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy;
...
}
關於訊息組的實際處理,預設實現是 DefaultAggregatingMessageGroupProcessor
。它建立一個單一的 Message
,其負載是給定組收到的負載的 List
。這對於帶有分割器、釋出/訂閱通道或接收者列表路由器的簡單分發-聚合實現非常有效。
在此類場景中使用釋出/訂閱通道或接收者列表路由器時,請務必啟用 apply-sequence 標誌。這樣做會新增必要的頭:CORRELATION_ID 、SEQUENCE_NUMBER 和 SEQUENCE_SIZE 。此行為在 Spring Integration 中預設對分割器啟用,但不對釋出/訂閱通道或接收者列表路由器啟用,因為這些元件可以在各種不需要這些頭的上下文中使用。 |
在為應用程式實現特定的聚合器策略時,你可以擴充套件 AbstractAggregatingMessageGroupProcessor
並實現 aggregatePayloads
方法。然而,還有更好的解決方案,與 API 耦合度較低,用於實現聚合邏輯,這些邏輯可以透過 XML 或註解進行配置。
通常,任何 POJO 都可以實現聚合演算法,只要它提供一個接受單個 java.util.List
作為引數的方法(也支援引數化列表)。此方法在聚合訊息時按如下方式呼叫
-
如果引數是
java.util.Collection<T>
並且引數型別 T 可賦值給Message
,則累積用於聚合的整個訊息列表會發送到聚合器。 -
如果引數是非引數化的
java.util.Collection
或引數型別不可賦值給Message
,則方法會接收累積訊息的負載。 -
如果返回型別不可賦值給
Message
,則將其視為框架自動建立的Message
的負載。
為了程式碼的簡潔性並推廣低耦合、可測試性等最佳實踐,實現聚合邏輯的首選方式是透過 POJO,並使用 XML 或註解支援在應用程式中進行配置。 |
從 5.3 版本開始,在處理訊息組後,AbstractCorrelatingMessageHandler
會對具有多個巢狀級別的正確分割器-聚合器場景執行 MessageBuilder.popSequenceDetails()
訊息頭修改。僅當訊息組釋放結果不是訊息集合時才會執行此操作。在這種情況下,目標 MessageGroupProcessor
負責在構建這些訊息時呼叫 MessageBuilder.popSequenceDetails()
。
如果 MessageGroupProcessor
返回一個 Message
,則僅當 sequenceDetails
與組中的第一條訊息匹配時,才會對輸出訊息執行 MessageBuilder.popSequenceDetails()
。(之前僅當從 MessageGroupProcessor
返回純負載或 AbstractIntegrationMessageBuilder
時才會執行此操作。)
此功能可以透過新的 popSequence
boolean
屬性控制,因此在某些關聯詳細資訊未由標準分割器填充的場景中,可以停用 MessageBuilder.popSequenceDetails()
。此屬性本質上撤消了最近的上游 AbstractMessageSplitter
中 applySequence = true
所做的工作。更多資訊請參見分割器。
SimpleMessageGroup.getMessages() 方法返回一個 unmodifiableCollection 。因此,如果聚合 POJO 方法有一個 Collection<Message> 引數,傳入的引數就是那個 Collection 例項;並且當你為聚合器使用 SimpleMessageStore 時,該原始 Collection<Message> 在釋放組後會被清除。因此,如果將 POJO 中的 Collection<Message> 變數傳出聚合器之外,它也會被清除。如果你希望直接釋放該集合以進行進一步處理,你必須構建一個新的 Collection (例如,new ArrayList<Message>(messages) )。從 4.3 版本開始,框架不再將訊息複製到新的集合中,以避免不必要的額外物件建立。 |
在 4.2 版本之前,無法透過 XML 配置提供 MessageGroupProcessor
。聚合只能使用 POJO 方法。現在,如果框架檢測到引用的(或內部的)bean 實現了 MessageProcessor
,則它被用作聚合器的輸出處理器。
如果你希望將來自自定義 MessageGroupProcessor
的物件集合作為訊息的負載釋放,你的類應擴充套件 AbstractAggregatingMessageGroupProcessor
並實現 aggregatePayloads()
。
此外,自 4.2 版本以來,提供了 SimpleMessageGroupProcessor
。它返回組中的訊息集合,如前所述,這使得釋放的訊息會被單獨傳送。
這使得聚合器可以充當訊息屏障,到達的訊息會被 удержи,直到釋放策略觸發,然後該組作為一系列獨立訊息被釋放。
從 6.0 版本開始,上述的分割行為僅在組處理器是 SimpleMessageGroupProcessor
時才有效。否則,對於任何其他返回 Collection<Message>
的 MessageGroupProcessor
實現,只會發出一則回覆訊息,其負載是整個訊息集合。這種邏輯是由聚合器的規範目的決定的——按某個鍵收集請求訊息並生成一個單一的組訊息。
ReleaseStrategy
ReleaseStrategy
介面定義如下
public interface ReleaseStrategy {
boolean canRelease(MessageGroup group);
}
通常,任何 POJO 都可以實現完成決策邏輯,只要它提供一個接受單個 java.util.List
作為引數的方法(也支援引數化列表)並返回一個布林值。此方法在每條新訊息到達後被呼叫,以決定該組是否完成,具體如下
-
如果引數是
java.util.List<T>
並且引數型別T
可賦值給Message
,則累積在組中的整個訊息列表會發送到該方法。 -
如果引數是非引數化的
java.util.List
或引數型別不可賦值給Message
,則方法會接收累積訊息的負載。 -
如果訊息組已準備好進行聚合,方法必須返回
true
,否則返回false
。
以下示例展示瞭如何將 @ReleaseStrategy
註解用於型別為 Message
的 List
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<Message<?>>) {...}
}
以下示例展示瞭如何將 @ReleaseStrategy
註解用於型別為 String
的 List
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<String>) {...}
}
基於前兩個示例中的簽名,基於 POJO 的釋放策略會接收一個未釋放訊息的 Collection
(如果你需要訪問整個 Message
),或者一個負載物件的 Collection
(如果型別引數不是 Message
)。這滿足了大多數用例。但是,如果出於某種原因你需要訪問完整的 MessageGroup
,你應該提供一個 ReleaseStrategy
介面的實現。
處理潛在的大組時,你應該理解這些方法是如何呼叫的,因為在組被釋放之前,釋放策略可能會被多次呼叫。最高效的是 由於這些原因,對於大組,我們建議你實現 |
當組被釋放進行聚合時,其所有未釋放的訊息都會被處理並從組中移除。如果組也已完成(即序列中的所有訊息已到達,或者沒有定義序列),則該組被標記為完成。此組的任何新訊息都會發送到丟棄通道(如果已定義)。將 expire-groups-upon-completion
設定為 true
(預設為 false
)會移除整個組,並且任何新訊息(與已移除組具有相同關聯 ID 的)會形成一個新組。你可以透過使用 MessageGroupStoreReaper
並將 send-partial-result-on-expiry
設定為 true
來釋放部分序列。
為了方便丟棄延遲到達的訊息,聚合器在組被釋放後必須維護關於該組的狀態。這最終可能導致記憶體溢位。為避免此類情況,你應該考慮配置一個 MessageGroupStoreReaper 來移除組的元資料。過期引數應設定為在某個點之後(此後不期望有延遲訊息到達)使組過期。有關配置 reaper 的資訊,請參見在聚合器中管理狀態:MessageGroupStore 。 |
Spring Integration 為 ReleaseStrategy
提供了一個實現:SimpleSequenceSizeReleaseStrategy
。此實現會查閱每條到達訊息的 SEQUENCE_NUMBER
和 SEQUENCE_SIZE
頭,以決定訊息組何時完成並準備好進行聚合。如前所示,它也是預設策略。
在 5.0 版本之前,預設的釋放策略是 SequenceSizeReleaseStrategy ,它在大組處理方面表現不佳。使用該策略時,會檢測並拒絕重複的序列號。此操作可能開銷很大。 |
如果你正在聚合大組,不需要釋放部分組,並且不需要檢測/拒絕重複序列,可以考慮改用 SimpleSequenceSizeReleaseStrategy
——它對於這些用例效率更高,並且自 5.0 版本以來在未指定部分組釋放時是預設策略。
聚合大組
4.3 版本將 SimpleMessageGroup
中訊息的預設 Collection
更改為 HashSet
(之前是 BlockingQueue
)。從大組中移除單條訊息時,這曾開銷很大(需要 O(n) 線性掃描)。雖然雜湊集合通常移除速度更快,但對於大訊息來說可能開銷很大,因為在插入和移除時都需要計算雜湊。如果你的訊息雜湊開銷很大,考慮使用其他集合型別。如使用 MessageGroupFactory
中所討論的,提供了 SimpleMessageGroupFactory
,以便你可以選擇最適合你需求的 Collection
。你也可以提供自己的工廠實現來建立其他 Collection<Message<?>>
。
以下示例展示瞭如何使用先前的實現和 SimpleSequenceSizeReleaseStrategy
配置聚合器
<int:aggregator input-channel="aggregate"
output-channel="out" message-store="store" release-strategy="releaser" />
<bean id="store" class="org.springframework.integration.store.SimpleMessageStore">
<property name="messageGroupFactory">
<bean class="org.springframework.integration.store.SimpleMessageGroupFactory">
<constructor-arg value="BLOCKING_QUEUE"/>
</bean>
</property>
</bean>
<bean id="releaser" class="SimpleSequenceSizeReleaseStrategy" />
如果過濾器端點參與了聚合器上游的流,序列大小釋放策略(固定的或基於 sequenceSize 頭)將無法實現其目的,因為序列中的某些訊息可能被過濾器丟棄。在這種情況下,建議選擇另一個 ReleaseStrategy ,或者使用從丟棄子流傳送的補償訊息,這些訊息的內容中包含一些資訊,以便在自定義的完整組函式中跳過。更多資訊請參見過濾器。 |
關聯策略
CorrelationStrategy
介面定義如下
public interface CorrelationStrategy {
Object getCorrelationKey(Message<?> message);
}
該方法返回一個 Object
,它表示用於將訊息與訊息組關聯的關聯鍵。該鍵必須滿足在 Map
中用作鍵的標準,即關於 equals()
和 hashCode()
的實現。
通常,任何 POJO 都可以實現關聯邏輯,並且將訊息對映到方法引數(或多個引數)的規則與 ServiceActivator
的規則相同(包括支援 @Header
註解)。該方法必須返回一個值,並且該值不能為空(null
)。
Spring Integration 提供了 CorrelationStrategy
的一個實現:HeaderAttributeCorrelationStrategy
。此實現返回訊息頭中的一個值(該值的名稱由建構函式引數指定)作為關聯鍵(correlation key)。預設情況下,關聯策略是 HeaderAttributeCorrelationStrategy
,它返回 CORRELATION_ID
訊息頭屬性的值。如果您想使用自定義的訊息頭名稱進行關聯,可以在 HeaderAttributeCorrelationStrategy
的例項上進行配置,並將其作為聚合器關聯策略的引用提供。
鎖註冊中心(Lock Registry)
組的更改是執行緒安全的。因此,當您併發傳送具有相同關聯 ID 的訊息時,聚合器只會處理其中一條,使其有效地成為每個訊息組單執行緒。聚合器使用 LockRegistry
來獲取解析後的關聯 ID 的鎖。預設使用 DefaultLockRegistry
(記憶體中)。為了在使用共享 MessageGroupStore
的伺服器之間同步更新,您必須配置一個共享的鎖註冊中心。
避免死鎖
如上所述,當訊息組被修改(新增或釋放訊息)時,會持有鎖。
考慮以下流程
...->aggregator1-> ... ->aggregator2-> ...
如果存在多個執行緒,並且這些聚合器共享一個共同的鎖註冊中心,則可能發生死鎖。這將導致執行緒掛起,並且 jstack <pid>
可能會顯示類似如下的結果
Found one Java-level deadlock:
=============================
"t2":
waiting for ownable synchronizer 0x000000076c1cbfa0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t1"
"t1":
waiting for ownable synchronizer 0x000000076c1ccc00, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t2"
有幾種方法可以避免此問題
-
確保每個聚合器都有自己的鎖註冊中心(這可以在應用程式例項之間共享,但流程中的兩個或多個聚合器必須各自擁有一個不同的註冊中心)
-
使用
ExecutorChannel
或QueueChannel
作為聚合器的輸出通道,以便下游流程在新執行緒上執行 -
從 5.1.1 版本開始,將
releaseLockBeforeSend
聚合器屬性設定為true
如果由於某種原因,單個聚合器的輸出最終又路由回同一聚合器,也可能導致此問題。當然,上述第一種解決方案在這種情況下不適用。 |
在 Java DSL 中配置聚合器
有關如何在 Java DSL 中配置聚合器,請參閱 聚合器和重排序器(Aggregators and Resequencers)。
使用 XML 配置聚合器
Spring Integration 透過 <aggregator/>
元素支援使用 XML 配置聚合器。以下示例展示了一個聚合器示例
<channel id="inputChannel"/>
<int:aggregator id="myAggregator" (1)
auto-startup="true" (2)
input-channel="inputChannel" (3)
output-channel="outputChannel" (4)
discard-channel="throwAwayChannel" (5)
message-store="persistentMessageStore" (6)
order="1" (7)
send-partial-result-on-expiry="false" (8)
send-timeout="1000" (9)
correlation-strategy="correlationStrategyBean" (10)
correlation-strategy-method="correlate" (11)
correlation-strategy-expression="headers['foo']" (12)
ref="aggregatorBean" (13)
method="aggregate" (14)
release-strategy="releaseStrategyBean" (15)
release-strategy-method="release" (16)
release-strategy-expression="size() == 5" (17)
expire-groups-upon-completion="false" (18)
empty-group-min-timeout="60000" (19)
lock-registry="lockRegistry" (20)
group-timeout="60000" (21)
group-timeout-expression="size() ge 2 ? 100 : -1" (22)
expire-groups-upon-timeout="true" (23)
scheduler="taskScheduler" > (24)
<expire-transactional/> (25)
<expire-advice-chain/> (26)
</aggregator>
<int:channel id="outputChannel"/>
<int:channel id="throwAwayChannel"/>
<bean id="persistentMessageStore" class="org.springframework.integration.jdbc.store.JdbcMessageStore">
<constructor-arg ref="dataSource"/>
</bean>
<bean id="aggregatorBean" class="sample.PojoAggregator"/>
<bean id="releaseStrategyBean" class="sample.PojoReleaseStrategy"/>
<bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>
1 | 聚合器的 ID 是可選的。 |
2 | Lifecycle 屬性,指示聚合器是否應在應用程式上下文啟動期間啟動。可選(預設值為 'true')。 |
3 | 聚合器接收訊息的通道。必需。 |
4 | 聚合器傳送聚合結果的通道。可選(因為接收到的訊息本身可以在 'replyChannel' 訊息頭中指定回覆通道)。 |
5 | 聚合器傳送超時訊息的通道(如果 send-partial-result-on-expiry 為 false )。可選。 |
6 | 引用一個 MessageGroupStore ,用於在其關聯鍵下儲存訊息組,直到它們完成為止。可選。預設情況下,它是一個易失性記憶體儲存。有關更多資訊,請參閱 訊息儲存(Message Store)。 |
7 | 當多個處理程式訂閱到同一個 DirectChannel 時,此聚合器的順序(用於負載均衡目的)。可選。 |
8 | 表示過期訊息應在包含它們的 MessageGroup 過期後進行聚合併發送到 'output-channel' 或 'replyChannel'(參見 MessageGroupStore.expireMessageGroups(long) )。一種使 MessageGroup 過期的方法是配置 MessageGroupStoreReaper 。但是,您也可以透過呼叫 MessageGroupStore.expireMessageGroups(timeout) 來使 MessageGroup 過期。您可以透過控制匯流排(Control Bus)操作實現此目的,或者,如果您有 MessageGroupStore 例項的引用,則可以透過呼叫 expireMessageGroups(timeout) 來實現。否則,僅此屬性本身不起作用。它僅作為一種指示,用於指示是否丟棄或傳送到輸出或回覆通道任何仍在即將過期的 MessageGroup 中的訊息。可選(預設值為 false )。注意:此屬性更準確的名稱可能是 send-partial-result-on-timeout ,因為如果 expire-groups-upon-timeout 設定為 false ,該組可能實際上不會過期。 |
9 | 將回復 Message 傳送到 output-channel 或 discard-channel 時等待的超時間隔。預設為 30 秒。僅當輸出通道存在某些“傳送”限制(例如具有固定“容量”的 QueueChannel )時才應用。在這種情況下,將丟擲 MessageDeliveryException 。對於 AbstractSubscribableChannel 實現,將忽略 send-timeout 。對於 group-timeout(-expression) ,來自計劃過期任務的 MessageDeliveryException 會導致該任務重新計劃。可選。 |
10 | 引用實現訊息關聯(分組)演算法的 bean。該 bean 可以是 CorrelationStrategy 介面的實現或 POJO。在後一種情況下,必須同時定義 correlation-strategy-method 屬性。可選(預設情況下,聚合器使用 IntegrationMessageHeaderAccessor.CORRELATION_ID 訊息頭)。 |
11 | 在由 correlation-strategy 引用的 bean 上定義的方法。它實現了關聯決策演算法。可選,但有限制(必須存在 correlation-strategy )。 |
12 | 表示關聯策略的 SpEL 表示式。示例:"headers['something']" 。只允許使用 correlation-strategy 或 correlation-strategy-expression 中的一個。 |
13 | 引用應用程式上下文中定義的 bean。該 bean 必須實現聚合邏輯,如前所述。可選(預設情況下,聚合訊息列表成為輸出訊息的有效載荷)。 |
14 | 在由 ref 屬性引用的 bean 上定義的方法。它實現了訊息聚合演算法。可選(取決於 ref 屬性是否已定義)。 |
15 | 引用實現釋放策略的 bean。該 bean 可以是 ReleaseStrategy 介面的實現或 POJO。在後一種情況下,必須同時定義 release-strategy-method 屬性。可選(預設情況下,聚合器使用 IntegrationMessageHeaderAccessor.SEQUENCE_SIZE 訊息頭屬性)。 |
16 | 在由 release-strategy 屬性引用的 bean 上定義的方法。它實現了完成決策演算法。可選,但有限制(必須存在 release-strategy )。 |
17 | 表示釋放策略的 SpEL 表示式。表示式的根物件是 MessageGroup 。示例:"size() == 5" 。只允許使用 release-strategy 或 release-strategy-expression 中的一個。 |
18 | 當設定為 true 時(預設為 false ),已完成的組將從訊息儲存中移除,從而使具有相同關聯的後續訊息形成一個新組。預設行為是將與已完成組具有相同關聯的訊息傳送到 discard-channel 。 |
19 | 僅在為 <aggregator> 的 MessageStore 配置了 MessageGroupStoreReaper 時才適用。預設情況下,當配置 MessageGroupStoreReaper 來使部分組過期時,空組也會被移除。組正常釋放後會存在空組。空組可以檢測並丟棄延遲到達的訊息。如果您希望以比部分組過期更長的時間間隔使空組過期,請設定此屬性。然後,空組在至少在此毫秒數內未被修改之前不會從 MessageStore 中移除。請注意,空組的實際過期時間也受收割機(reaper)的 timeout 屬性影響,它可能與此值加上超時一樣長。 |
20 | 引用一個 org.springframework.integration.util.LockRegistry bean。它用於根據 groupId 獲取一個 Lock ,以便對 MessageGroup 進行併發操作。預設情況下,使用內部 DefaultLockRegistry 。使用分散式 LockRegistry (如 ZookeeperLockRegistry )可確保聚合器只有一個例項可以併發操作組。有關更多資訊,請參閱 Redis Lock Registry 或 Zookeeper Lock Registry。 |
21 | 一個超時值(毫秒),用於在當前訊息到達時 ReleaseStrategy 未釋放組的情況下強制 MessageGroup 完成。此屬性為聚合器提供了一個內建的基於時間的釋放策略,當需要發射部分結果(或丟棄組)時,如果新訊息在超時時間內(從最後一條訊息到達的時間開始計算)未到達 MessageGroup 。要設定從建立 MessageGroup 的時間開始計算的超時,請參閱 group-timeout-expression 資訊。當新訊息到達聚合器時,任何現有的針對其 MessageGroup 的 ScheduledFuture<?> 都將被取消。如果 ReleaseStrategy 返回 false (表示不釋放)且 groupTimeout > 0 ,則會計劃一個新的任務來使組過期。我們不建議將此屬性設定為零(或負值)。這樣做會有效地停用聚合器,因為每個訊息組都會立即完成。但是,您可以使用表示式有條件地將其設定為零(或負值)。有關資訊,請參閱 group-timeout-expression 。完成期間採取的操作取決於 ReleaseStrategy 和 send-partial-result-on-expiry 屬性。有關更多資訊,請參閱 Aggregator 和 Group Timeout。它與 'group-timeout-expression' 屬性互斥。 |
22 | 評估為 groupTimeout 的 SpEL 表示式,其中 MessageGroup 作為 #root 評估上下文物件。用於計劃強制完成 MessageGroup 。如果表示式評估為 null ,則不計劃完成。如果評估為零,則組會立即在當前執行緒上完成。實際上,這提供了一個動態的 group-timeout 屬性。例如,如果您希望在組建立時間過去 10 秒後強制完成一個 MessageGroup ,您可以考慮使用以下 SpEL 表示式:timestamp + 10000 - T(System).currentTimeMillis() ,其中 timestamp 由 MessageGroup.getTimestamp() 提供,因為這裡的 MessageGroup 是 #root 評估上下文物件。但請記住,組建立時間可能與第一條到達訊息的時間不同,具體取決於其他組過期屬性的配置。有關更多資訊,請參閱 group-timeout 。與 'group-timeout' 屬性互斥。 |
23 | 當組由於超時(或 MessageGroupStoreReaper )而完成時,預設情況下組會過期(完全移除)。延遲到達的訊息會啟動一個新組。您可以將此設定為 false 以完成組但保留其元資料,以便丟棄延遲到達的訊息。空組以後可以使用 MessageGroupStoreReaper 與 empty-group-min-timeout 屬性一起移除。預設為 'true'。 |
24 | 一個 TaskScheduler bean 引用,用於計劃 MessageGroup ,以便如果在 groupTimeout 內沒有新訊息到達 MessageGroup ,則強制完成。如果未提供,則使用在 ApplicationContext 中註冊的預設排程器(taskScheduler )(ThreadPoolTaskScheduler )。如果未指定 group-timeout 或 group-timeout-expression ,則此屬性不適用。 |
25 | 自 4.1 版本起。它允許為 forceComplete 操作啟動事務。它由 group-timeout(-expression) 或 MessageGroupStoreReaper 啟動,不適用於正常的 add 、release 和 discard 操作。只允許使用此子元素或 <expire-advice-chain/> 。 |
26 | 自 4.1 版本起。它允許為 forceComplete 操作配置任何 Advice 。它由 group-timeout(-expression) 或 MessageGroupStoreReaper 啟動,不適用於正常的 add 、release 和 discard 操作。只允許使用此子元素或 <expire-transactional/> 。也可以在此處使用 Spring tx 名稱空間配置事務 Advice 。 |
組的過期(Expiring Groups)
有兩個屬性與組的過期(完全移除)相關。當組過期時,沒有它的記錄,如果新訊息帶有相同的關聯 ID 到達,則會啟動一個新的組。當組完成(未過期)時,空組仍然存在,延遲到達的訊息將被丟棄。空組以後可以使用
如果組未正常完成,而是由於超時而被釋放或丟棄,則通常組會過期。自 4.1 版本起,您可以使用
自 5.0 版本起,空組也會在 從 5.4 版本開始,可以配置聚合器(和重排序器)以使孤立組過期(持久訊息儲存中可能無法以其他方式釋放的組)。 |
如果自定義聚合器處理程式實現在其他 <aggregator>
定義中被引用,我們通常建議使用 ref
屬性。但是,如果自定義聚合器實現僅被單個 <aggregator>
定義使用,您可以使用內部 bean 定義(從 1.0.3 版本開始)在 <aggregator/>
元素內配置聚合 POJO,如下例所示
<aggregator input-channel="input" method="sum" output-channel="output">
<beans:bean class="org.foo.PojoAggregator"/>
</aggregator>
在同一 <aggregator> 配置中同時使用 ref 屬性和內部 bean 定義是不允許的,因為它會建立歧義條件。在這種情況下,會丟擲 Exception。 |
以下示例顯示了聚合器 bean 的實現
public class PojoAggregator {
public Long add(List<Long> results) {
long total = 0l;
for (long partialResult: results) {
total += partialResult;
}
return total;
}
}
前面示例的完成策略 bean 實現可能如下所示
public class PojoReleaseStrategy {
...
public boolean canRelease(List<Long> numbers) {
int sum = 0;
for (long number: numbers) {
sum += number;
}
return sum >= maxValue;
}
}
在有意義的地方,釋放策略方法和聚合器方法可以合併到單個 bean 中。 |
上述示例的關聯策略 bean 實現可能如下所示
public class PojoCorrelationStrategy {
...
public Long groupNumbersByLastDigit(Long number) {
return number % 10;
}
}
前面示例中的聚合器將按某個標準(在本例中是除以十後的餘數)對數字進行分組,並保持該組,直到有效載荷提供的數字總和超過某個值。
在有意義的地方,釋放策略方法、關聯策略方法和聚合器方法可以合併到單個 bean 中。(實際上,它們中的所有方法或任意兩個方法都可以合併。) |
聚合器和 Spring Expression Language (SpEL)
自 Spring Integration 2.0 起,您可以使用 SpEL 處理各種策略(關聯、釋放和聚合)。如果釋出策略背後的邏輯相對簡單,我們建議使用 SpEL。假設您有一個遺留元件,設計用於接收物件陣列。我們知道預設的釋出策略將所有聚合訊息組裝到 List
中。現在我們有兩個問題。首先,我們需要從列表中提取單個訊息。其次,我們需要提取每條訊息的有效載荷並組裝物件陣列。以下示例解決了這兩個問題
public String[] processRelease(List<Message<String>> messages){
List<String> stringList = new ArrayList<String>();
for (Message<String> message : messages) {
stringList.add(message.getPayload());
}
return stringList.toArray(new String[]{});
}
然而,使用 SpEL,這樣的要求實際上可以透過一行表示式相對容易地處理,從而無需編寫自定義類並將其配置為 bean。以下示例展示瞭如何實現這一點
<int:aggregator input-channel="aggChannel"
output-channel="replyChannel"
expression="#this.![payload].toArray()"/>
在上述配置中,我們使用 集合投影(collection projection) 表示式從列表中所有訊息的有效載荷組裝一個新集合,然後將其轉換為陣列,從而達到與早期 Java 程式碼相同的結果。
處理自定義釋放和關聯策略時,您可以應用相同的基於表示式的方法。
您無需在 correlation-strategy
屬性中定義自定義 CorrelationStrategy
的 bean,而是可以將簡單的關聯邏輯實現為 SpEL 表示式,並將其配置在 correlation-strategy-expression
屬性中,如下例所示
correlation-strategy-expression="payload.person.id"
在上述示例中,我們假設有效載荷具有一個帶有 id
的 person
屬性,該屬性將用於關聯訊息。
同樣,對於 ReleaseStrategy
,您可以將釋放邏輯實現為 SpEL 表示式,並將其配置在 release-strategy-expression
屬性中。評估上下文的根物件是 MessageGroup
本身。可以使用表示式中的組的 message
屬性引用訊息列表。
在 5.0 版本之前的版本中,根物件是 Message<?> 的集合,如前面示例所示 |
release-strategy-expression="!messages.?[payload==5].empty"
在上述示例中,SpEL 評估上下文的根物件是 MessageGroup
本身,您正在宣告,只要該組中存在有效載荷為 5
的訊息,該組就應該被釋放。
Aggregator 和 Group Timeout
自 4.0 版本起,引入了兩個新的互斥屬性:group-timeout
和 group-timeout-expression
。參見 使用 XML 配置聚合器。在某些情況下,如果在當前訊息到達時 ReleaseStrategy
未釋放聚合器結果(或丟棄組),您可能需要在超時後發出聚合器結果。為此,groupTimeout
選項允許計劃強制完成 MessageGroup
,如下例所示
<aggregator input-channel="input" output-channel="output"
send-partial-result-on-expiry="true"
group-timeout-expression="size() ge 2 ? 10000 : -1"
release-strategy-expression="messages[0].headers.sequenceNumber == messages[0].headers.sequenceSize"/>
在此示例中,如果聚合器接收到由 release-strategy-expression
定義的序列中的最後一條訊息,則可以進行正常釋放。如果該特定訊息未到達,只要組包含至少兩條訊息,groupTimeout
會在十秒後強制組完成。
強制組完成的結果取決於 ReleaseStrategy
和 send-partial-result-on-expiry
。首先,再次諮詢釋放策略,以檢視是否應進行正常釋放。雖然組未更改,但 ReleaseStrategy
此時可以決定釋放組。如果釋放策略在超時期間仍然不釋放組,則組會過期。如果 send-partial-result-on-expiry
為 true
,則(部分)MessageGroup
中存在的現有訊息作為正常聚合器回覆訊息釋放到 output-channel
。否則,它將被丟棄。
groupTimeout
行為與 MessageGroupStoreReaper
存在差異(參見 使用 XML 配置聚合器)。收割機定期啟動對 MessageGroupStore
中所有 MessageGroup
的強制完成。groupTimeout
會針對每個 MessageGroup
單獨執行此操作,前提是在 groupTimeout
期間沒有新訊息到達。此外,收割機可用於移除空組(如果 expire-groups-upon-completion
為 false,則保留空組以便丟棄延遲訊息)。
自 5.5 版本起,groupTimeoutExpression
可以評估為 java.util.Date
例項。這在某些情況下可能很有用,例如根據組建立時間(MessageGroup.getTimestamp()
)確定計劃任務時刻,而不是根據當前訊息到達時間計算(當 groupTimeoutExpression
評估為 long
時)。
group-timeout-expression="size() ge 2 ? new java.util.Date(timestamp + 200) : null"
使用註解配置聚合器
以下示例顯示了使用註解配置的聚合器
public class Waiter {
...
@Aggregator (1)
public Delivery aggregatingMethod(List<OrderItem> items) {
...
}
@ReleaseStrategy (2)
public boolean releaseChecker(List<Message<?>> messages) {
...
}
@CorrelationStrategy (3)
public String correlateBy(OrderItem item) {
...
}
}
1 | 一個註解,指示此方法應作為聚合器使用。如果此類用作聚合器,則必須指定此註解。 |
2 | 一個註解,指示此方法用作聚合器的釋放策略。如果在任何方法上不存在此註解,則聚合器使用 SimpleSequenceSizeReleaseStrategy 。 |
3 | 一個註解,指示此方法應作為聚合器的關聯策略使用。如果未指定關聯策略,則聚合器使用基於 CORRELATION_ID 的 HeaderAttributeCorrelationStrategy 。 |
XML 元素提供的所有配置選項也適用於 @Aggregator
註解。
聚合器可以透過 XML 顯式引用,或者,如果在類上定義了 @MessageEndpoint
,則可以透過類路徑掃描自動檢測。
Aggregator 元件的註解配置(@Aggregator
等)僅涵蓋簡單用例,其中大多數預設選項已足夠。如果在使用註解配置時需要更多地控制這些選項,請考慮為 AggregatingMessageHandler
使用 @Bean
定義,並使用 @ServiceActivator
標記其 @Bean
方法,如下例所示
@ServiceActivator(inputChannel = "aggregatorChannel")
@Bean
public MessageHandler aggregator(MessageGroupStore jdbcMessageGroupStore) {
AggregatingMessageHandler aggregator =
new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
jdbcMessageGroupStore);
aggregator.setOutputChannel(resultsChannel());
aggregator.setGroupTimeoutExpression(new ValueExpression<>(500L));
aggregator.setTaskScheduler(this.taskScheduler);
return aggregator;
}
自 4.2 版本起,AggregatorFactoryBean 可用於簡化 AggregatingMessageHandler 的 Java 配置。 |
在聚合器中管理狀態:MessageGroupStore
聚合器(以及 Spring Integration 中的一些其他模式)是一種有狀態模式,需要基於一段時間內到達的具有相同關聯鍵的一組訊息做出決策。有狀態模式(如 ReleaseStrategy
)中介面的設計遵循一個原則:元件(無論是框架定義的還是使用者定義的)應能保持無狀態。所有狀態都由 MessageGroup
攜帶,其管理委託給 MessageGroupStore
。MessageGroupStore
介面定義如下
public interface MessageGroupStore {
int getMessageCountForAllMessageGroups();
int getMarkedMessageCountForAllMessageGroups();
int getMessageGroupCount();
MessageGroup getMessageGroup(Object groupId);
MessageGroup addMessageToGroup(Object groupId, Message<?> message);
MessageGroup markMessageGroup(MessageGroup group);
MessageGroup removeMessageFromGroup(Object key, Message<?> messageToRemove);
MessageGroup markMessageFromGroup(Object key, Message<?> messageToMark);
void removeMessageGroup(Object groupId);
void registerMessageGroupExpiryCallback(MessageGroupCallback callback);
int expireMessageGroups(long timeout);
}
有關更多資訊,請參閱 Javadoc。
MessageGroupStore
在等待釋放策略觸發時在 MessageGroups
中累積狀態資訊,而此事件可能永遠不會發生。因此,為了防止陳舊訊息滯留,併為易失性儲存提供在應用程式關閉時進行清理的鉤子,MessageGroupStore
允許您註冊回撥,以便在 MessageGroups
過期時應用於它們。介面非常直接,如下所示
public interface MessageGroupCallback {
void execute(MessageGroupStore messageGroupStore, MessageGroup group);
}
回撥可以直接訪問儲存和訊息組,以便管理持久狀態(例如,從儲存中完全移除組)。
MessageGroupStore
維護這些回撥的列表,並根據需要將其應用於時間戳早於作為引數提供的時間的所有訊息(參見前面描述的 registerMessageGroupExpiryCallback(..)
和 expireMessageGroups(..)
方法)。
重要的是,在依賴 expireMessageGroups 功能時,不要在不同的聚合器元件中使用同一個 MessageGroupStore 例項。每個 AbstractCorrelatingMessageHandler 都會基於 forceComplete() 回撥註冊自己的 MessageGroupCallback 。這樣,每個待過期的組都可能被錯誤的聚合器完成或丟棄。自 5.0.10 版本起,AbstractCorrelatingMessageHandler 在 MessageGroupStore 中註冊回撥時使用 UniqueExpiryCallback 。MessageGroupStore 反過來檢查此類的例項是否存在,如果回撥集中已存在一個例項,則會記錄帶有適當訊息的錯誤。這樣,框架禁止在不同的聚合器/重排序器中使用同一個 MessageGroupStore 例項,以避免上述副作用,即由特定關聯處理程式未建立的組被過期。 |
您可以呼叫帶有超時值的 expireMessageGroups
方法。任何早於當前時間減去此值的訊息都會過期並應用回撥。因此,由儲存的使用者定義訊息組“過期”的含義。
作為使用者的便利,Spring Integration 提供了訊息過期功能的包裝器,形式為 MessageGroupStoreReaper
,如下例所示
<bean id="reaper" class="org...MessageGroupStoreReaper">
<property name="messageGroupStore" ref="messageStore"/>
<property name="timeout" value="30000"/>
</bean>
<task:scheduled-tasks scheduler="scheduler">
<task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
</task:scheduled-tasks>
收割機是一個 Runnable
。在上述示例中,訊息組儲存的過期方法每十秒鐘呼叫一次。超時本身是 30 秒。
理解 MessageGroupStoreReaper 的 'timeout' 屬性是一個近似值,並且受任務排程器的速率影響非常重要,因為此屬性僅在 MessageGroupStoreReaper 任務的下一次計劃執行時檢查。例如,如果超時設定為十分鐘,但 MessageGroupStoreReaper 任務計劃每小時執行一次,並且 MessageGroupStoreReaper 任務的最後一次執行發生在超時前一分鐘,則 MessageGroup 在接下來的 59 分鐘內都不會過期。因此,我們建議將速率設定為至少等於超時值或更短。 |
除了收割機之外,在應用程式透過 AbstractCorrelatingMessageHandler
中的生命週期回撥關閉時,也會呼叫過期回撥。
AbstractCorrelatingMessageHandler
註冊了自己的過期回撥,這與聚合器 XML 配置中的布林標誌 send-partial-result-on-expiry
相關聯。如果標誌設定為 true
,則當呼叫過期回撥時,未釋放組中任何未標記的訊息可以傳送到輸出通道。
由於 MessageGroupStoreReaper 是從計劃任務中呼叫的,並可能導致生成訊息(取決於 sendPartialResultOnExpiry 選項)到下游整合流,因此建議提供一個帶有 MessagePublishingErrorHandler 的自定義 TaskScheduler ,以透過 errorChannel 處理異常,這可能是常規聚合器釋放功能所期望的。同樣的邏輯也適用於組超時功能,它也依賴於 TaskScheduler 。有關更多資訊,請參閱 錯誤處理(Error Handling)。 |
當共享 一些 有關 |
Flux 聚合器
在 5.2 版本中,引入了 FluxAggregatorMessageHandler
元件。它基於 Project Reactor 的 Flux.groupBy()
和 Flux.window()
運算子。接收到的訊息被髮送到由該元件建構函式中的 Flux.create()
啟動的 FluxSink
。如果未提供 outputChannel
或它不是 ReactiveStreamsSubscribableChannel
的例項,則對主 Flux
的訂閱是在 Lifecycle.start()
實現中完成的。否則,它會推遲到 ReactiveStreamsSubscribableChannel
實現完成訂閱。訊息透過 Flux.groupBy()
使用 CorrelationStrategy
作為組鍵進行分組。預設情況下,會查詢訊息的 IntegrationMessageHeaderAccessor.CORRELATION_ID
訊息頭。
預設情況下,每個關閉的視窗都作為訊息有效載荷中的 Flux
釋放以產生。此訊息包含視窗中第一條訊息的所有訊息頭。輸出訊息有效載荷中的此 Flux
必須在下游訂閱和處理。這種邏輯可以透過 FluxAggregatorMessageHandler
的 setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>>)
配置選項進行定製(或取代)。例如,如果我們希望最終訊息中包含有效載荷的 List
,我們可以配置 Flux.collectList()
,如下所示
fluxAggregatorMessageHandler.setCombineFunction(
(messageFlux) ->
messageFlux
.map(Message::getPayload)
.collectList()
.map(GenericMessage::new));
FluxAggregatorMessageHandler
中有幾個選項可用於選擇適當的視窗策略
-
setBoundaryTrigger(Predicate<Message<?>>)
- 傳播到Flux.windowUntil()
運算子。有關更多資訊,請參閱其 JavaDocs。此選項優先於所有其他視窗選項。 -
setWindowSize(int)
和setWindowSizeFunction(Function<Message<?>, Integer>)
- 傳播到Flux.window(int)
或windowTimeout(int, Duration)
。預設情況下,視窗大小是根據組中第一條訊息及其IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
訊息頭計算得出的。 -
setWindowTimespan(Duration)
- 傳播到Flux.window(Duration)
或windowTimeout(int, Duration)
,具體取決於視窗大小配置。 -
setWindowConfigurer(Function<Flux<Message<?>>, Flux<Flux<Message<?>>>>)
- 一個函式,用於將轉換應用於分組的 Flux,以實現任何未透過暴露選項覆蓋的自定義視窗操作。
由於此元件是一個 MessageHandler
實現,它可以簡單地作為 `@Bean` 定義與 `@ServiceActivator` 訊息註解一起使用。使用 Java DSL 時,可以在 .handle()
EIP 方法中使用它。下面的示例演示了我們如何在執行時註冊 IntegrationFlow
以及如何將 FluxAggregatorMessageHandler
與上游的 splitter 進行關聯。
IntegrationFlow fluxFlow =
(flow) -> flow
.split()
.channel(MessageChannels.flux())
.handle(new FluxAggregatorMessageHandler());
IntegrationFlowContext.IntegrationFlowRegistration registration =
this.integrationFlowContext.registration(fluxFlow)
.register();
Flux<Message<?>> window =
registration.getMessagingTemplate()
.convertSendAndReceive(new Integer[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }, Flux.class);