非同步消費者
Spring AMQP 還支援透過使用 @RabbitListener 註解實現註解驅動的監聽器端點,並提供了開放的基礎設施以程式設計方式註冊端點。這是目前為止設定非同步消費者最便捷的方式。有關更多詳細資訊,請參閱 註解驅動的監聽器端點。 |
prefetch 的預設值以前是 1,這可能導致高效消費者的利用率不足。從 2.0 版本開始,預設的 prefetch 值現在是 250,這應該能在大多數常見場景下保持消費者忙碌,從而提高吞吐量。 然而,在某些場景下,prefetch 值應該設定得較低
此外,在訊息量低且消費者數量多(包括單個監聽器容器例項內的併發)的情況下,您可能希望降低 prefetch 值,以便在消費者之間實現更均勻的訊息分發。 請參閱 訊息監聽器容器配置。 關於 prefetch 的更多背景資訊,請參閱這篇關於 RabbitMQ 中的消費者利用率 的文章,以及這篇關於 排隊論 的文章。 |
訊息監聽器
對於非同步 Message
接收,涉及一個專門的元件(而不是 AmqpTemplate
)。該元件是一個用於 Message
消費回撥的容器。我們將在本節後面討論容器及其屬性。但是,首先我們應該看一下回調,因為這是應用程式程式碼與訊息系統整合的地方。回撥有幾種選擇,首先是 MessageListener
介面的實現,如下所示
public interface MessageListener {
void onMessage(Message message);
}
如果您的回撥邏輯因任何原因依賴於 AMQP Channel 例項,則可以改用 ChannelAwareMessageListener
。它看起來類似,但有一個額外的引數。以下列表顯示了 ChannelAwareMessageListener
介面的定義
public interface ChannelAwareMessageListener {
void onMessage(Message message, Channel channel) throws Exception;
}
在 2.1 版本中,此介面從包 o.s.amqp.rabbit.core 移至 o.s.amqp.rabbit.listener.api 。 |
MessageListenerAdapter
如果您希望在應用程式邏輯和訊息 API 之間保持更嚴格的分離,可以依賴框架提供的介面卡實現。這通常被稱為“訊息驅動 POJO”支援。
1.5 版本引入了一種更靈活的 POJO 訊息機制,即 @RabbitListener 註解。有關更多資訊,請參閱 註解驅動的監聽器端點。 |
使用介面卡時,您只需提供介面卡本身應呼叫的例項引用。以下示例展示瞭如何實現
MessageListenerAdapter listener = new MessageListenerAdapter(somePojo);
listener.setDefaultListenerMethod("myMethod");
您可以繼承介面卡並提供 getListenerMethodName()
的實現,以根據訊息動態選擇不同的方法。此方法有兩個引數:originalMessage
和 extractedMessage
,後者是任何轉換的結果。預設情況下,配置了一個 SimpleMessageConverter
。有關更多資訊以及其他可用轉換器的資訊,請參閱 SimpleMessageConverter
。
從 1.4.2 版本開始,原始訊息具有 consumerQueue
和 consumerTag
屬性,可用於確定接收訊息的佇列。
從 1.5 版本開始,您可以配置一個將消費者佇列或標籤對映到方法名的 Map,以動態選擇要呼叫的方法。如果 Map 中沒有對應的條目,則回退到預設監聽器方法。預設監聽器方法(如果未設定)為 handleMessage
。
從 2.0 版本開始,提供了一個方便的 FunctionalInterface
。以下列表顯示了 FunctionalInterface
的定義
@FunctionalInterface
public interface ReplyingMessageListener<T, R> {
R handleMessage(T t);
}
該介面透過使用 Java 8 lambda 表示式簡化了介面卡的配置,如下例所示
new MessageListenerAdapter((ReplyingMessageListener<String, String>) data -> {
...
return result;
}));
從 2.2 版本開始,buildListenerArguments(Object)
方法已被棄用,取而代之的是新的 buildListenerArguments(Object, Channel, Message)
方法。新方法幫助監聽器獲取 Channel
和 Message
引數以便執行更多操作,例如在手動確認模式下呼叫 channel.basicReject(long, boolean)
。以下列表顯示了最基本的示例
public class ExtendedListenerAdapter extends MessageListenerAdapter {
@Override
protected Object[] buildListenerArguments(Object extractedMessage, Channel channel, Message message) {
return new Object[]{extractedMessage, channel, message};
}
}
現在,如果您需要接收“channel”和“message”引數,可以將 ExtendedListenerAdapter
配置得與 MessageListenerAdapter
相同。監聽器的引數應設定為 buildListenerArguments(Object, Channel, Message)
的返回值,如下面的監聽器示例所示
public void handleMessage(Object object, Channel channel, Message message) throws IOException {
...
}
容器
現在您已經瞭解了 Message
監聽回撥的各種選項,我們可以將注意力轉向容器。基本上,容器處理“主動”的職責,以便監聽器回撥可以保持被動。容器是一個“生命週期”元件的示例。它提供了啟動和停止的方法。配置容器時,您實際上是在連線 AMQP 佇列和 MessageListener
例項。您必須提供對 ConnectionFactory
以及該監聽器應從中消費訊息的佇列名稱或 Queue 例項的引用。
在 2.0 版本之前,只有一個監聽器容器,即 SimpleMessageListenerContainer
。現在有了第二個容器,即 DirectMessageListenerContainer
。容器之間的差異以及選擇使用哪個容器時可能應用的準則在 選擇容器 中有詳細描述。
以下列表顯示了最基本的示例,它透過使用 SimpleMessageListenerContainer
工作
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory);
container.setQueueNames("some.queue");
container.setMessageListener(new MessageListenerAdapter(somePojo));
作為一個“主動”元件,最常見的做法是使用 bean 定義建立監聽器容器,以便它可以在後臺執行。以下示例展示了使用 XML 配置的一種方式
<rabbit:listener-container connection-factory="rabbitConnectionFactory">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>
以下列表展示了使用 XML 配置的另一種方式
<rabbit:listener-container connection-factory="rabbitConnectionFactory" type="direct">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>
上面兩個示例都建立了一個 DirectMessageListenerContainer
(注意 type
屬性 - 它預設為 simple
)。
或者,您可能更喜歡使用 Java 配置,它看起來與上面的程式碼片段類似
@Configuration
public class ExampleAmqpConfiguration {
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}
@Bean
public CachingConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public MessageListener exampleListener() {
return new MessageListener() {
public void onMessage(Message message) {
System.out.println("received: " + message);
}
};
}
}
消費者優先順序
從 RabbitMQ 3.2 版本開始,代理現在支援消費者優先順序(請參閱 在 RabbitMQ 中使用消費者優先順序)。這可以透過在消費者上設定 x-priority
引數來啟用。SimpleMessageListenerContainer
現在支援設定消費者引數,如下例所示
container.setConsumerArguments(Collections.
<String, Object> singletonMap("x-priority", Integer.valueOf(10)));
為了方便起見,名稱空間在 listener
元素上提供了 priority
屬性,如下例所示
<rabbit:listener-container connection-factory="rabbitConnectionFactory">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle" priority="10" />
</rabbit:listener-container>
從 1.3 版本開始,您可以在執行時修改容器監聽的佇列。請參閱 監聽器容器佇列。
auto-delete
佇列
當容器配置為監聽 auto-delete
佇列、佇列設定了 x-expires
選項,或者在代理上配置了 Time-To-Live 策略時,當容器停止時(即最後一個消費者取消時),佇列會被代理移除。在 1.3 版本之前,由於佇列丟失,容器無法重新啟動。RabbitAdmin
僅在連線關閉或開啟時自動重新宣告佇列等,而容器停止和啟動時不會發生這種情況。
從 1.3 版本開始,容器在啟動期間會使用 RabbitAdmin
重新宣告所有丟失的佇列。
您還可以使用條件宣告(請參閱 條件宣告)以及 auto-startup="false"
的 admin,將佇列宣告推遲到容器啟動時進行。以下示例展示瞭如何實現
<rabbit:queue id="otherAnon" declared-by="containerAdmin" />
<rabbit:direct-exchange name="otherExchange" auto-delete="true" declared-by="containerAdmin">
<rabbit:bindings>
<rabbit:binding queue="otherAnon" key="otherAnon" />
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:listener-container id="container2" auto-startup="false">
<rabbit:listener id="listener2" ref="foo" queues="otherAnon" admin="containerAdmin" />
</rabbit:listener-container>
<rabbit:admin id="containerAdmin" connection-factory="rabbitConnectionFactory"
auto-startup="false" />
在這種情況下,佇列和交換器由 containerAdmin
宣告,該 admin 設定了 auto-startup="false"
,因此元素在上下文初始化期間不會被宣告。同樣,容器也不會因為同樣的原因而啟動。當容器稍後啟動時,它會使用對 containerAdmin
的引用來宣告這些元素。