訊息端點
本章的第一部分涵蓋了一些背景理論,並揭示了驅動 Spring Integration 各種訊息元件的底層 API 的許多細節。如果您想真正理解幕後發生的事情,這些資訊可能會有所幫助。但是,如果您想透過簡化的基於名稱空間的配置來快速啟動和執行各種元素,請隨意跳到 端點名稱空間支援。
如概述中所述,訊息端點負責將各種訊息元件連線到通道。在接下來的幾章中,我們將介紹許多不同的訊息消費元件。其中一些還能夠傳送回覆訊息。傳送訊息非常簡單。如前所述的 訊息通道,您可以將訊息傳送到訊息通道。但是,接收訊息則稍微複雜一些。主要原因是消費者有兩種型別:輪詢消費者 和 事件驅動消費者。
在這兩者中,事件驅動消費者要簡單得多。它們無需管理和排程單獨的輪詢執行緒,本質上是帶有回撥方法的監聽器。當連線到 Spring Integration 的可訂閱訊息通道時,這個簡單的選項非常有效。但是,當連線到緩衝、可輪詢訊息通道時,某些元件必須排程和管理輪詢執行緒。Spring Integration 提供了兩種不同的端點實現來適應這兩種型別的消費者。因此,消費者本身只需要實現回撥介面。當需要輪詢時,端點充當消費者例項的容器。其好處類似於使用容器來託管訊息驅動 bean,但由於這些消費者是執行在 ApplicationContext 中的 Spring 管理物件,因此它更類似於 Spring 自己的 MessageListener 容器。
訊息處理器
Spring Integration 的 MessageHandler 介面由框架中的許多元件實現。換句話說,這不是公共 API 的一部分,您通常不會直接實現 MessageHandler。然而,它被訊息消費者用於實際處理所消費的訊息,因此瞭解這個策略介面有助於理解消費者的整體角色。該介面定義如下
public interface MessageHandler {
void handleMessage(Message<?> message);
}
儘管其簡單,但此介面為後續章節中介紹的大多陣列件(路由器、轉換器、拆分器、聚合器、服務啟用器等)奠定了基礎。這些元件對它們處理的訊息執行的功能截然不同,但實際接收訊息的要求是相同的,並且輪詢和事件驅動行為的選擇也是相同的。Spring Integration 提供了兩種端點實現,它們託管這些基於回撥的處理器,並允許它們連線到訊息通道。
事件驅動消費者
由於事件驅動消費者端點更簡單,我們首先介紹它。您可能還記得 SubscribableChannel 介面提供了一個 subscribe() 方法,並且該方法接受一個 MessageHandler 引數(如 SubscribableChannel 中所示)。以下清單顯示了 subscribe 方法的定義
subscribableChannel.subscribe(messageHandler);
由於訂閱到通道的處理器不需要主動輪詢該通道,因此這是一個事件驅動的消費者,Spring Integration 提供的實現接受一個 SubscribableChannel 和一個 MessageHandler,如下例所示
SubscribableChannel channel = context.getBean("subscribableChannel", SubscribableChannel.class);
EventDrivenConsumer consumer = new EventDrivenConsumer(channel, exampleHandler);
輪詢消費者
Spring Integration 也提供了一個 PollingConsumer,它的例項化方式相同,只是通道必須實現 PollableChannel,如下例所示
PollableChannel channel = context.getBean("pollableChannel", PollableChannel.class);
PollingConsumer consumer = new PollingConsumer(channel, exampleHandler);
輪詢消費者還有許多其他配置選項。以下示例顯示如何設定觸發器
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setTrigger(new PeriodicTrigger(Duration.ofSeconds(30)));
PeriodicTrigger 通常使用簡單的間隔(Duration)定義,但也支援 initialDelay 屬性和布林 fixedRate 屬性(預設值為 false,即沒有固定延遲)。以下示例設定了這兩個屬性
PeriodicTrigger trigger = new PeriodicTrigger(Duration.ofSeconds(1));
trigger.setInitialDelay(Duration.ofSeconds(5));
trigger.setFixedRate(true);
前例中三項設定的結果是一個觸發器,它等待五秒鐘,然後每秒觸發一次。
CronTrigger 需要一個有效的 cron 表示式。有關詳細資訊,請參閱 Javadoc。以下示例設定了一個新的 CronTrigger
CronTrigger trigger = new CronTrigger("*/10 * * * * MON-FRI");
上一個示例中定義的觸發器的結果是一個在週一到週五每十秒觸發一次的觸發器。
輪詢端點的預設觸發器是一個 PeriodicTrigger 例項,具有 1 秒的固定延遲週期。 |
除了觸發器,您還可以指定另外兩個與輪詢相關的配置屬性:maxMessagesPerPoll 和 receiveTimeout。以下示例展示瞭如何設定這兩個屬性
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setMaxMessagesPerPoll(10);
consumer.setReceiveTimeout(5000);
maxMessagesPerPoll 屬性指定在給定輪詢操作中接收的最大訊息數。這意味著輪詢器會繼續呼叫 receive() 而不等待,直到返回 null 或達到最大值。例如,如果一個輪詢器有一個十秒間隔的觸發器和 maxMessagesPerPoll 設定為 25,並且它正在輪詢一個佇列中有 100 條訊息的通道,那麼所有 100 條訊息可以在 40 秒內檢索。它獲取 25 條,等待十秒,獲取接下來的 25 條,依此類推。如果 maxMessagesPerPoll 配置為負值,則在單個輪詢週期內呼叫 MessageSource.receive() 直到它返回 null。從 5.5 版本開始,0 值具有特殊含義 - 完全跳過 MessageSource.receive() 呼叫,這可以被視為暫停此輪詢端點,直到 maxMessagesPerPoll 在以後更改為非零值,例如透過控制匯流排。
receiveTimeout 屬性指定輪詢器在呼叫接收操作時如果沒有可用訊息應該等待的時間。例如,考慮兩個表面上看起來相似但實際上完全不同的選項:第一個具有 5 秒的間隔觸發器和 50 毫秒的接收超時,而第二個具有 50 毫秒的間隔觸發器和 5 秒的接收超時。第一個選項可能比其在通道上接受的訊息晚 4950 毫秒接收訊息(如果該訊息在其輪詢呼叫返回後立即到達)。另一方面,第二個配置從不會錯過超過 50 毫秒的訊息。區別在於第二個選項需要一個執行緒等待。但是,因此,它可以更快地響應到達的訊息。這種技術,稱為“長輪詢”,可用於在輪詢源上模擬事件驅動行為。
輪詢消費者還可以委託給 Spring TaskExecutor,如下例所示
PollingConsumer consumer = new PollingConsumer(channel, handler);
TaskExecutor taskExecutor = context.getBean("exampleExecutor", TaskExecutor.class);
consumer.setTaskExecutor(taskExecutor);
此外,PollingConsumer 有一個名為 adviceChain 的屬性。此屬性允許您指定一個 AOP 增強鏈(List),用於處理包括事務在內的其他橫切關注點。這些增強應用於 doPoll() 方法。有關更深入的資訊,請參閱 端點名稱空間支援 下的 AOP 增強鏈和事務支援部分。另請參閱 @Poller 註解的 Javadoc 和相應的 訊息註解支援 部分。Java DSL 還提供了 .poller() 端點配置選項及其相應的 Pollers 工廠。
前面的示例展示了依賴查詢。但是,請記住,這些消費者通常被配置為 Spring bean 定義。事實上,Spring Integration 還提供了一個名為 ConsumerEndpointFactoryBean 的 FactoryBean,它根據通道型別建立適當的消費者型別。此外,Spring Integration 具有完整的 XML 名稱空間支援,可以進一步隱藏這些細節。名稱空間配置在本指南中隨著每種元件型別的引入而出現。
許多 MessageHandler 實現可以生成回覆訊息。如前所述,傳送訊息與接收訊息相比微不足道。然而,何時以及傳送多少回覆訊息取決於處理程式型別。例如,聚合器等待一定數量的訊息到達,並且通常配置為拆分器的下游消費者,拆分器可以為它處理的每條訊息生成多個回覆。當使用名稱空間配置時,您不必嚴格瞭解所有細節。但是,仍然值得了解的是,其中幾個元件共享一個共同的基類 AbstractReplyProducingMessageHandler,並且它提供了一個 setOutputChannel(..) 方法。 |
端點名稱空間支援
在本參考手冊中,您可以找到端點元素(如路由器、轉換器、服務啟用器等)的特定配置示例。其中大多數支援 input-channel 屬性,許多支援 output-channel 屬性。解析後,這些端點元素會生成 PollingConsumer 或 EventDrivenConsumer 的例項,具體取決於所引用 input-channel 的型別:分別是 PollableChannel 或 SubscribableChannel。當通道可輪詢時,輪詢行為基於端點元素的 poller 子元素及其屬性。
以下列出了 poller 的所有可用配置選項
<int:poller cron="" (1)
default="false" (2)
error-channel="" (3)
fixed-delay="" (4)
fixed-rate="" (5)
initial-delay="" (6)
id="" (7)
max-messages-per-poll="" (8)
receive-timeout="" (9)
ref="" (10)
task-executor="" (11)
time-unit="MILLISECONDS" (12)
trigger=""> (13)
<int:advice-chain /> (14)
<int:transactional /> (15)
</int:poller>
| 1 | 透過使用 Cron 表示式配置輪詢器。底層實現使用 org.springframework.scheduling.support.CronTrigger。如果設定了此屬性,則不得指定以下任何屬性:fixed-delay、trigger、fixed-rate 和 ref。 |
| 2 | 透過將此屬性設定為 true,您可以定義一個且僅一個全域性預設輪詢器。如果在應用程式上下文中定義了多個預設輪詢器,則會丟擲異常。任何連線到 PollableChannel (PollingConsumer) 或任何未明確配置輪詢器的 SourcePollingChannelAdapter 都將使用全域性預設輪詢器。它預設為 false。可選。 |
| 3 | 如果此輪詢器呼叫發生故障,則標識錯誤訊息傳送到的通道。要完全抑制異常,您可以提供對 nullChannel 的引用。可選。 |
| 4 | 固定延遲觸發器在底層使用 PeriodicTrigger。數值以 time-unit 為單位,也可以是持續時間格式(從 6.2 版開始),例如 PT10S、P1D。如果設定了此屬性,則不得指定以下任何屬性:fixed-rate、trigger、cron 和 ref。 |
| 5 | 固定速率觸發器在底層使用 PeriodicTrigger。數值以 time-unit 為單位,也可以是持續時間格式(從 6.2 版開始),例如 PT10S、P1D。如果設定了此屬性,則不得指定以下任何屬性:fixed-delay、trigger、cron 和 ref。 |
| 6 | PeriodicTrigger 的初始延遲(從 6.2 版開始)。數值以 time-unit 為單位,也可以是持續時間格式,例如 PT10S、P1D。 |
| 7 | 指代輪詢器底層 bean 定義的 ID,該定義型別為 org.springframework.integration.scheduling.PollerMetadata。對於頂級輪詢器元素,id 屬性是必需的,除非它是預設輪詢器 (default="true")。 |
| 8 | 有關更多資訊,請參閱 配置入站通道介面卡。如果未指定,預設值取決於上下文。如果您使用 PollingConsumer,此屬性預設為 -1。但是,如果您使用 SourcePollingChannelAdapter,則 max-messages-per-poll 屬性預設為 1。可選。 |
| 9 | 值設定在底層類 PollerMetadata 上。如果未指定,預設為 1000(毫秒)。可選。 |
| 10 | 對另一個頂級輪詢器的 bean 引用。ref 屬性不能出現在頂級 poller 元素上。但是,如果設定了此屬性,則不得指定以下任何屬性:fixed-rate、trigger、cron 和 fixed-delay。 |
| 11 | 提供了引用自定義任務執行器的能力。有關更多資訊,請參閱 TaskExecutor 支援。可選。 |
| 12 | 此屬性指定底層 org.springframework.scheduling.support.PeriodicTrigger 上的 java.util.concurrent.TimeUnit 列舉值。因此,此屬性只能與 fixed-delay 或 fixed-rate 屬性結合使用。如果與 cron 或 trigger 引用屬性結合使用,則會導致失敗。PeriodicTrigger 支援的最小粒度是毫秒。因此,唯一可用的選項是毫秒和秒。如果未提供此值,任何 fixed-delay 或 fixed-rate 值都將被解釋為毫秒。基本上,此列舉為基於秒的間隔觸發器值提供了便利。對於小時、日和月設定,我們建議改用 cron 觸發器。 |
| 13 | 對實現 org.springframework.scheduling.Trigger 介面的任何 Spring 配置 bean 的引用。但是,如果設定了此屬性,則不得指定以下任何屬性:fixed-delay、fixed-rate、cron 和 ref。可選。 |
| 14 | 允許指定額外的 AOP 建議來處理額外的橫切關注點。有關更多資訊,請參閱 事務。可選。 |
| 15 | 輪詢器可以進行事務性處理。有關更多資訊,請參閱 AOP 增強鏈。可選。 |
示例
一個簡單的基於間隔的輪詢器,間隔為 1 秒,可以按如下方式配置
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller fixed-rate="1000"/>
</int:transformer>
作為使用 fixed-rate 屬性的替代方法,您也可以使用 fixed-delay 屬性。
對於基於 Cron 表示式的輪詢器,請改用 cron 屬性,如下例所示
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller cron="*/10 * * * * MON-FRI"/>
</int:transformer>
如果輸入通道是 PollableChannel,則需要輪詢器配置。具體來說,如前所述,trigger 是 PollingConsumer 類的一個必需屬性。因此,如果您在輪詢消費者端點配置中省略 poller 子元素,則可能會丟擲異常。如果您嘗試在連線到非輪詢通道的元素上配置輪詢器,也可能會丟擲異常。
也可以建立頂級輪詢器,在這種情況下,只需要一個 ref 屬性,如下例所示
<int:poller id="weekdayPoller" cron="*/10 * * * * MON-FRI"/>
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller ref="weekdayPoller"/>
</int:transformer>
ref 屬性僅允許在內部輪詢器定義上使用。在頂級輪詢器上定義此屬性會導致在應用程式上下文初始化期間丟擲配置異常。 |
全域性預設輪詢器
為了進一步簡化配置,您可以定義一個全域性預設輪詢器。XML DSL 中的單個頂級輪詢器元件可以將 default 屬性設定為 true。對於 Java 配置,在這種情況下必須宣告一個名稱為 PollerMetadata.DEFAULT_POLLER 的 PollerMetadata bean。在這種情況下,任何輸入通道為 PollableChannel 且在同一 ApplicationContext 中定義但未明確配置 poller 的端點都將使用該預設輪詢器。以下示例顯示了這樣一個輪詢器以及使用它的轉換器
-
Java DSL
-
Java
-
Kotlin DSL
-
XML
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(5);
pollerMetadata.setTrigger(new PeriodicTrigger(3000));
return pollerMetadata;
}
// No 'poller' attribute because there is a default global poller
@Bean
public IntegrationFlow transformFlow(MyTransformer transformer) {
return IntegrationFlow.from(MessageChannels.queue("pollable"))
.transform(transformer) // No 'poller' attribute because there is a default global poller
.channel("output")
.get();
}
@Bean(PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(5);
pollerMetadata.setTrigger(new PeriodicTrigger(3000));
return pollerMetadata;
}
@Bean
public QueueChannel pollable() {
return new QueueChannel();
}
// No 'poller' attribute because there is a default global poller
@Transformer(inputChannel = "pollable", outputChannel = "output")
public Object transform(Object payload) {
...
}
@Bean(PollerMetadata.DEFAULT_POLLER)
fun defaultPoller() =
PollerMetadata()
.also {
it.maxMessagesPerPoll = 5
it.trigger = PeriodicTrigger(3000)
}
@Bean
fun convertFlow() =
integrationFlow(MessageChannels.queue("pollable")) {
transform(transformer) // No 'poller' attribute because there is a default global poller
channel("output")
}
<int:poller id="defaultPoller" default="true" max-messages-per-poll="5" fixed-delay="3000"/>
<!-- No <poller/> sub-element is necessary, because there is a default -->
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output"/>
事務支援
Spring Integration 還為輪詢器提供事務支援,以便每個接收和轉發操作都可以作為原子工作單元執行。要為輪詢器配置事務,請新增 <transactional/> 子元素。以下示例顯示了可用的屬性
<int:poller fixed-delay="1000">
<int:transactional transaction-manager="txManager"
propagation="REQUIRED"
isolation="REPEATABLE_READ"
timeout="10000"
read-only="false"/>
</int:poller>
有關更多資訊,請參閱 輪詢器事務支援。
AOP 增強鏈
由於 Spring 事務支援依賴於代理機制和 TransactionInterceptor(AOP 增強)來處理由輪詢器啟動的訊息流的事務行為,因此有時您必須提供額外的增強來處理與輪詢器相關的其他橫切行為。為此,poller 定義了一個 advice-chain 元素,允許您新增更多實現 MethodInterceptor 介面的增強。以下示例顯示瞭如何為 poller 定義 advice-chain
<int:service-activator id="advicedSa" input-channel="goodInputWithAdvice" ref="testBean"
method="good" output-channel="output">
<int:poller max-messages-per-poll="1" fixed-rate="10000">
<int:advice-chain>
<ref bean="adviceA" />
<beans:bean class="org.something.SampleAdvice" />
<ref bean="txAdvice" />
</int:advice-chain>
</int:poller>
</int:service-activator>
有關如何實現 MethodInterceptor 介面的更多資訊,請參閱 Spring Framework 參考指南的 AOP 部分。增強鏈也可以應用於沒有任何事務配置的輪詢器,讓您增強由輪詢器啟動的訊息流的行為。
當使用增強鏈時,不能指定 <transactional/> 子元素。相反,宣告一個 <tx:advice/> bean 並將其新增到 <advice-chain/> 中。有關完整的配置詳細資訊,請參閱 輪詢器事務支援。 |
TaskExecutor 支援
輪詢執行緒可以由 Spring TaskExecutor 抽象的任何例項執行。這為端點或端點組啟用了併發。從 Spring 3.0 開始,核心 Spring 框架具有一個 task 名稱空間,其 <executor/> 元素支援建立簡單的執行緒池執行器。該元素接受常見併發設定的屬性,例如池大小和佇列容量。配置執行緒池執行器可以對端點在負載下的效能產生重大影響。這些設定適用於每個端點,因為端點的效能是需要考慮的主要因素之一(另一個主要因素是端點訂閱的通道上的預期流量)。要為使用 XML 名稱空間支援配置的輪詢端點啟用併發,請在其 <poller/> 元素上提供 task-executor 引用,然後提供以下示例中顯示的一個或多個屬性
<int:poller task-executor="pool" fixed-rate="1000"/>
<task:executor id="pool"
pool-size="5-25"
queue-capacity="20"
keep-alive="120"/>
如果您不提供 task-executor,則消費者的處理程式將在呼叫者的執行緒中呼叫。請注意,呼叫者通常是預設的 TaskScheduler(請參閱 配置 Task 排程器)。您還應該記住,task-executor 屬性可以透過指定 bean 名稱來提供對 Spring TaskExecutor 介面的任何實現的引用。前面顯示的 executor 元素是為了方便而提供的。
如前面 輪詢消費者背景部分 中所述,您還可以透過模擬事件驅動行為的方式配置輪詢消費者。透過長的接收超時和觸發器中短的間隔,即使在輪詢訊息源上,您也可以確保對到達的訊息做出非常及時的反應。請注意,這僅適用於具有帶超時的阻塞等待呼叫的源。例如,檔案輪詢器不會阻塞。每個 receive() 呼叫會立即返回,要麼包含新檔案,要麼不包含。因此,即使輪詢器包含一個長的 receive-timeout,該值在這種情況下也永遠不會被使用。另一方面,當使用 Spring Integration 自己的基於佇列的通道時,超時值確實有機會參與。以下示例顯示了輪詢消費者如何幾乎即時地接收訊息
<int:service-activator input-channel="someQueueChannel"
output-channel="output">
<int:poller receive-timeout="30000" fixed-rate="10"/>
</int:service-activator>
使用這種方法不會帶來太多開銷,因為它在內部不過是一個定時等待執行緒,它不需要像例如一個頻繁執行的無限 while 迴圈那樣多的 CPU 資源使用。
執行時更改輪詢速率
當使用 fixed-delay 或 fixed-rate 屬性配置輪詢器時,預設實現使用 PeriodicTrigger 例項。PeriodicTrigger 是核心 Spring Framework 的一部分。它只接受間隔作為建構函式引數。因此,它無法在執行時更改。
但是,您可以定義自己的 org.springframework.scheduling.Trigger 介面實現。您甚至可以使用 PeriodicTrigger 作為起點。然後您可以為間隔(週期)新增一個 setter,或者您甚至可以在觸發器本身中嵌入自己的節流邏輯。period 屬性在每次呼叫 nextExecutionTime 時用於排程下一次輪詢。要在輪詢器中使用此自定義觸發器,請在您的應用程式上下文中宣告自定義觸發器的 bean 定義,並使用 trigger 屬性(該屬性引用自定義觸發器 bean 例項)將依賴項注入到您的輪詢器配置中。您現在可以獲取對觸發器 bean 的引用,並在輪詢之間更改輪詢間隔。
有關示例,請參閱 Spring Integration Samples 專案。它包含一個名為 dynamic-poller 的示例,該示例使用自定義觸發器並演示了在執行時更改輪詢間隔的能力。
該示例提供了一個實現 org.springframework.scheduling.Trigger 介面的自定義觸發器。該示例的觸發器基於 Spring 的 PeriodicTrigger 實現。但是,自定義觸發器的欄位不是 final 的,並且屬性具有顯式的 getter 和 setter,允許您在執行時動態更改輪詢週期。
但是,需要注意的是,由於 Trigger 方法是 nextExecutionTime(),因此對動態觸發器的任何更改都不會立即生效,而是在下次輪詢時,基於現有配置。無法強制觸發器在其當前配置的下次執行時間之前觸發。 |
有效負載型別轉換
在本參考手冊中,您還可以看到各種端點的具體配置和實現示例,這些端點接受訊息或任何任意 Object 作為輸入引數。在 Object 的情況下,此類引數被對映到訊息有效負載或有效負載或訊息頭的一部分(當使用 Spring 表示式語言時)。但是,端點方法的輸入引數型別有時與有效負載或其部分的型別不匹配。在這種情況下,我們需要執行型別轉換。Spring Integration 提供了一種方便的方式,可以在其自身的名為 integrationConversionService 的轉換服務 bean 例項中註冊型別轉換器(透過使用 Spring ConversionService)。一旦使用 Spring Integration 基礎設施定義了第一個轉換器,該 bean 就會自動建立。要註冊轉換器,您可以實現 org.springframework.core.convert.converter.Converter、org.springframework.core.convert.converter.GenericConverter 或 org.springframework.core.convert.converter.ConverterFactory。
Converter 實現最簡單,它將一種型別轉換為另一種型別。為了更復雜,例如轉換為類層次結構,您可以實現 GenericConverter 和可能的 ConditionalConverter。它們使您可以完全訪問 from 和 to 型別描述符,從而實現複雜的轉換。例如,如果您有一個名為 Something 的抽象類作為轉換的目標(引數型別、通道資料型別等),並且您有兩個名為 Thing1 和 Thing 的具體實現,並且您希望根據輸入型別轉換為其中一個,那麼 GenericConverter 將是一個很好的選擇。有關更多資訊,請參閱這些介面的 Javadoc
當您實現轉換器後,您可以使用方便的名稱空間支援註冊它,如下例所示
<int:converter ref="sampleConverter"/>
<bean id="sampleConverter" class="foo.bar.TestConverter"/>
或者,您可以使用內部 bean,如下例所示
<int:converter>
<bean class="o.s.i.config.xml.ConverterParserTests$TestConverter3"/>
</int:converter>
從 Spring Integration 4.0 開始,您可以使用註解來建立上述配置,如下例所示
@Component
@IntegrationConverter
public class TestConverter implements Converter<Boolean, Number> {
public Number convert(Boolean source) {
return source ? 1 : 0;
}
}
或者,您可以使用 @Configuration 註解,如下例所示
@Configuration
@EnableIntegration
public class ContextConfiguration {
@Bean
@IntegrationConverter
public SerializingConverter serializingConverter() {
return new SerializingConverter();
}
}
|
在配置應用程式上下文時,Spring Framework 允許您新增一個 相反, 但是,如果您確實想使用 Spring
在這種情況下, |
內容型別轉換
從 5.0 版本開始,預設情況下,方法呼叫機制基於 org.springframework.messaging.handler.invocation.InvocableHandlerMethod 基礎設施。其 HandlerMethodArgumentResolver 實現(例如 PayloadArgumentResolver 和 MessageMethodArgumentResolver)可以使用 MessageConverter 抽象將傳入的 payload 轉換為目標方法引數型別。轉換可以基於 contentType 訊息頭。為此,Spring Integration 提供了 ConfigurableCompositeMessageConverter,它委託給一系列已註冊的轉換器,直到其中一個返回非空結果。預設情況下,此轉換器提供(嚴格按順序)
-
如果類路徑中存在 Jackson 處理器,則為
MappingJackson2MessageConverter
有關其目的和適當的 contentType 轉換值,請參閱 Javadoc(連結在前面的列表中)。使用 ConfigurableCompositeMessageConverter 是因為它可以使用任何其他 MessageConverter 實現,包括或不包括前面提到的預設轉換器。它還可以作為適當的 bean 在應用程式上下文中註冊,覆蓋預設轉換器,如下例所示
@Bean(name = IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
public ConfigurableCompositeMessageConverter compositeMessageConverter() {
List<MessageConverter> converters =
Arrays.asList(new MarshallingMessageConverter(jaxb2Marshaller()),
new JavaSerializationMessageConverter());
return new ConfigurableCompositeMessageConverter(converters);
}
這兩個新的轉換器在預設轉換器之前註冊到複合轉換器中。您也可以不使用 ConfigurableCompositeMessageConverter,而是透過註冊一個名為 integrationArgumentResolverMessageConverter 的 bean(透過設定 IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME 屬性)來提供您自己的 MessageConverter。
當使用 SpEL 方法呼叫時,基於 MessageConverter 的(包括 contentType 訊息頭)轉換不可用。在這種情況下,只有前面 有效負載型別轉換 中提到的常規類到類轉換可用。 |
非同步輪詢
如果您希望輪詢是非同步的,輪詢器可以選擇性地指定一個 task-executor 屬性,該屬性指向任何 TaskExecutor bean 的現有例項(Spring 3.0 透過 task 名稱空間提供了方便的名稱空間配置)。但是,在配置帶有 TaskExecutor 的輪詢器時,您必須瞭解某些事情。
問題在於存在兩種配置,即輪詢器和 TaskExecutor。它們必須相互協調。否則,您最終可能會造成人為的記憶體洩漏。
考慮以下配置
<int:channel id="publishChannel">
<int:queue />
</int:channel>
<int:service-activator input-channel="publishChannel" ref="myService">
<int:poller receive-timeout="5000" task-executor="taskExecutor" fixed-rate="50" />
</int:service-activator>
<task:executor id="taskExecutor" pool-size="20" />
上述配置展示了一個不協調的配置。
預設情況下,任務執行器有一個無界任務佇列。輪詢器會不斷排程新任務,即使所有執行緒都被阻塞,等待新訊息到達或超時。考慮到有 20 個執行緒執行任務,超時時間為 5 秒,它們以每秒 4 個任務的速度執行。然而,新任務以每秒 20 個任務的速度排程,因此任務執行器中的內部佇列以每秒 16 個任務的速度增長(在程序空閒時),所以我們有記憶體洩漏。
處理這種情況的一種方法是設定任務執行器的 queue-capacity 屬性。即使是 0 也是一個合理的值。您還可以透過設定 Task Executor 的 rejection-policy 屬性(例如,設定為 DISCARD)來管理無法排隊的訊息。換句話說,在配置 TaskExecutor 時,您必須瞭解某些細節。有關此主題的更多詳細資訊,請參閱 Spring 參考手冊中的 “任務執行和排程”。
端點內部 Bean
許多端點是複合 bean。這包括所有消費者和所有輪詢入站通道介面卡。消費者(輪詢或事件驅動)委託給 MessageHandler。輪詢介面卡透過委託給 MessageSource 獲取訊息。通常,獲取對委託 bean 的引用很有用,也許是為了在執行時更改配置或進行測試。這些 bean 可以透過眾所周知的名稱從 ApplicationContext 中獲取。MessageHandler 例項以類似於 someConsumer.handler(其中“consumer”是端點 id 屬性的值)的 bean ID 註冊到應用程式上下文。MessageSource 例項以類似於 somePolledAdapter.source 的 bean ID 註冊,其中“somePolledAdapter”是介面卡的 ID。
上述內容僅適用於框架元件本身。您可以改為使用內部 bean 定義,如下例所示
<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
output-channel = "outChannel" method="foo">
<beans:bean class="org.foo.ExampleServiceActivator"/>
</int:service-activator>
該 bean 被視為任何宣告的內部 bean,並且未在應用程式上下文中註冊。如果您希望以其他方式訪問此 bean,請將其宣告為頂級 bean 並帶有 id,然後改用 ref 屬性。有關更多資訊,請參閱 Spring 文件。