通道介面卡

通道介面卡是一個訊息端點,它能夠將單個傳送方或接收方連線到訊息通道。Spring Integration 提供了許多介面卡來支援各種傳輸方式,例如 JMS、檔案、HTTP、Web 服務、郵件等。本參考指南的後續章節將討論每個介面卡。然而,本章重點介紹簡單但靈活的方法呼叫通道介面卡支援。有入站和出站介面卡,每個都可以使用核心名稱空間中提供的 XML 元素進行配置。只要您有一個可以作為源或目標的呼叫方法,這些介面卡就提供了一種擴充套件 Spring Integration 的簡便方法。

配置入站通道介面卡

一個 inbound-channel-adapter 元素(在 Java 配置中是一個 SourcePollingChannelAdapter)可以呼叫 Spring 管理物件上的任何方法,並在將方法的輸出轉換為 Message 後,將非空返回值傳送到 MessageChannel。當介面卡的訂閱被啟用時,輪詢器會嘗試從源接收訊息。輪詢器根據提供的配置透過 TaskScheduler 進行排程。要配置單個通道介面卡的輪詢間隔或 cron 表示式,您可以提供一個帶有排程屬性(例如“fixed-rate”或“cron”)的“poller”元素。以下示例定義了兩個 inbound-channel-adapter 例項。

  • Java DSL

  • Java

  • Kotlin DSL

  • XML

@Bean
public IntegrationFlow source1() {
    return IntegrationFlow.from(() -> new GenericMessage<>(...),
                             e -> e.poller(p -> p.fixedRate(5000)))
                ...
                .get();
}

@Bean
public IntegrationFlow source2() {
    return IntegrationFlow.from(() -> new GenericMessage<>(...),
                             e -> e.poller(p -> p.cron("30 * 9-17 * * MON-FRI")))
                ...
                .get();
}
public class SourceService {

    @InboundChannelAdapter(channel = "channel1", poller = @Poller(fixedRate = "5000"))
    Object method1() {
        ...
    }

    @InboundChannelAdapter(channel = "channel2", poller = @Poller(cron = "30 * 9-17 * * MON-FRI"))
    Object method2() {
        ...
    }
}
@Bean
fun messageSourceFlow() =
    integrationFlow( { GenericMessage<>(...) },
                    { poller { it.fixedRate(5000) } }) {
        ...
    }
<int:inbound-channel-adapter ref="source1" method="method1" channel="channel1">
    <int:poller fixed-rate="5000"/>
</int:inbound-channel-adapter>

<int:inbound-channel-adapter ref="source2" method="method2" channel="channel2">
    <int:poller cron="30 * 9-17 * * MON-FRI"/>
</int:channel-adapter>
如果未提供輪詢器,則必須在上下文中註冊一個預設輪詢器。有關更多詳細資訊,請參閱端點名稱空間支援
輪詢端點的預設觸發器是一個 PeriodicTrigger 例項,具有 1 秒的固定延遲週期。
重要提示:輪詢器配置

所有 inbound-channel-adapter 型別都由一個 SourcePollingChannelAdapter 提供支援,這意味著它們包含一個輪詢器配置,該配置根據輪詢器中指定的配置輪詢 MessageSource(以呼叫生成訊息有效負載的自定義方法)。以下示例顯示了兩個輪詢器的配置:

<int:poller max-messages-per-poll="1" fixed-rate="1000"/>

<int:poller max-messages-per-poll="10" fixed-rate="1000"/>

在第一個配置中,輪詢任務每輪詢一次呼叫一次,並且在每個任務(輪詢)期間,根據 max-messages-per-poll 屬性值,方法(產生訊息)呼叫一次。在第二個配置中,輪詢任務每輪詢呼叫 10 次,或者直到它返回“null”,從而可能每輪詢產生 10 條訊息,而每次輪詢每秒發生一次。但是,如果配置如下所示,會發生什麼情況?

<int:poller fixed-rate="1000"/>

請注意,沒有指定 max-messages-per-poll。正如我們稍後將介紹的,PollingConsumer(例如 service-activatorfilterrouter 等)中相同的輪詢器配置的 max-messages-per-poll 預設值為 -1,這意味著“不間斷地執行輪詢任務,除非輪詢方法返回 null(可能因為 QueueChannel 中沒有更多訊息)”,然後休眠一秒。

然而,在 SourcePollingChannelAdapter 中,情況略有不同。max-messages-per-poll 的預設值是 1,除非您明確將其設定為負值(例如 -1)。這確保了輪詢器可以響應生命週期事件(例如啟動和停止),並防止如果 MessageSource 的自定義方法的實現有可能永遠不返回 null 並且恰好是不可中斷的,它可能會無限迴圈。

但是,如果您確信您的方法可以返回 null,並且您需要為每次輪詢儘可能多的源進行輪詢,則應將 max-messages-per-poll 顯式設定為負值,如下例所示:

<int:poller max-messages-per-poll="-1" fixed-rate="1000"/>

從版本 5.5 開始,max-messages-per-poll 的值 0 具有特殊含義——完全跳過 MessageSource.receive() 呼叫,這可以被視為此入站通道介面卡暫停,直到 maxMessagesPerPoll 在稍後的時間更改為非零值,例如透過控制匯流排。

從版本 6.2 開始,fixed-delayfixed-rate 可以配置為 ISO 8601 Duration 格式,例如 PT10S, P1D 等。此外,底層 PeriodicTriggerinitial-interval 也以與 fixed-delayfixed-rate 相似的值格式公開。

另請參閱全域性預設輪詢器以獲取更多資訊。

配置出站通道介面卡

一個 outbound-channel-adapter 元素(Java 配置中的 @ServiceActivator)也可以將 MessageChannel 連線到任何 POJO 消費者方法,該方法應該使用傳送到該通道的訊息的有效負載進行呼叫。以下示例顯示瞭如何定義出站通道介面卡:

  • Java DSL

  • Java

  • Kotlin DSL

  • XML

@Bean
public IntegrationFlow outboundChannelAdapterFlow(MyPojo myPojo) {
    return f -> f
             .handle(myPojo, "handle");
}
public class MyPojo {

    @ServiceActivator(channel = "channel1")
    void handle(Object payload) {
        ...
    }

}
@Bean
fun outboundChannelAdapterFlow(myPojo: MyPojo) =
    integrationFlow {
        handle(myPojo, "handle")
    }
<int:outbound-channel-adapter channel="channel1" ref="target" method="handle"/>

<beans:bean id="target" class="org.MyPojo"/>

如果被適配的通道是 PollableChannel,則必須提供一個 poller 子元素(@ServiceActivator 上的 @Poller 子註解),如下例所示:

  • Java

  • XML

public class MyPojo {

    @ServiceActivator(channel = "channel1", poller = @Poller(fixedRate = "3000"))
    void handle(Object payload) {
        ...
    }

}
<int:outbound-channel-adapter channel="channel2" ref="target" method="handle">
    <int:poller fixed-rate="3000" />
</int:outbound-channel-adapter>

<beans:bean id="target" class="org.MyPojo"/>

如果 POJO 消費者實現可以在其他 <outbound-channel-adapter> 定義中重用,則應使用 ref 屬性。但是,如果消費者實現僅被單個 <outbound-channel-adapter> 定義引用,則可以將其定義為內部 bean,如下例所示:

<int:outbound-channel-adapter channel="channel" method="handle">
    <beans:bean class="org.Foo"/>
</int:outbound-channel-adapter>
在相同的 <outbound-channel-adapter> 配置中同時使用 ref 屬性和內部處理程式定義是不允許的,因為它會建立模糊條件。這種配置會導致丟擲異常。

任何通道介面卡都可以不帶 channel 引用而建立,在這種情況下,它會隱式建立一個 DirectChannel 例項。建立的通道的名稱與 <inbound-channel-adapter><outbound-channel-adapter> 元素的 id 屬性匹配。因此,如果未提供 channel,則 id 是必需的。

通道介面卡表示式和指令碼

與許多其他 Spring Integration 元件一樣,<inbound-channel-adapter><outbound-channel-adapter> 也支援 SpEL 表示式求值。要使用 SpEL,請在“expression”屬性中提供表示式字串,而不是提供用於對 bean 進行方法呼叫的“ref”和“method”屬性。當表示式求值時,它遵循與方法呼叫相同的契約:<inbound-channel-adapter> 的表示式在求值結果為非空值時生成訊息,而 <outbound-channel-adapter> 的表示式必須等同於返回 void 的方法呼叫。

從 Spring Integration 3.0 開始,當需要比簡單“expression”屬性更復雜的處理時,<int:inbound-channel-adapter/> 還可以配置一個 SpEL <expression/>(甚至是一個 <script/>)子元素。如果您透過使用 location 屬性將指令碼作為 Resource 提供,您還可以設定 refresh-check-delay,這允許資源定期重新整理。如果您希望在每次輪詢時檢查指令碼,則需要將此設定與輪詢器的觸發器協調,如下例所示:

<int:inbound-channel-adapter ref="source1" method="method1" channel="channel1">
    <int:poller max-messages-per-poll="1" fixed-delay="5000"/>
    <script:script lang="ruby" location="Foo.rb" refresh-check-delay="5000"/>
</int:inbound-channel-adapter>

另請參閱使用 <expression/> 子元素時的 ReloadableResourceBundleExpressionSource 上的 cacheSeconds 屬性。有關表示式的更多資訊,請參閱Spring Expression Language (SpEL)。有關指令碼,請參閱Groovy 支援指令碼支援

<int:inbound-channel-adapter/> (SourcePollingChannelAdapter) 是一個端點,它透過定期觸發以輪詢底層 MessageSource 來啟動訊息流。由於在輪詢時沒有訊息物件,表示式和指令碼無法訪問根 Message,因此在大多數其他訊息 SpEL 表示式中沒有可用的有效負載或標頭屬性。指令碼可以生成並返回一個完整的帶有標頭和有效負載的 Message 物件,或者只返回一個有效負載,由框架將其新增到帶有基本標頭的訊息中。
© . This site is unofficial and not affiliated with VMware.