延遲器

延遲器 (delayer) 是一種簡單的端點,允許訊息流延遲一定的間隔。當訊息被延遲時,原始傳送者不會阻塞。相反,延遲的訊息會使用 org.springframework.scheduling.TaskScheduler 例項進行排程,以便在延遲時間過後傳送到輸出通道。這種方法即使對於相當長的延遲也具有可伸縮性,因為它不會導致大量傳送者執行緒被阻塞。相反,在典型情況下,會使用執行緒池來實際執行訊息釋放。本節包含配置延遲器的一些示例。

配置延遲器

<delayer> 元素用於延遲兩個訊息通道之間的訊息流。與其他端點一樣,你可以提供 'input-channel' 和 'output-channel' 屬性,但延遲器還有 'default-delay' 和 'expression' 屬性(以及 'expression' 元素),用於確定每條訊息應延遲的毫秒數。以下示例將所有訊息延遲三秒

<int:delayer id="delayer" input-channel="input"
             default-delay="3000" output-channel="output"/>

如果你需要為每條訊息確定延遲時間,還可以使用 'expression' 屬性提供 SpEL 表示式,如下面的表示式所示

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@Bean
public IntegrationFlow flow() {
    return IntegrationFlow.from("input")
            .delay(d -> d
                    .messageGroupId("delayer.messageGroupId")
                    .defaultDelay(3_000L)
                    .delayExpression("headers['delay']"))
            .channel("output")
            .get();
}
@Bean
fun flow() =
    integrationFlow("input") {
        delay {
            messageGroupId("delayer.messageGroupId")
            defaultDelay(3000L)
            delayExpression("headers['delay']")
        }
        channel("output")
    }
@ServiceActivator(inputChannel = "input")
@Bean
public DelayHandler delayer() {
    DelayHandler handler = new DelayHandler("delayer.messageGroupId");
    handler.setDefaultDelay(3_000L);
    handler.setDelayExpressionString("headers['delay']");
    handler.setOutputChannelName("output");
    return handler;
}
<int:delayer id="delayer" input-channel="input" output-channel="output"
             default-delay="3000" expression="headers['delay']"/>

在前面的示例中,只有當給定入站訊息的表示式評估結果為 null 時,才會應用三秒的延遲。如果你只想對錶達式評估結果有效的訊息應用延遲,可以使用 0 作為 'default-delay'(這是預設值)。對於任何延遲為 0(或更少)的訊息,訊息會立即在呼叫執行緒上傳送。

XML 解析器使用訊息組 ID <beanName>.messageGroupId
延遲處理器支援表示毫秒間隔的表示式評估結果(任何其 toString() 方法生成可解析為 Long 值)以及表示絕對時間的 java.util.Date 例項。在第一種情況下,毫秒數從當前時間開始計算(例如,值 5000 會將訊息從延遲器接收到時起延遲至少五秒)。使用 Date 例項時,訊息直到該 Date 物件表示的時間才會釋放。一個非正延遲或過去的 Date 值不會導致延遲。相反,它會直接在原始傳送者的執行緒上傳送到輸出通道。如果表示式評估結果不是 Date 且無法解析為 Long,則應用預設延遲(如果有,預設值為 0)。
表示式評估可能因各種原因丟擲評估異常,包括無效表示式或其他條件。預設情況下,此類異常會被忽略(儘管會在 DEBUG 級別記錄日誌),並且延遲器會回退到預設延遲(如果有)。你可以透過設定 ignore-expression-failures 屬性來修改此行為。預設情況下,此屬性設定為 true,並且延遲器行為如前所述。但是,如果你不想忽略表示式評估異常並將它們拋給延遲器的呼叫者,請將 ignore-expression-failures 屬性設定為 false

在前面的示例中,延遲表示式指定為 headers['delay']。這是 SpEL Indexer 語法,用於訪問 Map 元素(MessageHeaders 實現了 Map)。它呼叫:headers.get("delay")。對於簡單的 map 元素名稱(不包含 '.'),你也可以使用 SpEL 的“點訪問器”語法,如前面所示的 header 表示式可以指定為 headers.delay。但是,如果 header 丟失,則會得到不同的結果。

 org.springframework.expression.spel.SpelEvaluationException: EL1008E:(pos 8):
		   Field or property 'delay' cannot be found on object of type 'org.springframework.messaging.MessageHeaders'

在第一種情況下,表示式評估為 null。第二種情況會導致類似如下的結果

因此,如果 header 有可能被省略並且你想回退到預設延遲,通常使用索引器語法而不是點屬性訪問器語法更有效率(並且推薦),因為檢測 null 比捕獲異常更快。

<int:delayer id="delayer" input-channel="input" output-channel="output"
    expression="headers.delay"
    scheduler="exampleTaskScheduler"/>

<task:scheduler id="exampleTaskScheduler" pool-size="3"/>
延遲器委託給 Spring 的 TaskScheduler 抽象的例項。延遲器使用的預設排程器是 Spring Integration 在啟動時提供的 ThreadPoolTaskScheduler 例項。參見 配置 Task Scheduler。如果你想委託給不同的排程器,可以透過延遲器元素的 'scheduler' 屬性提供引用,如下面的示例所示
如果你配置了外部的 ThreadPoolTaskScheduler,可以在此屬性上設定 waitForTasksToCompleteOnShutdown = true。這允許在應用程式關閉時,已經處於執行狀態(正在釋放訊息)的“延遲”任務成功完成。在 Spring Integration 2.2 之前,此屬性可以在 <delayer> 元素上使用,因為 DelayHandler 可以在後臺建立自己的排程器。從 2.2 開始,延遲器需要一個外部排程器例項,並且 waitForTasksToCompleteOnShutdown 被刪除。你應該使用排程器本身的配置。

ThreadPoolTaskScheduler 有一個屬性 errorHandler,可以注入 org.springframework.util.ErrorHandler 的實現。此處理器允許處理從傳送延遲訊息的排程任務執行緒中丟擲的 Exception。預設情況下,它使用 org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler,你可以在日誌中看到堆疊跟蹤。你可能希望考慮使用 org.springframework.integration.channel.MessagePublishingErrorHandler,它會將 ErrorMessage 傳送到 error-channel,可以從失敗訊息的 header 中獲取,或者傳送到預設的 error-channel。此錯誤處理在事務回滾後執行(如果存在事務)。參見 釋放失敗

延遲器與訊息儲存

DelayHandler 將延遲的訊息持久化到提供的 MessageStore 中的訊息組中。('groupId' 基於 <delayer> 元素所需的 'id' 屬性。另請參閱 DelayHandler.setMessageGroupId(String)。)延遲的訊息在 DelayHandler 將訊息傳送到 output-channel 之前,立即被排程任務從 MessageStore 中移除。如果提供的 MessageStore 是持久的(例如 JdbcMessageStore),它提供了在應用程式關閉時不會丟失訊息的能力。應用程式啟動後,DelayHandler 從其在 MessageStore 中的訊息組讀取訊息,並根據訊息的原始到達時間(如果延遲是數字)重新排程它們。對於延遲 header 是 Date 的訊息,在重新排程時會使用該 Date。如果延遲訊息在 MessageStore 中停留時間超過其 'delay',它將在啟動後立即傳送。messageGroupId 是必需的,不能依賴於可能生成的 DelayHandler bean 名稱。這樣,應用程式重啟後,DelayHandler 可能會獲得一個新的生成的 bean 名稱。因此,延遲的訊息可能會因為它們的組不再由應用程式管理而丟失重新排程。

<int:delayer id="delayer" input-channel="input" output-channel="output"
    expression="headers.delay"
    message-store="jdbcMessageStore">
    <int:advice-chain>
        <beans:ref bean="customAdviceBean"/>
        <tx:advice>
            <tx:attributes>
                <tx:method name="*" read-only="true"/>
            </tx:attributes>
        </tx:advice>
    </int:advice-chain>
</int:delayer>

<delayer> 可以使用兩個互斥的元素之一進行增強:<transactional><advice-chain>。這些 AOP advice 的 List 應用於代理的內部 DelayHandler.ReleaseMessageHandler,它負責在延遲後,在排程任務的 Thread 上釋放訊息。例如,當後繼訊息流丟擲異常且 ReleaseMessageHandler 的事務回滾時,可以使用它。在這種情況下,延遲的訊息會保留在持久化的 MessageStore 中。你可以在 <advice-chain> 中使用任何自定義的 org.aopalliance.aop.Advice 實現。<transactional> 元素定義了一個簡單的 advice 鏈,它只包含事務 advice。以下示例展示了 <delayer> 中的 advice-chain

Message<String> delayerReschedulingMessage =
    MessageBuilder.withPayload("'delayer.handler'.reschedulePersistedMessages").build();
controlBusChannel.send(delayerReschedulingMessage);
DelayHandler 可以作為 JMX MBean 匯出,包含管理操作(getDelayedMessageCountreschedulePersistedMessages),這允許在執行時重新排程延遲的持久化訊息——例如,如果 TaskScheduler 之前已經停止。這些操作可以透過 Control Bus 命令呼叫,如下面的示例所示

有關訊息儲存、JMX 和控制匯流排的更多資訊,請參閱 系統管理

從版本 5.3.7 開始,如果在將訊息儲存到 MessageStore 中時事務處於活動狀態,則釋放任務將在 TransactionSynchronization.afterCommit() 回撥中排程。這是必要的,以防止競態條件,即排程釋放可能在事務提交之前執行,從而找不到訊息。在這種情況下,訊息將在延遲後或事務提交後釋放,以較晚者為準。

釋放失敗

  • 從版本 5.0.8 開始,延遲器新增了兩個屬性

  • maxAttempts(預設值 5)

retryDelay(預設值 1 秒)

釋放訊息時,如果後繼流失敗,將在 retryDelay 後重試釋放。如果達到 maxAttempts,訊息將被丟棄(除非釋放是事務性的,在這種情況下訊息將保留在儲存中,但不再計劃釋放,直到應用程式重新啟動或呼叫 reschedulePersistedMessages() 方法,如上所述)。

此外,你可以配置一個 delayedMessageErrorChannel;當釋放失敗時,會將一個 ErrorMessage 傳送到該通道,異常作為載荷,幷包含 originalMessage 屬性。ErrorMessage 包含一個 header IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT,其中包含當前計數。