訊息端點
本章第一部分介紹了一些背景理論,並揭示了驅動 Spring Integration 各種訊息元件的底層 API 的許多內容。如果您想真正理解幕後發生的事情,這些資訊會很有幫助。但是,如果您想快速開始使用基於簡化名稱空間的各種元素配置,現在可以跳到端點名稱空間支援部分。
如概述中所述,訊息端點負責將各種訊息元件連線到通道。接下來的幾章中,我們將介紹許多不同的訊息消費者元件。其中一些元件也能夠傳送回覆訊息。傳送訊息相當簡單。如前文訊息通道中所示,您可以將訊息傳送到訊息通道。然而,接收訊息則稍微複雜一些。主要原因是消費者有兩種型別:輪詢消費者和事件驅動消費者。
在這兩者中,事件驅動消費者要簡單得多。它們無需管理和排程單獨的輪詢執行緒,本質上是帶有回撥方法的監聽器。當連線到 Spring Integration 的可訂閱訊息通道時,這種簡單選項工作得很好。然而,當連線到緩衝、可輪詢訊息通道時,某些元件必須排程和管理輪詢執行緒。Spring Integration 提供了兩種不同的端點實現來適應這兩種型別的消費者。因此,消費者本身只需實現回撥介面即可。需要輪詢時,端點充當消費者例項的容器。其好處類似於使用容器託管訊息驅動 Bean,但由於這些消費者是執行在 ApplicationContext
中的 Spring 管理物件,它更接近於 Spring 自己的 MessageListener
容器。
訊息處理器
Spring Integration 的 MessageHandler
介面由框架內的許多元件實現。換句話說,這不是公共 API 的一部分,您通常不會直接實現 MessageHandler
。然而,它被訊息消費者用於實際處理消費的訊息,因此瞭解此策略介面有助於理解消費者扮演的整體角色。該介面定義如下
public interface MessageHandler {
void handleMessage(Message<?> message);
}
儘管介面簡單,但它為後續章節中介紹的大多陣列件(路由器、轉換器、分割器、聚合器、服務啟用器等)提供了基礎。這些元件處理訊息的功能各不相同,但實際接收訊息的要求是相同的,並且輪詢和事件驅動行為的選擇也是相同的。Spring Integration 提供了兩種端點實現,它們託管這些基於回撥的處理程式,並使它們能夠連線到訊息通道。
事件驅動消費者
由於事件驅動消費者是兩者中較簡單的一個,我們首先介紹它。您可能記得 SubscribableChannel
介面提供了一個 subscribe()
方法,並且該方法接受一個 MessageHandler
引數(如SubscribableChannel
中所示)。以下列表顯示了 subscribe
方法的定義
subscribableChannel.subscribe(messageHandler);
由於訂閱到通道的處理程式不必主動輪詢該通道,這是一個事件驅動消費者,並且 Spring Integration 提供的實現接受 SubscribableChannel
和 MessageHandler
,如下例所示
SubscribableChannel channel = context.getBean("subscribableChannel", SubscribableChannel.class);
EventDrivenConsumer consumer = new EventDrivenConsumer(channel, exampleHandler);
輪詢消費者
Spring Integration 也提供了 PollingConsumer
,其例項化方式與 EventDrivenConsumer 相同,只是通道必須實現 PollableChannel
,如下例所示
PollableChannel channel = context.getBean("pollableChannel", PollableChannel.class);
PollingConsumer consumer = new PollingConsumer(channel, exampleHandler);
輪詢消費者還有許多其他配置選項。以下示例展示瞭如何設定觸發器
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setTrigger(new PeriodicTrigger(Duration.ofSeconds(30)));
PeriodicTrigger
通常使用簡單的間隔(Duration
)定義,但也支援 initialDelay
屬性和布林型 fixedRate
屬性(預設為 false
,即無固定延遲)。以下示例同時設定了這兩個屬性
PeriodicTrigger trigger = new PeriodicTrigger(Duration.ofSeconds(1));
trigger.setInitialDelay(Duration.ofSeconds(5));
trigger.setFixedRate(true);
前例中三個設定的結果是,觸發器等待五秒鐘,然後每秒觸發一次。
CronTrigger
需要有效的 cron 表示式。詳情請參閱 Javadoc。以下示例設定了一個新的 CronTrigger
CronTrigger trigger = new CronTrigger("*/10 * * * * MON-FRI");
前例中定義的觸發器的結果是,觸發器在週一到週五每十秒觸發一次。
輪詢端點的預設觸發器是一個 PeriodicTrigger 例項,具有 1 秒的固定延遲週期。 |
除了觸發器之外,您還可以指定另外兩個與輪詢相關的配置屬性:maxMessagesPerPoll
和 receiveTimeout
。以下示例展示瞭如何設定這兩個屬性
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setMaxMessagesPerPoll(10);
consumer.setReceiveTimeout(5000);
maxMessagesPerPoll
屬性指定在給定輪詢操作中接收的最大訊息數。這意味著輪詢器會持續呼叫 receive()
而不等待,直到返回 null
或達到最大值。例如,如果一個輪詢器有一個十秒間隔的觸發器和 maxMessagesPerPoll
設定為 25
,並且它正在輪詢一個佇列中有 100 條訊息的通道,那麼所有 100 條訊息可以在 40 秒內被檢索。它會獲取 25 條,等待十秒,然後獲取接下來的 25 條,依此類推。如果 maxMessagesPerPoll
配置為負值,則在單個輪詢週期內會持續呼叫 MessageSource.receive()
,直到它返回 null
。從 5.5 版本開始,值 0
具有特殊含義——完全跳過 MessageSource.receive()
呼叫,這可以被視為暫停此輪詢端點,直到稍後(例如透過 Control Bus)將 maxMessagesPerPoll
更改為非零值。
receiveTimeout
屬性指定當輪詢器呼叫接收操作時,如果沒有可用訊息應等待的時間。例如,考慮兩種表面看起來相似但實際上非常不同的選項:第一種的間隔觸發器為 5 秒,接收超時為 50 毫秒;而第二種的間隔觸發器為 50 毫秒,接收超時為 5 秒。第一種配置可能會比訊息到達通道後延遲最多 4950 毫秒才接收到該訊息(如果該訊息在其一次輪詢呼叫返回後立即到達)。另一方面,第二種配置的訊息延遲不會超過 50 毫秒。區別在於第二種選項需要一個執行緒等待。然而,結果是它能夠更快地響應到達的訊息。這種技術被稱為“長輪詢”,可以用來模擬輪詢源上的事件驅動行為。
輪詢消費者也可以委託給 Spring 的 TaskExecutor
,如下例所示
PollingConsumer consumer = new PollingConsumer(channel, handler);
TaskExecutor taskExecutor = context.getBean("exampleExecutor", TaskExecutor.class);
consumer.setTaskExecutor(taskExecutor);
此外,PollingConsumer
有一個名為 adviceChain
的屬性。此屬性允許您指定一個 AOP 通知 List
,用於處理包括事務在內的附加橫切關注點。這些通知應用於 doPoll()
方法周圍。有關更深入的資訊,請參閱端點名稱空間支援下的 AOP 通知鏈和事務支援部分。另請參閱 @Poller
註解的 Javadoc 以及相關的訊息註解支援部分。Java DSL 也提供了 .poller()
端點配置選項及其相應的 Pollers
工廠。
前面的示例展示了依賴查詢。但是,請記住這些消費者通常被配置為 Spring bean 定義。實際上,Spring Integration 還提供了一個名為 ConsumerEndpointFactoryBean
的 FactoryBean
,它根據通道型別建立適當的消費者型別。此外,Spring Integration 具有完整的 XML 名稱空間支援,可以進一步隱藏這些細節。本指南在介紹每種元件型別時都會重點介紹基於名稱空間的配置。
許多 MessageHandler 實現可以生成回覆訊息。如前所述,與接收訊息相比,傳送訊息微不足道。然而,何時傳送以及傳送多少回覆訊息取決於處理程式型別。例如,聚合器會等待一定數量的訊息到達,並且通常被配置為分割器的下游消費者,分割器可以為它處理的每條訊息生成多個回覆。使用名稱空間配置時,您不需要嚴格瞭解所有細節。但是,瞭解其中一些元件共享一個公共基類 AbstractReplyProducingMessageHandler ,並且它提供了一個 setOutputChannel(..) 方法,可能仍然值得。 |
端點名稱空間支援
在本參考手冊中,您可以找到端點元素的具體配置示例,例如路由器、轉換器、服務啟用器等。其中大多數支援 input-channel
屬性,許多支援 output-channel
屬性。解析後,這些端點元素會生成 PollingConsumer
或 EventDrivenConsumer
的例項,具體取決於引用的 input-channel
型別:分別是 PollableChannel
或 SubscribableChannel
。當通道是可輪詢的時,輪詢行為基於端點元素的 poller
子元素及其屬性。
以下列出了 poller
的所有可用配置選項
<int:poller cron="" (1)
default="false" (2)
error-channel="" (3)
fixed-delay="" (4)
fixed-rate="" (5)
initial-delay="" (6)
id="" (7)
max-messages-per-poll="" (8)
receive-timeout="" (9)
ref="" (10)
task-executor="" (11)
time-unit="MILLISECONDS" (12)
trigger=""> (13)
<int:advice-chain /> (14)
<int:transactional /> (15)
</int:poller>
1 | 提供了使用 Cron 表示式配置輪詢器的能力。底層實現使用了 org.springframework.scheduling.support.CronTrigger 。如果設定了此屬性,則不能指定以下任何屬性:fixed-delay 、trigger 、fixed-rate 和 ref 。 |
2 | 透過將此屬性設定為 true ,您可以定義恰好一個全域性預設輪詢器。如果在應用上下文中定義了多個預設輪詢器,則會引發異常。連線到 PollableChannel (PollingConsumer )的任何端點或任何未顯式配置輪詢器的 SourcePollingChannelAdapter 都將使用全域性預設輪詢器。它預設為 false 。可選。 |
3 | 指定當此輪詢器呼叫發生故障時傳送錯誤訊息的通道。要完全抑制異常,您可以提供對 nullChannel 的引用。可選。 |
4 | 固定延遲觸發器底層使用了 PeriodicTrigger 。數值以 time-unit 為單位,或可以採用持續時間格式(從 6.2 版本開始),例如 PT10S 、P1D 。如果設定了此屬性,則不能指定以下任何屬性:fixed-rate 、trigger 、cron 和 ref 。 |
5 | 固定速率觸發器底層使用了 PeriodicTrigger 。數值以 time-unit 為單位,或可以採用持續時間格式(從 6.2 版本開始),例如 PT10S 、P1D 。如果設定了此屬性,則不能指定以下任何屬性:fixed-delay 、trigger 、cron 和 ref 。 |
6 | PeriodicTrigger 的初始延遲(從 6.2 版本開始)。數值以 time-unit 為單位,或可以採用持續時間格式,例如 PT10S 、P1D 。 |
7 | 指向輪詢器底層 bean 定義的 ID,其型別為 org.springframework.integration.scheduling.PollerMetadata 。頂級輪詢器元素必須指定 id 屬性,除非它是預設輪詢器(default="true" )。 |
8 | 有關更多資訊,請參閱配置入站通道介面卡。如果未指定,預設值取決於上下文。如果您使用 PollingConsumer ,此屬性預設為 -1 。然而,如果您使用 SourcePollingChannelAdapter ,max-messages-per-poll 屬性預設為 1 。可選。 |
9 | 該值設定在底層類 PollerMetadata 上。如果未指定,預設為 1000(毫秒)。可選。 |
10 | 對另一個頂級輪詢器的 bean 引用。頂級 poller 元素上不得出現 ref 屬性。然而,如果設定了此屬性,則不能指定以下任何屬性:fixed-rate 、trigger 、cron 和 fixed-delay 。 |
11 | 提供了引用自定義任務執行器的能力。有關更多資訊,請參閱TaskExecutor 支援。可選。 |
12 | 此屬性指定了底層 org.springframework.scheduling.support.PeriodicTrigger 上的 java.util.concurrent.TimeUnit 列舉值。因此,此屬性只能與 fixed-delay 或 fixed-rate 屬性結合使用。如果與 cron 或 trigger 引用屬性結合使用,則會導致失敗。PeriodicTrigger 支援的最小粒度是毫秒。因此,唯一可用的選項是毫秒和秒。如果未提供此值,則任何 fixed-delay 或 fixed-rate 值都將被解釋為毫秒。基本上,此列舉提供了秒級間隔觸發器值的便利。對於按小時、按天和按月的設定,我們建議使用 cron 觸發器。 |
13 | 對實現了 org.springframework.scheduling.Trigger 介面的任何 Spring 配置的 bean 的引用。然而,如果設定了此屬性,則不能指定以下任何屬性:fixed-delay 、fixed-rate 、cron 和 ref 。可選。 |
14 | 允許指定額外的 AOP 通知來處理附加的橫切關注點。有關更多資訊,請參閱事務。可選。 |
15 | 輪詢器可以設定為事務性的。有關更多資訊,請參閱AOP 通知鏈。可選。 |
示例
一個簡單的、基於間隔的輪詢器,間隔為 1 秒,可以按如下方式配置
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller fixed-rate="1000"/>
</int:transformer>
作為使用 fixed-rate
屬性的替代方案,您也可以使用 fixed-delay
屬性。
對於基於 Cron 表示式的輪詢器,請改用 cron
屬性,如下例所示
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller cron="*/10 * * * * MON-FRI"/>
</int:transformer>
如果輸入通道是 PollableChannel
,則需要配置輪詢器。具體來說,如前所述,trigger
是 PollingConsumer
類的一個必需屬性。因此,如果您遺漏輪詢消費者端點配置的 poller
子元素,可能會丟擲異常。如果您嘗試在連線到非可輪詢通道的元素上配置輪詢器,也可能會丟擲異常。
也可以建立頂級輪詢器,在這種情況下只需要 ref
屬性,如下例所示
<int:poller id="weekdayPoller" cron="*/10 * * * * MON-FRI"/>
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller ref="weekdayPoller"/>
</int:transformer>
ref 屬性僅允許在內部輪詢器定義上使用。在頂級輪詢器上定義此屬性會導致在應用上下文初始化期間丟擲配置異常。 |
全域性預設輪詢器
為了進一步簡化配置,您可以定義一個全域性預設輪詢器。XML DSL 中的單個頂級輪詢器元件可以將 default
屬性設定為 true
。對於 Java 配置,在這種情況下必須宣告一個名稱為 PollerMetadata.DEFAULT_POLLER
的 PollerMetadata
bean。在這種情況下,同一 ApplicationContext
中定義的任何輸入通道為 PollableChannel
且未顯式配置 poller
的端點都會使用該預設設定。以下示例展示了這樣一個輪詢器以及使用它的轉換器
-
Java DSL
-
Java
-
Kotlin DSL
-
XML
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(5);
pollerMetadata.setTrigger(new PeriodicTrigger(3000));
return pollerMetadata;
}
// No 'poller' attribute because there is a default global poller
@Bean
public IntegrationFlow transformFlow(MyTransformer transformer) {
return IntegrationFlow.from(MessageChannels.queue("pollable"))
.transform(transformer) // No 'poller' attribute because there is a default global poller
.channel("output")
.get();
}
@Bean(PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(5);
pollerMetadata.setTrigger(new PeriodicTrigger(3000));
return pollerMetadata;
}
@Bean
public QueueChannel pollable() {
return new QueueChannel();
}
// No 'poller' attribute because there is a default global poller
@Transformer(inputChannel = "pollable", outputChannel = "output")
public Object transform(Object payload) {
...
}
@Bean(PollerMetadata.DEFAULT_POLLER)
fun defaultPoller() =
PollerMetadata()
.also {
it.maxMessagesPerPoll = 5
it.trigger = PeriodicTrigger(3000)
}
@Bean
fun convertFlow() =
integrationFlow(MessageChannels.queue("pollable")) {
transform(transformer) // No 'poller' attribute because there is a default global poller
channel("output")
}
<int:poller id="defaultPoller" default="true" max-messages-per-poll="5" fixed-delay="3000"/>
<!-- No <poller/> sub-element is necessary, because there is a default -->
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output"/>
事務支援
Spring Integration 還為輪詢器提供事務支援,以便每個接收並轉發操作都可以作為一個原子工作單元執行。要為輪詢器配置事務,請新增 <transactional/>
子元素。以下示例展示了可用屬性
<int:poller fixed-delay="1000">
<int:transactional transaction-manager="txManager"
propagation="REQUIRED"
isolation="REPEATABLE_READ"
timeout="10000"
read-only="false"/>
</int:poller>
有關更多資訊,請參閱輪詢器事務支援。
AOP 通知鏈
由於 Spring 事務支援依賴於代理機制,透過 TransactionInterceptor
(AOP 通知)來處理由輪詢器(poller)啟動的訊息流的事務行為,因此有時您必須提供額外的通知(advice)來處理與輪詢器相關的其他橫切行為。為此,poller
定義了一個 advice-chain
元素,允許您新增更多實現了 MethodInterceptor
介面的類作為通知。以下示例展示瞭如何為 poller
定義一個 advice-chain
。
<int:service-activator id="advicedSa" input-channel="goodInputWithAdvice" ref="testBean"
method="good" output-channel="output">
<int:poller max-messages-per-poll="1" fixed-rate="10000">
<int:advice-chain>
<ref bean="adviceA" />
<beans:bean class="org.something.SampleAdvice" />
<ref bean="txAdvice" />
</int:advice-chain>
</int:poller>
</int:service-activator>
有關如何實現 MethodInterceptor
介面的更多資訊,請參閱 Spring Framework Reference Guide 的 AOP 部分。通知鏈也可以應用於沒有任何事務配置的輪詢器,從而讓您增強由輪詢器啟動的訊息流的行為。
使用通知鏈時,不能指定 <transactional/> 子元素。取而代之的是,宣告一個 <tx:advice/> bean 並將其新增到 <advice-chain/> 中。完整的配置詳情請參見 輪詢器事務支援。 |
TaskExecutor 支援
輪詢執行緒可以由 Spring 的 TaskExecutor
抽象的任何例項執行。這使得端點或端點組能夠實現併發。自 Spring 3.0 起,核心 Spring Framework 提供了一個 task
名稱空間,其 <executor/>
元素支援建立簡單的執行緒池執行器。該元素接受常見併發設定的屬性,例如 pool-size 和 queue-capacity。配置一個執行緒池執行器可以顯著影響端點在負載下的效能。這些設定對每個端點都可用,因為端點的效能是需要考慮的主要因素之一(另一個主要因素是端點訂閱的通道的預期訊息量)。為了啟用 XML 名稱空間支援配置的輪詢端點的併發性,在其 <poller/>
元素上提供 task-executor
引用,然後提供以下示例中顯示的一個或多個屬性。
<int:poller task-executor="pool" fixed-rate="1000"/>
<task:executor id="pool"
pool-size="5-25"
queue-capacity="20"
keep-alive="120"/>
如果您不提供任務執行器,消費者的處理程式將在呼叫者的執行緒中呼叫。請注意,呼叫者通常是預設的 TaskScheduler
(參見 配置 Task Scheduler)。您還應該記住,task-executor
屬性可以透過指定 bean 名稱來引用 Spring 的 TaskExecutor
介面的任何實現。前面顯示的 executor
元素是為了方便而提供的。
正如前面在輪詢消費者背景部分提到的,您也可以配置輪詢消費者,使其模擬事件驅動行為。透過較長的接收超時時間和較短的觸發器間隔,即使是在輪詢的訊息源上,您也可以確保對到達訊息作出非常及時的反應。請注意,這僅適用於具有阻塞等待呼叫並帶超時的源。例如,檔案輪詢器不阻塞。每次 receive()
呼叫會立即返回,要麼包含新檔案,要麼不包含。因此,即使輪詢器包含較長的 receive-timeout
,該值在這種情況下也永遠不會被使用。另一方面,當使用 Spring Integration 自有的基於佇列的通道時,超時值確實有機會發揮作用。以下示例展示了輪詢消費者如何幾乎即時地接收訊息。
<int:service-activator input-channel="someQueueChannel"
output-channel="output">
<int:poller receive-timeout="30000" fixed-rate="10"/>
</int:service-activator>
使用這種方法不會產生太多開銷,因為在內部,它不過是一個定時等待執行緒,它並不像(例如)一個無休止的忙等(thrashing, infinite while loop)那樣需要大量 CPU 資源。
執行時改變輪詢速率
當使用 fixed-delay
或 fixed-rate
屬性配置輪詢器時,預設實現使用 PeriodicTrigger
例項。PeriodicTrigger
是核心 Spring Framework 的一部分。它只接受間隔作為建構函式引數。因此,它不能在執行時改變。
但是,您可以定義自己的 org.springframework.scheduling.Trigger
介面實現。您甚至可以使用 PeriodicTrigger
作為起點。然後您可以為間隔(週期)新增一個 setter,或者您甚至可以在觸發器本身中嵌入自己的節流邏輯。period
屬性在每次呼叫 nextExecutionTime
時用於安排下一次輪詢。要在輪詢器中使用此自定義觸發器,請在您的應用程式上下文中宣告自定義觸發器的 bean 定義,並使用 trigger
屬性將依賴項注入到您的輪詢器配置中,該屬性引用自定義觸發器 bean 例項。現在,您可以獲取觸發器 bean 的引用,並在輪詢之間更改輪詢間隔。
有關示例,請參閱 Spring Integration Samples 專案。它包含一個名為 dynamic-poller
的示例,該示例使用自定義觸發器並演示了在執行時更改輪詢間隔的能力。
該示例提供了一個實現 org.springframework.scheduling.Trigger
介面的自定義觸發器。該示例的觸發器基於 Spring 的 PeriodicTrigger
實現。然而,自定義觸發器的欄位不是 final 的,並且屬性具有顯式的 getter 和 setter,允許您在執行時動態更改輪詢週期。
但是,重要的是要注意,由於 Trigger 方法是 nextExecutionTime() ,因此對動態觸發器的任何更改都不會立即生效,而是基於現有配置在下一次輪詢時才會生效。不可能強制觸發器在其當前配置的下一次執行時間之前觸發。 |
載荷型別轉換
在本參考手冊中,您還可以看到各種端點的特定配置和實現示例,這些端點接受訊息或任意 Object
作為輸入引數。對於 Object
的情況,此類引數被對映到訊息載荷或載荷的一部分或訊息頭(當使用 Spring 表示式語言時)。但是,端點方法的輸入引數型別有時與載荷或其部分的型別不匹配。在這種情況下,我們需要執行型別轉換。Spring Integration 提供了一種便捷的方式來在其自身的名為 integrationConversionService
的轉換服務 bean 例項中註冊型別轉換器(透過使用 Spring 的 ConversionService
)。一旦透過 Spring Integration 基礎設施定義了第一個轉換器,就會自動建立該 bean。要註冊轉換器,您可以實現 org.springframework.core.convert.converter.Converter
、org.springframework.core.convert.converter.GenericConverter
或 org.springframework.core.convert.converter.ConverterFactory
。
Converter
實現是最簡單的,用於從單一型別轉換為另一種型別。為了更復雜的需求,例如轉換為類層次結構,您可以實現 GenericConverter
,並且可能實現 ConditionalConverter
。這些介面讓您可以完全訪問 from
和 to
型別描述符,從而實現複雜的轉換。例如,如果您有一個名為 Something
的抽象類作為您轉換的目標(引數型別、通道資料型別等),並且有兩個具體實現 Thing1
和 Thing
,您希望根據輸入型別轉換為其中一個,那麼 GenericConverter
將是一個很好的選擇。有關更多資訊,請參閱這些介面的 Javadoc。
實現轉換器後,您可以使用便捷的名稱空間支援註冊它,如下例所示:
<int:converter ref="sampleConverter"/>
<bean id="sampleConverter" class="foo.bar.TestConverter"/>
或者,您可以使用內部 bean,如下例所示:
<int:converter>
<bean class="o.s.i.config.xml.ConverterParserTests$TestConverter3"/>
</int:converter>
從 Spring Integration 4.0 開始,您可以使用註解來建立上述配置,如下例所示:
@Component
@IntegrationConverter
public class TestConverter implements Converter<Boolean, Number> {
public Number convert(Boolean source) {
return source ? 1 : 0;
}
}
或者,您可以使用 @Configuration
註解,如下例所示:
@Configuration
@EnableIntegration
public class ContextConfiguration {
@Bean
@IntegrationConverter
public SerializingConverter serializingConverter() {
return new SerializingConverter();
}
}
配置應用程式上下文時,Spring Framework 允許您新增一個 相比之下, 但是,如果您確實希望將 Spring 的
在這種情況下, |
內容型別轉換
從 5.0 版本開始,預設情況下,方法呼叫機制基於 org.springframework.messaging.handler.invocation.InvocableHandlerMethod
基礎設施。它的 HandlerMethodArgumentResolver
實現(如 PayloadArgumentResolver
和 MessageMethodArgumentResolver
)可以使用 MessageConverter
抽象將傳入的 payload
轉換為目標方法引數型別。轉換可以基於 contentType
訊息頭。為此,Spring Integration 提供了 ConfigurableCompositeMessageConverter
,它會將委託給一個註冊的轉換器列表,直到其中一個返回非空結果。預設情況下,此轉換器提供(按嚴格順序):
-
如果在 classpath 中存在 Jackson 處理器,則為
MappingJackson2MessageConverter
。
有關它們的用途和適當的 contentType
轉換值,請參閱 Javadoc(連結在前述列表中)。使用 ConfigurableCompositeMessageConverter
是因為它可以提供任何其他 MessageConverter
實現,包括或不包括前面提到的預設轉換器。它也可以在應用程式上下文中註冊為適當的 bean,覆蓋預設轉換器,如下例所示:
@Bean(name = IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
public ConfigurableCompositeMessageConverter compositeMessageConverter() {
List<MessageConverter> converters =
Arrays.asList(new MarshallingMessageConverter(jaxb2Marshaller()),
new JavaSerializationMessageConverter());
return new ConfigurableCompositeMessageConverter(converters);
}
這兩個新的轉換器在複合轉換器中註冊,並且位於預設轉換器之前。您也可以不使用 ConfigurableCompositeMessageConverter
,而是透過註冊一個名為 integrationArgumentResolverMessageConverter
(透過設定 IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME
屬性)的 bean 來提供您自己的 MessageConverter
。
基於 MessageConverter 的(包括 contentType 頭)轉換在使用 SpEL 方法呼叫時不可用。在這種情況下,只有上面在 載荷型別轉換 中提到的常規類到類轉換可用。 |
非同步輪詢
如果您希望輪詢是非同步的,輪詢器可以選擇指定一個 task-executor
屬性,該屬性指向任何現有的 TaskExecutor
bean 例項(Spring 3.0 透過 task
名稱空間提供便捷的名稱空間配置)。但是,在配置帶有 TaskExecutor
的輪詢器時,您必須瞭解一些事項。
問題在於存在兩種配置:輪詢器和 TaskExecutor
。它們必須相互協調。否則,您可能會導致人為的記憶體洩漏。
考慮以下配置:
<int:channel id="publishChannel">
<int:queue />
</int:channel>
<int:service-activator input-channel="publishChannel" ref="myService">
<int:poller receive-timeout="5000" task-executor="taskExecutor" fixed-rate="50" />
</int:service-activator>
<task:executor id="taskExecutor" pool-size="20" />
上述配置展示了一種不協調的配置。
預設情況下,任務執行器具有無界任務佇列。輪詢器會不斷排程新任務,即使所有執行緒都在阻塞,等待新訊息到達或超時過期。考慮到有 20 個執行緒執行任務,超時時間為五秒,它們的執行速率為每秒 4 次。但是,新任務的排程速率為每秒 20 次,因此任務執行器的內部佇列以每秒 16 次的速度增長(當程序空閒時),所以我們存在記憶體洩漏。
處理這個問題的一種方法是設定任務執行器的 queue-capacity
屬性。即使是 0 也是一個合理的值。您也可以透過設定 Task Executor 的 rejection-policy
屬性(例如,設定為 DISCARD
)來管理如何處理無法排隊的消息。換句話說,在配置 TaskExecutor
時,您必須理解某些細節。有關此主題的更多詳細資訊,請參閱 Spring 參考手冊中的“任務執行和排程”。
端點內部 Bean
許多端點是複合 Bean。這包括所有消費者和所有輪詢入站通道介面卡。消費者(輪詢或事件驅動)委託給 MessageHandler
。輪詢介面卡透過委託給 MessageSource
獲取訊息。通常,獲取委託 bean 的引用很有用,也許是為了在執行時更改配置或用於測試。這些 bean 可以透過眾所周知的名稱從 ApplicationContext
中獲取。MessageHandler
例項在應用程式上下文中註冊的 bean ID 類似於 someConsumer.handler
(其中 'consumer' 是端點 id
屬性的值)。MessageSource
例項註冊的 bean ID 類似於 somePolledAdapter.source
,其中 'somePolledAdapter' 是介面卡的 ID。
上述內容僅適用於框架元件本身。您可以改用內部 bean 定義,如下例所示:
<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
output-channel = "outChannel" method="foo">
<beans:bean class="org.foo.ExampleServiceActivator"/>
</int:service-activator>
該 bean 被視為宣告的任何內部 bean,並且未在應用程式上下文中註冊。如果您希望以其他方式訪問此 bean,請將其宣告在頂層並提供一個 id
,然後改用 ref
屬性。有關更多資訊,請參閱 Spring 文件。