嚴格的訊息排序
本節描述了入站和出站訊息的訊息排序。
入站
如果需要嚴格排序入站訊息,則必須將入站偵聽器容器的 prefetchCount 屬性配置為 1。這是因為,如果訊息失敗並被重新傳遞,它會在現有的預取訊息之後到達。自 Spring AMQP 2.0 版以來,為了提高效能,prefetchCount 預設為 250。嚴格的排序要求會以降低效能為代價。
出站
考慮以下整合流
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlow.from(Gateway.class)
.splitWith(s -> s.delimiters(","))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}
假設我們向閘道器傳送訊息 A、B 和 C。雖然訊息 A、B、C 很可能會按順序傳送,但無法保證。這是因為模板為每個傳送操作從快取中“借用”一個通道,並且不能保證每個訊息都使用相同的通道。一個解決方案是在拆分器之前啟動一個事務,但在 RabbitMQ 中事務開銷很大,並且可能將效能降低數百倍。
為了更有效地解決此問題,從 5.1 版本開始,Spring Integration 提供了 BoundRabbitChannelAdvice,它是一個 HandleMessageAdvice。參見 處理訊息通知。當在拆分器之前應用時,它確保所有下游操作都在同一通道上執行,並且可以選擇等待所有傳送訊息的釋出者確認被接收(如果連線工廠配置了確認)。以下示例展示瞭如何使用 BoundRabbitChannelAdvice
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlow.from(Gateway.class)
.splitWith(s -> s.delimiters(",")
.advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}
請注意,在通知和出站介面卡中使用了相同的 RabbitTemplate(它實現了 RabbitOperations)。通知在模板的 invoke 方法中執行下游流,以便所有操作都在同一通道上執行。如果提供了可選的超時,當流完成時,通知會呼叫 waitForConfirmsOrDie 方法,如果在指定時間內未收到確認,則會丟擲異常。
下游流中不得有執行緒交接(QueueChannel、ExecutorChannel 等)。 |