輪詢器

本節描述了 Spring Integration 中的輪詢如何工作。

輪詢消費者

當訊息端點(通道介面卡)連線到通道並例項化時,它們會生成以下例項之一

實際實現取決於這些端點連線的通道型別。連線到實現 org.springframework.messaging.SubscribableChannel 介面的通道的通道介面卡會生成 EventDrivenConsumer 例項。另一方面,連線到實現 org.springframework.messaging.PollableChannel 介面(例如 QueueChannel)的通道的通道介面卡會生成 PollingConsumer 例項。

輪詢消費者讓 Spring Integration 元件主動輪詢訊息,而不是以事件驅動的方式處理訊息。

它們在許多訊息傳遞場景中代表了一個關鍵的橫切關注點。在 Spring Integration 中,輪詢消費者基於 Gregor Hohpe 和 Bobby Woolf 所著的《企業整合模式》一書中描述的同名模式。您可以在本書網站上找到該模式的描述。

有關輪詢消費者配置的更多資訊,請參閱訊息端點

可輪詢訊息源

Spring Integration 提供了輪詢消費者模式的第二種變體。當使用入站通道介面卡時,這些介面卡通常被 SourcePollingChannelAdapter 包裝。例如,當從遠端 FTP 伺服器位置檢索訊息時,FTP 入站通道介面卡中描述的介面卡配置了一個輪詢器來定期檢索訊息。因此,當元件配置了輪詢器時,生成的例項是以下型別之一

這意味著輪詢器在入站和出站訊息傳遞場景中都使用。以下是一些使用輪詢器的用例

  • 輪詢某些外部系統,例如 FTP 伺服器、資料庫和 Web 服務

  • 輪詢內部(可輪詢)訊息通道

  • 輪詢內部服務(例如重複執行 Java 類上的方法)

AOP 通知類可以應用於輪詢器,在 advice-chain 中,例如啟動事務的事務通知。從版本 4.1 開始,提供了 PollSkipAdvice。輪詢器使用觸發器來確定下一次輪詢的時間。PollSkipAdvice 可用於抑制(跳過)輪詢,可能是因為存在一些下游條件會阻止訊息被處理。要使用此通知,您必須為其提供 PollSkipStrategy 的實現。從版本 4.2.5 開始,提供了 SimplePollSkipStrategy。要使用它,您可以將例項作為 bean 新增到應用程式上下文中,將其注入 PollSkipAdvice,並將其新增到輪詢器的通知鏈中。要跳過輪詢,請呼叫 skipPolls()。要恢復輪詢,請呼叫 reset()。版本 4.2 在這方面增加了更多靈活性。請參閱條件輪詢器

本章旨在僅對輪詢消費者以及它們如何適應訊息通道(參閱訊息通道)和通道介面卡(參閱通道介面卡)的概念進行高階概述。有關訊息端點(尤其是輪詢消費者)的更多資訊,請參閱訊息端點

延遲確認可輪詢訊息源

從版本 5.0.1 開始,某些模組提供 MessageSource 實現,支援在下游流完成(或將訊息交給另一個執行緒)之前延遲確認。目前這僅限於 AmqpMessageSourceKafkaMessageSource

對於這些訊息源,IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 標頭(參閱MessageHeaderAccessor API)被新增到訊息中。當與可輪詢訊息源一起使用時,標頭的值是 AcknowledgmentCallback 例項,如以下示例所示

@FunctionalInterface
public interface AcknowledgmentCallback extends SimpleAcknowledgment {

    void acknowledge(Status status);

    @Override
    default void acknowledge() {
        acknowledge(Status.ACCEPT);
    }

    default boolean isAcknowledged() {
        return false;
    }


    default void noAutoAck() {
        throw new UnsupportedOperationException("You cannot disable auto acknowledgment with this implementation");
    }

    default boolean isAutoAck() {
        return true;
    }

    enum Status {

        /**
         * Mark the message as accepted.
         */
        ACCEPT,

        /**
         * Mark the message as rejected.
         */
        REJECT,

        /**
         * Reject the message and requeue so that it will be redelivered.
         */
        REQUEUE

    }

}

並非所有訊息源(例如 KafkaMessageSource)都支援 REJECT 狀態。它被視為與 ACCEPT 相同。

應用程式可以隨時確認訊息,如以下示例所示

Message<?> received = source.receive();

...

StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
        .acknowledge(Status.ACCEPT);

如果 MessageSource 連線到 SourcePollingChannelAdapter,當輪詢器執行緒在下游流完成後返回到介面卡時,介面卡會檢查確認是否已確認,如果沒有,則將其狀態設定為 ACCEPT(如果流丟擲異常則設定為 REJECT)。狀態值在AcknowledgmentCallback.Status 列舉中定義。

Spring Integration 提供 MessageSourcePollingTemplate 來執行 MessageSource 的臨時輪詢。當 MessageHandler 回撥返回(或丟擲異常)時,這也負責在 AcknowledgmentCallback 上設定 ACCEPTREJECT。以下示例展示瞭如何使用 MessageSourcePollingTemplate 進行輪詢

MessageSourcePollingTemplate template =
    new MessageSourcePollingTemplate(this.source);
template.poll(h -> {
    ...
});

在兩種情況下(SourcePollingChannelAdapterMessageSourcePollingTemplate),您可以透過呼叫回撥上的 noAutoAck() 來停用自動確認/拒絕。如果您將訊息交給另一個執行緒並希望稍後確認,則可能會這樣做。並非所有實現都支援此功能,例如 Apache Kafka 不支援,因為偏移提交必須在同一個執行緒上執行。

訊息源的條件輪詢器

本節介紹如何使用條件輪詢器。

背景

輪詢器上的 advice-chain 中的 Advice 物件會通知整個輪詢任務(訊息檢索和處理)。這些“環繞通知”方法無法訪問輪詢的任何上下文——只有輪詢本身。這對於諸如使任務具有事務性或由於某些外部條件跳過輪詢等要求來說是沒問題的,如前所述。如果我們希望根據輪詢的 receive 部分的結果採取一些行動,或者我們希望根據條件調整輪詢器,該怎麼辦?對於這些情況,Spring Integration 提供了“智慧”輪詢。

“智慧”輪詢

版本 5.3 引入了 ReceiveMessageAdvice 介面。advice-chain 中實現此介面的任何 Advice 物件僅應用於 receive() 操作 - MessageSource.receive()PollableChannel.receive(timeout)。因此,它們只能應用於 SourcePollingChannelAdapterPollingConsumer。此類實現以下方法

  • beforeReceive(Object source) 此方法在 Object.receive() 方法之前呼叫。它允許您檢查和重新配置源。返回 false 會取消此輪詢(類似於前面提到的 PollSkipAdvice)。

  • Message<?> afterReceive(Message<?> result, Object source) 此方法在 receive() 方法之後呼叫。同樣,您可以重新配置源或採取任何行動(可能取決於結果,如果源沒有建立訊息,結果可能為 null)。您甚至可以返回不同的訊息

執行緒安全

如果 Advice 修改源,則不應使用 TaskExecutor 配置輪詢器。如果 Advice 修改源,則此類修改不是執行緒安全的,可能會導致意外結果,尤其是在高頻輪詢器的情況下。如果您需要併發處理輪詢結果,請考慮使用下游 ExecutorChannel 而不是向輪詢器新增執行器。

通知鏈順序

您應該瞭解在初始化期間如何處理通知鏈。不實現 ReceiveMessageAdviceAdvice 物件應用於整個輪詢過程,並且在任何 ReceiveMessageAdvice 之前按順序首先呼叫。然後 ReceiveMessageAdvice 物件在源 receive() 方法周圍按順序呼叫。例如,如果您有 Advice 物件 a, b, c, d,其中 bdReceiveMessageAdvice,則物件按以下順序應用:a, c, b, d。此外,如果源已經是 Proxy,則在任何現有 Advice 物件之後呼叫 ReceiveMessageAdvice。如果您希望更改順序,則必須自己連線代理。

SimpleActiveIdleReceiveMessageAdvice

此通知是 ReceiveMessageAdvice 的簡單實現。當與 DynamicPeriodicTrigger 結合使用時,它會根據上一次輪詢是否產生訊息來調整輪詢頻率。輪詢器還必須引用相同的 DynamicPeriodicTrigger

重要提示:非同步移交
SimpleActiveIdleReceiveMessageAdvice 根據 receive() 結果修改觸發器。這僅當在輪詢器執行緒上呼叫通知時才有效。如果輪詢器有 task-executor,則不起作用。要在您希望在輪詢結果之後使用非同步操作的情況下使用此通知,請稍後執行非同步移交,例如透過使用 ExecutorChannel

CompoundTriggerAdvice

此通知允許根據輪詢是否返回訊息來選擇兩個觸發器中的一個。考慮一個使用 CronTrigger 的輪詢器。CronTrigger 例項是不可變的,因此一旦構造就無法更改。考慮一個用例,我們希望使用 cron 表示式每小時觸發一次輪詢,但如果沒有收到訊息,則每分鐘輪詢一次,當檢索到訊息時,恢復使用 cron 表示式。

為此,通知(和輪詢器)使用 CompoundTrigger。觸發器的 primary 觸發器可以是 CronTrigger。當通知檢測到未收到訊息時,它會將輔助觸發器新增到 CompoundTrigger。當呼叫 CompoundTrigger 例項的 nextExecutionTime 方法時,如果存在,它會委託給輔助觸發器。否則,它會委託給主要觸發器。

輪詢器還必須引用相同的 CompoundTrigger

以下示例顯示了每小時 cron 表示式的配置,並帶有回退到每分鐘

<int:inbound-channel-adapter channel="nullChannel" auto-startup="false">
    <bean class="org.springframework.integration.endpoint.PollerAdviceTests.Source" />
    <int:poller trigger="compoundTrigger">
        <int:advice-chain>
            <bean class="org.springframework.integration.aop.CompoundTriggerAdvice">
                <constructor-arg ref="compoundTrigger"/>
                <constructor-arg ref="secondary"/>
            </bean>
        </int:advice-chain>
    </int:poller>
</int:inbound-channel-adapter>

<bean id="compoundTrigger" class="org.springframework.integration.util.CompoundTrigger">
    <constructor-arg ref="primary" />
</bean>

<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
    <constructor-arg value="0 0 * * * *" /> <!-- top of every hour -->
</bean>

<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">
    <constructor-arg value="60000" />
</bean>
重要提示:非同步移交
CompoundTriggerAdvice 根據 receive() 結果修改觸發器。這僅當在輪詢器執行緒上呼叫通知時才有效。如果輪詢器有 task-executor,則不起作用。要在您希望在輪詢結果之後使用非同步操作的情況下使用此通知,請稍後執行非同步移交,例如透過使用 ExecutorChannel

僅限 MessageSource 的通知

某些通知可能僅適用於 MessageSource.receive(),它們對 PollableChannel 沒有意義。為此,仍然存在 MessageSourceMutator 介面(ReceiveMessageAdvice 的擴充套件)。有關更多資訊,請參閱入站通道介面卡:輪詢多個伺服器和目錄

© . This site is unofficial and not affiliated with VMware.