請求/回覆訊息傳遞
AmqpTemplate
還提供了多種 sendAndReceive
方法,這些方法接受與前面描述的單向傳送操作相同的引數選項(exchange
、routingKey
和 Message
)。 這些方法對於請求/回覆場景非常有用,因為它們在傳送前處理必要的 reply-to
屬性的配置,並且可以監聽為該目的在內部建立的獨佔佇列上的回覆訊息。
類似的請求/回覆方法也可用,其中 MessageConverter
同時應用於請求和回覆。 這些方法名為 convertSendAndReceive
。 詳情請參閱 AmqpTemplate
的 Javadoc。
從版本 1.5.0 開始,每個 sendAndReceive
方法變體都有一個過載版本,接受 CorrelationData
。 結合正確配置的連線工廠,這使得可以接收操作傳送端的釋出者確認。 更多資訊請參閱 相關釋出者確認和返回 以及 RabbitOperations
的 Javadoc。
從版本 2.0 開始,這些方法有變體(convertSendAndReceiveAsType
),它們接受一個額外的 ParameterizedTypeReference
引數,用於轉換複雜的返回型別。 必須為模板配置一個 SmartMessageConverter
。 更多資訊請參閱 使用 RabbitTemplate
從 Message
轉換。
從版本 2.1 開始,您可以配置 RabbitTemplate
的 noLocalReplyConsumer
選項,以控制回覆消費者的 noLocal
標誌。 預設情況下,此標誌為 false
。
回覆超時
預設情況下,傳送和接收方法會在五秒後超時並返回 null。 您可以透過設定 replyTimeout
屬性來修改此行為。 從版本 1.5 開始,如果您將 mandatory
屬性設定為 true
(或 mandatory-expression
對特定訊息評估為 true
),如果訊息無法傳送到佇列,將丟擲 AmqpMessageReturnedException
異常。 此異常包含 returnedMessage
、replyCode
和 replyText
屬性,以及用於傳送的 exchange
和 routingKey
。
此功能使用釋出者返回。 您可以透過在 CachingConnectionFactory 上將 publisherReturns 設定為 true 來啟用它(請參閱 釋出者確認和返回)。 另外,您不得在 RabbitTemplate 中註冊自己的 ReturnCallback 。 |
從版本 2.1.2 開始,添加了一個 replyTimedOut
方法,允許子類獲知超時,以便它們可以清理任何保留的狀態。
從版本 2.0.11 和 2.1.3 開始,當您使用預設的 DirectReplyToMessageListenerContainer
時,您可以透過設定模板的 replyErrorHandler
屬性來新增錯誤處理器。 此錯誤處理器會在任何失敗的傳送(例如延遲迴復以及沒有關聯頭接收到的訊息)時被呼叫。 傳入的異常是 ListenerExecutionFailedException
,它包含一個 failedMessage
屬性。
RabbitMQ Direct reply-to
從版本 3.4.0 開始,RabbitMQ 伺服器支援 direct reply-to。 這消除了固定回覆佇列的主要原因(避免為每個請求建立臨時佇列的需要)。 從 Spring AMQP 版本 1.4.1 開始,預設使用 direct reply-to(如果伺服器支援),而不是建立臨時回覆佇列。 當未提供 replyQueue (或將其名稱設定為 amq.rabbitmq.reply-to )時,RabbitTemplate 會自動檢測是否支援 direct reply-to,然後使用它或回退到使用臨時回覆佇列。 使用 direct reply-to 時,不需要配置 reply-listener ,也不應進行配置。 |
回覆監聽器仍然支援命名佇列(除了 amq.rabbitmq.reply-to
),允許控制回覆併發等。
從版本 1.6 開始,如果您希望為每個回覆使用一個臨時的、獨佔的、自動刪除的佇列,請將 useTemporaryReplyQueues
屬性設定為 true
。 如果您設定了 replyAddress
,此屬性將被忽略。
您可以透過繼承 RabbitTemplate
並重寫 useDirectReplyTo()
方法來改變使用 direct reply-to 的判定標準。 此方法僅在第一次傳送請求時被呼叫一次。
在版本 2.0 之前,RabbitTemplate
為每個請求建立一個新的消費者,並在收到回覆(或超時)時取消該消費者。 現在模板改為使用 DirectReplyToMessageListenerContainer
,允許消費者被重用。 模板仍然負責關聯回覆,因此不必擔心延遲迴復發送給不同的傳送者。 如果您想恢復到之前的行為,請將 useDirectReplyToContainer
(使用 XML 配置時為 direct-reply-to-container
)屬性設定為 false。
AsyncRabbitTemplate
沒有此選項。 在使用 direct reply-to 時,它始終使用 DirectReplyToContainer
來處理回覆。
從版本 2.3.7 開始,模板添加了一個新的屬性 useChannelForCorrelation
。 當此屬性為 true
時,伺服器無需從請求訊息頭複製關聯 ID 到回覆訊息。 相反,用於傳送請求的通道被用來將回復與請求關聯起來。
使用回覆佇列進行訊息關聯
當使用固定回覆佇列(除了 amq.rabbitmq.reply-to
)時,必須提供關聯資料以便將回復與請求關聯起來。 請參閱 RabbitMQ 遠端過程呼叫 (RPC)。 預設情況下,標準 correlationId
屬性用於存放關聯資料。 但是,如果您希望使用自定義屬性存放關聯資料,可以在 <rabbit-template/> 上設定 correlation-key
屬性。 顯式地將屬性設定為 correlationId
與省略該屬性效果相同。 客戶端和伺服器必須使用相同的頭來存放關聯資料。
Spring AMQP 版本 1.1 使用一個名為 spring_reply_correlation 的自定義屬性來存放此資料。 如果您希望在當前版本中恢復此行為(可能為了保持與使用 1.1 的其他應用程式的相容性),必須將屬性設定為 spring_reply_correlation 。 |
預設情況下,模板生成自己的關聯 ID(忽略任何使用者提供的值)。 如果您希望使用自己的關聯 ID,請將 RabbitTemplate
例項的 userCorrelationId
屬性設定為 true
。
關聯 ID 必須是唯一的,以避免為請求返回錯誤回覆的可能性。 |
回覆監聽器容器
當使用早於版本 3.4.0 的 RabbitMQ 時,每個回覆會使用一個新的臨時佇列。 然而,可以在模板上配置一個單一的回覆佇列,這更有效率,並且允許您在該佇列上設定引數。 但在這種情況下,您還必須提供一個 <reply-listener/> 子元素。 此元素為回覆佇列提供了一個監聽器容器,模板作為監聽器。 訊息監聽器容器配置 中允許用於 <listener-container/> 的所有屬性都允許用於此元素,除了 connection-factory
和 message-converter
,它們繼承自模板的配置。
如果您執行應用程式的多個例項或使用多個 RabbitTemplate 例項,則**必須**為每個例項使用一個唯一的回覆佇列。 RabbitMQ 沒有從佇列中選擇訊息的能力,因此,如果它們都使用同一個佇列,每個例項將競爭回覆,並且不一定能收到自己的回覆。 |
以下示例定義了一個帶有連線工廠的 rabbit template
<rabbit:template id="amqpTemplate"
connection-factory="connectionFactory"
reply-queue="replies"
reply-address="replyEx/routeReply">
<rabbit:reply-listener/>
</rabbit:template>
雖然容器和模板共享一個連線工廠,但它們不共享通道。 因此,請求和回覆不會在同一事務內執行(如果啟用了事務)。
在版本 1.5.0 之前,reply-address 屬性不可用。 回覆總是使用預設交換器和 reply-queue 名稱作為路由鍵進行路由。 這仍然是預設行為,但您現在可以指定新的 reply-address 屬性。 reply-address 可以包含格式為 <exchange>/<routingKey> 的地址,回覆將被路由到指定的交換器,並路由到使用該路由鍵繫結的佇列。 reply-address 優先於 reply-queue 。 當只使用 reply-address 時,<reply-listener> 必須配置為一個獨立的 <listener-container> 元件。 reply-address 和 reply-queue (或 <listener-container> 上的 queues 屬性)在邏輯上必須指向同一個佇列。 |
使用此配置,SimpleListenerContainer
用於接收回復,RabbitTemplate
作為 MessageListener
。 使用 <rabbit:template/>
名稱空間元素定義模板時,如前例所示,解析器會定義容器並將模板注入為監聽器。
當模板不使用固定的 replyQueue (或使用 direct reply-to,請參閱 RabbitMQ Direct reply-to)時,不需要監聽器容器。 Direct reply-to 是使用 RabbitMQ 3.4.0 或更高版本時的首選機制。 |
如果您將 RabbitTemplate
定義為一個 <bean/>
,或者使用 `@Configuration` 類將其定義為 `@Bean`,或者透過程式設計方式建立模板,則需要自己定義並連接回復監聽器容器。 如果未能這樣做,模板將永遠收不到回覆,最終超時並對 sendAndReceive
方法呼叫返回 null。
從版本 1.5 開始,RabbitTemplate
會檢測是否已將其配置為 MessageListener
來接收回復。 如果沒有,嘗試使用回覆地址傳送和接收訊息將失敗,並丟擲 IllegalStateException
(因為回覆永遠不會被接收)。
此外,如果使用簡單的 replyAddress
(佇列名稱),回覆監聽器容器會驗證它是否正在監聽同名的佇列。 如果回覆地址是交換器和路由鍵,則無法執行此檢查,並且會寫入一條除錯日誌訊息。
當您自己連接回復監聽器和模板時,務必確保模板的 replyAddress 和容器的 queues (或 queueNames )屬性引用同一個佇列。 模板會將回復地址插入到 outbound 訊息的 replyTo 屬性中。 |
以下列表顯示瞭如何手動連線 beans 的示例
<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<constructor-arg ref="connectionFactory" />
<property name="exchange" value="foo.exchange" />
<property name="routingKey" value="foo" />
<property name="replyQueue" ref="replyQ" />
<property name="replyTimeout" value="600000" />
<property name="useDirectReplyToContainer" value="false" />
</bean>
<bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<constructor-arg ref="connectionFactory" />
<property name="queues" ref="replyQ" />
<property name="messageListener" ref="amqpTemplate" />
</bean>
<rabbit:queue id="replyQ" name="my.reply.queue" />
@Bean
public RabbitTemplate amqpTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(msgConv());
rabbitTemplate.setReplyAddress(replyQueue().getName());
rabbitTemplate.setReplyTimeout(60000);
rabbitTemplate.setUseDirectReplyToContainer(false);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueues(replyQueue());
container.setMessageListener(amqpTemplate());
return container;
}
@Bean
public Queue replyQueue() {
return new Queue("my.reply.queue");
}
一個完整的示例,顯示了與固定回覆佇列連線的 RabbitTemplate
,以及一個處理請求並返回回覆的“遠端”監聽器容器,見 此測試用例。
當回覆超時(replyTimeout )時,sendAndReceive() 方法返回 null。 |
在版本 1.3.6 之前,對於超時的訊息,延遲迴復只會記錄日誌。 現在,如果收到延遲迴復,它將被拒絕(模板會丟擲 AmqpRejectAndDontRequeueException
異常)。 如果回覆佇列配置為將拒絕的訊息傳送到死信交換器,則可以檢索該回復以供後續分析。 為此,請將一個佇列繫結到配置的死信交換器,並使用與回覆佇列名稱相同的路由鍵。
更多關於配置死信的資訊,請參閱 RabbitMQ 死信文件。 您也可以檢視 FixedReplyQueueDeadLetterTests
測試用例以獲取示例。
非同步 Rabbit 模板
版本 1.6 引入了 AsyncRabbitTemplate
。 它具有類似於 AmqpTemplate
的 sendAndReceive
(和 convertSendAndReceive
)方法。 但是,它們不阻塞,而是返回一個 CompletableFuture
。
sendAndReceive
方法返回一個 RabbitMessageFuture
。 convertSendAndReceive
方法返回一個 RabbitConverterFuture
。
您可以稍後透過在 future 上呼叫 get()
同步檢索結果,也可以註冊一個回撥,該回調會非同步地接收結果。 以下列表顯示了這兩種方法
@Autowired
private AsyncRabbitTemplate template;
...
public void doSomeWorkAndGetResultLater() {
...
CompletableFuture<String> future = this.template.convertSendAndReceive("foo");
// do some more work
String reply = null;
try {
reply = future.get(10, TimeUnit.SECONDS);
}
catch (ExecutionException e) {
...
}
...
}
public void doSomeWorkAndGetResultAsync() {
...
RabbitConverterFuture<String> future = this.template.convertSendAndReceive("foo");
future.whenComplete((result, ex) -> {
if (ex == null) {
// success
}
else {
// failure
}
});
...
}
如果設定了 mandatory
並且訊息無法傳送,future 將丟擲 ExecutionException
,其原因為 AmqpMessageReturnedException
,該異常封裝了返回的訊息以及有關返回的資訊。
如果設定了 enableConfirms
,future 有一個名為 confirm
的屬性,它本身是一個 CompletableFuture<Boolean>
,其中 true
表示釋出成功。 如果 confirm future 為 false
,則 RabbitFuture
還有一個名為 nackCause
的屬性,其中包含失敗的原因(如果可用)。
如果在收到回覆後才收到釋出者確認,則會丟棄該確認,因為回覆意味著釋出成功。 |
您可以設定模板的 receiveTimeout
屬性來使回覆超時(預設為 30000
毫秒 - 30 秒)。 發生超時時,future 將以 AmqpReplyTimeoutException
完成。
模板實現了 SmartLifecycle
介面。 在有待處理的回覆時停止模板會導致待處理的 Future
例項被取消。
從版本 2.0 開始,非同步模板現在支援 direct reply-to,而不是配置的回覆佇列。 要啟用此功能,請使用以下建構函式之一
public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey)
public AsyncRabbitTemplate(RabbitTemplate template)
有關如何在同步 RabbitTemplate
中使用 direct reply-to,請參閱 RabbitMQ Direct reply-to。
版本 2.0 引入了這些方法的變體(convertSendAndReceiveAsType
),它們接受一個額外的 ParameterizedTypeReference
引數,用於轉換複雜的返回型別。 您必須使用 SmartMessageConverter
配置底層的 RabbitTemplate
。 更多資訊請參閱 使用 RabbitTemplate
從 Message
轉換。