註解驅動的監聽器端點

非同步接收訊息最簡單的方法是使用帶註解的監聽器端點基礎設施。簡而言之,它允許您將受管 bean 的方法公開為 Rabbit 監聽器端點。以下示例展示瞭如何使用 @RabbitListener 註解

@Component
public class MyService {

    @RabbitListener(queues = "myQueue")
    public void processOrder(String data) {
        ...
    }

}

前述示例的目的是,無論何時 myQueue 佇列上有可用訊息,都會相應地呼叫 processOrder 方法(在本例中,帶訊息的負載)。

帶註解的端點基礎設施透過使用 RabbitListenerContainerFactory,在每個帶註解的方法的幕後建立一個訊息監聽器容器。

在前述示例中,myQueue 必須已經存在並繫結到某個交換器。只要應用程式上下文中存在 RabbitAdmin,就可以自動宣告和繫結佇列。

註解屬性(queues 等)可以指定屬性佔位符(${some.property})或 SpEL 表示式(#{someExpression})。有關為什麼可能使用 SpEL 而不是屬性佔位符的示例,請參閱監聽多個佇列。以下列表顯示了宣告 Rabbit 監聽器的三個示例
@Component
public class MyService {

  @RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "myQueue", durable = "true"),
        exchange = @Exchange(value = "auto.exch", ignoreDeclarationExceptions = "true"),
        key = "orderRoutingKey")
  )
  public void processOrder(Order order) {
    ...
  }

  @RabbitListener(bindings = @QueueBinding(
        value = @Queue,
        exchange = @Exchange(value = "auto.exch"),
        key = "invoiceRoutingKey")
  )
  public void processInvoice(Invoice invoice) {
    ...
  }

  @RabbitListener(queuesToDeclare = @Queue(name = "${my.queue}", durable = "true"))
  public String handleWithSimpleDeclare(String data) {
      ...
  }

}

在第一個示例中,如果需要,會自動宣告一個佇列 myQueue(持久的)以及交換器,並使用路由鍵繫結到交換器。在第二個示例中,宣告並綁定了一個匿名(獨佔、自動刪除)佇列;佇列名稱由框架使用 Base64UrlNamingStrategy 建立。您不能使用此技術宣告代理命名的佇列;它們需要宣告為 bean 定義;請參閱容器和代理命名的佇列。可以提供多個 QueueBinding 條目,使監聽器監聽多個佇列。在第三個示例中,如果必要,宣告一個從屬性 my.queue 中檢索名稱的佇列,並使用佇列名稱作為路由鍵繫結到預設交換器。

自 2.0 版本以來,@Exchange 註解支援任何交換器型別,包括自定義型別。有關更多資訊,請參閱AMQP 概念

當您需要更高階的配置時,可以使用普通的 @Bean 定義。

請注意第一個示例中交換器上的 ignoreDeclarationExceptions。例如,這允許繫結到可能具有不同設定(例如 internal)的現有交換器。預設情況下,現有交換器的屬性必須匹配。

從 2.0 版本開始,您現在可以將佇列繫結到具有多個路由鍵的交換器,如下例所示

...
    key = { "red", "yellow" }
...

您還可以在 @QueueBinding 註解中為佇列、交換器和繫結指定引數,如下例所示

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "auto.headers", autoDelete = "true",
                        arguments = @Argument(name = "x-message-ttl", value = "10000",
                                                type = "java.lang.Integer")),
        exchange = @Exchange(value = "auto.headers", type = ExchangeTypes.HEADERS, autoDelete = "true"),
        arguments = {
                @Argument(name = "x-match", value = "all"),
                @Argument(name = "thing1", value = "somevalue"),
                @Argument(name = "thing2")
        })
)
public String handleWithHeadersExchange(String foo) {
    ...
}

請注意,佇列的 x-message-ttl 引數設定為 10 秒。由於引數型別不是 String,我們必須指定其型別——在本例中為 Integer。與所有此類宣告一樣,如果佇列已存在,則引數必須與佇列上的引數匹配。對於 header 交換器,我們將繫結引數設定為匹配具有 thing1 header 設定為 somevalue 的訊息,並且 thing2 header 必須存在並具有任何值。x-match 引數表示兩個條件都必須滿足。

引數名稱、值和型別可以是屬性佔位符(${…​})或 SpEL 表示式(#{…​})。name 必須解析為 Stringtype 的表示式必須解析為 Class 或類的完全限定名。value 必須解析為可以透過 DefaultConversionService 轉換為該型別的東西(例如前述示例中的 x-message-ttl)。

如果名稱解析為 null 或空 String,則忽略該 @Argument

© . This site is unofficial and not affiliated with VMware.