嚴格訊息排序

本節介紹入站和出站訊息的訊息排序。

入站

如果需要嚴格的入站訊息排序,必須將入站監聽器容器的 prefetchCount 屬性配置為 1。這是因為,如果訊息失敗並被重新投遞,它會在現有預取訊息之後到達。自 Spring AMQP 2.0 版本以來,prefetchCount 預設設定為 250 以提高效能。嚴格排序要求會以降低效能為代價。

出站

考慮以下整合流

@Bean
public IntegrationFlow flow(RabbitTemplate template) {
    return IntegrationFlow.from(Gateway.class)
            .splitWith(s -> s.delimiters(","))
            .<String, String>transform(String::toUpperCase)
            .handle(Amqp.outboundAdapter(template).routingKey("rk"))
            .get();
}

假設我們向閘道器傳送訊息 A、B 和 C。儘管訊息 A、B、C 很可能按順序傳送,但不能保證。這是因為模板為每個傳送操作從快取中“借用”一個通道,並且不能保證每個訊息都使用相同的通道。一種解決方案是在分發器之前啟動事務,但在 RabbitMQ 中事務開銷很大,可能導致效能降低數百倍。

為了更高效地解決此問題,從 5.1 版本開始,Spring Integration 提供了 BoundRabbitChannelAdvice,它是一個 HandleMessageAdvice。參見 處理訊息 Advice。當應用於分發器之前時,它確保所有下游操作在同一通道上執行,並且可以選擇等待所有已傳送訊息的釋出者確認(如果連線工廠配置為進行確認)。以下示例展示瞭如何使用 BoundRabbitChannelAdvice

@Bean
public IntegrationFlow flow(RabbitTemplate template) {
    return IntegrationFlow.from(Gateway.class)
            .splitWith(s -> s.delimiters(",")
                    .advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))
            .<String, String>transform(String::toUpperCase)
            .handle(Amqp.outboundAdapter(template).routingKey("rk"))
            .get();
}

請注意,advice 和出站介面卡中使用了相同的 RabbitTemplate(它實現了 RabbitOperations)。advice 在模板的 invoke 方法內執行下游流,以便所有操作都在同一通道上執行。如果提供了可選的超時時間,當流完成後,advice 會呼叫 waitForConfirmsOrDie 方法,如果在指定時間內未收到確認,則會丟擲異常。

下游流中不得存線上程移交(例如 QueueChannelExecutorChannel 等)。