非同步消費者

Spring AMQP 還透過使用 @RabbitListener 註解支援帶註解的監聽器端點,並提供開放的基礎設施以程式設計方式註冊端點。這是目前為止設定非同步消費者最便捷的方式。有關更多詳細資訊,請參閱 註解驅動的監聽器端點

預取(prefetch)預設值以前是 1,這可能導致高效消費者的利用率不足。從 2.0 版本開始,預設預取值現在是 250,這應該在大多數常見場景中讓消費者保持忙碌狀態,從而提高吞吐量。

然而,在某些情況下,預取值應該較低:

  • 對於大型訊息,特別是當處理速度較慢時(訊息可能在客戶端程序中累積大量記憶體)

  • 當需要嚴格的訊息排序時(在這種情況下,預取值應重新設定為 1)

  • 其他特殊情況

此外,在低流量訊息和多個消費者(包括單個監聽器容器例項內的併發)的情況下,您可能希望降低預取值,以使訊息在消費者之間分佈更均勻。

有關預取值的更多背景資訊,請參閱這篇關於 RabbitMQ 中消費者利用率 的文章,以及這篇關於 排隊理論 的文章。

訊息監聽器

對於非同步 Message 接收,涉及到一個專用元件(不是 AmqpTemplate)。該元件是 Message 消費回撥的容器。我們將在本節稍後討論容器及其屬性。不過,首先我們應該看看回調,因為那是您的應用程式程式碼與訊息系統整合的地方。回撥有幾種選項,從 MessageListener 介面的實現開始,如下面的列表所示:

public interface MessageListener {
    void onMessage(Message message);
}

如果您的回撥邏輯出於任何原因依賴於 AMQP Channel 例項,您可以改用 ChannelAwareMessageListener。它看起來相似,但多了一個引數。下面的列表顯示了 ChannelAwareMessageListener 介面定義:

public interface ChannelAwareMessageListener {
    void onMessage(Message message, Channel channel) throws Exception;
}
在 2.1 版本中,此介面從包 o.s.amqp.rabbit.core 移動到 o.s.amqp.rabbit.listener.api

MessageListenerAdapter

如果您希望在應用程式邏輯和訊息 API 之間保持更嚴格的分離,您可以依賴框架提供的介面卡實現。這通常被稱為“訊息驅動 POJO”支援。

1.5 版本引入了一種更靈活的 POJO 訊息傳遞機制,即 @RabbitListener 註解。有關更多資訊,請參閱 註解驅動的監聽器端點

使用介面卡時,您只需提供介面卡本身應呼叫的例項的引用。以下示例顯示瞭如何操作:

MessageListenerAdapter listener = new MessageListenerAdapter(somePojo);
listener.setDefaultListenerMethod("myMethod");

您可以對介面卡進行子類化,並提供 getListenerMethodName() 的實現,以根據訊息動態選擇不同的方法。此方法有兩個引數,originalMessageextractedMessage,後者是任何轉換的結果。預設情況下,配置了 SimpleMessageConverter。有關更多資訊以及其他可用轉換器的資訊,請參閱 SimpleMessageConverter

從 1.4.2 版本開始,原始訊息具有 consumerQueueconsumerTag 屬性,可用於確定訊息是從哪個佇列接收的。

從 1.5 版本開始,您可以配置一個從消費者佇列或標籤到方法名稱的對映,以動態選擇要呼叫的方法。如果對映中沒有條目,我們將回退到預設監聽器方法。預設監聽器方法(如果未設定)是 handleMessage

從 2.0 版本開始,提供了一個方便的 FunctionalInterface。以下列表顯示了 FunctionalInterface 的定義:

@FunctionalInterface
public interface ReplyingMessageListener<T, R> {

    R handleMessage(T t);

}

此介面透過使用 Java 8 lambda 方便了介面卡的配置,如下面的示例所示:

new MessageListenerAdapter((ReplyingMessageListener<String, String>) data -> {
    ...
    return result;
}));

從 2.2 版本開始,buildListenerArguments(Object) 已被棄用,並引入了新的 buildListenerArguments(Object, Channel, Message)。新方法幫助監聽器獲取 ChannelMessage 引數以執行更多操作,例如在手動確認模式下呼叫 channel.basicReject(long, boolean)。以下列表顯示了最基本的示例:

public class ExtendedListenerAdapter extends MessageListenerAdapter {

    @Override
    protected Object[] buildListenerArguments(Object extractedMessage, Channel channel, Message message) {
        return new Object[]{extractedMessage, channel, message};
    }

}

現在,如果您需要接收“通道”和“訊息”,您可以像配置 MessageListenerAdapter 一樣配置 ExtendedListenerAdapter。監聽器的引數應設定為 buildListenerArguments(Object, Channel, Message) 的返回值,如下面的監聽器示例所示:

public void handleMessage(Object object, Channel channel, Message message) throws IOException {
    ...
}

容器

既然您已經瞭解了 Message 監聽回撥的各種選項,我們可以將注意力轉向容器。基本上,容器處理“主動”職責,以便監聽器回撥可以保持被動。容器是“生命週期”元件的一個示例。它提供了啟動和停止的方法。配置容器時,您實質上彌合了 AMQP 佇列和 MessageListener 例項之間的鴻溝。您必須提供 ConnectionFactory 的引用以及該監聽器應從中消費訊息的佇列名稱或佇列例項。

在 2.0 版本之前,只有一個監聽器容器,即 SimpleMessageListenerContainer。現在有了第二個容器,即 DirectMessageListenerContainer。容器之間的差異以及您在選擇使用哪個容器時可能應用的條件在 選擇容器 中進行了描述。

以下列表顯示了最基本的示例,它透過使用 SimpleMessageListenerContainer 工作:

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory);
container.setQueueNames("some.queue");
container.setMessageListener(new MessageListenerAdapter(somePojo));

作為“主動”元件,最常見的是使用 bean 定義建立監聽器容器,以便它可以在後臺執行。以下示例顯示瞭如何使用 XML 執行此操作:

<rabbit:listener-container connection-factory="rabbitConnectionFactory">
    <rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>

以下列表顯示了使用 XML 執行此操作的另一種方式:

<rabbit:listener-container connection-factory="rabbitConnectionFactory" type="direct">
    <rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>

前面的兩個示例都建立了一個 DirectMessageListenerContainer(注意 type 屬性 — 它預設為 simple)。

或者,您可能更喜歡使用 Java 配置,它看起來與前面的程式碼片段相似:

@Configuration
public class ExampleAmqpConfiguration {

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory());
        container.setQueueName("some.queue");
        container.setMessageListener(exampleListener());
        return container;
    }

    @Bean
    public CachingConnectionFactory rabbitConnectionFactory() {
        CachingConnectionFactory connectionFactory =
            new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public MessageListener exampleListener() {
        return new MessageListener() {
            public void onMessage(Message message) {
                System.out.println("received: " + message);
            }
        };
    }
}

消費者優先順序

從 RabbitMQ 3.2 版本開始,broker 現在支援消費者優先順序(請參閱 在 RabbitMQ 中使用消費者優先順序)。這可以透過在消費者上設定 x-priority 引數來啟用。SimpleMessageListenerContainer 現在支援設定消費者引數,如下面的示例所示:

container.setConsumerArguments(Collections.
<String, Object> singletonMap("x-priority", Integer.valueOf(10)));

為了方便,名稱空間在 listener 元素上提供了 priority 屬性,如下面的示例所示:

<rabbit:listener-container connection-factory="rabbitConnectionFactory">
    <rabbit:listener queues="some.queue" ref="somePojo" method="handle" priority="10" />
</rabbit:listener-container>

從 1.3 版本開始,您可以在執行時修改容器監聽的佇列。請參閱 監聽器容器佇列

auto-delete 佇列

當容器配置為監聽 auto-delete 佇列時,如果佇列具有 x-expires 選項,或者在 Broker 上配置了 Time-To-Live 策略,則當容器停止時(即當最後一個消費者被取消時),broker 會刪除該佇列。在 1.3 版本之前,容器無法重新啟動,因為佇列丟失了。RabbitAdmin 僅在連線關閉或開啟時自動重新宣告佇列等,這在容器停止和啟動時不會發生。

從 1.3 版本開始,容器在啟動期間使用 RabbitAdmin 重新宣告任何缺失的佇列。

您還可以使用條件宣告(請參閱 條件宣告)以及 auto-startup="false" 的 admin,將佇列宣告推遲到容器啟動時。以下示例顯示瞭如何操作:

<rabbit:queue id="otherAnon" declared-by="containerAdmin" />

<rabbit:direct-exchange name="otherExchange" auto-delete="true" declared-by="containerAdmin">
    <rabbit:bindings>
        <rabbit:binding queue="otherAnon" key="otherAnon" />
    </rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:listener-container id="container2" auto-startup="false">
    <rabbit:listener id="listener2" ref="foo" queues="otherAnon" admin="containerAdmin" />
</rabbit:listener-container>

<rabbit:admin id="containerAdmin" connection-factory="rabbitConnectionFactory"
    auto-startup="false" />

在這種情況下,佇列和交換由 containerAdmin 宣告,該 containerAdmin 具有 auto-startup="false",因此元素不會在上下文初始化期間宣告。同樣,容器也因此沒有啟動。當容器稍後啟動時,它會使用其對 containerAdmin 的引用來宣告元素。

© . This site is unofficial and not affiliated with VMware.