輪詢消費者
AmqpTemplate 本身可用於輪詢 Message 接收。預設情況下,如果沒有訊息可用,會立即返回 null。不會有阻塞。從 1.5 版開始,你可以設定一個以毫秒為單位的 receiveTimeout,接收方法會阻塞長達該時間,等待訊息。小於零的值表示無限期阻塞(或至少直到與代理的連線丟失)。1.6 版引入了 receive 方法的變體,允許在每次呼叫時傳入超時時間。
由於接收操作為每個訊息建立一個新的 QueueingConsumer,因此該技術不適用於高吞吐量環境。對於這些用例,請考慮使用非同步消費者或將 receiveTimeout 設定為零。 |
從 2.4.8 版開始,當使用非零超時時,你可以指定傳遞給 basicConsume 方法的引數,用於將消費者與通道關聯。例如:template.addConsumerArg("x-priority", 10)。
有四種簡單的 receive 方法可用。與傳送側的 Exchange 一樣,有一個方法要求直接在模板本身設定了預設佇列屬性,還有一個方法接受執行時佇列引數。1.6 版引入了接受 timeoutMillis 的變體,以在每次請求時覆蓋 receiveTimeout。以下列表顯示了這四種方法的定義。
Message receive() throws AmqpException;
Message receive(String queueName) throws AmqpException;
Message receive(long timeoutMillis) throws AmqpException;
Message receive(String queueName, long timeoutMillis) throws AmqpException;
與傳送訊息的情況一樣,AmqpTemplate 具有一些便利方法,用於接收 POJO 而不是 Message 例項,並且實現提供了一種自定義用於建立返回的 Object 的 MessageConverter 的方法:以下列表顯示了這些方法。
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
Object receiveAndConvert(long timeoutMillis) throws AmqpException;
Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;
從 2.0 版開始,這些方法有額外的變體,接受一個 ParameterizedTypeReference 引數來轉換複雜型別。模板必須配置 SmartMessageConverter。有關更多資訊,請參閱 使用 RabbitTemplate 從 Message 轉換。
與 sendAndReceive 方法類似,從 1.3 版開始,AmqpTemplate 具有幾個便利的 receiveAndReply 方法,用於同步接收、處理和回覆訊息。以下列表顯示了這些方法的定義。
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback)
throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback)
throws AmqpException;
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
String replyExchange, String replyRoutingKey) throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
String replyExchange, String replyRoutingKey) throws AmqpException;
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
AmqpTemplate 實現負責 receive 和 reply 階段。在大多數情況下,你只需提供 ReceiveAndReplyCallback 的實現來執行接收訊息的業務邏輯,並在需要時構建回覆物件或訊息。請注意,ReceiveAndReplyCallback 可以返回 null。在這種情況下,不傳送回覆,receiveAndReply 的工作方式類似於 receive 方法。這允許同一個佇列用於混合訊息,其中一些可能不需要回復。
只有當提供的回撥不是 ReceiveAndReplyMessageCallback 的例項時,才會應用自動訊息(請求和回覆)轉換,ReceiveAndReplyMessageCallback 提供了原始訊息交換契約。
ReplyToAddressCallback 對於需要自定義邏輯以根據收到的訊息和來自 ReceiveAndReplyCallback 的回覆在執行時確定 replyTo 地址的情況很有用。預設情況下,請求訊息中的 replyTo 資訊用於路由回覆。
以下列表顯示了一個基於 POJO 的接收和回覆示例。
boolean received =
this.template.receiveAndReply(ROUTE, new ReceiveAndReplyCallback<Order, Invoice>() {
public Invoice handle(Order order) {
return processOrder(order);
}
});
if (received) {
log.info("We received an order!");
}