嚴格的訊息排序

本節描述了入站和出站訊息的訊息排序。

入站

如果需要嚴格排序入站訊息,則必須將入站偵聽器容器的 prefetchCount 屬性配置為 1。這是因為,如果訊息失敗並被重新傳遞,它會在現有的預取訊息之後到達。自 Spring AMQP 2.0 版以來,為了提高效能,prefetchCount 預設為 250。嚴格的排序要求會以降低效能為代價。

出站

考慮以下整合流

@Bean
public IntegrationFlow flow(RabbitTemplate template) {
    return IntegrationFlow.from(Gateway.class)
            .splitWith(s -> s.delimiters(","))
            .<String, String>transform(String::toUpperCase)
            .handle(Amqp.outboundAdapter(template).routingKey("rk"))
            .get();
}

假設我們向閘道器傳送訊息 ABC。雖然訊息 ABC 很可能會按順序傳送,但無法保證。這是因為模板為每個傳送操作從快取中“借用”一個通道,並且不能保證每個訊息都使用相同的通道。一個解決方案是在拆分器之前啟動一個事務,但在 RabbitMQ 中事務開銷很大,並且可能將效能降低數百倍。

為了更有效地解決此問題,從 5.1 版本開始,Spring Integration 提供了 BoundRabbitChannelAdvice,它是一個 HandleMessageAdvice。參見 處理訊息通知。當在拆分器之前應用時,它確保所有下游操作都在同一通道上執行,並且可以選擇等待所有傳送訊息的釋出者確認被接收(如果連線工廠配置了確認)。以下示例展示瞭如何使用 BoundRabbitChannelAdvice

@Bean
public IntegrationFlow flow(RabbitTemplate template) {
    return IntegrationFlow.from(Gateway.class)
            .splitWith(s -> s.delimiters(",")
                    .advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))
            .<String, String>transform(String::toUpperCase)
            .handle(Amqp.outboundAdapter(template).routingKey("rk"))
            .get();
}

請注意,在通知和出站介面卡中使用了相同的 RabbitTemplate(它實現了 RabbitOperations)。通知在模板的 invoke 方法中執行下游流,以便所有操作都在同一通道上執行。如果提供了可選的超時,當流完成時,通知會呼叫 waitForConfirmsOrDie 方法,如果在指定時間內未收到確認,則會丟擲異常。

下游流中不得有執行緒交接(QueueChannelExecutorChannel 等)。
© . This site is unofficial and not affiliated with VMware.