Poller
本節介紹 Spring Integration 中的輪詢工作方式。
輪詢消費者
當訊息端點(通道介面卡)連線到通道並例項化時,它們會生成以下例項之一
實際實現取決於這些端點連線到的通道型別。連線到實現了 org.springframework.messaging.SubscribableChannel
介面的通道的通道介面卡會生成 EventDrivenConsumer
的例項。另一方面,連線到實現了 org.springframework.messaging.PollableChannel
介面(例如 QueueChannel
)的通道的通道介面卡會生成 PollingConsumer
的例項。
輪詢消費者允許 Spring Integration 元件主動輪詢訊息,而不是以事件驅動的方式處理訊息。
它們在許多訊息傳遞場景中是一個關鍵的橫切關注點。在 Spring Integration 中,輪詢消費者基於同名的模式,該模式在 Gregor Hohpe 和 Bobby Woolf 所著的《企業整合模式》一書中有所描述。您可以在 該書的網站上找到該模式的描述。
有關輪詢消費者配置的更多資訊,請參閱 訊息端點。
可輪詢訊息源
Spring Integration 提供了輪詢消費者模式的第二種變體。當使用入站通道介面卡時,這些介面卡通常被 SourcePollingChannelAdapter
包裝。例如,當從遠端 FTP 伺服器位置檢索訊息時,FTP 入站通道介面卡 中描述的介面卡會配置一個 poller 來定期檢索訊息。因此,當元件配置了 pollers 時,生成的例項屬於以下型別之一
這意味著 pollers 在入站和出站訊息傳遞場景中都會使用。以下是使用 pollers 的一些用例
-
輪詢某些外部系統,例如 FTP 伺服器、資料庫和 Web Services
-
輪詢內部(可輪詢)訊息通道
-
輪詢內部服務(例如重複執行 Java 類中的方法)
AOP advice 類可以應用於 pollers 的 advice-chain 中,例如事務 advice 用於啟動事務。從版本 4.1 開始,提供了 PollSkipAdvice 。Pollers 使用 triggers 來確定下一次輪詢的時間。可以使用 PollSkipAdvice 來抑制(跳過)一次輪詢,也許是因為存在某些下游條件會阻止訊息被處理。要使用此 advice,您必須提供一個 PollSkipStrategy 的實現。從版本 4.2.5 開始,提供了 SimplePollSkipStrategy 。要使用它,您可以將其例項作為 bean 新增到應用上下文,注入到 PollSkipAdvice 中,並將其新增到 poller 的 advice 鏈中。要跳過輪詢,請呼叫 skipPolls() 。要恢復輪詢,請呼叫 reset() 。版本 4.2 在這方面增加了更多靈活性。請參閱 條件輪詢器。 |
延遲確認可輪詢訊息源
從版本 5.0.1 開始,某些模組提供了支援延遲確認的 MessageSource
實現,直到下游流完成(或將訊息交給另一個執行緒)。目前這僅限於 AmqpMessageSource
和 KafkaMessageSource
。
對於這些訊息源,會將 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
頭部(參見 MessageHeaderAccessor
API)新增到訊息中。當與可輪詢訊息源一起使用時,此頭部的值是 AcknowledgmentCallback
的例項,如下例所示
@FunctionalInterface
public interface AcknowledgmentCallback {
void acknowledge(Status status);
boolean isAcknowledged();
void noAutoAck();
default boolean isAutoAck();
enum Status {
/**
* Mark the message as accepted.
*/
ACCEPT,
/**
* Mark the message as rejected.
*/
REJECT,
/**
* Reject the message and requeue so that it will be redelivered.
*/
REQUEUE
}
}
並非所有訊息源(例如 KafkaMessageSource
)都支援 REJECT
狀態。它被視為與 ACCEPT
相同。
應用可以在任何時候確認訊息,如下例所示
Message<?> received = source.receive();
...
StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
.acknowledge(Status.ACCEPT);
如果 MessageSource
連線到 SourcePollingChannelAdapter
,當 poller 執行緒在下游流完成後返回到介面卡時,介面卡會檢查確認是否已完成,如果未完成,則將其狀態設定為 ACCEPT
(如果流丟擲異常,則設定為 REJECT
)。狀態值在 AcknowledgmentCallback.Status
列舉中定義。
Spring Integration 提供了 MessageSourcePollingTemplate
來執行對 MessageSource
的即時輪詢。當 MessageHandler
回撥返回(或丟擲異常)時,這也負責在 AcknowledgmentCallback
上設定 ACCEPT
或 REJECT
。以下示例展示瞭如何使用 MessageSourcePollingTemplate
進行輪詢
MessageSourcePollingTemplate template =
new MessageSourcePollingTemplate(this.source);
template.poll(h -> {
...
});
在這兩種情況(SourcePollingChannelAdapter
和 MessageSourcePollingTemplate
)下,您都可以透過呼叫回撥上的 noAutoAck()
方法來停用自動 ack/nack。如果您將訊息交給另一個執行緒並希望稍後進行確認,則可能會這樣做。並非所有實現都支援此功能(例如,Apache Kafka 不支援,因為偏移量提交必須在同一執行緒上執行)。
訊息源的條件輪詢器
本節介紹如何使用條件輪詢器。
背景
Poller 的 advice-chain
中的 Advice
物件會建議整個輪詢任務(包括訊息檢索和處理)。這些“around advice”方法無法訪問輪詢的任何上下文——只能訪問輪詢本身。這對於諸如使任務具有事務性或由於某些外部條件跳過輪詢(如前所述)等要求而言是可以接受的。如果我們希望根據輪詢的 receive
部分的結果採取某些操作,或者如果我們要根據條件調整 poller,該怎麼辦?對於這些情況,Spring Integration 提供了“智慧”輪詢。
“智慧”輪詢
版本 5.3 引入了 ReceiveMessageAdvice
介面。advice-chain
中任何實現了此介面的 Advice
物件僅應用於 receive()
操作 - MessageSource.receive()
和 PollableChannel.receive(timeout)
。因此,它們只能應用於 SourcePollingChannelAdapter
或 PollingConsumer
。此類實現了以下方法
-
beforeReceive(Object source)
此方法在Object.receive()
方法之前呼叫。它允許您檢查和重新配置源。返回false
將取消此輪詢(類似於前面提到的PollSkipAdvice
)。 -
Message<?> afterReceive(Message<?> result, Object source)
此方法在receive()
方法之後呼叫。同樣,您可以重新配置源或採取任何操作(可能取決於結果,如果源未建立訊息,結果可能為null
)。您甚至可以返回不同的訊息
執行緒安全
如果 |
Advice 鏈排序
您應該瞭解 advice 鏈在初始化期間的處理方式。未實現 |
SimpleActiveIdleReceiveMessageAdvice
此 advice 是 ReceiveMessageAdvice
的簡單實現。當與 DynamicPeriodicTrigger
結合使用時,它會根據前一次輪詢是否收到訊息來調整輪詢頻率。poller 還必須引用同一個 DynamicPeriodicTrigger
。
重要:非同步移交
SimpleActiveIdleReceiveMessageAdvice 會根據 receive() 結果修改 trigger。這僅在 advice 在 poller 執行緒上呼叫時有效。如果 poller 有 task-executor 則無效。要在希望在輪詢結果後使用非同步操作時使用此 advice,請稍後進行非同步移交,例如透過使用 ExecutorChannel 。 |
CompoundTriggerAdvice
此 advice 允許根據輪詢是否返回訊息來選擇兩個 triggers 中的一個。考慮一個使用 CronTrigger
的 poller。CronTrigger
例項是不可變的,因此一旦構造就無法更改。考慮一個用例:我們想使用 cron 表示式每小時觸發一次輪詢,但如果未收到訊息,則每分鐘輪詢一次,並在檢索到訊息時恢復使用 cron 表示式。
Advice(和 poller)為此目的使用 CompoundTrigger
。觸發器的 primary
trigger 可以是 CronTrigger
。當 advice 檢測到未收到訊息時,它會將 secondary trigger 新增到 CompoundTrigger
。當呼叫 CompoundTrigger
例項的 nextExecutionTime
方法時,如果存在 secondary trigger,則委託給它。否則,委託給 primary trigger。
Poller 還必須引用同一個 CompoundTrigger
。
以下示例展示了每小時 cron 表示式配置,並帶有回退到每分鐘輪詢的設定
<int:inbound-channel-adapter channel="nullChannel" auto-startup="false">
<bean class="org.springframework.integration.endpoint.PollerAdviceTests.Source" />
<int:poller trigger="compoundTrigger">
<int:advice-chain>
<bean class="org.springframework.integration.aop.CompoundTriggerAdvice">
<constructor-arg ref="compoundTrigger"/>
<constructor-arg ref="secondary"/>
</bean>
</int:advice-chain>
</int:poller>
</int:inbound-channel-adapter>
<bean id="compoundTrigger" class="org.springframework.integration.util.CompoundTrigger">
<constructor-arg ref="primary" />
</bean>
<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
<constructor-arg value="0 0 * * * *" /> <!-- top of every hour -->
</bean>
<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">
<constructor-arg value="60000" />
</bean>
重要:非同步移交
CompoundTriggerAdvice 會根據 receive() 結果修改 trigger。這僅在 advice 在 poller 執行緒上呼叫時有效。如果 poller 有 task-executor 則無效。要在希望在輪詢結果後使用非同步操作時使用此 advice,請稍後進行非同步移交,例如透過使用 ExecutorChannel 。 |
僅限 MessageSource 的 Advice
有些 advices 可能僅適用於 MessageSource.receive()
,而對於 PollableChannel
則沒有意義。為此,仍然存在 MessageSourceMutator
介面(ReceiveMessageAdvice
的擴充套件)。更多資訊請參閱 入站通道介面卡:輪詢多個伺服器和目錄。