註解驅動的監聽器端點
非同步接收訊息最簡單的方法是使用帶註解的監聽器端點基礎設施。簡而言之,它允許您將受管 bean 的方法公開為 Rabbit 監聽器端點。以下示例展示瞭如何使用 @RabbitListener 註解
@Component
public class MyService {
@RabbitListener(queues = "myQueue")
public void processOrder(String data) {
...
}
}
前述示例的目的是,無論何時 myQueue 佇列上有可用訊息,都會相應地呼叫 processOrder 方法(在本例中,帶訊息的負載)。
帶註解的端點基礎設施透過使用 RabbitListenerContainerFactory,在每個帶註解的方法的幕後建立一個訊息監聽器容器。
在前述示例中,myQueue 必須已經存在並繫結到某個交換器。只要應用程式上下文中存在 RabbitAdmin,就可以自動宣告和繫結佇列。
註解屬性(queues 等)可以指定屬性佔位符(${some.property})或 SpEL 表示式(#{someExpression})。有關為什麼可能使用 SpEL 而不是屬性佔位符的示例,請參閱監聽多個佇列。以下列表顯示了宣告 Rabbit 監聽器的三個示例 |
@Component
public class MyService {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "myQueue", durable = "true"),
exchange = @Exchange(value = "auto.exch", ignoreDeclarationExceptions = "true"),
key = "orderRoutingKey")
)
public void processOrder(Order order) {
...
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(value = "auto.exch"),
key = "invoiceRoutingKey")
)
public void processInvoice(Invoice invoice) {
...
}
@RabbitListener(queuesToDeclare = @Queue(name = "${my.queue}", durable = "true"))
public String handleWithSimpleDeclare(String data) {
...
}
}
在第一個示例中,如果需要,會自動宣告一個佇列 myQueue(持久的)以及交換器,並使用路由鍵繫結到交換器。在第二個示例中,宣告並綁定了一個匿名(獨佔、自動刪除)佇列;佇列名稱由框架使用 Base64UrlNamingStrategy 建立。您不能使用此技術宣告代理命名的佇列;它們需要宣告為 bean 定義;請參閱容器和代理命名的佇列。可以提供多個 QueueBinding 條目,使監聽器監聽多個佇列。在第三個示例中,如果必要,宣告一個從屬性 my.queue 中檢索名稱的佇列,並使用佇列名稱作為路由鍵繫結到預設交換器。
自 2.0 版本以來,@Exchange 註解支援任何交換器型別,包括自定義型別。有關更多資訊,請參閱AMQP 概念。
當您需要更高階的配置時,可以使用普通的 @Bean 定義。
請注意第一個示例中交換器上的 ignoreDeclarationExceptions。例如,這允許繫結到可能具有不同設定(例如 internal)的現有交換器。預設情況下,現有交換器的屬性必須匹配。
從 2.0 版本開始,您現在可以將佇列繫結到具有多個路由鍵的交換器,如下例所示
...
key = { "red", "yellow" }
...
您還可以在 @QueueBinding 註解中為佇列、交換器和繫結指定引數,如下例所示
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "auto.headers", autoDelete = "true",
arguments = @Argument(name = "x-message-ttl", value = "10000",
type = "java.lang.Integer")),
exchange = @Exchange(value = "auto.headers", type = ExchangeTypes.HEADERS, autoDelete = "true"),
arguments = {
@Argument(name = "x-match", value = "all"),
@Argument(name = "thing1", value = "somevalue"),
@Argument(name = "thing2")
})
)
public String handleWithHeadersExchange(String foo) {
...
}
請注意,佇列的 x-message-ttl 引數設定為 10 秒。由於引數型別不是 String,我們必須指定其型別——在本例中為 Integer。與所有此類宣告一樣,如果佇列已存在,則引數必須與佇列上的引數匹配。對於 header 交換器,我們將繫結引數設定為匹配具有 thing1 header 設定為 somevalue 的訊息,並且 thing2 header 必須存在並具有任何值。x-match 引數表示兩個條件都必須滿足。
引數名稱、值和型別可以是屬性佔位符(${…})或 SpEL 表示式(#{…})。name 必須解析為 String。type 的表示式必須解析為 Class 或類的完全限定名。value 必須解析為可以透過 DefaultConversionService 轉換為該型別的東西(例如前述示例中的 x-message-ttl)。
如果名稱解析為 null 或空 String,則忽略該 @Argument。