配置訊息通道
要建立訊息通道例項,你可以使用 XML 的 <channel/> 元素或 Java 配置的 DirectChannel 例項,如下所示
-
Java
-
XML
@Bean
public MessageChannel exampleChannel() {
return new DirectChannel();
}
<int:channel id="exampleChannel"/>
當你使用 <channel/> 元素而沒有任何子元素時,它會建立一個 DirectChannel 例項(一個 SubscribableChannel)。
要建立一個釋出-訂閱通道,請使用 <publish-subscribe-channel/> 元素(Java 中的 PublishSubscribeChannel),如下所示
-
Java
-
XML
@Bean
public MessageChannel exampleChannel() {
return new PublishSubscribeChannel();
}
<int:publish-subscribe-channel id="exampleChannel"/>
你還可以提供各種 <queue/> 子元素來建立任何可輪詢的通道型別(如 訊息通道實現 中所述)。以下各節展示了每種通道型別的示例。
DirectChannel 配置
如前所述,DirectChannel 是預設型別。以下列表展示瞭如何定義一個
-
Java
-
XML
@Bean
public MessageChannel directChannel() {
return new DirectChannel();
}
<int:channel id="directChannel"/>
預設通道具有迴圈負載均衡器,並且啟用了故障轉移(有關詳細資訊,請參見 DirectChannel)。要停用其中一個或兩個功能,請新增一個 <dispatcher/> 子元素(DirectChannel 的 LoadBalancingStrategy 建構函式),並按如下方式配置屬性
-
Java
-
XML
@Bean
public MessageChannel failFastChannel() {
DirectChannel channel = new DirectChannel();
channel.setFailover(false);
return channel;
}
@Bean
public MessageChannel failFastChannel() {
return new DirectChannel(null);
}
<int:channel id="failFastChannel">
<int:dispatcher failover="false"/>
</channel>
<int:channel id="channelWithFixedOrderSequenceFailover">
<int:dispatcher load-balancer="none"/>
</int:channel>
從 6.3 版本開始,所有基於 UnicastingDispatcher 的 MessageChannel 實現都可以配置 Predicate<Exception> failoverStrategy,而不是簡單的 failover 選項。此謂詞根據當前 MessageHandler 丟擲的異常來決定是否故障轉移到下一個 MessageHandler。更復雜的錯誤分析應使用 ErrorMessageExceptionTypeRouter 完成。
資料型別通道配置
有時,消費者只能處理特定型別的有效負載,這會強制你確保輸入訊息的有效負載型別。首先想到的是使用訊息過濾器。但是,訊息過濾器所能做的只是過濾掉不符合消費者要求的訊息。另一種方法是使用基於內容的路由器,並將具有不相容資料型別的訊息路由到特定的轉換器,以強制轉換為所需的資料型別。這會奏效,但實現相同目的的一種更簡單的方法是應用 資料型別通道 模式。你可以為每種特定的有效負載資料型別使用單獨的資料型別通道。
要建立一個僅接受包含特定有效負載型別的訊息的資料型別通道,請在通道元素的 datatype 屬性中提供該資料型別的完全限定類名,如以下示例所示
-
Java
-
XML
@Bean
public MessageChannel numberChannel() {
DirectChannel channel = new DirectChannel();
channel.setDatatypes(Number.class);
return channel;
}
<int:channel id="numberChannel" datatype="java.lang.Number"/>
請注意,型別檢查會透過任何可分配給通道資料型別的型別。換句話說,前面示例中的 numberChannel 將接受其有效負載為 java.lang.Integer 或 java.lang.Double 的訊息。可以提供多個型別作為逗號分隔列表,如以下示例所示
-
Java
-
XML
@Bean
public MessageChannel numberChannel() {
DirectChannel channel = new DirectChannel();
channel.setDatatypes(String.class, Number.class);
return channel;
}
<int:channel id="stringOrNumberChannel" datatype="java.lang.String,java.lang.Number"/>
因此,前面示例中的 'numberChannel' 僅接受資料型別為 java.lang.Number 的訊息。但是,如果訊息的有效負載不是所需型別,會發生什麼?這取決於你是否定義了一個名為 integrationConversionService 的 bean,它是 Spring 的 Conversion Service 的例項。如果沒有,則會立即丟擲 Exception。但是,如果你定義了 integrationConversionService bean,它將嘗試將訊息的有效負載轉換為可接受的型別。
你甚至可以註冊自定義轉換器。例如,假設你向我們上面配置的 'numberChannel' 傳送一個帶有 String 有效負載的訊息。你可以按如下方式處理訊息
MessageChannel inChannel = context.getBean("numberChannel", MessageChannel.class);
inChannel.send(new GenericMessage<String>("5"));
通常,這將是一個完全合法的操作。但是,由於我們使用資料型別通道,此類操作的結果將生成類似於以下的異常
Exception in thread "main" org.springframework.integration.MessageDeliveryException:
Channel 'numberChannel'
expected one of the following datataypes [class java.lang.Number],
but received [class java.lang.String]
…
發生異常是因為我們要求有效負載型別為 Number,但我們傳送了 String。所以我們需要一些東西將 String 轉換為 Number。為此,我們可以實現一個類似於以下示例的轉換器
public static class StringToIntegerConverter implements Converter<String, Integer> {
public Integer convert(String source) {
return Integer.parseInt(source);
}
}
然後我們可以將其註冊為整合轉換服務的轉換器,如以下示例所示
-
Java
-
XML
@Bean
@IntegrationConverter
public StringToIntegerConverter strToInt() {
return new StringToIntegerConverter();
}
<int:converter ref="strToInt"/>
<bean id="strToInt" class="org.springframework.integration.util.Demo.StringToIntegerConverter"/>
或者在 StringToIntegerConverter 類上,當它被 @Component 註解標記用於自動掃描時。
解析 'converter' 元素時,如果尚未定義 integrationConversionService bean,則會建立它。有了該轉換器,send 操作現在將成功,因為資料型別通道使用該轉換器將 String 有效負載轉換為 Integer。
有關有效負載型別轉換的更多資訊,請參見 有效負載型別轉換。
從 4.0 版本開始,integrationConversionService 由 DefaultDatatypeChannelMessageConverter 呼叫,它會在應用程式上下文中查詢轉換服務。要使用不同的轉換技術,你可以在通道上指定 message-converter 屬性。這必須是對 MessageConverter 實現的引用。僅使用 fromMessage 方法。它為轉換器提供了訪問訊息頭的許可權(以防轉換可能需要來自頭的資訊,例如 content-type)。該方法只能返回轉換後的有效負載或完整的 Message 物件。如果是後者,轉換器必須小心地從入站訊息中複製所有頭。
或者,你可以宣告一個 ID 為 datatypeChannelMessageConverter 的 MessageConverter 型別的 <bean/>,並且該轉換器將由所有具有 datatype 的通道使用。
QueueChannel 配置
要建立 QueueChannel,請使用 <queue/> 子元素。你可以按如下方式指定通道的容量
-
Java
-
XML
@Bean
public PollableChannel queueChannel() {
return new QueueChannel(25);
}
<int:channel id="queueChannel">
<queue capacity="25"/>
</int:channel>
如果你未在此 <queue/> 子元素上為 'capacity' 屬性提供值,則生成的佇列是無界的。為避免記憶體不足等問題,我們強烈建議你為有界佇列設定一個明確的值。 |
持久化 QueueChannel 配置
由於 QueueChannel 提供了緩衝訊息的能力,但預設情況下僅在記憶體中執行此操作,因此它也帶來了在系統故障時訊息可能丟失的可能性。為了降低此風險,QueueChannel 可以由 MessageGroupStore 策略介面的持久化實現支援。有關 MessageGroupStore 和 MessageStore 的更多詳細資訊,請參見 訊息儲存。
當使用 message-store 屬性時,不允許使用 capacity 屬性。 |
當 QueueChannel 接收到 Message 時,它會將訊息新增到訊息儲存中。當 Message 從 QueueChannel 中輪詢時,它會從訊息儲存中移除。
預設情況下,QueueChannel 將其訊息儲存在記憶體佇列中,這可能導致前面提到的訊息丟失場景。但是,Spring Integration 提供了持久化儲存,例如 JdbcChannelMessageStore。
你可以透過新增 message-store 屬性來為任何 QueueChannel 配置訊息儲存,如以下示例所示
<int:channel id="dbBackedChannel">
<int:queue message-store="channelStore"/>
</int:channel>
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>
(有關 Java/Kotlin 配置選項,請參見下面的示例。)
Spring Integration JDBC 模組還為許多流行的資料庫提供了模式資料定義語言 (DDL)。這些模式位於該模組 (spring-integration-jdbc) 的 org.springframework.integration.jdbc.store.channel 包中。
一個重要的特性是,對於任何事務性持久化儲存(例如 JdbcChannelMessageStore),只要輪詢器配置了事務,從儲存中刪除的訊息只有在事務成功完成時才能永久刪除。否則,事務將回滾,並且 Message 不會丟失。 |
隨著越來越多的與“NoSQL”資料儲存相關的 Spring 專案為這些儲存提供底層支援,訊息儲存的許多其他實現也可用。如果你找不到滿足你特定需求的實現,你還可以提供自己的 MessageGroupStore 介面實現。
從 4.0 版本開始,建議 QueueChannel 例項儘可能配置為使用 ChannelMessageStore。與通用訊息儲存相比,這些通常針對此用途進行了最佳化。如果 ChannelMessageStore 是 ChannelPriorityMessageStore,則訊息將按優先順序順序以 FIFO 方式接收。優先順序的概念由訊息儲存實現確定。例如,以下示例顯示了 MongoDB 通道訊息儲存 的 Java 配置
-
Java
-
Java DSL
-
Kotlin DSL
@Bean
public BasicMessageGroupStore mongoDbChannelMessageStore(MongoDbFactory mongoDbFactory) {
MongoDbChannelMessageStore store = new MongoDbChannelMessageStore(mongoDbFactory);
store.setPriorityEnabled(true);
return store;
}
@Bean
public PollableChannel priorityQueue(BasicMessageGroupStore mongoDbChannelMessageStore) {
return new PriorityChannel(new MessageGroupQueue(mongoDbChannelMessageStore, "priorityQueue"));
}
@Bean
public IntegrationFlow priorityFlow(PriorityCapableChannelMessageStore mongoDbChannelMessageStore) {
return IntegrationFlow.from((Channels c) ->
c.priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup"))
....
.get();
}
@Bean
fun priorityFlow(mongoDbChannelMessageStore: PriorityCapableChannelMessageStore) =
integrationFlow {
channel { priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup") }
}
注意 MessageGroupQueue 類。這是一個用於使用 MessageGroupStore 操作的 BlockingQueue 實現。 |
自定義 QueueChannel 環境的另一個選項由 <int:queue> 子元素或其特定建構函式的 ref 屬性提供。此屬性提供對任何 java.util.Queue 實現的引用。例如,Hazelcast 分散式 IQueue 可以按如下方式配置
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance(new Config()
.setProperty("hazelcast.logging.type", "log4j"));
}
@Bean
public PollableChannel distributedQueue() {
return new QueueChannel(hazelcastInstance()
.getQueue("springIntegrationQueue"));
}
PublishSubscribeChannel 配置
要建立 PublishSubscribeChannel,請使用 <publish-subscribe-channel/> 元素。使用此元素時,你還可以指定用於釋出訊息的 task-executor(如果未指定,則在傳送者的執行緒中釋出),如下所示
-
Java
-
XML
@Bean
public MessageChannel pubsubChannel() {
return new PublishSubscribeChannel(someExecutor());
}
<int:publish-subscribe-channel id="pubsubChannel" task-executor="someExecutor"/>
如果你在 PublishSubscribeChannel 下游提供重新排序器或聚合器,則可以將通道上的 'apply-sequence' 屬性設定為 true。這樣做表示通道應在傳遞訊息之前設定 sequence-size 和 sequence-number 訊息頭以及關聯 ID。例如,如果有五個訂閱者,則 sequence-size 將設定為 5,並且訊息將具有從 1 到 5 的 sequence-number 頭值。
除了 Executor,你還可以配置 ErrorHandler。預設情況下,PublishSubscribeChannel 使用 MessagePublishingErrorHandler 實現將錯誤傳送到來自 errorChannel 頭或全域性 errorChannel 例項的 MessageChannel。如果未配置 Executor,則 ErrorHandler 將被忽略,並且異常將直接拋給呼叫者的執行緒。
如果你在 PublishSubscribeChannel 下游提供 Resequencer 或 Aggregator,則可以將通道上的 'apply-sequence' 屬性設定為 true。這樣做表示通道應在傳遞訊息之前設定 sequence-size 和 sequence-number 訊息頭以及關聯 ID。例如,如果有五個訂閱者,則 sequence-size 將設定為 5,並且訊息將具有從 1 到 5 的 sequence-number 頭值。
以下示例顯示瞭如何將 apply-sequence 頭設定為 true
-
Java
-
XML
@Bean
public MessageChannel pubsubChannel() {
PublishSubscribeChannel channel = new PublishSubscribeChannel();
channel.setApplySequence(true);
return channel;
}
<int:publish-subscribe-channel id="pubsubChannel" apply-sequence="true"/>
apply-sequence 的值預設為 false,以便釋出-訂閱通道可以將完全相同的訊息例項傳送到多個出站通道。由於 Spring Integration 強制有效負載和頭引用的不變性,當該標誌設定為 true 時,通道會建立具有相同有效負載引用但不同頭值的新 Message 例項。 |
從 5.4.3 版本開始,PublishSubscribeChannel 還可以配置其 BroadcastingDispatcher 的 requireSubscribers 選項,以指示當沒有訂閱者時,此通道不會默默地忽略訊息。當沒有訂閱者並且此選項設定為 true 時,將丟擲帶有 Dispatcher has no subscribers 訊息的 MessageDispatchingException。
ExecutorChannel
要建立 ExecutorChannel,請新增帶有 task-executor 屬性的 <dispatcher> 子元素。該屬性的值可以引用上下文中的任何 TaskExecutor。例如,這樣做可以配置一個執行緒池,用於將訊息分派給訂閱的處理程式。如前所述,這樣做會中斷髮送方和接收方之間的單執行緒執行上下文,以便任何活動事務上下文都不會被處理程式的呼叫共享(即,處理程式可能會丟擲 Exception,但 send 呼叫已成功返回)。以下示例演示瞭如何使用 dispatcher 元素並在 task-executor 屬性中指定執行器
-
Java
-
XML
@Bean
public MessageChannel executorChannel() {
return new ExecutorChannel(someExecutor());
}
<int:channel id="executorChannel">
<int:dispatcher task-executor="someExecutor"/>
</int:channel>
|
|
PriorityChannel 配置
要建立 PriorityChannel,請使用 <priority-queue/> 子元素,如以下示例所示
-
Java
-
XML
@Bean
public PollableChannel priorityChannel() {
return new PriorityChannel(20);
}
<int:channel id="priorityChannel">
<int:priority-queue capacity="20"/>
</int:channel>
預設情況下,通道會檢視訊息的 priority 頭。但是,你也可以提供自定義的 Comparator 引用。此外,請注意 PriorityChannel(與其他型別一樣)確實支援 datatype 屬性。與 QueueChannel 一樣,它也支援 capacity 屬性。以下示例演示了所有這些
-
Java
-
XML
@Bean
public PollableChannel priorityChannel() {
PriorityChannel channel = new PriorityChannel(20, widgetComparator());
channel.setDatatypes(example.Widget.class);
return channel;
}
<int:channel id="priorityChannel" datatype="example.Widget">
<int:priority-queue comparator="widgetComparator"
capacity="10"/>
</int:channel>
從 4.0 版本開始,priority-channel 子元素支援 message-store 選項(在這種情況下不允許使用 comparator 和 capacity)。訊息儲存必須是 PriorityCapableChannelMessageStore。目前為 Redis、JDBC 和 MongoDB 提供了 PriorityCapableChannelMessageStore 的實現。有關詳細資訊,請參見 QueueChannel 配置 和 訊息儲存。你可以在 支援訊息通道 中找到示例配置。
RendezvousChannel 配置
當佇列子元素為 <rendezvous-queue> 時,將建立 RendezvousChannel。它不提供任何前面描述的額外配置選項,並且其佇列不接受任何容量值,因為它是一個零容量的直接傳遞佇列。以下示例顯示瞭如何宣告 RendezvousChannel
-
Java
-
XML
@Bean
public PollableChannel rendezvousChannel() {
return new RendezvousChannel();
}
<int:channel id="rendezvousChannel"/>
<int:rendezvous-queue/>
</int:channel>
通道攔截器配置
訊息通道也可以有攔截器,如 通道攔截器 中所述。<interceptors/> 子元素可以新增到 <channel/>(或更具體的元素型別)中。你可以提供 ref 屬性來引用實現 ChannelInterceptor 介面的任何 Spring 管理物件,如以下示例所示
<int:channel id="exampleChannel">
<int:interceptors>
<ref bean="trafficMonitoringInterceptor"/>
</int:interceptors>
</int:channel>
通常,我們建議在單獨的位置定義攔截器實現,因為它們通常提供可以在多個通道之間重用的通用行為。
全域性通道攔截器配置
通道攔截器提供了一種乾淨簡潔的方式,可以為每個單獨的通道應用橫切行為。如果相同的行為應該應用於多個通道,那麼為每個通道配置相同的攔截器集將不是最有效的方式。為了避免重複配置,同時還使攔截器能夠應用於多個通道,Spring Integration 提供了全域性攔截器。考慮以下兩組示例
<int:channel-interceptor pattern="input*, thing2*, thing1, !cat*" order="3">
<bean class="thing1.thing2SampleInterceptor"/>
</int:channel-interceptor>
<int:channel-interceptor ref="myInterceptor" pattern="input*, thing2*, thing1, !cat*" order="3"/>
<bean id="myInterceptor" class="thing1.thing2SampleInterceptor"/>
每個 <channel-interceptor/> 元素都允許你定義一個全域性攔截器,該攔截器應用於所有與 pattern 屬性定義的任何模式匹配的通道。在前面的例子中,全域性攔截器應用於 'thing1' 通道以及所有以 'thing2' 或 'input' 開頭的其他通道,但不應用於以 'thing3' 開頭的通道(從 5.0 版本開始)。
在模式中新增此語法會導致一個可能的問題(儘管可能不太可能)。如果你有一個名為 !thing1 的 bean,並且你的通道攔截器的 pattern 模式中包含了 !thing1 的模式,它將不再匹配。該模式現在匹配所有不名為 thing1 的 bean。在這種情況下,你可以在模式中用 \ 轉義 !。模式 \!thing1 將匹配名為 !thing1 的 bean。 |
order 屬性允許你在給定通道上存在多個攔截器時管理此攔截器的注入位置。例如,通道 'inputChannel' 可以本地配置單獨的攔截器,如以下示例所示
<int:channel id="inputChannel">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
一個合理的問題是“全域性攔截器是如何相對於本地配置的其他攔截器或透過其他全域性攔截器定義注入的?”當前實現提供了一個簡單的機制來定義攔截器執行的順序。order 屬性中的正數確保攔截器在任何現有攔截器之後注入,而負數確保攔截器在現有攔截器之前注入。這意味著,在前面的示例中,全域性攔截器在本地配置的 'wire-tap' 攔截器之後注入(因為它的 order 大於 0)。如果存在另一個具有匹配 pattern 的全域性攔截器,其順序將透過比較兩個攔截器的 order 屬性值來確定。要在現有攔截器之前注入全域性攔截器,請對 order 屬性使用負值。
請注意,order 和 pattern 屬性都是可選的。order 的預設值為 0,pattern 的預設值為 '*'(匹配所有通道)。 |
訊息竊聽器
如前所述,Spring Integration 提供了一個簡單的訊息竊聽器攔截器。你可以在 <interceptors/> 元素內的任何通道上配置訊息竊聽器。這樣做對於除錯特別有用,並且可以與 Spring Integration 的日誌通道介面卡結合使用,如下所示
<int:channel id="in">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
<int:logging-channel-adapter id="logger" level="DEBUG"/>
'logging-channel-adapter' 也接受一個 'expression' 屬性,這樣你就可以針對 'payload' 和 'headers' 變數評估 SpEL 表示式。或者,要記錄完整的訊息 toString() 結果,請為 'log-full-message' 屬性提供 true 值。預設情況下,它為 false,因此只記錄有效負載。將其設定為 true 可以記錄所有頭以及有效負載。'expression' 選項提供了最大的靈活性(例如,expression="payload.user.name")。 |
關於訊息竊聽器和其他類似元件(訊息釋出配置)的一個常見誤解是它們本質上是自動非同步的。預設情況下,訊息竊聽器作為元件不會非同步呼叫。相反,Spring Integration 專注於一種統一的非同步行為配置方法:訊息通道。訊息流的某些部分是同步還是非同步取決於在該流中配置的訊息通道的型別。這是訊息通道抽象的主要優點之一。從框架誕生之初,我們就一直強調訊息通道作為框架的一等公民的必要性和價值。它不僅僅是 EIP 模式的內部隱式實現。它完全作為可配置元件暴露給終端使用者。因此,訊息竊聽器元件只負責執行以下任務
-
透過竊聽通道(例如
channelA)攔截訊息流 -
抓取每條訊息
-
將訊息傳送到另一個通道(例如
channelB)
它本質上是橋接模式的一種變體,但它封裝在通道定義中(因此更容易啟用和停用而不會中斷流程)。此外,與橋接不同,它基本上分叉了另一個訊息流。該流是同步的還是非同步的?答案取決於 'channelB' 是哪種型別的訊息通道。我們有以下選項:直接通道、可輪詢通道和執行器通道。後兩者打破了執行緒邊界,使得透過此類通道的通訊是非同步的,因為訊息從該通道分派到其訂閱的處理程式發生在與用於將訊息傳送到該通道的執行緒不同的執行緒上。這就是使你的訊息竊聽流程同步或非同步的原因。它與框架中的其他元件(例如訊息釋出者)保持一致,並透過讓你無需提前擔心(除了編寫執行緒安全程式碼)特定程式碼片段是應該實現為同步還是非同步,從而增加了一致性和簡單性。兩個程式碼片段(例如元件 A 和元件 B)透過訊息通道的實際連線是使它們的協作同步或非同步的原因。你甚至可能希望將來從同步更改為非同步,而訊息通道允許你快速完成,而無需觸及程式碼。
關於訊息竊聽器的最後一點是,儘管上面提供了預設情況下不非同步的理由,但你應該記住,通常希望儘快交出訊息。因此,將非同步通道選項用作訊息竊聽器的出站通道將是相當常見的。但是,非同步行為並非預設強制執行。如果我們這樣做,許多用例將會中斷,包括你可能不想打破事務邊界。也許你將訊息竊聽模式用於審計目的,並且你確實希望審計訊息在原始事務中傳送。例如,你可能將訊息竊聽器連線到 JMS 出站通道介面卡。這樣,你就可以兩全其美:1)JMS 訊息的傳送可以在事務中發生,而 2)它仍然是“即發即棄”操作,從而防止主訊息流中出現任何明顯的延遲。
從 4.0 版本開始,當攔截器(例如 WireTap 類)引用通道時,避免迴圈引用非常重要。你需要將此類通道從被當前攔截器攔截的通道中排除。這可以透過適當的模式或程式設計方式完成。如果你有一個引用 channel 的自定義 ChannelInterceptor,請考慮實現 VetoCapableInterceptor。這樣,框架會根據提供的模式詢問攔截器是否可以攔截每個候選通道。你還可以在攔截器方法中新增執行時保護,以確保通道不是攔截器引用的通道。WireTap 使用了這兩種技術。 |
從 4.3 版本開始,WireTap 具有額外的建構函式,它們接受 channelName 而不是 MessageChannel 例項。這對於 Java 配置和使用通道自動建立邏輯時非常方便。目標 MessageChannel bean 會在稍後,在與攔截器的第一次互動時,從提供的 channelName 中解析。
通道解析需要 BeanFactory,因此訊息竊聽器例項必須是一個 Spring 管理的 bean。 |
這種後期繫結方法還允許簡化 Java DSL 配置的典型訊息竊聽模式,如以下示例所示
@Bean
public PollableChannel myChannel() {
return MessageChannels.queue()
.wireTap("loggingFlow.input")
.get();
}
@Bean
public IntegrationFlow loggingFlow() {
return f -> f.log();
}
條件訊息竊聽器
訊息竊聽器可以透過使用 selector 或 selector-expression 屬性實現條件化。selector 引用一個 MessageSelector bean,它可以在執行時確定訊息是否應該進入竊聽通道。類似地,selector-expression 是一個布林 SpEL 表示式,它執行相同的目的:如果表示式評估為 true,則訊息傳送到竊聽通道。
全域性訊息竊聽器配置
可以配置全域性訊息竊聽器作為 全域性通道攔截器配置 的特例。為此,請配置一個頂級 wire-tap 元素。現在,除了正常的 wire-tap 名稱空間支援外,還支援 pattern 和 order 屬性,並且它們的工作方式與 channel-interceptor 完全相同。以下示例演示瞭如何配置全域性訊息竊聽器
-
Java
-
XML
@Bean
@GlobalChannelInterceptor(patterns = "input*,thing2*,thing1", order = 3)
public WireTap wireTap(MessageChannel wiretapChannel) {
return new WireTap(wiretapChannel);
}
<int:wire-tap pattern="input*, thing2*, thing1" order="3" channel="wiretapChannel"/>
全域性訊息竊聽器提供了一種方便的方式,可以在不修改現有通道配置的情況下外部配置單通道訊息竊聽器。為此,請將 pattern 屬性設定為目標通道名稱。例如,你可以使用此技術來配置測試用例,以驗證通道上的訊息。 |