嚴格訊息排序
本節介紹入站和出站訊息的訊息排序。
入站
如果需要嚴格的入站訊息排序,必須將入站監聽器容器的 prefetchCount
屬性配置為 1
。這是因為,如果訊息失敗並被重新投遞,它會在現有預取訊息之後到達。自 Spring AMQP 2.0 版本以來,prefetchCount
預設設定為 250
以提高效能。嚴格排序要求會以降低效能為代價。
出站
考慮以下整合流
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlow.from(Gateway.class)
.splitWith(s -> s.delimiters(","))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}
假設我們向閘道器傳送訊息 A、B 和 C。儘管訊息 A、B、C 很可能按順序傳送,但不能保證。這是因為模板為每個傳送操作從快取中“借用”一個通道,並且不能保證每個訊息都使用相同的通道。一種解決方案是在分發器之前啟動事務,但在 RabbitMQ 中事務開銷很大,可能導致效能降低數百倍。
為了更高效地解決此問題,從 5.1 版本開始,Spring Integration 提供了 BoundRabbitChannelAdvice
,它是一個 HandleMessageAdvice
。參見 處理訊息 Advice。當應用於分發器之前時,它確保所有下游操作在同一通道上執行,並且可以選擇等待所有已傳送訊息的釋出者確認(如果連線工廠配置為進行確認)。以下示例展示瞭如何使用 BoundRabbitChannelAdvice
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlow.from(Gateway.class)
.splitWith(s -> s.delimiters(",")
.advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}
請注意,advice 和出站介面卡中使用了相同的 RabbitTemplate
(它實現了 RabbitOperations
)。advice 在模板的 invoke
方法內執行下游流,以便所有操作都在同一通道上執行。如果提供了可選的超時時間,當流完成後,advice 會呼叫 waitForConfirmsOrDie
方法,如果在指定時間內未收到確認,則會丟擲異常。
下游流中不得存線上程移交(例如 QueueChannel 、ExecutorChannel 等)。 |