通道介面卡 (Channel Adapter)

通道介面卡是一種訊息端點,用於將單個傳送方或接收方連線到訊息通道。Spring Integration 提供了許多介面卡來支援各種傳輸方式,例如 JMS、檔案、HTTP、Web 服務、郵件等等。本參考指南的後續章節將討論每個介面卡。然而,本章重點介紹簡單但靈活的方法呼叫通道介面卡支援。有入站和出站介面卡,每個都可以使用核心名稱空間中提供的 XML 元素進行配置。只要您有一個可以作為源或目標呼叫的方法,這些介面卡就提供了一種擴充套件 Spring Integration 的簡便方法。

配置入站通道介面卡

inbound-channel-adapter 元素(在 Java 配置中是 SourcePollingChannelAdapter)可以呼叫 Spring 管理物件上的任何方法,並將非空返回值轉換為 Message 後傳送到 MessageChannel。當介面卡的訂閱被啟用時,輪詢器會嘗試從源接收訊息。輪詢器會根據提供的配置使用 TaskScheduler 進行排程。要配置單個通道介面卡的輪詢間隔或 cron 表示式,您可以提供一個帶有排程屬性(如 'fixed-rate' 或 'cron')的 'poller' 元素。以下示例定義了兩個 inbound-channel-adapter 例項

  • Java DSL

  • Java

  • Kotlin DSL

  • XML

@Bean
public IntegrationFlow source1() {
    return IntegrationFlow.from(() -> new GenericMessage<>(...),
                             e -> e.poller(p -> p.fixedRate(5000)))
                ...
                .get();
}

@Bean
public IntegrationFlow source2() {
    return IntegrationFlow.from(() -> new GenericMessage<>(...),
                             e -> e.poller(p -> p.cron("30 * 9-17 * * MON-FRI")))
                ...
                .get();
}
public class SourceService {

    @InboundChannelAdapter(channel = "channel1", poller = @Poller(fixedRate = "5000"))
    Object method1() {
        ...
    }

    @InboundChannelAdapter(channel = "channel2", poller = @Poller(cron = "30 * 9-17 * * MON-FRI"))
    Object method2() {
        ...
    }
}
@Bean
fun messageSourceFlow() =
    integrationFlow( { GenericMessage<>(...) },
                    { poller { it.fixedRate(5000) } }) {
        ...
    }
<int:inbound-channel-adapter ref="source1" method="method1" channel="channel1">
    <int:poller fixed-rate="5000"/>
</int:inbound-channel-adapter>

<int:inbound-channel-adapter ref="source2" method="method2" channel="channel2">
    <int:poller cron="30 * 9-17 * * MON-FRI"/>
</int:channel-adapter>
如果未提供輪詢器,則必須在上下文中註冊一個預設輪詢器。有關更多詳細資訊,請參閱端點名稱空間支援
輪詢端點的預設觸發器是具有 1 秒固定延遲週期的 PeriodicTrigger 例項。
重要:輪詢器配置

所有 inbound-channel-adapter 型別都由 SourcePollingChannelAdapter 支援,這意味著它們包含一個輪詢器配置,該配置根據 Poller 中指定的配置輪詢 MessageSource(以呼叫產生訊息載荷的自定義方法)。以下示例顯示了兩個輪詢器的配置

<int:poller max-messages-per-poll="1" fixed-rate="1000"/>

<int:poller max-messages-per-poll="10" fixed-rate="1000"/>

在第一個配置中,輪詢任務每次輪詢呼叫一次,並且在每個任務(輪詢)期間,根據 max-messages-per-poll 屬性值呼叫一次方法(這會產生訊息)。在第二個配置中,輪詢任務每次輪詢呼叫 10 次,或者直到返回 'null',因此每次輪詢可能產生十條訊息,而每次輪詢以一秒的間隔發生。然而,如果配置如下例所示會發生什麼

<int:poller fixed-rate="1000"/>

請注意,未指定 max-messages-per-poll。正如我們稍後將介紹的,PollingConsumer(例如,service-activatorfilterrouter 等)中相同的輪詢器配置的 max-messages-per-poll 預設值為 -1,這意味著“不間斷地執行輪詢任務,除非輪詢方法返回 null(可能因為 QueueChannel 中沒有更多訊息)”,然後休眠一秒。

然而,在 SourcePollingChannelAdapter 中略有不同。max-messages-per-poll 的預設值為 1,除非您明確將其設定為負值(例如 -1)。這確保了輪詢器可以對生命週期事件(如啟動和停止)做出反應,並防止如果 MessageSource 自定義方法的實現有可能永遠不返回 null 並且恰好不可中斷,則可能導致無限迴圈。

然而,如果您確定您的方法可以返回 null,並且需要在每次輪詢時輪詢儘可能多的可用源,則應明確將 max-messages-per-poll 設定為負值,如下例所示

<int:poller max-messages-per-poll="-1" fixed-rate="1000"/>

從 5.5 版本開始,max-messages-per-poll 的值為 0 具有特殊含義 - 完全跳過 MessageSource.receive() 呼叫,這可以被認為是暫停此入站通道介面卡,直到 maxMessagesPerPoll 稍後透過 Control Bus 等方式更改為非零值。

從 6.2 版本開始,fixed-delayfixed-rate 可以配置為 ISO 8601 Duration 格式,例如 PT10SP1D 等。此外,底層 PeriodicTriggerinitial-interval 也以與 fixed-delayfixed-rate 類似的格式公開。

另請參閱全域性預設輪詢器以獲取更多資訊。

配置出站通道介面卡

outbound-channel-adapter 元素(在 Java 配置中為 @ServiceActivator)也可以將 MessageChannel 連線到任何 POJO 消費者方法,該方法應使用傳送到該通道的訊息的載荷進行呼叫。以下示例顯示瞭如何定義出站通道介面卡

  • Java DSL

  • Java

  • Kotlin DSL

  • XML

@Bean
public IntegrationFlow outboundChannelAdapterFlow(MyPojo myPojo) {
    return f -> f
             .handle(myPojo, "handle");
}
public class MyPojo {

    @ServiceActivator(channel = "channel1")
    void handle(Object payload) {
        ...
    }

}
@Bean
fun outboundChannelAdapterFlow(myPojo: MyPojo) =
    integrationFlow {
        handle(myPojo, "handle")
    }
<int:outbound-channel-adapter channel="channel1" ref="target" method="handle"/>

<beans:bean id="target" class="org.MyPojo"/>

如果被適配的通道是 PollableChannel,則必須提供一個 poller 子元素(在 @ServiceActivator 上使用 @Poller 子註解),如下例所示

  • Java

  • XML

public class MyPojo {

    @ServiceActivator(channel = "channel1", poller = @Poller(fixedRate = "3000"))
    void handle(Object payload) {
        ...
    }

}
<int:outbound-channel-adapter channel="channel2" ref="target" method="handle">
    <int:poller fixed-rate="3000" />
</int:outbound-channel-adapter>

<beans:bean id="target" class="org.MyPojo"/>

如果 POJO 消費者實現可以在其他 <outbound-channel-adapter> 定義中重用,則應使用 ref 屬性。但是,如果消費者實現僅由 <outbound-channel-adapter> 的單個定義引用,則可以將其定義為內部 bean,如下例所示

<int:outbound-channel-adapter channel="channel" method="handle">
    <beans:bean class="org.Foo"/>
</int:outbound-channel-adapter>
在同一 <outbound-channel-adapter> 配置中同時使用 ref 屬性和內部處理程式定義是不允許的,因為它會建立模糊條件。這種配置會導致丟擲異常。

任何通道介面卡都可以不帶 channel 引用建立,在這種情況下,它會隱式建立一個 DirectChannel 例項。建立的通道的名稱與 <inbound-channel-adapter><outbound-channel-adapter> 元素的 id 屬性匹配。因此,如果未提供 channel,則 id 是必需的。

通道介面卡表示式和指令碼

與許多其他 Spring Integration 元件一樣,<inbound-channel-adapter><outbound-channel-adapter> 也支援 SpEL 表示式求值。要使用 SpEL,請在 'expression' 屬性中提供表示式字串,而不是提供用於在 bean 上進行方法呼叫的 'ref' 和 'method' 屬性。當表示式求值時,它遵循與方法呼叫相同的契約:對於 <inbound-channel-adapter>,表示式求值結果為非空值時會生成一條訊息;而對於 <outbound-channel-adapter>,表示式必須等同於返回 void 的方法呼叫。

從 Spring Integration 3.0 開始,<int:inbound-channel-adapter/> 也可以使用 SpEL <expression/>(甚至使用 <script/>)子元素進行配置,以應對需要比簡單 'expression' 屬性更復雜的情況。如果透過使用 location 屬性將指令碼提供為 Resource,您還可以設定 refresh-check-delay,這允許定期重新整理資源。如果您希望在每次輪詢時檢查指令碼,則需要協調此設定與輪詢器的觸發器,如下例所示

<int:inbound-channel-adapter ref="source1" method="method1" channel="channel1">
    <int:poller max-messages-per-poll="1" fixed-delay="5000"/>
    <script:script lang="ruby" location="Foo.rb" refresh-check-delay="5000"/>
</int:inbound-channel-adapter>

另請參閱使用 <expression/> 子元素時 ReloadableResourceBundleExpressionSource 上的 cacheSeconds 屬性。有關表示式的更多資訊,請參閱Spring Expression Language (SpEL)。有關指令碼的資訊,請參閱Groovy 支援指令碼支援

<int:inbound-channel-adapter/> (SourcePollingChannelAdapter) 是一個端點,它透過定期觸發以輪詢底層 MessageSource 來啟動訊息流。由於在輪詢時沒有訊息物件,表示式和指令碼無法訪問根 Message,因此不像大多數其他訊息處理 SpEL 表示式那樣具有可用的 payload 或 headers 屬性。指令碼可以生成並返回包含 headers 和 payload 的完整 Message 物件,或者只返回一個 payload,該 payload 將由框架新增到包含基本 headers 的訊息中。