回覆管理
MessageListenerAdapter 中已有的支援允許你的方法具有非 void 返回型別。在這種情況下,方法呼叫的結果會被封裝在一個訊息中,傳送到原始訊息的 ReplyToAddress 頭部指定的地址,或者傳送到監聽器上配置的預設地址。你可以透過使用訊息抽象的 @SendTo 註解來設定該預設地址。
假設我們的 processOrder 方法現在應該返回 OrderStatus,我們可以如下編寫程式碼以自動傳送回覆
@RabbitListener(destination = "myQueue")
@SendTo("status")
public OrderStatus processOrder(Order order) {
// order processing
return status;
}
如果你需要以傳輸獨立的方式設定額外的頭部,你可以返回一個 Message 代替,示例如下
@RabbitListener(destination = "myQueue")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
// order processing
return MessageBuilder
.withPayload(status)
.setHeader("code", 1234)
.build();
}
另外,你可以在 beforeSendReplyMessagePostProcessors 容器工廠屬性中使用 MessagePostProcessor 新增更多頭部。從 2.2.3 版本開始,被呼叫的 bean/方法在回覆訊息中可用,這可以在訊息後處理器中使用,以便將資訊反饋給呼叫者
factory.setBeforeSendReplyPostProcessors(msg -> {
msg.getMessageProperties().setHeader("calledBean",
msg.getMessageProperties().getTargetBean().getClass().getSimpleName());
msg.getMessageProperties().setHeader("calledMethod",
msg.getMessageProperties().getTargetMethod().getName());
return m;
});
從 2.2.5 版本開始,你可以配置 ReplyPostProcessor 來修改回覆訊息傳送前的內容;它在 correlationId 頭部設定為匹配請求之後被呼叫。
@RabbitListener(queues = "test.header", group = "testGroup", replyPostProcessor = "echoCustomHeader")
public String capitalizeWithHeader(String in) {
return in.toUpperCase();
}
@Bean
public ReplyPostProcessor echoCustomHeader() {
return (req, resp) -> {
resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader"));
return resp;
};
}
從 3.0 版本開始,你可以在容器工廠上配置後處理器,而不是在註解上。
factory.setReplyPostProcessorProvider(id -> (req, resp) -> {
resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader"));
return resp;
});
id 引數是監聽器的 id。
註解上的設定將覆蓋工廠設定。
@SendTo 的值被假定為一個回覆交換機和路由鍵對,遵循 exchange/routingKey 模式,其中一個部分可以省略。有效值如下
-
thing1/thing2
: 回覆交換機和路由鍵。thing1/
: 回覆交換機和預設(空)路由鍵。thing2
或/thing2
: 回覆路由鍵和預設(空)交換機。/
或空: 回覆預設交換機和預設路由鍵。
另外,你可以使用不帶 value 屬性的 @SendTo。這種情況等同於一個空的 sendTo 模式。只有在入站訊息沒有 replyToAddress 屬性時,才使用 @SendTo。
從 1.5 版本開始,@SendTo 的值可以是一個 bean 初始化 SpEL 表示式,如下例所示
@RabbitListener(queues = "test.sendTo.spel")
@SendTo("#{spelReplyTo}")
public String capitalizeWithSendToSpel(String foo) {
return foo.toUpperCase();
}
...
@Bean
public String spelReplyTo() {
return "test.sendTo.reply.spel";
}
表示式必須解析為一個 String,它可以是一個簡單的佇列名(傳送到預設交換機),或者如前面示例之前討論的 exchange/routingKey 形式。
#{…} 表示式在初始化期間只評估一次。 |
對於動態回覆路由,訊息傳送者應包含一個 reply_to 訊息屬性或使用備用的執行時 SpEL 表示式(在下一個示例後描述)。
從 1.6 版本開始,@SendTo 可以是一個 SpEL 表示式,它在執行時針對請求和回覆進行評估,如下例所示
@RabbitListener(queues = "test.sendTo.spel")
@SendTo("!{'some.reply.queue.with.' + result.queueName}")
public Bar capitalizeWithSendToSpel(Foo foo) {
return processTheFooAndReturnABar(foo);
}
SpEL 表示式的執行時特性透過 !{…}
分隔符表示。表示式的評估上下文 #root
物件有三個屬性
-
request
: o.s.amqp.core.Message 請求物件。 -
source
: 轉換後的 o.s.messaging.Message>。 -
result
: 方法結果。
上下文包含一個 map 屬性訪問器、一個標準型別轉換器和一個 bean 解析器,允許引用上下文中的其他 bean(例如,@someBeanName.determineReplyQ(request, result)
)。
總之,#{…}
在初始化期間評估一次,#root
物件是應用上下文。Bean 按其名稱引用。!{…}
在執行時為每條訊息評估,根物件具有前面列出的屬性。Bean 按其名稱引用,字首為 @
。
從 2.1 版本開始,也支援簡單的屬性佔位符(例如,${some.reply.to}
)。在早期版本中,可以使用以下方法作為變通方案,如下例所示
@RabbitListener(queues = "foo")
@SendTo("#{environment['my.send.to']}")
public String listen(Message in) {
...
return ...
}