Poller

本節介紹 Spring Integration 中的輪詢工作方式。

輪詢消費者

當訊息端點(通道介面卡)連線到通道並例項化時,它們會生成以下例項之一

實際實現取決於這些端點連線到的通道型別。連線到實現了 org.springframework.messaging.SubscribableChannel 介面的通道的通道介面卡會生成 EventDrivenConsumer 的例項。另一方面,連線到實現了 org.springframework.messaging.PollableChannel 介面(例如 QueueChannel)的通道的通道介面卡會生成 PollingConsumer 的例項。

輪詢消費者允許 Spring Integration 元件主動輪詢訊息,而不是以事件驅動的方式處理訊息。

它們在許多訊息傳遞場景中是一個關鍵的橫切關注點。在 Spring Integration 中,輪詢消費者基於同名的模式,該模式在 Gregor Hohpe 和 Bobby Woolf 所著的《企業整合模式》一書中有所描述。您可以在 該書的網站上找到該模式的描述。

有關輪詢消費者配置的更多資訊,請參閱 訊息端點

可輪詢訊息源

Spring Integration 提供了輪詢消費者模式的第二種變體。當使用入站通道介面卡時,這些介面卡通常被 SourcePollingChannelAdapter 包裝。例如,當從遠端 FTP 伺服器位置檢索訊息時,FTP 入站通道介面卡 中描述的介面卡會配置一個 poller 來定期檢索訊息。因此,當元件配置了 pollers 時,生成的例項屬於以下型別之一

這意味著 pollers 在入站和出站訊息傳遞場景中都會使用。以下是使用 pollers 的一些用例

  • 輪詢某些外部系統,例如 FTP 伺服器、資料庫和 Web Services

  • 輪詢內部(可輪詢)訊息通道

  • 輪詢內部服務(例如重複執行 Java 類中的方法)

AOP advice 類可以應用於 pollers 的 advice-chain 中,例如事務 advice 用於啟動事務。從版本 4.1 開始,提供了 PollSkipAdvice。Pollers 使用 triggers 來確定下一次輪詢的時間。可以使用 PollSkipAdvice 來抑制(跳過)一次輪詢,也許是因為存在某些下游條件會阻止訊息被處理。要使用此 advice,您必須提供一個 PollSkipStrategy 的實現。從版本 4.2.5 開始,提供了 SimplePollSkipStrategy。要使用它,您可以將其例項作為 bean 新增到應用上下文,注入到 PollSkipAdvice 中,並將其新增到 poller 的 advice 鏈中。要跳過輪詢,請呼叫 skipPolls()。要恢復輪詢,請呼叫 reset()。版本 4.2 在這方面增加了更多靈活性。請參閱 條件輪詢器

本章旨在提供輪詢消費者的高階概述,以及它們如何融入訊息通道(參見 訊息通道)和通道介面卡(參見 通道介面卡)的概念。有關訊息端點(特別是輪詢消費者)的更多資訊,請參閱 訊息端點

延遲確認可輪詢訊息源

從版本 5.0.1 開始,某些模組提供了支援延遲確認的 MessageSource 實現,直到下游流完成(或將訊息交給另一個執行緒)。目前這僅限於 AmqpMessageSourceKafkaMessageSource

對於這些訊息源,會將 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 頭部(參見 MessageHeaderAccessor API)新增到訊息中。當與可輪詢訊息源一起使用時,此頭部的值是 AcknowledgmentCallback 的例項,如下例所示

@FunctionalInterface
public interface AcknowledgmentCallback {

    void acknowledge(Status status);

    boolean isAcknowledged();

    void noAutoAck();

    default boolean isAutoAck();

    enum Status {

        /**
         * Mark the message as accepted.
         */
        ACCEPT,

        /**
         * Mark the message as rejected.
         */
        REJECT,

        /**
         * Reject the message and requeue so that it will be redelivered.
         */
        REQUEUE

    }

}

並非所有訊息源(例如 KafkaMessageSource)都支援 REJECT 狀態。它被視為與 ACCEPT 相同。

應用可以在任何時候確認訊息,如下例所示

Message<?> received = source.receive();

...

StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
        .acknowledge(Status.ACCEPT);

如果 MessageSource 連線到 SourcePollingChannelAdapter,當 poller 執行緒在下游流完成後返回到介面卡時,介面卡會檢查確認是否已完成,如果未完成,則將其狀態設定為 ACCEPT(如果流丟擲異常,則設定為 REJECT)。狀態值在 AcknowledgmentCallback.Status 列舉中定義。

Spring Integration 提供了 MessageSourcePollingTemplate 來執行對 MessageSource 的即時輪詢。當 MessageHandler 回撥返回(或丟擲異常)時,這也負責在 AcknowledgmentCallback 上設定 ACCEPTREJECT。以下示例展示瞭如何使用 MessageSourcePollingTemplate 進行輪詢

MessageSourcePollingTemplate template =
    new MessageSourcePollingTemplate(this.source);
template.poll(h -> {
    ...
});

在這兩種情況(SourcePollingChannelAdapterMessageSourcePollingTemplate)下,您都可以透過呼叫回撥上的 noAutoAck() 方法來停用自動 ack/nack。如果您將訊息交給另一個執行緒並希望稍後進行確認,則可能會這樣做。並非所有實現都支援此功能(例如,Apache Kafka 不支援,因為偏移量提交必須在同一執行緒上執行)。

訊息源的條件輪詢器

本節介紹如何使用條件輪詢器。

背景

Poller 的 advice-chain 中的 Advice 物件會建議整個輪詢任務(包括訊息檢索和處理)。這些“around advice”方法無法訪問輪詢的任何上下文——只能訪問輪詢本身。這對於諸如使任務具有事務性或由於某些外部條件跳過輪詢(如前所述)等要求而言是可以接受的。如果我們希望根據輪詢的 receive 部分的結果採取某些操作,或者如果我們要根據條件調整 poller,該怎麼辦?對於這些情況,Spring Integration 提供了“智慧”輪詢。

“智慧”輪詢

版本 5.3 引入了 ReceiveMessageAdvice 介面。advice-chain 中任何實現了此介面的 Advice 物件僅應用於 receive() 操作 - MessageSource.receive()PollableChannel.receive(timeout)。因此,它們只能應用於 SourcePollingChannelAdapterPollingConsumer。此類實現了以下方法

  • beforeReceive(Object source) 此方法在 Object.receive() 方法之前呼叫。它允許您檢查和重新配置源。返回 false 將取消此輪詢(類似於前面提到的 PollSkipAdvice)。

  • Message<?> afterReceive(Message<?> result, Object source) 此方法在 receive() 方法之後呼叫。同樣,您可以重新配置源或採取任何操作(可能取決於結果,如果源未建立訊息,結果可能為 null)。您甚至可以返回不同的訊息

執行緒安全

如果 Advice 修改了源,則不應為 poller 配置 TaskExecutor。如果 Advice 修改了源,這種修改不是執行緒安全的,可能會導致意外結果,尤其是在高頻率的 pollers 中。如果您需要併發處理輪詢結果,請考慮使用下游的 ExecutorChannel,而不是向 poller 新增 executor。

Advice 鏈排序

您應該瞭解 advice 鏈在初始化期間的處理方式。未實現 ReceiveMessageAdviceAdvice 物件應用於整個輪詢過程,並在任何 ReceiveMessageAdvice 之前按順序全部呼叫。然後,ReceiveMessageAdvice 物件圍繞源的 receive() 方法按順序呼叫。例如,如果您有 Advice 物件 a, b, c, d,其中 bdReceiveMessageAdvice,則這些物件按以下順序應用:a, c, b, d。此外,如果源已經是一個 Proxy,則 ReceiveMessageAdvice 會在任何現有 Advice 物件之後呼叫。如果您希望更改順序,則必須自己配置代理。

SimpleActiveIdleReceiveMessageAdvice

此 advice 是 ReceiveMessageAdvice 的簡單實現。當與 DynamicPeriodicTrigger 結合使用時,它會根據前一次輪詢是否收到訊息來調整輪詢頻率。poller 還必須引用同一個 DynamicPeriodicTrigger

重要:非同步移交
SimpleActiveIdleReceiveMessageAdvice 會根據 receive() 結果修改 trigger。這僅在 advice 在 poller 執行緒上呼叫時有效。如果 poller 有 task-executor 則無效。要在希望在輪詢結果後使用非同步操作時使用此 advice,請稍後進行非同步移交,例如透過使用 ExecutorChannel

CompoundTriggerAdvice

此 advice 允許根據輪詢是否返回訊息來選擇兩個 triggers 中的一個。考慮一個使用 CronTrigger 的 poller。CronTrigger 例項是不可變的,因此一旦構造就無法更改。考慮一個用例:我們想使用 cron 表示式每小時觸發一次輪詢,但如果未收到訊息,則每分鐘輪詢一次,並在檢索到訊息時恢復使用 cron 表示式。

Advice(和 poller)為此目的使用 CompoundTrigger。觸發器的 primary trigger 可以是 CronTrigger。當 advice 檢測到未收到訊息時,它會將 secondary trigger 新增到 CompoundTrigger。當呼叫 CompoundTrigger 例項的 nextExecutionTime 方法時,如果存在 secondary trigger,則委託給它。否則,委託給 primary trigger。

Poller 還必須引用同一個 CompoundTrigger

以下示例展示了每小時 cron 表示式配置,並帶有回退到每分鐘輪詢的設定

<int:inbound-channel-adapter channel="nullChannel" auto-startup="false">
    <bean class="org.springframework.integration.endpoint.PollerAdviceTests.Source" />
    <int:poller trigger="compoundTrigger">
        <int:advice-chain>
            <bean class="org.springframework.integration.aop.CompoundTriggerAdvice">
                <constructor-arg ref="compoundTrigger"/>
                <constructor-arg ref="secondary"/>
            </bean>
        </int:advice-chain>
    </int:poller>
</int:inbound-channel-adapter>

<bean id="compoundTrigger" class="org.springframework.integration.util.CompoundTrigger">
    <constructor-arg ref="primary" />
</bean>

<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
    <constructor-arg value="0 0 * * * *" /> <!-- top of every hour -->
</bean>

<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">
    <constructor-arg value="60000" />
</bean>
重要:非同步移交
CompoundTriggerAdvice 會根據 receive() 結果修改 trigger。這僅在 advice 在 poller 執行緒上呼叫時有效。如果 poller 有 task-executor 則無效。要在希望在輪詢結果後使用非同步操作時使用此 advice,請稍後進行非同步移交,例如透過使用 ExecutorChannel

僅限 MessageSource 的 Advice

有些 advices 可能僅適用於 MessageSource.receive(),而對於 PollableChannel 則沒有意義。為此,仍然存在 MessageSourceMutator 介面(ReceiveMessageAdvice 的擴充套件)。更多資訊請參閱 入站通道介面卡:輪詢多個伺服器和目錄