檢測空閒的非同步消費者

雖然非同步消費者效率很高,但檢測它們何時空閒是一個問題——使用者可能希望在一段時間沒有訊息到達時採取一些行動。

從 1.6 版本開始,現在可以配置監聽器容器,使其在一段時間沒有訊息投遞時釋出 `ListenerContainerIdleEvent` 事件。當容器空閒時,每隔 `idleEventInterval` 毫秒就會發布一次事件。

要配置此功能,請在容器上設定 `idleEventInterval`。以下示例展示瞭如何在 XML 和 Java 中(對於 `SimpleMessageListenerContainer` 和 `SimpleRabbitListenerContainerFactory`)進行配置。

<rabbit:listener-container connection-factory="connectionFactory"
        ...
        idle-event-interval="60000"
        ...
        >
    <rabbit:listener id="container1" queue-names="foo" ref="myListener" method="handle" />
</rabbit:listener-container>
@Bean
public SimpleMessageListenerContainer(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    ...
    container.setIdleEventInterval(60000L);
    ...
    return container;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(rabbitConnectionFactory());
    factory.setIdleEventInterval(60000L);
    ...
    return factory;
}

在每種情況下,當容器空閒時,每分鐘釋出一次事件。

事件消費

您可以透過實現 `ApplicationListener` 來捕獲空閒事件——可以是通用監聽器,也可以是僅接收此特定事件的監聽器。您還可以使用 Spring Framework 4.2 中引入的 `@EventListener`。

以下示例將 `@RabbitListener` 和 `@EventListener` 合併到一個類中。您需要理解應用程式監聽器會收到所有容器的事件,因此如果您想根據哪個容器空閒來採取特定行動,可能需要檢查監聽器 ID。您也可以為此目的使用 `@EventListener` 的 `condition` 屬性。

事件有四個屬性

  • source:監聽器容器例項

  • id:監聽器 ID(或容器 Bean 名稱)

  • idleTime:事件釋出時容器已空閒的時間

  • queueNames:容器監聽的佇列名稱

以下示例展示瞭如何使用 `@RabbitListener` 和 `@EventListener` 註解建立監聽器

public class Listener {

    @RabbitListener(id="someId", queues="#{queue.name}")
    public String listen(String foo) {
        return foo.toUpperCase();
    }

    @EventListener(condition = "event.listenerId == 'someId'")
    public void onApplicationEvent(ListenerContainerIdleEvent event) {
        ...
    }

}
事件監聽器會看到所有容器的事件。因此,在前面的示例中,我們根據監聽器 ID 來過濾接收到的事件。
如果您希望使用空閒事件來停止監聽器容器,則不應在呼叫監聽器的執行緒上呼叫 `container.stop()`。這樣做總是會導致延遲和不必要的日誌訊息。相反,您應該將事件傳遞給另一個執行緒,然後由該執行緒來停止容器。