請求/回覆訊息

AmqpTemplate 還提供了多種 sendAndReceive 方法,它們接受與前面描述的單向傳送操作(exchangeroutingKeyMessage)相同的引數選項。這些方法對於請求-回覆場景非常有用,因為它們在傳送前處理必要的 reply-to 屬性配置,並可以偵聽為此目的內部建立的專用佇列上的回覆訊息。

類似的請求-回覆方法也適用於對請求和回覆都應用 MessageConverter 的情況。這些方法被命名為 convertSendAndReceive。有關更多詳細資訊,請參閱 AmqpTemplate 的 Javadoc

從 1.5.0 版本開始,每個 sendAndReceive 方法變體都有一個接受 CorrelationData 的過載版本。結合正確配置的連線工廠,這使得操作的傳送端能夠接收發布者確認。有關更多資訊,請參閱關聯的釋出者確認和返回以及 RabbitOperations 的 Javadoc

從 2.0 版本開始,這些方法(convertSendAndReceiveAsType)有一些變體,它們接受一個額外的 ParameterizedTypeReference 引數來轉換複雜的返回型別。模板必須配置 SmartMessageConverter。有關更多資訊,請參閱使用 RabbitTemplateMessage 轉換

從 2.1 版本開始,您可以使用 noLocalReplyConsumer 選項配置 RabbitTemplate,以控制回覆消費者是否設定 noLocal 標誌。預設情況下,此選項為 false

回覆超時

預設情況下,傳送和接收方法在五秒後超時並返回 null。您可以透過設定 replyTimeout 屬性來修改此行為。從 1.5 版本開始,如果您將 mandatory 屬性設定為 true(或 mandatory-expression 對特定訊息評估為 true),如果訊息無法傳遞到佇列,則會丟擲 AmqpMessageReturnedException。此異常具有 returnedMessagereplyCodereplyText 屬性,以及用於傳送的 exchangeroutingKey

此功能使用釋出者返回。您可以透過在 CachingConnectionFactory 上將 publisherReturns 設定為 true 來啟用它(請參閱釋出者確認和返回)。此外,您不能在 RabbitTemplate 中註冊自己的 ReturnCallback

從 2.1.2 版本開始,已新增 replyTimedOut 方法,允許子類收到超時通知,以便它們可以清理任何保留的狀態。

從 2.0.11 和 2.1.3 版本開始,當您使用預設的 DirectReplyToMessageListenerContainer 時,可以透過設定模板的 replyErrorHandler 屬性來新增錯誤處理程式。此錯誤處理程式在任何失敗的交付(例如延遲迴復和沒有關聯頭的訊息)時被呼叫。傳入的異常是 ListenerExecutionFailedException,它具有 failedMessage 屬性。

RabbitMQ 直接回復

從 3.4.0 版本開始,RabbitMQ 伺服器支援直接回復。這消除了固定回覆佇列的主要原因(避免為每個請求建立臨時佇列的需要)。從 Spring AMQP 1.4.1 版本開始,預設情況下(如果伺服器支援)使用直接回復,而不是建立臨時回覆佇列。當未提供 replyQueue(或將其名稱設定為 amq.rabbitmq.reply-to)時,RabbitTemplate 會自動檢測是否支援直接回復,並使用它或回退到使用臨時回覆佇列。使用直接回復時,不需要 reply-listener,也不應配置它。

回覆偵聽器仍然支援命名佇列(除了 amq.rabbitmq.reply-to),允許控制回覆併發等。

從 1.6 版本開始,如果您希望為每個回覆使用臨時、排他、自動刪除的佇列,請將 useTemporaryReplyQueues 屬性設定為 true。如果您設定了 replyAddress,此屬性將被忽略。

您可以透過繼承 RabbitTemplate 並覆蓋 useDirectReplyTo() 來檢查不同的條件,從而更改決定是否使用直接回復的標準。該方法僅在傳送第一個請求時呼叫一次。

在 2.0 版本之前,RabbitTemplate 為每個請求建立一個新的消費者,並在收到回覆(或超時)時取消消費者。現在,模板改為使用 DirectReplyToMessageListenerContainer,允許消費者重用。模板仍然負責關聯回覆,因此不會出現延遲迴復發送給不同傳送者的風險。如果您想恢復到以前的行為,請將 useDirectReplyToContainer(使用 XML 配置時為 direct-reply-to-container)屬性設定為 false。

AsyncRabbitTemplate 沒有這樣的選項。當使用直接回復時,它總是為回覆使用 DirectReplyToContainer

從 2.3.7 版本開始,模板有一個新的屬性 useChannelForCorrelation。當此屬性為 true 時,伺服器不必將關聯 ID 從請求訊息頭複製到回覆訊息。相反,用於傳送請求的通道用於將回復與請求關聯起來。

使用回覆佇列進行訊息關聯

當使用固定回覆佇列(除了 amq.rabbitmq.reply-to)時,您必須提供關聯資料,以便將回復與請求關聯起來。請參閱 RabbitMQ 遠端過程呼叫 (RPC)。預設情況下,標準的 correlationId 屬性用於儲存關聯資料。但是,如果您希望使用自定義屬性來儲存關聯資料,您可以在 <rabbit-template/> 上設定 correlation-key 屬性。明確將屬性設定為 correlationId 與省略該屬性相同。客戶端和伺服器必須使用相同的頭部進行關聯資料。

Spring AMQP 1.1 版本為此資料使用了名為 spring_reply_correlation 的自定義屬性。如果您希望在當前版本中恢復此行為(可能為了保持與使用 1.1 的另一個應用程式的相容性),您必須將屬性設定為 spring_reply_correlation

預設情況下,模板生成自己的關聯 ID(忽略任何使用者提供的值)。如果您希望使用自己的關聯 ID,請將 RabbitTemplate 例項的 userCorrelationId 屬性設定為 true

關聯 ID 必須是唯一的,以避免請求返回錯誤回覆的可能性。

回覆監聽器容器

當使用 3.4.0 版本之前的 RabbitMQ 版本時,每個回覆都會使用一個新的臨時佇列。但是,可以在模板上配置單個回覆佇列,這更高效,並且還允許您在該佇列上設定引數。在這種情況下,您還必須提供一個 <reply-listener/> 子元素。此元素為回覆佇列提供一個監聽器容器,模板作為監聽器。允許在 <listener-container/> 上使用的所有訊息監聽器容器配置屬性都允許在此元素上使用,除了 connection-factorymessage-converter,它們繼承自模板的配置。

如果您執行應用程式的多個例項或使用多個 RabbitTemplate 例項,則 必須 為每個例項使用唯一的回覆佇列。RabbitMQ 無法從佇列中選擇訊息,因此,如果它們都使用相同的佇列,每個例項將競爭回覆,並且不一定會收到自己的回覆。

以下示例定義了一個帶有連線工廠的 Rabbit 模板

<rabbit:template id="amqpTemplate"
        connection-factory="connectionFactory"
        reply-queue="replies"
        reply-address="replyEx/routeReply">
    <rabbit:reply-listener/>
</rabbit:template>

雖然容器和模板共享一個連線工廠,但它們不共享一個通道。因此,請求和回覆不會在同一事務中執行(如果事務性)。

在 1.5.0 版本之前,reply-address 屬性不可用。回覆總是透過使用預設交換和 reply-queue 名稱作為路由鍵進行路由。這仍然是預設值,但您現在可以指定新的 reply-address 屬性。reply-address 可以包含形式為 <exchange>/<routingKey> 的地址,回覆將路由到指定的交換並路由到綁定了路由鍵的佇列。reply-address 優先於 reply-queue。當只使用 reply-address 時,<reply-listener> 必須配置為單獨的 <listener-container> 元件。reply-addressreply-queue(或 <listener-container> 上的 queues 屬性)在邏輯上必須引用同一個佇列。

透過此配置,SimpleListenerContainer 用於接收回復,RabbitTemplate 作為 MessageListener。當使用 <rabbit:template/> 名稱空間元素定義模板時,如前一個示例所示,解析器定義容器並將模板作為監聽器連線。

當模板不使用固定 replyQueue(或使用直接回復——參見RabbitMQ 直接回復)時,不需要監聽器容器。在使用 RabbitMQ 3.4.0 或更高版本時,直接 reply-to 是首選機制。

如果您將 RabbitTemplate 定義為 <bean/>,或使用 @Configuration 類將其定義為 @Bean,或以程式設計方式建立模板,您需要自己定義和連接回復監聽器容器。如果您未能這樣做,模板將永遠不會收到回覆,最終會超時並返回 null 作為對 sendAndReceive 方法呼叫的回覆。

從 1.5 版本開始,RabbitTemplate 會檢測是否已將其配置為 MessageListener 以接收回復。如果未配置,嘗試使用回覆地址傳送和接收訊息將因 IllegalStateException 而失敗(因為永遠不會收到回覆)。

此外,如果使用簡單的 replyAddress(佇列名稱),回覆監聽器容器會驗證它是否正在監聽具有相同名稱的佇列。如果回覆地址是交換和路由鍵,則無法執行此檢查,並且會寫入除錯日誌訊息。

當您自己連接回復監聽器和模板時,重要的是確保模板的 replyAddress 和容器的 queues(或 queueNames)屬性引用相同的佇列。模板將回復地址插入到出站訊息的 replyTo 屬性中。

以下列表顯示瞭如何手動連線 bean 的示例

<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <constructor-arg ref="connectionFactory" />
    <property name="exchange" value="foo.exchange" />
    <property name="routingKey" value="foo" />
    <property name="replyQueue" ref="replyQ" />
    <property name="replyTimeout" value="600000" />
    <property name="useDirectReplyToContainer" value="false" />
</bean>

<bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <constructor-arg ref="connectionFactory" />
    <property name="queues" ref="replyQ" />
    <property name="messageListener" ref="amqpTemplate" />
</bean>

<rabbit:queue id="replyQ" name="my.reply.queue" />
    @Bean
    public RabbitTemplate amqpTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setMessageConverter(msgConv());
        rabbitTemplate.setReplyAddress(replyQueue().getName());
        rabbitTemplate.setReplyTimeout(60000);
        rabbitTemplate.setUseDirectReplyToContainer(false);
        return rabbitTemplate;
    }

    @Bean
    public SimpleMessageListenerContainer replyListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory());
        container.setQueues(replyQueue());
        container.setMessageListener(amqpTemplate());
        return container;
    }

    @Bean
    public Queue replyQueue() {
        return new Queue("my.reply.queue");
    }

一個完整的 RabbitTemplate 與固定回覆佇列連線,以及處理請求並返回回覆的“遠端”監聽器容器的示例顯示在此測試用例中。

當回覆超時(replyTimeout)時,sendAndReceive() 方法返回 null。

在 1.3.6 版本之前,超時訊息的延遲迴復只會被記錄。現在,如果收到延遲迴復,它將被拒絕(模板丟擲 AmqpRejectAndDontRequeueException)。如果回覆佇列配置為將拒絕的訊息傳送到死信交換,則可以檢索回覆以供以後分析。為此,將一個佇列繫結到配置的死信交換,其路由鍵等於回覆佇列的名稱。

有關配置死信的更多資訊,請參閱 RabbitMQ 死信文件。您還可以檢視 FixedReplyQueueDeadLetterTests 測試用例以獲取示例。

非同步 Rabbit 模板

1.6 版本引入了 AsyncRabbitTemplate。它具有與 AmqpTemplate 上類似的 sendAndReceive(和 convertSendAndReceive)方法。但是,它們不會阻塞,而是返回一個 CompletableFuture

sendAndReceive 方法返回一個 RabbitMessageFutureconvertSendAndReceive 方法返回一個 RabbitConverterFuture

您可以透過在 future 上呼叫 get() 同步地檢索結果,或者您可以註冊一個回撥,該回調會非同步地與結果一起呼叫。以下列表顯示了兩種方法

@Autowired
private AsyncRabbitTemplate template;

...

public void doSomeWorkAndGetResultLater() {

    ...

    CompletableFuture<String> future = this.template.convertSendAndReceive("foo");

    // do some more work

    String reply = null;
    try {
        reply = future.get(10, TimeUnit.SECONDS);
    }
    catch (ExecutionException e) {
        ...
    }

    ...

}

public void doSomeWorkAndGetResultAsync() {

    ...

    RabbitConverterFuture<String> future = this.template.convertSendAndReceive("foo");
    future.whenComplete((result, ex) -> {
        if (ex == null) {
            // success
        }
        else {
            // failure
        }
    });

    ...

}

如果設定了 mandatory 並且訊息無法傳遞,future 會丟擲 ExecutionException,其原因是 AmqpMessageReturnedException,它封裝了返回的訊息和有關返回的資訊。

如果設定了 enableConfirms,future 具有一個名為 confirm 的屬性,它本身是一個 CompletableFuture<Boolean>,其中 true 表示成功釋出。如果確認 future 為 falseRabbitFuture 還有一個名為 nackCause 的屬性,其中包含失敗的原因(如果可用)。

如果釋出者確認在回覆之後收到,則會被丟棄,因為回覆意味著成功釋出。

您可以在模板上設定 receiveTimeout 屬性以使回覆超時(預設為 30000 - 30 秒)。如果發生超時,future 將使用 AmqpReplyTimeoutException 完成。

該模板實現了 SmartLifecycle。在有待處理回覆時停止模板會導致待處理的 Future 例項被取消。

從 2.0 版本開始,非同步模板現在支援直接回復,而不是配置的回覆佇列。要啟用此功能,請使用以下建構函式之一

public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey)

public AsyncRabbitTemplate(RabbitTemplate template)

請參閱RabbitMQ 直接回復,以將直接回復與同步 RabbitTemplate 一起使用。

2.0 版本引入了這些方法的變體(convertSendAndReceiveAsType),它們接受一個額外的 ParameterizedTypeReference 引數來轉換複雜的返回型別。您必須使用 SmartMessageConverter 配置底層 RabbitTemplate。有關更多資訊,請參閱使用 RabbitTemplateMessage 轉換

使用 AMQP 的 Spring Remoting

Spring Remoting 不再受支援,因為該功能已從 Spring Framework 中移除。

請改用 RabbitTemplate(客戶端)的 sendAndReceive 操作和 @RabbitListener

© . This site is unofficial and not affiliated with VMware.