請求/回覆訊息
AmqpTemplate 還提供了多種 sendAndReceive 方法,它們接受與前面描述的單向傳送操作(exchange、routingKey 和 Message)相同的引數選項。這些方法對於請求-回覆場景非常有用,因為它們在傳送前處理必要的 reply-to 屬性配置,並可以偵聽為此目的內部建立的專用佇列上的回覆訊息。
類似的請求-回覆方法也適用於對請求和回覆都應用 MessageConverter 的情況。這些方法被命名為 convertSendAndReceive。有關更多詳細資訊,請參閱 AmqpTemplate 的 Javadoc。
從 1.5.0 版本開始,每個 sendAndReceive 方法變體都有一個接受 CorrelationData 的過載版本。結合正確配置的連線工廠,這使得操作的傳送端能夠接收發布者確認。有關更多資訊,請參閱關聯的釋出者確認和返回以及 RabbitOperations 的 Javadoc。
從 2.0 版本開始,這些方法(convertSendAndReceiveAsType)有一些變體,它們接受一個額外的 ParameterizedTypeReference 引數來轉換複雜的返回型別。模板必須配置 SmartMessageConverter。有關更多資訊,請參閱使用 RabbitTemplate 從 Message 轉換。
從 2.1 版本開始,您可以使用 noLocalReplyConsumer 選項配置 RabbitTemplate,以控制回覆消費者是否設定 noLocal 標誌。預設情況下,此選項為 false。
回覆超時
預設情況下,傳送和接收方法在五秒後超時並返回 null。您可以透過設定 replyTimeout 屬性來修改此行為。從 1.5 版本開始,如果您將 mandatory 屬性設定為 true(或 mandatory-expression 對特定訊息評估為 true),如果訊息無法傳遞到佇列,則會丟擲 AmqpMessageReturnedException。此異常具有 returnedMessage、replyCode 和 replyText 屬性,以及用於傳送的 exchange 和 routingKey。
此功能使用釋出者返回。您可以透過在 CachingConnectionFactory 上將 publisherReturns 設定為 true 來啟用它(請參閱釋出者確認和返回)。此外,您不能在 RabbitTemplate 中註冊自己的 ReturnCallback。 |
從 2.1.2 版本開始,已新增 replyTimedOut 方法,允許子類收到超時通知,以便它們可以清理任何保留的狀態。
從 2.0.11 和 2.1.3 版本開始,當您使用預設的 DirectReplyToMessageListenerContainer 時,可以透過設定模板的 replyErrorHandler 屬性來新增錯誤處理程式。此錯誤處理程式在任何失敗的交付(例如延遲迴復和沒有關聯頭的訊息)時被呼叫。傳入的異常是 ListenerExecutionFailedException,它具有 failedMessage 屬性。
RabbitMQ 直接回復
從 3.4.0 版本開始,RabbitMQ 伺服器支援直接回復。這消除了固定回覆佇列的主要原因(避免為每個請求建立臨時佇列的需要)。從 Spring AMQP 1.4.1 版本開始,預設情況下(如果伺服器支援)使用直接回復,而不是建立臨時回覆佇列。當未提供 replyQueue(或將其名稱設定為 amq.rabbitmq.reply-to)時,RabbitTemplate 會自動檢測是否支援直接回復,並使用它或回退到使用臨時回覆佇列。使用直接回復時,不需要 reply-listener,也不應配置它。 |
回覆偵聽器仍然支援命名佇列(除了 amq.rabbitmq.reply-to),允許控制回覆併發等。
從 1.6 版本開始,如果您希望為每個回覆使用臨時、排他、自動刪除的佇列,請將 useTemporaryReplyQueues 屬性設定為 true。如果您設定了 replyAddress,此屬性將被忽略。
您可以透過繼承 RabbitTemplate 並覆蓋 useDirectReplyTo() 來檢查不同的條件,從而更改決定是否使用直接回復的標準。該方法僅在傳送第一個請求時呼叫一次。
在 2.0 版本之前,RabbitTemplate 為每個請求建立一個新的消費者,並在收到回覆(或超時)時取消消費者。現在,模板改為使用 DirectReplyToMessageListenerContainer,允許消費者重用。模板仍然負責關聯回覆,因此不會出現延遲迴復發送給不同傳送者的風險。如果您想恢復到以前的行為,請將 useDirectReplyToContainer(使用 XML 配置時為 direct-reply-to-container)屬性設定為 false。
AsyncRabbitTemplate 沒有這樣的選項。當使用直接回復時,它總是為回覆使用 DirectReplyToContainer。
從 2.3.7 版本開始,模板有一個新的屬性 useChannelForCorrelation。當此屬性為 true 時,伺服器不必將關聯 ID 從請求訊息頭複製到回覆訊息。相反,用於傳送請求的通道用於將回復與請求關聯起來。
使用回覆佇列進行訊息關聯
當使用固定回覆佇列(除了 amq.rabbitmq.reply-to)時,您必須提供關聯資料,以便將回復與請求關聯起來。請參閱 RabbitMQ 遠端過程呼叫 (RPC)。預設情況下,標準的 correlationId 屬性用於儲存關聯資料。但是,如果您希望使用自定義屬性來儲存關聯資料,您可以在 <rabbit-template/> 上設定 correlation-key 屬性。明確將屬性設定為 correlationId 與省略該屬性相同。客戶端和伺服器必須使用相同的頭部進行關聯資料。
Spring AMQP 1.1 版本為此資料使用了名為 spring_reply_correlation 的自定義屬性。如果您希望在當前版本中恢復此行為(可能為了保持與使用 1.1 的另一個應用程式的相容性),您必須將屬性設定為 spring_reply_correlation。 |
預設情況下,模板生成自己的關聯 ID(忽略任何使用者提供的值)。如果您希望使用自己的關聯 ID,請將 RabbitTemplate 例項的 userCorrelationId 屬性設定為 true。
| 關聯 ID 必須是唯一的,以避免請求返回錯誤回覆的可能性。 |
回覆監聽器容器
當使用 3.4.0 版本之前的 RabbitMQ 版本時,每個回覆都會使用一個新的臨時佇列。但是,可以在模板上配置單個回覆佇列,這更高效,並且還允許您在該佇列上設定引數。在這種情況下,您還必須提供一個 <reply-listener/> 子元素。此元素為回覆佇列提供一個監聽器容器,模板作為監聽器。允許在 <listener-container/> 上使用的所有訊息監聽器容器配置屬性都允許在此元素上使用,除了 connection-factory 和 message-converter,它們繼承自模板的配置。
如果您執行應用程式的多個例項或使用多個 RabbitTemplate 例項,則 必須 為每個例項使用唯一的回覆佇列。RabbitMQ 無法從佇列中選擇訊息,因此,如果它們都使用相同的佇列,每個例項將競爭回覆,並且不一定會收到自己的回覆。 |
以下示例定義了一個帶有連線工廠的 Rabbit 模板
<rabbit:template id="amqpTemplate"
connection-factory="connectionFactory"
reply-queue="replies"
reply-address="replyEx/routeReply">
<rabbit:reply-listener/>
</rabbit:template>
雖然容器和模板共享一個連線工廠,但它們不共享一個通道。因此,請求和回覆不會在同一事務中執行(如果事務性)。
在 1.5.0 版本之前,reply-address 屬性不可用。回覆總是透過使用預設交換和 reply-queue 名稱作為路由鍵進行路由。這仍然是預設值,但您現在可以指定新的 reply-address 屬性。reply-address 可以包含形式為 <exchange>/<routingKey> 的地址,回覆將路由到指定的交換並路由到綁定了路由鍵的佇列。reply-address 優先於 reply-queue。當只使用 reply-address 時,<reply-listener> 必須配置為單獨的 <listener-container> 元件。reply-address 和 reply-queue(或 <listener-container> 上的 queues 屬性)在邏輯上必須引用同一個佇列。 |
透過此配置,SimpleListenerContainer 用於接收回復,RabbitTemplate 作為 MessageListener。當使用 <rabbit:template/> 名稱空間元素定義模板時,如前一個示例所示,解析器定義容器並將模板作為監聽器連線。
當模板不使用固定 replyQueue(或使用直接回復——參見RabbitMQ 直接回復)時,不需要監聽器容器。在使用 RabbitMQ 3.4.0 或更高版本時,直接 reply-to 是首選機制。 |
如果您將 RabbitTemplate 定義為 <bean/>,或使用 @Configuration 類將其定義為 @Bean,或以程式設計方式建立模板,您需要自己定義和連接回復監聽器容器。如果您未能這樣做,模板將永遠不會收到回覆,最終會超時並返回 null 作為對 sendAndReceive 方法呼叫的回覆。
從 1.5 版本開始,RabbitTemplate 會檢測是否已將其配置為 MessageListener 以接收回復。如果未配置,嘗試使用回覆地址傳送和接收訊息將因 IllegalStateException 而失敗(因為永遠不會收到回覆)。
此外,如果使用簡單的 replyAddress(佇列名稱),回覆監聽器容器會驗證它是否正在監聽具有相同名稱的佇列。如果回覆地址是交換和路由鍵,則無法執行此檢查,並且會寫入除錯日誌訊息。
當您自己連接回復監聽器和模板時,重要的是確保模板的 replyAddress 和容器的 queues(或 queueNames)屬性引用相同的佇列。模板將回復地址插入到出站訊息的 replyTo 屬性中。 |
以下列表顯示瞭如何手動連線 bean 的示例
<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<constructor-arg ref="connectionFactory" />
<property name="exchange" value="foo.exchange" />
<property name="routingKey" value="foo" />
<property name="replyQueue" ref="replyQ" />
<property name="replyTimeout" value="600000" />
<property name="useDirectReplyToContainer" value="false" />
</bean>
<bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<constructor-arg ref="connectionFactory" />
<property name="queues" ref="replyQ" />
<property name="messageListener" ref="amqpTemplate" />
</bean>
<rabbit:queue id="replyQ" name="my.reply.queue" />
@Bean
public RabbitTemplate amqpTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(msgConv());
rabbitTemplate.setReplyAddress(replyQueue().getName());
rabbitTemplate.setReplyTimeout(60000);
rabbitTemplate.setUseDirectReplyToContainer(false);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueues(replyQueue());
container.setMessageListener(amqpTemplate());
return container;
}
@Bean
public Queue replyQueue() {
return new Queue("my.reply.queue");
}
一個完整的 RabbitTemplate 與固定回覆佇列連線,以及處理請求並返回回覆的“遠端”監聽器容器的示例顯示在此測試用例中。
當回覆超時(replyTimeout)時,sendAndReceive() 方法返回 null。 |
在 1.3.6 版本之前,超時訊息的延遲迴復只會被記錄。現在,如果收到延遲迴復,它將被拒絕(模板丟擲 AmqpRejectAndDontRequeueException)。如果回覆佇列配置為將拒絕的訊息傳送到死信交換,則可以檢索回覆以供以後分析。為此,將一個佇列繫結到配置的死信交換,其路由鍵等於回覆佇列的名稱。
有關配置死信的更多資訊,請參閱 RabbitMQ 死信文件。您還可以檢視 FixedReplyQueueDeadLetterTests 測試用例以獲取示例。
非同步 Rabbit 模板
1.6 版本引入了 AsyncRabbitTemplate。它具有與 AmqpTemplate 上類似的 sendAndReceive(和 convertSendAndReceive)方法。但是,它們不會阻塞,而是返回一個 CompletableFuture。
sendAndReceive 方法返回一個 RabbitMessageFuture。convertSendAndReceive 方法返回一個 RabbitConverterFuture。
您可以透過在 future 上呼叫 get() 同步地檢索結果,或者您可以註冊一個回撥,該回調會非同步地與結果一起呼叫。以下列表顯示了兩種方法
@Autowired
private AsyncRabbitTemplate template;
...
public void doSomeWorkAndGetResultLater() {
...
CompletableFuture<String> future = this.template.convertSendAndReceive("foo");
// do some more work
String reply = null;
try {
reply = future.get(10, TimeUnit.SECONDS);
}
catch (ExecutionException e) {
...
}
...
}
public void doSomeWorkAndGetResultAsync() {
...
RabbitConverterFuture<String> future = this.template.convertSendAndReceive("foo");
future.whenComplete((result, ex) -> {
if (ex == null) {
// success
}
else {
// failure
}
});
...
}
如果設定了 mandatory 並且訊息無法傳遞,future 會丟擲 ExecutionException,其原因是 AmqpMessageReturnedException,它封裝了返回的訊息和有關返回的資訊。
如果設定了 enableConfirms,future 具有一個名為 confirm 的屬性,它本身是一個 CompletableFuture<Boolean>,其中 true 表示成功釋出。如果確認 future 為 false,RabbitFuture 還有一個名為 nackCause 的屬性,其中包含失敗的原因(如果可用)。
| 如果釋出者確認在回覆之後收到,則會被丟棄,因為回覆意味著成功釋出。 |
您可以在模板上設定 receiveTimeout 屬性以使回覆超時(預設為 30000 - 30 秒)。如果發生超時,future 將使用 AmqpReplyTimeoutException 完成。
該模板實現了 SmartLifecycle。在有待處理回覆時停止模板會導致待處理的 Future 例項被取消。
從 2.0 版本開始,非同步模板現在支援直接回復,而不是配置的回覆佇列。要啟用此功能,請使用以下建構函式之一
public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey)
public AsyncRabbitTemplate(RabbitTemplate template)
請參閱RabbitMQ 直接回復,以將直接回復與同步 RabbitTemplate 一起使用。
2.0 版本引入了這些方法的變體(convertSendAndReceiveAsType),它們接受一個額外的 ParameterizedTypeReference 引數來轉換複雜的返回型別。您必須使用 SmartMessageConverter 配置底層 RabbitTemplate。有關更多資訊,請參閱使用 RabbitTemplate 從 Message 轉換。