回覆管理

MessageListenerAdapter 中已有的支援允許你的方法具有非 void 返回型別。在這種情況下,方法呼叫的結果會被封裝在一個訊息中,傳送到原始訊息的 ReplyToAddress 頭部指定的地址,或者傳送到監聽器上配置的預設地址。你可以透過使用訊息抽象的 @SendTo 註解來設定該預設地址。

假設我們的 processOrder 方法現在應該返回 OrderStatus,我們可以如下編寫程式碼以自動傳送回覆

@RabbitListener(destination = "myQueue")
@SendTo("status")
public OrderStatus processOrder(Order order) {
    // order processing
    return status;
}

如果你需要以傳輸獨立的方式設定額外的頭部,你可以返回一個 Message 代替,示例如下

@RabbitListener(destination = "myQueue")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
    // order processing
    return MessageBuilder
        .withPayload(status)
        .setHeader("code", 1234)
        .build();
}

另外,你可以在 beforeSendReplyMessagePostProcessors 容器工廠屬性中使用 MessagePostProcessor 新增更多頭部。從 2.2.3 版本開始,被呼叫的 bean/方法在回覆訊息中可用,這可以在訊息後處理器中使用,以便將資訊反饋給呼叫者

factory.setBeforeSendReplyPostProcessors(msg -> {
    msg.getMessageProperties().setHeader("calledBean",
            msg.getMessageProperties().getTargetBean().getClass().getSimpleName());
    msg.getMessageProperties().setHeader("calledMethod",
            msg.getMessageProperties().getTargetMethod().getName());
    return m;
});

從 2.2.5 版本開始,你可以配置 ReplyPostProcessor 來修改回覆訊息傳送前的內容;它在 correlationId 頭部設定為匹配請求之後被呼叫。

@RabbitListener(queues = "test.header", group = "testGroup", replyPostProcessor = "echoCustomHeader")
public String capitalizeWithHeader(String in) {
    return in.toUpperCase();
}

@Bean
public ReplyPostProcessor echoCustomHeader() {
    return (req, resp) -> {
        resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader"));
        return resp;
    };
}

從 3.0 版本開始,你可以在容器工廠上配置後處理器,而不是在註解上。

factory.setReplyPostProcessorProvider(id -> (req, resp) -> {
    resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader"));
    return resp;
});

id 引數是監聽器的 id。

註解上的設定將覆蓋工廠設定。

@SendTo 的值被假定為一個回覆交換機和路由鍵對,遵循 exchange/routingKey 模式,其中一個部分可以省略。有效值如下

  • thing1/thing2: 回覆交換機和路由鍵。 thing1/: 回覆交換機和預設(空)路由鍵。 thing2/thing2: 回覆路由鍵和預設(空)交換機。 / 或空: 回覆預設交換機和預設路由鍵。

另外,你可以使用不帶 value 屬性的 @SendTo。這種情況等同於一個空的 sendTo 模式。只有在入站訊息沒有 replyToAddress 屬性時,才使用 @SendTo。

從 1.5 版本開始,@SendTo 的值可以是一個 bean 初始化 SpEL 表示式,如下例所示

@RabbitListener(queues = "test.sendTo.spel")
@SendTo("#{spelReplyTo}")
public String capitalizeWithSendToSpel(String foo) {
    return foo.toUpperCase();
}
...
@Bean
public String spelReplyTo() {
    return "test.sendTo.reply.spel";
}

表示式必須解析為一個 String,它可以是一個簡單的佇列名(傳送到預設交換機),或者如前面示例之前討論的 exchange/routingKey 形式。

#{…​} 表示式在初始化期間只評估一次。

對於動態回覆路由,訊息傳送者應包含一個 reply_to 訊息屬性或使用備用的執行時 SpEL 表示式(在下一個示例後描述)。

從 1.6 版本開始,@SendTo 可以是一個 SpEL 表示式,它在執行時針對請求和回覆進行評估,如下例所示

@RabbitListener(queues = "test.sendTo.spel")
@SendTo("!{'some.reply.queue.with.' + result.queueName}")
public Bar capitalizeWithSendToSpel(Foo foo) {
    return processTheFooAndReturnABar(foo);
}

SpEL 表示式的執行時特性透過 !{…​} 分隔符表示。表示式的評估上下文 #root 物件有三個屬性

  • request: o.s.amqp.core.Message 請求物件。

  • source: 轉換後的 o.s.messaging.Message

  • result: 方法結果。

上下文包含一個 map 屬性訪問器、一個標準型別轉換器和一個 bean 解析器,允許引用上下文中的其他 bean(例如,@someBeanName.determineReplyQ(request, result))。

總之,#{…​} 在初始化期間評估一次,#root 物件是應用上下文。Bean 按其名稱引用。!{…​} 在執行時為每條訊息評估,根物件具有前面列出的屬性。Bean 按其名稱引用,字首為 @

從 2.1 版本開始,也支援簡單的屬性佔位符(例如,${some.reply.to})。在早期版本中,可以使用以下方法作為變通方案,如下例所示

@RabbitListener(queues = "foo")
@SendTo("#{environment['my.send.to']}")
public String listen(Message in) {
    ...
    return ...
}