請求/回覆訊息傳遞

AmqpTemplate 還提供了多種 sendAndReceive 方法,這些方法接受與前面描述的單向傳送操作相同的引數選項(exchangeroutingKeyMessage)。 這些方法對於請求/回覆場景非常有用,因為它們在傳送前處理必要的 reply-to 屬性的配置,並且可以監聽為該目的在內部建立的獨佔佇列上的回覆訊息。

類似的請求/回覆方法也可用,其中 MessageConverter 同時應用於請求和回覆。 這些方法名為 convertSendAndReceive。 詳情請參閱 AmqpTemplate 的 Javadoc

從版本 1.5.0 開始,每個 sendAndReceive 方法變體都有一個過載版本,接受 CorrelationData。 結合正確配置的連線工廠,這使得可以接收操作傳送端的釋出者確認。 更多資訊請參閱 相關釋出者確認和返回 以及 RabbitOperations 的 Javadoc

從版本 2.0 開始,這些方法有變體(convertSendAndReceiveAsType),它們接受一個額外的 ParameterizedTypeReference 引數,用於轉換複雜的返回型別。 必須為模板配置一個 SmartMessageConverter。 更多資訊請參閱 使用 RabbitTemplateMessage 轉換

從版本 2.1 開始,您可以配置 RabbitTemplatenoLocalReplyConsumer 選項,以控制回覆消費者的 noLocal 標誌。 預設情況下,此標誌為 false

回覆超時

預設情況下,傳送和接收方法會在五秒後超時並返回 null。 您可以透過設定 replyTimeout 屬性來修改此行為。 從版本 1.5 開始,如果您將 mandatory 屬性設定為 true(或 mandatory-expression 對特定訊息評估為 true),如果訊息無法傳送到佇列,將丟擲 AmqpMessageReturnedException 異常。 此異常包含 returnedMessagereplyCodereplyText 屬性,以及用於傳送的 exchangeroutingKey

此功能使用釋出者返回。 您可以透過在 CachingConnectionFactory 上將 publisherReturns 設定為 true 來啟用它(請參閱 釋出者確認和返回)。 另外,您不得在 RabbitTemplate 中註冊自己的 ReturnCallback

從版本 2.1.2 開始,添加了一個 replyTimedOut 方法,允許子類獲知超時,以便它們可以清理任何保留的狀態。

從版本 2.0.11 和 2.1.3 開始,當您使用預設的 DirectReplyToMessageListenerContainer 時,您可以透過設定模板的 replyErrorHandler 屬性來新增錯誤處理器。 此錯誤處理器會在任何失敗的傳送(例如延遲迴復以及沒有關聯頭接收到的訊息)時被呼叫。 傳入的異常是 ListenerExecutionFailedException,它包含一個 failedMessage 屬性。

RabbitMQ Direct reply-to

從版本 3.4.0 開始,RabbitMQ 伺服器支援 direct reply-to。 這消除了固定回覆佇列的主要原因(避免為每個請求建立臨時佇列的需要)。 從 Spring AMQP 版本 1.4.1 開始,預設使用 direct reply-to(如果伺服器支援),而不是建立臨時回覆佇列。 當未提供 replyQueue(或將其名稱設定為 amq.rabbitmq.reply-to)時,RabbitTemplate 會自動檢測是否支援 direct reply-to,然後使用它或回退到使用臨時回覆佇列。 使用 direct reply-to 時,不需要配置 reply-listener,也不應進行配置。

回覆監聽器仍然支援命名佇列(除了 amq.rabbitmq.reply-to),允許控制回覆併發等。

從版本 1.6 開始,如果您希望為每個回覆使用一個臨時的、獨佔的、自動刪除的佇列,請將 useTemporaryReplyQueues 屬性設定為 true。 如果您設定了 replyAddress,此屬性將被忽略。

您可以透過繼承 RabbitTemplate 並重寫 useDirectReplyTo() 方法來改變使用 direct reply-to 的判定標準。 此方法僅在第一次傳送請求時被呼叫一次。

在版本 2.0 之前,RabbitTemplate 為每個請求建立一個新的消費者,並在收到回覆(或超時)時取消該消費者。 現在模板改為使用 DirectReplyToMessageListenerContainer,允許消費者被重用。 模板仍然負責關聯回覆,因此不必擔心延遲迴復發送給不同的傳送者。 如果您想恢復到之前的行為,請將 useDirectReplyToContainer(使用 XML 配置時為 direct-reply-to-container)屬性設定為 false。

AsyncRabbitTemplate 沒有此選項。 在使用 direct reply-to 時,它始終使用 DirectReplyToContainer 來處理回覆。

從版本 2.3.7 開始,模板添加了一個新的屬性 useChannelForCorrelation。 當此屬性為 true 時,伺服器無需從請求訊息頭複製關聯 ID 到回覆訊息。 相反,用於傳送請求的通道被用來將回復與請求關聯起來。

使用回覆佇列進行訊息關聯

當使用固定回覆佇列(除了 amq.rabbitmq.reply-to)時,必須提供關聯資料以便將回復與請求關聯起來。 請參閱 RabbitMQ 遠端過程呼叫 (RPC)。 預設情況下,標準 correlationId 屬性用於存放關聯資料。 但是,如果您希望使用自定義屬性存放關聯資料,可以在 <rabbit-template/> 上設定 correlation-key 屬性。 顯式地將屬性設定為 correlationId 與省略該屬性效果相同。 客戶端和伺服器必須使用相同的頭來存放關聯資料。

Spring AMQP 版本 1.1 使用一個名為 spring_reply_correlation 的自定義屬性來存放此資料。 如果您希望在當前版本中恢復此行為(可能為了保持與使用 1.1 的其他應用程式的相容性),必須將屬性設定為 spring_reply_correlation

預設情況下,模板生成自己的關聯 ID(忽略任何使用者提供的值)。 如果您希望使用自己的關聯 ID,請將 RabbitTemplate 例項的 userCorrelationId 屬性設定為 true

關聯 ID 必須是唯一的,以避免為請求返回錯誤回覆的可能性。

回覆監聽器容器

當使用早於版本 3.4.0 的 RabbitMQ 時,每個回覆會使用一個新的臨時佇列。 然而,可以在模板上配置一個單一的回覆佇列,這更有效率,並且允許您在該佇列上設定引數。 但在這種情況下,您還必須提供一個 <reply-listener/> 子元素。 此元素為回覆佇列提供了一個監聽器容器,模板作為監聽器。 訊息監聽器容器配置 中允許用於 <listener-container/> 的所有屬性都允許用於此元素,除了 connection-factorymessage-converter,它們繼承自模板的配置。

如果您執行應用程式的多個例項或使用多個 RabbitTemplate 例項,則**必須**為每個例項使用一個唯一的回覆佇列。 RabbitMQ 沒有從佇列中選擇訊息的能力,因此,如果它們都使用同一個佇列,每個例項將競爭回覆,並且不一定能收到自己的回覆。

以下示例定義了一個帶有連線工廠的 rabbit template

<rabbit:template id="amqpTemplate"
        connection-factory="connectionFactory"
        reply-queue="replies"
        reply-address="replyEx/routeReply">
    <rabbit:reply-listener/>
</rabbit:template>

雖然容器和模板共享一個連線工廠,但它們不共享通道。 因此,請求和回覆不會在同一事務內執行(如果啟用了事務)。

在版本 1.5.0 之前,reply-address 屬性不可用。 回覆總是使用預設交換器和 reply-queue 名稱作為路由鍵進行路由。 這仍然是預設行為,但您現在可以指定新的 reply-address 屬性。 reply-address 可以包含格式為 <exchange>/<routingKey> 的地址,回覆將被路由到指定的交換器,並路由到使用該路由鍵繫結的佇列。 reply-address 優先於 reply-queue。 當只使用 reply-address 時,<reply-listener> 必須配置為一個獨立的 <listener-container> 元件。 reply-addressreply-queue(或 <listener-container> 上的 queues 屬性)在邏輯上必須指向同一個佇列。

使用此配置,SimpleListenerContainer 用於接收回復,RabbitTemplate 作為 MessageListener。 使用 <rabbit:template/> 名稱空間元素定義模板時,如前例所示,解析器會定義容器並將模板注入為監聽器。

當模板不使用固定的 replyQueue(或使用 direct reply-to,請參閱 RabbitMQ Direct reply-to)時,不需要監聽器容器。 Direct reply-to 是使用 RabbitMQ 3.4.0 或更高版本時的首選機制。

如果您將 RabbitTemplate 定義為一個 <bean/>,或者使用 `@Configuration` 類將其定義為 `@Bean`,或者透過程式設計方式建立模板,則需要自己定義並連接回復監聽器容器。 如果未能這樣做,模板將永遠收不到回覆,最終超時並對 sendAndReceive 方法呼叫返回 null。

從版本 1.5 開始,RabbitTemplate 會檢測是否已將其配置為 MessageListener 來接收回復。 如果沒有,嘗試使用回覆地址傳送和接收訊息將失敗,並丟擲 IllegalStateException(因為回覆永遠不會被接收)。

此外,如果使用簡單的 replyAddress(佇列名稱),回覆監聽器容器會驗證它是否正在監聽同名的佇列。 如果回覆地址是交換器和路由鍵,則無法執行此檢查,並且會寫入一條除錯日誌訊息。

當您自己連接回復監聽器和模板時,務必確保模板的 replyAddress 和容器的 queues(或 queueNames)屬性引用同一個佇列。 模板會將回復地址插入到 outbound 訊息的 replyTo 屬性中。

以下列表顯示瞭如何手動連線 beans 的示例

<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <constructor-arg ref="connectionFactory" />
    <property name="exchange" value="foo.exchange" />
    <property name="routingKey" value="foo" />
    <property name="replyQueue" ref="replyQ" />
    <property name="replyTimeout" value="600000" />
    <property name="useDirectReplyToContainer" value="false" />
</bean>

<bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <constructor-arg ref="connectionFactory" />
    <property name="queues" ref="replyQ" />
    <property name="messageListener" ref="amqpTemplate" />
</bean>

<rabbit:queue id="replyQ" name="my.reply.queue" />
    @Bean
    public RabbitTemplate amqpTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setMessageConverter(msgConv());
        rabbitTemplate.setReplyAddress(replyQueue().getName());
        rabbitTemplate.setReplyTimeout(60000);
        rabbitTemplate.setUseDirectReplyToContainer(false);
        return rabbitTemplate;
    }

    @Bean
    public SimpleMessageListenerContainer replyListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory());
        container.setQueues(replyQueue());
        container.setMessageListener(amqpTemplate());
        return container;
    }

    @Bean
    public Queue replyQueue() {
        return new Queue("my.reply.queue");
    }

一個完整的示例,顯示了與固定回覆佇列連線的 RabbitTemplate,以及一個處理請求並返回回覆的“遠端”監聽器容器,見 此測試用例

當回覆超時(replyTimeout)時,sendAndReceive() 方法返回 null。

在版本 1.3.6 之前,對於超時的訊息,延遲迴復只會記錄日誌。 現在,如果收到延遲迴復,它將被拒絕(模板會丟擲 AmqpRejectAndDontRequeueException 異常)。 如果回覆佇列配置為將拒絕的訊息傳送到死信交換器,則可以檢索該回復以供後續分析。 為此,請將一個佇列繫結到配置的死信交換器,並使用與回覆佇列名稱相同的路由鍵。

更多關於配置死信的資訊,請參閱 RabbitMQ 死信文件。 您也可以檢視 FixedReplyQueueDeadLetterTests 測試用例以獲取示例。

非同步 Rabbit 模板

版本 1.6 引入了 AsyncRabbitTemplate。 它具有類似於 AmqpTemplatesendAndReceive(和 convertSendAndReceive)方法。 但是,它們不阻塞,而是返回一個 CompletableFuture

sendAndReceive 方法返回一個 RabbitMessageFutureconvertSendAndReceive 方法返回一個 RabbitConverterFuture

您可以稍後透過在 future 上呼叫 get() 同步檢索結果,也可以註冊一個回撥,該回調會非同步地接收結果。 以下列表顯示了這兩種方法

@Autowired
private AsyncRabbitTemplate template;

...

public void doSomeWorkAndGetResultLater() {

    ...

    CompletableFuture<String> future = this.template.convertSendAndReceive("foo");

    // do some more work

    String reply = null;
    try {
        reply = future.get(10, TimeUnit.SECONDS);
    }
    catch (ExecutionException e) {
        ...
    }

    ...

}

public void doSomeWorkAndGetResultAsync() {

    ...

    RabbitConverterFuture<String> future = this.template.convertSendAndReceive("foo");
    future.whenComplete((result, ex) -> {
        if (ex == null) {
            // success
        }
        else {
            // failure
        }
    });

    ...

}

如果設定了 mandatory 並且訊息無法傳送,future 將丟擲 ExecutionException,其原因為 AmqpMessageReturnedException,該異常封裝了返回的訊息以及有關返回的資訊。

如果設定了 enableConfirms,future 有一個名為 confirm 的屬性,它本身是一個 CompletableFuture<Boolean>,其中 true 表示釋出成功。 如果 confirm future 為 false,則 RabbitFuture 還有一個名為 nackCause 的屬性,其中包含失敗的原因(如果可用)。

如果在收到回覆後才收到釋出者確認,則會丟棄該確認,因為回覆意味著釋出成功。

您可以設定模板的 receiveTimeout 屬性來使回覆超時(預設為 30000 毫秒 - 30 秒)。 發生超時時,future 將以 AmqpReplyTimeoutException 完成。

模板實現了 SmartLifecycle 介面。 在有待處理的回覆時停止模板會導致待處理的 Future 例項被取消。

從版本 2.0 開始,非同步模板現在支援 direct reply-to,而不是配置的回覆佇列。 要啟用此功能,請使用以下建構函式之一

public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey)

public AsyncRabbitTemplate(RabbitTemplate template)

有關如何在同步 RabbitTemplate 中使用 direct reply-to,請參閱 RabbitMQ Direct reply-to

版本 2.0 引入了這些方法的變體(convertSendAndReceiveAsType),它們接受一個額外的 ParameterizedTypeReference 引數,用於轉換複雜的返回型別。 您必須使用 SmartMessageConverter 配置底層的 RabbitTemplate。 更多資訊請參閱 使用 RabbitTemplateMessage 轉換

使用 AMQP 的 Spring Remoting

Spring remoting 不再受支援,因為該功能已從 Spring Framework 中移除。

請改用 RabbitTemplate(客戶端)的 sendAndReceive 操作和 `@RabbitListener`。