聚合器和重排序器
聚合器(Aggregator
)在概念上與拆分器(Splitter
)相反。它將一系列獨立訊息聚合成一個單一訊息,因此必然更為複雜。預設情況下,聚合器返回一個包含來自入站訊息的多個負載集合的訊息。重排序器(Resequencer
)也適用同樣的規則。以下示例展示了拆分器-聚合器模式的典型用法
@Bean
public IntegrationFlow splitAggregateFlow() {
return IntegrationFlow.from("splitAggregateInput")
.split()
.channel(MessageChannels.executor(this.taskExecutor()))
.resequence()
.aggregate()
.get();
}
split()
方法將列表拆分成獨立訊息併發送到 ExecutorChannel
。resequence()
方法根據訊息頭中的序列詳情重新排序訊息。aggregate()
方法收集這些訊息。
但是,您可以透過指定釋放策略(release strategy)和關聯策略(correlation strategy)等來改變預設行為。考慮以下示例
.aggregate(a ->
a.correlationStrategy(m -> m.getHeaders().get("myCorrelationKey"))
.releaseStrategy(g -> g.size() > 10)
.messageStore(messageStore()))
上述示例將具有 myCorrelationKey
訊息頭的訊息關聯起來,並在至少累積十條訊息後釋放它們。
resequence()
EIP 方法也提供了類似的 lambda 配置。