配置訊息通道

要建立訊息通道例項,您可以使用 XML 配置中的 <channel/> 元素或 Java 配置中的 DirectChannel 例項,如下所示

  • Java

  • XML

@Bean
public MessageChannel exampleChannel() {
    return new DirectChannel();
}
<int:channel id="exampleChannel"/>

當您使用不帶任何子元素的 <channel/> 元素時,它會建立一個 DirectChannel 例項(一個 SubscribableChannel)。

要建立釋出-訂閱通道,請使用 <publish-subscribe-channel/> 元素(Java 中的 PublishSubscribeChannel),如下所示

  • Java

  • XML

@Bean
public MessageChannel exampleChannel() {
    return new PublishSubscribeChannel();
}
<int:publish-subscribe-channel id="exampleChannel"/>

您也可以提供各種 <queue/> 子元素來建立任何可輪詢通道型別(如 訊息通道實現 中所述)。以下各節展示了每種通道型別的示例。

DirectChannel 配置

如前所述,DirectChannel 是預設型別。以下列表展示瞭如何定義一個

  • Java

  • XML

@Bean
public MessageChannel directChannel() {
    return new DirectChannel();
}
<int:channel id="directChannel"/>

預設通道具有輪詢式負載均衡器,並且啟用了故障轉移(詳見 DirectChannel)。要停用其中一個或兩者,請新增一個 <dispatcher/> 子元素(DirectChannel 的一個 LoadBalancingStrategy 建構函式),並按如下方式配置屬性

  • Java

  • XML

@Bean
public MessageChannel failFastChannel() {
    DirectChannel channel = new DirectChannel();
    channel.setFailover(false);
    return channel;
}

@Bean
public MessageChannel failFastChannel() {
    return new DirectChannel(null);
}
<int:channel id="failFastChannel">
    <int:dispatcher failover="false"/>
</channel>

<int:channel id="channelWithFixedOrderSequenceFailover">
    <int:dispatcher load-balancer="none"/>
</int:channel>

從版本 6.3 開始,所有基於 UnicastingDispatcherMessageChannel 實現都可以配置一個 Predicate<Exception> failoverStrategy,而不是使用簡單的 failover 選項。這個謂詞根據當前 MessageHandler 丟擲的異常來決定是否故障轉移到下一個 MessageHandler。更復雜的錯誤分析應該使用 ErrorMessageExceptionTypeRouter 來完成。

資料型別通道配置

有時,消費者只能處理特定型別的載荷,這就要求您確保輸入訊息的載荷型別。首先想到的可能是使用訊息過濾器。然而,訊息過濾器只能過濾掉不符合消費者要求的訊息。另一種方法是使用基於內容的路由器,將具有不相容資料型別的訊息路由到特定的轉換器,以強制轉換為所需的資料型別。這種方法可行,但實現相同目標的更簡單方法是應用 資料型別通道 (Datatype Channel) 模式。您可以為每種特定的載荷資料型別使用單獨的資料型別通道。

要建立一個僅接受包含特定載荷型別訊息的資料型別通道,請在通道元素的 datatype 屬性中提供該資料型別的完全限定類名,如下例所示

  • Java

  • XML

@Bean
public MessageChannel numberChannel() {
    DirectChannel channel = new DirectChannel();
    channel.setDatatypes(Number.class);
    return channel;
}
<int:channel id="numberChannel" datatype="java.lang.Number"/>

請注意,型別檢查對於任何可分配給通道資料型別的型別都將透過。換句話說,前例中的 numberChannel 將接受載荷為 java.lang.Integerjava.lang.Double 的訊息。可以以逗號分隔的列表形式提供多種型別,如下例所示

  • Java

  • XML

@Bean
public MessageChannel numberChannel() {
    DirectChannel channel = new DirectChannel();
    channel.setDatatypes(String.class, Number.class);
    return channel;
}
<int:channel id="stringOrNumberChannel" datatype="java.lang.String,java.lang.Number"/>

因此,前例中的 'numberChannel' 只接受資料型別為 java.lang.Number 的訊息。但是,如果訊息的載荷不是所需型別,會發生什麼?這取決於您是否定義了一個名為 integrationConversionService 的 bean,它是 Spring 的 轉換服務 (Conversion Service) 例項。如果未定義,則會立即丟擲 Exception。但是,如果您定義了一個 integrationConversionService bean,它將嘗試用於將訊息的載荷轉換為可接受的型別。

您甚至可以註冊自定義轉換器。例如,假設您向上面配置的 'numberChannel' 傳送一條載荷為 String 的訊息。您可能會按如下方式處理該訊息

MessageChannel inChannel = context.getBean("numberChannel", MessageChannel.class);
inChannel.send(new GenericMessage<String>("5"));

通常,這是一個完全合法的操作。然而,由於我們使用了資料型別通道,此類操作的結果會生成類似如下的異常

Exception in thread "main" org.springframework.integration.MessageDeliveryException:
Channel 'numberChannel'
expected one of the following datataypes [class java.lang.Number],
but received [class java.lang.String]
…

發生異常是因為我們要求載荷型別是 Number,但我們傳送的是 String。因此,我們需要一些東西來將 String 轉換為 Number。為此,我們可以實現一個類似於以下示例的轉換器

public static class StringToIntegerConverter implements Converter<String, Integer> {
    public Integer convert(String source) {
        return Integer.parseInt(source);
    }
}

然後我們可以將其註冊為整合轉換服務 (Integration Conversion Service) 的一個轉換器,如下例所示

  • Java

  • XML

@Bean
@IntegrationConverter
public StringToIntegerConverter strToInt {
    return new StringToIntegerConverter();
}
<int:converter ref="strToInt"/>

<bean id="strToInt" class="org.springframework.integration.util.Demo.StringToIntegerConverter"/>

或者在 StringToIntegerConverter 類上標記 @Component 註解以進行自動掃描。

解析 'converter' 元素時,如果尚未定義 integrationConversionService bean,則會建立它。有了該轉換器,send 操作現在就會成功,因為資料型別通道使用該轉換器將 String 載荷轉換為 Integer

有關載荷型別轉換的更多資訊,請參閱 載荷型別轉換

從版本 4.0 開始,integrationConversionServiceDefaultDatatypeChannelMessageConverter 呼叫,後者在應用程式上下文中查詢轉換服務。要使用不同的轉換技術,您可以在通道上指定 message-converter 屬性。這必須是對 MessageConverter 實現的引用。僅使用 fromMessage 方法。它為轉換器提供對訊息頭的訪問權(以防轉換可能需要來自訊息頭的資訊,例如 content-type)。該方法只能返回轉換後的載荷或完整的 Message 物件。如果是後者,轉換器必須注意複製入站訊息中的所有訊息頭。

或者,您可以宣告一個 ID 為 datatypeChannelMessageConverter、型別為 MessageConverter<bean/>,並且該轉換器將被所有帶有 datatype 屬性的通道使用。

QueueChannel 配置

要建立一個 QueueChannel,請使用 <queue/> 子元素。您可以按如下方式指定通道的容量

  • Java

  • XML

@Bean
public PollableChannel queueChannel() {
    return new QueueChannel(25);
}
<int:channel id="queueChannel">
    <queue capacity="25"/>
</int:channel>
如果您未在此 <queue/> 子元素上為 'capacity' 屬性提供值,則生成的佇列是無界的。為避免記憶體不足等問題,我們強烈建議您為有界佇列設定一個明確的值。

持久化 QueueChannel 配置

由於 QueueChannel 提供了緩衝訊息的能力,但預設情況下僅在記憶體中進行,這也引入了系統發生故障時訊息可能丟失的可能性。為了降低這種風險,QueueChannel 可以由 MessageGroupStore 策略介面的持久化實現作為後端支援。有關 MessageGroupStoreMessageStore 的更多詳細資訊,請參閱 訊息儲存

當使用 message-store 屬性時,不允許使用 capacity 屬性。

QueueChannel 收到 Message 時,它將訊息新增到訊息儲存中。當從 QueueChannel 輪詢 Message 時,它將從訊息儲存中移除。

預設情況下,QueueChannel 將其訊息儲存在記憶體佇列中,這可能導致前面提到的訊息丟失場景。然而,Spring Integration 提供了持久化儲存,例如 JdbcChannelMessageStore

您可以透過新增 message-store 屬性來為任何 QueueChannel 配置訊息儲存,如下例所示

<int:channel id="dbBackedChannel">
    <int:queue message-store="channelStore"/>
</int:channel>

<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>

(有關 Java/Kotlin 配置選項,請參見下面的示例。)

Spring Integration JDBC 模組還為一些流行的資料庫提供了模式資料定義語言 (DDL)。這些模式位於該模組(spring-integration-jdbc)的 org.springframework.integration.jdbc.store.channel 包中。

一個重要的特性是,對於任何事務性持久儲存(例如 JdbcChannelMessageStore),只要輪詢器配置了事務,從儲存中移除的訊息只有在事務成功完成後才能被永久移除。否則,事務將回滾,且 Message 不會丟失。

隨著與“NoSQL”資料儲存相關的 Spring 專案數量不斷增加,許多其他訊息儲存的實現也應運而生,為這些儲存提供了底層支援。如果您找不到符合您特定需求的實現,也可以提供自己的 MessageGroupStore 介面實現。

自版本 4.0 以來,我們建議在可能的情況下配置 QueueChannel 例項使用 ChannelMessageStore。與通用訊息儲存相比,這些通常為此用途進行了最佳化。如果 ChannelMessageStore 是一個 ChannelPriorityMessageStore,則訊息將按照優先順序順序以 FIFO 方式接收。優先順序的概念由訊息儲存實現決定。例如,以下示例顯示了 MongoDB 通道訊息儲存 (MongoDB Channel Message Store) 的 Java 配置

  • Java

  • Java DSL

  • Kotlin DSL

@Bean
public BasicMessageGroupStore mongoDbChannelMessageStore(MongoDbFactory mongoDbFactory) {
    MongoDbChannelMessageStore store = new MongoDbChannelMessageStore(mongoDbFactory);
    store.setPriorityEnabled(true);
    return store;
}

@Bean
public PollableChannel priorityQueue(BasicMessageGroupStore mongoDbChannelMessageStore) {
    return new PriorityChannel(new MessageGroupQueue(mongoDbChannelMessageStore, "priorityQueue"));
}
@Bean
public IntegrationFlow priorityFlow(PriorityCapableChannelMessageStore mongoDbChannelMessageStore) {
    return IntegrationFlow.from((Channels c) ->
            c.priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup"))
            ....
            .get();
}
@Bean
fun priorityFlow(mongoDbChannelMessageStore: PriorityCapableChannelMessageStore) =
    integrationFlow {
        channel { priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup") }
    }
請注意 MessageGroupQueue 類。這是使用 MessageGroupStore 操作的 BlockingQueue 實現。

定製 QueueChannel 環境的另一個選項由 <int:queue> 子元素的 ref 屬性或其特定建構函式提供。該屬性提供對任何 java.util.Queue 實現的引用。例如,可以按如下方式配置 Hazelcast 分散式 IQueue

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance(new Config()
                                           .setProperty("hazelcast.logging.type", "log4j"));
}

@Bean
public PollableChannel distributedQueue() {
    return new QueueChannel(hazelcastInstance()
                              .getQueue("springIntegrationQueue"));
}

PublishSubscribeChannel 配置

要建立一個 PublishSubscribeChannel,請使用 <publish-subscribe-channel/> 元素。使用此元素時,您還可以指定用於釋出訊息的 task-executor(如果未指定,則在傳送者的執行緒中釋出),如下所示

  • Java

  • XML

@Bean
public MessageChannel pubsubChannel() {
    return new PublishSubscribeChannel(someExecutor());
}
<int:publish-subscribe-channel id="pubsubChannel" task-executor="someExecutor"/>

如果您在 PublishSubscribeChannel 的下游提供重排序器或聚合器,您可以將通道上的 'apply-sequence' 屬性設定為 true。這樣做表示通道在傳遞訊息之前應該設定 sequence-sizesequence-number 訊息頭以及 關聯 ID (correlation ID)。例如,如果有五個訂閱者,sequence-size 將設定為 5,並且訊息將具有介於 15 之間的 sequence-number 頭值。

除了 Executor 之外,您還可以配置一個 ErrorHandler。預設情況下,PublishSubscribeChannel 使用 MessagePublishingErrorHandler 實現將錯誤傳送到 errorChannel 頭中的 MessageChannel 或傳送到全域性 errorChannel 例項。如果未配置 Executor,則 ErrorHandler 會被忽略,異常將直接拋給呼叫者的執行緒。

如果您在 PublishSubscribeChannel 的下游提供 ResequencerAggregator,您可以將通道上的 'apply-sequence' 屬性設定為 true。這樣做表示通道在傳遞訊息之前應該設定 sequence-size 和 sequence-number 訊息頭以及 correlation ID。例如,如果有五個訂閱者,sequence-size 將設定為 5,並且訊息將具有介於 15 之間的 sequence-number 頭值。

以下示例展示瞭如何將 apply-sequence 頭設定為 true

  • Java

  • XML

@Bean
public MessageChannel pubsubChannel() {
    PublishSubscribeChannel channel = new PublishSubscribeChannel();
    channel.setApplySequence(true);
    return channel;
}
<int:publish-subscribe-channel id="pubsubChannel" apply-sequence="true"/>
apply-sequence 的值預設為 false,以便釋出-訂閱通道可以將完全相同的訊息例項傳送到多個出站通道。由於 Spring Integration 強制執行載荷和訊息頭引用的不可變性,當此標誌設定為 true 時,通道會建立新的 Message 例項,這些例項具有相同的載荷引用,但訊息頭值不同。

從版本 5.4.3 開始,PublishSubscribeChannel 還可以配置其 BroadcastingDispatcherrequireSubscribers 選項,以指示當通道沒有訂閱者時,不會靜默地忽略訊息。當沒有訂閱者且此選項設定為 true 時,會丟擲帶有 Dispatcher has no subscribers 訊息的 MessageDispatchingException

ExecutorChannel

要建立 ExecutorChannel,請新增帶有 task-executor 屬性的 <dispatcher> 子元素。該屬性的值可以引用上下文中的任何 TaskExecutor。例如,這樣做可以配置執行緒池以將訊息分發給訂閱的處理程式。如前所述,這樣做會打破發送者和接收者之間的單執行緒執行上下文,以便任何活動的事務上下文不會被處理程式的呼叫共享(即,處理程式可能會丟擲 Exception,但 send 呼叫已經成功返回)。以下示例展示瞭如何使用 dispatcher 元素並在 task-executor 屬性中指定執行器

  • Java

  • XML

@Bean
public MessageChannel executorChannel() {
    return new ExecutorChannel(someExecutor());
}
<int:channel id="executorChannel">
    <int:dispatcher task-executor="someExecutor"/>
</int:channel>

load-balancerfailover 選項也在 <dispatcher/> 子元素上可用,如前面的 DirectChannel 配置 中所述。應用相同的預設值。因此,除非為其中一個或兩個屬性提供了明確的配置,否則通道具有啟用了故障轉移的輪詢式負載均衡策略,如下例所示

<int:channel id="executorChannelWithoutFailover">
    <int:dispatcher task-executor="someExecutor" failover="false"/>
</int:channel>

PriorityChannel 配置

要建立 PriorityChannel,請使用 <priority-queue/> 子元素,如下例所示

  • Java

  • XML

@Bean
public PollableChannel priorityChannel() {
    return new PriorityChannel(20);
}
<int:channel id="priorityChannel">
    <int:priority-queue capacity="20"/>
</int:channel>

預設情況下,通道會查閱訊息的 priority 頭。但是,您可以改為提供自定義的 Comparator 引用。另外請注意,PriorityChannel(與其他型別一樣)確實支援 datatype 屬性。與 QueueChannel 一樣,它也支援 capacity 屬性。以下示例演示了所有這些配置

  • Java

  • XML

@Bean
public PollableChannel priorityChannel() {
    PriorityChannel channel = new PriorityChannel(20, widgetComparator());
    channel.setDatatypes(example.Widget.class);
    return channel;
}
<int:channel id="priorityChannel" datatype="example.Widget">
    <int:priority-queue comparator="widgetComparator"
                    capacity="10"/>
</int:channel>

自版本 4.0 以來,priority-channel 子元素支援 message-store 選項(在這種情況下不允許使用 comparatorcapacity)。訊息儲存必須是 PriorityCapableChannelMessageStore。目前為 RedisJDBCMongoDB 提供了 PriorityCapableChannelMessageStore 的實現。有關更多資訊,請參閱 QueueChannel 配置訊息儲存。您可以在 支援的訊息通道 (Backing Message Channels) 中找到示例配置。

RendezvousChannel 配置

當佇列子元素是 <rendezvous-queue> 時,會建立一個 RendezvousChannel。它不提供任何前面描述的額外配置選項,並且其佇列不接受任何容量值,因為它是一個容量為零的直接切換 (direct handoff) 佇列。以下示例展示瞭如何宣告一個 RendezvousChannel

  • Java

  • XML

@Bean
public PollableChannel rendezvousChannel() {
    return new RendezvousChannel();
}
<int:channel id="rendezvousChannel"/>
    <int:rendezvous-queue/>
</int:channel>

作用域通道配置

任何通道都可以配置 scope 屬性,如下例所示

<int:channel id="threadLocalChannel" scope="thread"/>

通道攔截器配置

訊息通道也可以有攔截器,如 通道攔截器 中所述。可以將 <interceptors/> 子元素新增到 <channel/>(或更具體的元素型別)中。您可以提供 ref 屬性來引用任何實現了 ChannelInterceptor 介面的 Spring 管理物件,如下例所示

<int:channel id="exampleChannel">
    <int:interceptors>
        <ref bean="trafficMonitoringInterceptor"/>
    </int:interceptors>
</int:channel>

通常,我們建議在單獨的位置定義攔截器實現,因為它們通常提供可在多個通道中重用的通用行為。

全域性通道攔截器配置

通道攔截器提供了一種簡潔明瞭的方式來為每個單獨的通道應用橫切行為。如果同一行為需要應用於多個通道,則為每個通道配置同一組攔截器並不是最有效的方式。為了避免重複配置並使攔截器能夠應用於多個通道,Spring Integration 提供了全域性攔截器。考慮以下兩個示例

<int:channel-interceptor pattern="input*, thing2*, thing1, !cat*" order="3">
    <bean class="thing1.thing2SampleInterceptor"/>
</int:channel-interceptor>
<int:channel-interceptor ref="myInterceptor" pattern="input*, thing2*, thing1, !cat*" order="3"/>

<bean id="myInterceptor" class="thing1.thing2SampleInterceptor"/>

每個 <channel-interceptor/> 元素允許您定義一個全域性攔截器,它應用於所有與 pattern 屬性定義的任何模式匹配的通道。在前面的例子中,全域性攔截器應用於 'thing1' 通道以及所有以 'thing2' 或 'input' 開頭但 *不* 以 'thing3' 開頭的其他通道(自版本 5.0 起)。

在模式中新增此語法可能會導致一個潛在(儘管可能不太可能)的問題。如果您有一個名為 !thing1 的 bean,並且在通道攔截器的 pattern 模式中包含 !thing1 模式,它將不再匹配。該模式現在匹配所有 *不* 名為 thing1 的 bean。在這種情況下,您可以在模式中使用 \ 轉義 !。模式 \!thing1 匹配名為 !thing1 的 bean。

當給定通道上有多個攔截器時,order 屬性讓您管理此攔截器注入的位置。例如,通道 'inputChannel' 可以透過本地配置的方式(見下文)擁有單獨的攔截器,如以下示例所示:

<int:channel id="inputChannel">
  <int:interceptors>
    <int:wire-tap channel="logger"/>
  </int:interceptors>
</int:channel>

一個合理的問題是“全域性攔截器相對於透過本地或其他全域性攔截器定義配置的其他攔截器如何注入?”目前的實現提供了一個簡單的機制來定義攔截器的執行順序。order 屬性中的正數確保攔截器在任何現有攔截器 *之後* 注入,而負數確保攔截器在現有攔截器 *之前* 注入。這意味著,在前面的示例中,全域性攔截器在本地配置的 'wire-tap' 攔截器 *之後* 注入(因為其 order 大於 0)。如果存在另一個具有匹配 pattern 的全域性攔截器,其順序將透過比較兩個攔截器的 order 屬性值來確定。要在現有攔截器 *之前* 注入全域性攔截器,請使用 order 屬性的負值。

請注意,orderpattern 屬性都是可選的。order 的預設值是 0,pattern 的預設值是 '*'(匹配所有通道)。

監聽(Wire Tap)

如前所述,Spring Integration 提供了一個簡單的監聽攔截器。您可以在 <interceptors/> 元素內的任何通道上配置監聽器。這樣做對於除錯特別有用,並且可以與 Spring Integration 的日誌通道介面卡結合使用,如下所示:

<int:channel id="in">
    <int:interceptors>
        <int:wire-tap channel="logger"/>
    </int:interceptors>
</int:channel>

<int:logging-channel-adapter id="logger" level="DEBUG"/>
'logging-channel-adapter' 也接受一個 'expression' 屬性,以便您可以對 'payload' 和 'headers' 變數評估 SpEL 表示式。或者,要日誌完整訊息的 toString() 結果,請為 'log-full-message' 屬性提供值 true。預設情況下,它為 false,因此隻日志 payload。將其設定為 true 可以日誌除 payload 外的所有 headers。'expression' 選項提供了最大的靈活性(例如,expression="payload.user.name")。

關於監聽器和其他類似元件(訊息釋出配置)的常見誤解之一是它們本質上是自動非同步的。預設情況下,作為元件的監聽器不是非同步呼叫的。相反,Spring Integration 專注於一種統一的方法來配置非同步行為:訊息通道。使訊息流的某些部分同步或非同步的是在該流中配置的 Message Channel 型別。這是訊息通道抽象的主要優點之一。從框架建立之初,我們就一直強調訊息通道作為框架的一等公民的需求和價值。它不僅僅是 EIP 模式的一種內部、隱式實現。它作為可配置元件完全暴露給終端使用者。因此,監聽器元件僅負責執行以下任務:

  • 透過監聽一個通道(例如,channelA)來攔截訊息流

  • 捕獲每條訊息

  • 將訊息傳送到另一個通道(例如,channelB

它本質上是橋接模式的一種變體,但它被封裝在通道定義中(因此更容易啟用和停用而不會中斷流)。此外,與橋接不同的是,它基本是派生出另一個訊息流。該流是同步還是非同步的?答案取決於 'channelB' 是哪種型別的訊息通道。我們有以下選項:direct channel(直接通道)、pollable channel(可輪詢通道)和 executor channel(執行器通道)。後兩者打破了執行緒邊界,使透過此類通道的通訊變為非同步,因為訊息從該通道分派到其訂閱的處理程式所使用的執行緒與將訊息傳送到該通道所使用的執行緒不同。這就是決定您的監聽流是同步還是非同步的因素。這與框架中的其他元件(例如訊息釋出器)一致,並透過避免您提前擔心(除了編寫執行緒安全的程式碼)特定程式碼段是應實現為同步還是非同步,增加了統一性和簡單性。透過訊息通道連線兩個程式碼段(例如,元件 A 和元件 B)的實際方式決定了它們的協作是同步還是非同步的。您甚至可能希望將來從同步改為非同步,而訊息通道讓您無需更改程式碼即可快速完成。

關於監聽器的最後一點是,儘管上面提供了預設不非同步的理由,您應該記住通常期望儘快將訊息傳遞出去。因此,使用非同步通道選項作為監聽器的出站通道會非常常見。然而,非同步行為並非預設強制執行。如果我們這樣做,許多用例將會中斷,包括您可能不想破壞事務邊界。也許您使用監聽器模式進行審計,並且確實希望審計訊息在原始事務中傳送。例如,您可能將監聽器連線到 JMS 出站通道介面卡。這樣,您就可以兼得兩全其美:1)傳送 JMS 訊息可以在事務內發生,同時 2)它仍然是“即發即忘”的操作,從而防止對主訊息流造成任何明顯的延遲。

從版本 4.0 開始,當攔截器(例如 WireTap)引用通道時,重要的是避免迴圈引用。您需要從當前攔截器攔截的通道中排除此類通道。這可以透過適當的模式或程式設計方式完成。如果您有一個引用 channel 的自定義 ChannelInterceptor,請考慮實現 VetoCapableInterceptor。這樣,框架會根據提供的模式詢問攔截器是否可以攔截每個候選通道。您還可以在攔截器方法中新增執行時保護,以確保通道不是被攔截器引用的通道。WireTap 使用了這兩種技術。

從版本 4.3 開始,WireTap 增加了接受 channelName 而不是 MessageChannel 例項的建構函式。這對於 Java 配置和使用通道自動建立邏輯時非常方便。目標 MessageChannel bean 會在稍後,在與攔截器的第一次互動時,從提供的 channelName 中解析出來。

通道解析需要 BeanFactory,因此監聽器例項必須是 Spring 管理的 bean。

這種延遲繫結方法還允許簡化使用 Java DSL 配置的典型監聽模式,如以下示例所示:

@Bean
public PollableChannel myChannel() {
    return MessageChannels.queue()
            .wireTap("loggingFlow.input")
            .get();
}

@Bean
public IntegrationFlow loggingFlow() {
    return f -> f.log();
}

條件監聽

透過使用 selectorselector-expression 屬性,可以使監聽器具有條件性。selector 引用一個 MessageSelector bean,它可以在執行時決定訊息是否應傳送到監聽通道。類似地,selector-expression 是一個布林型別的 SpEL 表示式,其目的相同:如果表示式評估結果為 true,則將訊息傳送到監聽通道。

全域性監聽配置

可以將全域性監聽器配置為全域性通道攔截器配置的一種特殊情況。為此,配置一個頂層 wire-tap 元素。現在,除了正常的 wire-tap 名稱空間支援外,還支援 patternorder 屬性,它們的工作方式與 channel-interceptor 完全相同。以下示例展示瞭如何配置全域性監聽器:

  • Java

  • XML

@Bean
@GlobalChannelInterceptor(patterns = "input*,thing2*,thing1", order = 3)
public WireTap wireTap(MessageChannel wiretapChannel) {
    return new WireTap(wiretapChannel);
}
<int:wire-tap pattern="input*, thing2*, thing1" order="3" channel="wiretapChannel"/>
全域性監聽器提供了一種方便的方式,可以在外部配置單個通道監聽器,而無需修改現有通道配置。為此,將 pattern 屬性設定為目標通道名稱。例如,您可以使用此技術配置測試用例以驗證通道上的訊息。