輪詢消費者
AmqpTemplate
本身可以用於輪詢式地接收 Message
。預設情況下,如果沒有可用的訊息,會立即返回 null
,不會阻塞。從版本 1.5 開始,你可以設定一個以毫秒為單位的 receiveTimeout
,接收方法會阻塞最多這麼長時間,等待訊息。小於零的值表示無限期阻塞(或者至少直到與 Broker 的連線丟失)。版本 1.6 引入了 receive
方法的變體,允許在每次呼叫時傳入超時時間。
由於每個訊息的接收操作都會建立一個新的 QueueingConsumer ,這種技術不太適用於高吞吐量的環境。對於這些用例,請考慮使用非同步消費者或將 receiveTimeout 設定為零。 |
從版本 2.4.8 開始,在使用非零超時時,可以指定傳遞給用於將消費者與通道關聯的 basicConsume
方法的引數。例如:template.addConsumerArg("x-priority", 10)
。
有四個簡單的 receive
方法可用。與傳送端的 Exchange
一樣,有一個方法要求直接在模板本身上設定了預設佇列屬性,還有一個方法接受在執行時傳入佇列引數。版本 1.6 引入了接受 timeoutMillis
引數的變體,以便在每次請求時覆蓋 receiveTimeout
。以下列表顯示了這四個方法的定義
Message receive() throws AmqpException;
Message receive(String queueName) throws AmqpException;
Message receive(long timeoutMillis) throws AmqpException;
Message receive(String queueName, long timeoutMillis) throws AmqpException;
與傳送訊息類似,AmqpTemplate
提供了一些方便的方法用於接收 POJO 而不是 Message
例項,並且實現提供了一種自定義用於建立返回的 Object
的 MessageConverter
的方式:以下列表顯示了這些方法
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
Object receiveAndConvert(long timeoutMillis) throws AmqpException;
Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;
從版本 2.0 開始,這些方法有一些變體,它們接受一個額外的 ParameterizedTypeReference
引數來轉換複雜型別。模板必須配置一個 SmartMessageConverter
。有關更多資訊,請參閱 使用 RabbitTemplate
從 Message
進行轉換。
與 sendAndReceive
方法類似,從版本 1.3 開始,AmqpTemplate
提供了一些方便的 receiveAndReply
方法,用於同步接收、處理和回覆訊息。以下列表顯示了這些方法的定義
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback)
throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback)
throws AmqpException;
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
String replyExchange, String replyRoutingKey) throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
String replyExchange, String replyRoutingKey) throws AmqpException;
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
AmqpTemplate
實現負責 receive
和 reply
階段。在大多數情況下,您只需提供 ReceiveAndReplyCallback
的實現來處理接收到的訊息並根據需要構建回覆物件或訊息。請注意,ReceiveAndReplyCallback
可以返回 null
。在這種情況下,不會發送回復,並且 receiveAndReply
的工作方式與 receive
方法相同。這允許同一個佇列用於混合的訊息,其中一些可能不需要回復。
只有當提供的回撥不是 ReceiveAndReplyMessageCallback
的例項時,才會應用自動訊息(請求和回覆)轉換,後者提供了原始訊息交換契約。
ReplyToAddressCallback
對於需要自定義邏輯來根據接收到的訊息和 ReceiveAndReplyCallback
的回覆在執行時確定 replyTo
地址的情況非常有用。預設情況下,請求訊息中的 replyTo
資訊用於路由回覆。
以下列表顯示了一個基於 POJO 的接收和回覆示例
boolean received =
this.template.receiveAndReply(ROUTE, new ReceiveAndReplyCallback<Order, Invoice>() {
public Invoice handle(Order order) {
return processOrder(order);
}
});
if (received) {
log.info("We received an order!");
}