回覆管理
MessageListenerAdapter 中現有的支援已允許您的方法具有非 void 返回型別。在這種情況下,呼叫的結果將封裝在一個訊息中,傳送到原始訊息的 ReplyToAddress 標頭中指定的地址,或傳送到偵聽器上配置的預設地址。您可以使用訊息抽象的 @SendTo 註解來設定該預設地址。
假設我們的 processOrder 方法現在應該返回一個 OrderStatus,我們可以將其編寫如下以自動傳送回覆
@RabbitListener(destination = "myQueue")
@SendTo("status")
public OrderStatus processOrder(Order order) {
// order processing
return status;
}
如果您需要以傳輸無關的方式設定額外的標頭,您可以返回一個 Message,如下所示:
@RabbitListener(destination = "myQueue")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
// order processing
return MessageBuilder
.withPayload(status)
.setHeader("code", 1234)
.build();
}
或者,您可以在 beforeSendReplyMessagePostProcessors 容器工廠屬性中使用 MessagePostProcessor 來新增更多標頭。從版本 2.2.3 開始,被呼叫的 bean/方法會在回覆訊息中提供,這可以在訊息後處理器中使用,將資訊傳回給呼叫方。
factory.setBeforeSendReplyPostProcessors(msg -> {
msg.getMessageProperties().setHeader("calledBean",
msg.getMessageProperties().getTargetBean().getClass().getSimpleName());
msg.getMessageProperties().setHeader("calledMethod",
msg.getMessageProperties().getTargetMethod().getName());
return m;
});
從版本 2.2.5 開始,您可以配置一個 ReplyPostProcessor 來修改回覆訊息,然後傳送;它在設定 correlationId 標頭以匹配請求之後呼叫。
@RabbitListener(queues = "test.header", group = "testGroup", replyPostProcessor = "echoCustomHeader")
public String capitalizeWithHeader(String in) {
return in.toUpperCase();
}
@Bean
public ReplyPostProcessor echoCustomHeader() {
return (req, resp) -> {
resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader"));
return resp;
};
}
從版本 3.0 開始,您可以在容器工廠而不是註解上配置後處理器。
factory.setReplyPostProcessorProvider(id -> (req, resp) -> {
resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader"));
return resp;
});
id 引數是偵聽器 id。
註解上的設定將取代工廠設定。
@SendTo 值被假定為一個回覆 exchange 和 routingKey 對,遵循 exchange/routingKey 模式,其中一部分可以省略。有效值如下:
-
thing1/thing2:replyTo交換和routingKey。thing1/:replyTo交換和預設(空)routingKey。thing2或/thing2:replyToroutingKey和預設(空)交換。/或空:replyTo預設交換和預設routingKey。
此外,您可以不帶 value 屬性使用 @SendTo。這種情況等同於空 sendTo 模式。@SendTo 僅在入站訊息沒有 replyToAddress 屬性時使用。
從版本 1.5 開始,@SendTo 值可以是 bean 初始化 SpEL 表示式,如以下示例所示
@RabbitListener(queues = "test.sendTo.spel")
@SendTo("#{spelReplyTo}")
public String capitalizeWithSendToSpel(String foo) {
return foo.toUpperCase();
}
...
@Bean
public String spelReplyTo() {
return "test.sendTo.reply.spel";
}
該表示式必須解析為 String,它可以是簡單的佇列名稱(傳送到預設交換),也可以是 exchange/routingKey 形式,如前一個示例之前所討論的。
#{…} 表示式在初始化期間評估一次。 |
對於動態回覆路由,訊息傳送方應包含 reply_to 訊息屬性,或使用備用執行時 SpEL 表示式(在下一個示例之後描述)。
從版本 1.6 開始,@SendTo 可以是一個 SpEL 表示式,它在執行時針對請求和回覆進行評估,如以下示例所示
@RabbitListener(queues = "test.sendTo.spel")
@SendTo("!{'some.reply.queue.with.' + result.queueName}")
public Bar capitalizeWithSendToSpel(Foo foo) {
return processTheFooAndReturnABar(foo);
}
SpEL 表示式的執行時性質由 !{…} 分隔符表示。表示式的評估上下文 #root 物件具有三個屬性
-
request:o.s.amqp.core.Message請求物件。 -
source:轉換後的o.s.messaging.Message<?>。 -
result:方法結果。
該上下文具有一個對映屬性訪問器、一個標準型別轉換器和一個 bean 解析器,允許引用上下文中的其他 bean(例如,@someBeanName.determineReplyQ(request, result))。
總而言之,#{…} 在初始化期間評估一次,其中 #root 物件是應用程式上下文。bean 透過其名稱引用。!{…} 在執行時為每條訊息評估,其中根物件具有前面列出的屬性。bean 透過其名稱(字首為 @)引用。
從版本 2.1 開始,還支援簡單的屬性佔位符(例如,${some.reply.to})。對於早期版本,可以使用以下解決方法,如以下示例所示
@RabbitListener(queues = "foo")
@SendTo("#{environment['my.send.to']}")
public String listen(Message in) {
...
return ...
}