延遲器
延遲器是一個簡單的端點,它允許訊息流按一定間隔進行延遲。當訊息被延遲時,原始傳送者不會被阻塞。相反,延遲的訊息會由 org.springframework.scheduling.TaskScheduler 例項進行排程,在延遲時間過後傳送到輸出通道。這種方法即使對於相當長的延遲也具有可伸縮性,因為它不會導致大量傳送執行緒被阻塞。相反,在典型情況下,會使用執行緒池來實際執行訊息的釋放。本節包含配置延遲器的幾個示例。
配置延遲器
<delayer> 元素用於延遲兩個訊息通道之間的訊息流。與其他端點一樣,你可以提供 'input-channel' 和 'output-channel' 屬性,但延遲器還具有 'default-delay' 和 'expression' 屬性(以及 'expression' 元素),它們決定了每條訊息應該延遲的毫秒數。以下示例將所有訊息延遲三秒。
<int:delayer id="delayer" input-channel="input"
default-delay="3000" output-channel="output"/>
如果你需要為每條訊息確定延遲,你還可以使用 'expression' 屬性提供 SpEL 表示式,如下所示:
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from("input")
.delay(d -> d
.messageGroupId("delayer.messageGroupId")
.defaultDelay(3_000L)
.delayExpression("headers['delay']"))
.channel("output")
.get();
}
@Bean
fun flow() =
integrationFlow("input") {
delay {
messageGroupId("delayer.messageGroupId")
defaultDelay(3000L)
delayExpression("headers['delay']")
}
channel("output")
}
@ServiceActivator(inputChannel = "input")
@Bean
public DelayHandler delayer() {
DelayHandler handler = new DelayHandler("delayer.messageGroupId");
handler.setDefaultDelay(3_000L);
handler.setDelayExpressionString("headers['delay']");
handler.setOutputChannelName("output");
return handler;
}
<int:delayer id="delayer" input-channel="input" output-channel="output"
default-delay="3000" expression="headers['delay']"/>
在前面的示例中,三秒的延遲僅適用於當表示式對給定入站訊息計算結果為 null 時。如果你只想將延遲應用於表示式評估結果有效的訊息,你可以使用 0(預設值)作為 'default-delay'。對於任何延遲為 0(或更少)的訊息,訊息會立即在呼叫執行緒上傳送。
XML 解析器使用 <beanName>.messageGroupId 作為訊息組 ID。 |
延遲處理器支援表示毫秒間隔的表示式評估結果(任何 Object,其 toString() 方法產生的值可以解析為 Long)以及表示絕對時間的 java.util.Date 例項。在第一種情況下,毫秒是從當前時間開始計算的,例如,值 5000 會將訊息從延遲器接收時延遲至少五秒。對於 Date 例項,訊息直到該 Date 物件表示的時間才會被釋放。等於非正延遲或過去日期的值不會導致延遲。相反,它會直接在原始傳送者的執行緒上傳送到輸出通道。如果表示式評估結果不是 Date 且無法解析為 Long,則應用預設延遲(如果有 — 預設值為 0)。 |
表示式評估可能會由於各種原因丟擲評估異常,包括無效表示式或其他情況。預設情況下,此類異常會被忽略(儘管在 DEBUG 級別記錄),並且延遲器會回退到預設延遲(如果有)。你可以透過設定 ignore-expression-failures 屬性來修改此行為。預設情況下,此屬性設定為 true,延遲器行為如前所述。但是,如果你不想忽略表示式評估異常並將其拋給延遲器的呼叫者,請將 ignore-expression-failures 屬性設定為 false。 |
|
在前面的示例中,延遲表示式被指定為
因此,如果訊息頭可能被省略,並且你希望回退到預設延遲,通常更有效(且推薦)使用索引器語法而不是點屬性訪問器語法,因為檢測 null 比捕獲異常更快。 |
延遲器委託給 Spring 的 TaskScheduler 抽象例項。延遲器使用的預設排程器是 Spring Integration 在啟動時提供的 ThreadPoolTaskScheduler 例項。請參閱 配置任務排程器。如果你想委託給不同的排程器,可以透過延遲器元素的 'scheduler' 屬性提供一個引用,如下例所示:
<int:delayer id="delayer" input-channel="input" output-channel="output"
expression="headers.delay"
scheduler="exampleTaskScheduler"/>
<task:scheduler id="exampleTaskScheduler" pool-size="3"/>
如果配置外部 ThreadPoolTaskScheduler,可以將其屬性 waitForTasksToCompleteOnShutdown 設定為 true。這允許在應用程式關閉時,已經處於執行狀態(釋放訊息)的“延遲”任務成功完成。在 Spring Integration 2.2 之前,此屬性在 <delayer> 元素上可用,因為 DelayHandler 可以在後臺建立自己的排程器。自 2.2 版本起,延遲器需要一個外部排程器例項,並且 waitForTasksToCompleteOnShutdown 已被刪除。你應該使用排程器自己的配置。 |
ThreadPoolTaskScheduler 有一個 errorHandler 屬性,可以注入 org.springframework.util.ErrorHandler 的某個實現。此處理程式允許處理來自排程任務執行緒傳送延遲訊息的 Exception。預設情況下,它使用 org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler,你可以在日誌中看到堆疊跟蹤。你可能需要考慮使用 org.springframework.integration.channel.MessagePublishingErrorHandler,它將 ErrorMessage 傳送到 error-channel,可以是從失敗訊息的頭部獲取的通道,也可以是預設的 error-channel。此錯誤處理在事務回滾(如果存在)之後執行。請參閱 釋出失敗。 |
延遲器和訊息儲存
DelayHandler 將延遲訊息持久化到所提供的 MessageStore 中的訊息組中。('groupId' 基於 <delayer> 元素必需的 'id' 屬性。另請參閱 DelayHandler.setMessageGroupId(String)。)延遲訊息在排程任務將訊息傳送到 output-channel 之前立即從 MessageStore 中刪除。如果提供的 MessageStore 是持久化的(例如 JdbcMessageStore),它提供了在應用程式關閉時不會丟失訊息的能力。應用程式啟動後,DelayHandler 從 MessageStore 中的訊息組讀取訊息,並根據訊息的原始到達時間(如果延遲是數字)重新排程它們。對於延遲訊息頭是 Date 的訊息,在重新排程時使用該 Date。如果延遲訊息在 MessageStore 中停留的時間超過其“延遲”,它將在啟動後立即傳送。messageGroupId 是必需的,不能依賴可能生成的 DelayHandler bean 名稱。這樣,在應用程式重新啟動後,DelayHandler 可能會獲得新的生成的 bean 名稱。因此,延遲訊息可能會從重新排程中丟失,因為它們的組不再由應用程式管理。
<delayer> 可以用兩個互斥的元素中的任意一個進行豐富:<transactional> 和 <advice-chain>。這些 AOP 建議列表應用於代理的內部 DelayHandler.ReleaseMessageHandler,後者負責在延遲後在排程任務的 Thread 上釋放訊息。例如,當一個下游訊息流丟擲異常並且 ReleaseMessageHandler 的事務回滾時,可以使用它。在這種情況下,延遲訊息保留在持久化的 MessageStore 中。你可以在 <advice-chain> 中使用任何自定義的 org.aopalliance.aop.Advice 實現。<transactional> 元素定義了一個簡單的建議鏈,其中只包含事務建議。以下示例顯示了 <delayer> 中的 advice-chain:
<int:delayer id="delayer" input-channel="input" output-channel="output"
expression="headers.delay"
message-store="jdbcMessageStore">
<int:advice-chain>
<beans:ref bean="customAdviceBean"/>
<tx:advice>
<tx:attributes>
<tx:method name="*" read-only="true"/>
</tx:attributes>
</tx:advice>
</int:advice-chain>
</int:delayer>
DelayHandler 可以作為 JMX MBean 匯出,並帶有受管操作(getDelayedMessageCount 和 reschedulePersistedMessages),這允許在執行時重新排程延遲的持久化訊息 — 例如,如果 TaskScheduler 之前已停止。這些操作可以透過 Control Bus 命令呼叫,如下例所示:
Message<String> delayerReschedulingMessage =
MessageBuilder.withPayload("'delayer.handler'.reschedulePersistedMessages").build();
controlBusChannel.send(delayerReschedulingMessage);
| 有關訊息儲存、JMX 和控制匯流排的更多資訊,請參閱 系統管理。 |
從 5.3.7 版本開始,如果在將訊息儲存到 MessageStore 時存在活動事務,則釋放任務會在 TransactionSynchronization.afterCommit() 回撥中排程。這是為了防止競態條件,即排程釋放可能在事務提交之前執行,從而導致找不到訊息。在這種情況下,訊息將在延遲之後或事務提交之後釋放,以較晚者為準。
釋出失敗
從 5.0.8 版本開始,延遲器有兩個新屬性:
-
maxAttempts(預設 5) -
retryDelay(預設 1 秒)
當訊息被釋放時,如果下游流失敗,釋放將在 retryDelay 之後重試。如果達到 maxAttempts,訊息將被丟棄(除非釋放是事務性的,在這種情況下,訊息將保留在儲存中,但不再安排釋放,直到應用程式重新啟動,或者如上所述呼叫 reschedulePersistedMessages() 方法)。
此外,你可以配置 delayedMessageErrorChannel;當釋出失敗時,ErrorMessage 會發送到該通道,其中包含異常作為有效負載,並具有 originalMessage 屬性。ErrorMessage 包含一個訊息頭 IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT,其中包含當前嘗試次數。
如果錯誤流消費了錯誤訊息並正常退出,則不採取進一步操作;如果釋放是事務性的,事務將提交,並且訊息將從儲存中刪除。如果錯誤流丟擲異常,則將重試釋出,最多達到 maxAttempts,如上所述。