訊息通道實現
Spring Integration 提供了不同的訊息通道實現。以下部分簡要描述了每種實現。
PublishSubscribeChannel
PublishSubscribeChannel
實現將其接收到的任何 Message
廣播給所有訂閱的處理器。這最常用於傳送事件訊息,其主要作用是通知(而不是文件訊息,文件訊息通常旨在由單個處理器處理)。請注意,PublishSubscribeChannel
僅用於傳送。由於它在其 send(Message)
方法被呼叫時直接廣播給其訂閱者,消費者無法輪詢訊息(它不實現 PollableChannel
,因此沒有 receive()
方法)。相反,任何訂閱者本身必須是一個 MessageHandler
,並且訂閱者的 handleMessage(Message)
方法會被依次呼叫。
在版本 3.0 之前,在沒有訂閱者的 PublishSubscribeChannel
上呼叫 send
方法會返回 false
。與 MessagingTemplate
結合使用時,會丟擲 MessageDeliveryException
。從版本 3.0 開始,行為已更改,只要存在至少最低數量的訂閱者(並且成功處理訊息),傳送就始終被視為成功。可以透過設定 minSubscribers
屬性來修改此行為,該屬性預設為 0
。
如果您使用 TaskExecutor ,僅使用正確數量的訂閱者是否存在來確定此行為,因為訊息的實際處理是非同步執行的。 |
QueueChannel
QueueChannel
實現包裝了一個佇列。與 PublishSubscribeChannel
不同,QueueChannel
具有點對點語義。換句話說,即使通道有多個消費者,傳送到該通道的任何 Message
也只應由其中一個接收。它提供了一個預設的無參建構函式(提供了實際上無限容量的 Integer.MAX_VALUE
),以及一個接受佇列容量的建構函式,如下所示
public QueueChannel(int capacity)
未達到容量限制的通道將其訊息儲存在內部佇列中,即使沒有接收方準備好處理訊息,send(Message<?>)
方法也會立即返回。如果佇列已滿,傳送方會阻塞直到佇列中有可用空間。或者,如果您使用帶額外超時引數的 send 方法,佇列會阻塞直到有可用空間或超時時間結束,取兩者先發生者。類似地,如果佇列中有可用訊息,receive()
呼叫會立即返回;但如果佇列為空,receive 呼叫可能會阻塞直到有訊息可用或超時時間(如果提供)結束。在任何一種情況下,都可以透過傳遞 0 的超時值來強制立即返回,而不管佇列的狀態如何。然而,請注意,不帶 timeout
引數的 send()
和 receive()
呼叫會無限期阻塞。
PriorityChannel
QueueChannel
強制執行先進先出(FIFO)順序,而 PriorityChannel
是另一種實現,它允許訊息根據優先順序在通道內排序。預設情況下,優先順序由每條訊息中的 priority
頭部確定。但是,對於自定義的優先順序確定邏輯,可以向 PriorityChannel
建構函式提供一個型別為 Comparator<Message<?>>
的比較器。
RendezvousChannel
RendezvousChannel
支援“直接交接”場景,其中傳送方會阻塞直到另一方呼叫通道的 receive()
方法。接收方會阻塞直到傳送方傳送訊息。在內部,此實現與 QueueChannel
非常相似,不同之處在於它使用了 SynchronousQueue
(一個容量為零的 BlockingQueue
實現)。這在傳送方和接收方在不同執行緒中操作,但不適合將訊息非同步放入佇列的情況下效果很好。換句話說,使用 RendezvousChannel
,傳送方知道有接收方已經接受了訊息,而使用 QueueChannel
,訊息會被儲存到內部佇列中,並且可能永遠不會被接收。
請記住,預設情況下,所有這些基於佇列的通道都僅將訊息儲存在記憶體中。需要持久化時,您可以在 'queue' 元素內提供 'message-store' 屬性來引用持久化的 MessageStore 實現,或者用由持久化 broker 支援的通道替換本地通道,例如 JMS 支援的通道或通道介面卡。後一種選項允許您利用任何 JMS 提供程式的訊息持久化實現,如JMS 支援中所述。但是,當不需要佇列緩衝時,最簡單的方法是依賴於下一節討論的 DirectChannel 。 |
RendezvousChannel
對於實現請求-回覆操作也很有用。傳送方可以建立一個臨時的、匿名的 RendezvousChannel
例項,然後在構建 Message
時將其設定為 'replyChannel' 頭部。傳送該 Message
後,傳送方可以立即呼叫 receive
(可選提供超時值),以便在等待回覆 Message
時阻塞。這與 Spring Integration 許多請求-回覆元件內部使用的實現非常相似。
DirectChannel
DirectChannel
具有點對點語義,但在其他方面更類似於 PublishSubscribeChannel
,而不是之前描述的任何基於佇列的通道實現。它實現了 SubscribableChannel
介面而不是 PollableChannel
介面,因此它將訊息直接分派給訂閱者。然而,作為點對點通道,它與 PublishSubscribeChannel
的區別在於它將每條 Message
傳送到一個訂閱的 MessageHandler
。
除了是最簡單的點對點通道選項外,其最重要的特性之一是它允許單個執行緒執行通道“兩端”的操作。例如,如果處理器訂閱了 DirectChannel
,那麼向該通道傳送 Message
會直接在傳送方的執行緒中觸發該處理器的 handleMessage(Message)
方法的呼叫,然後 send()
方法呼叫才能返回。
提供具有這種行為的通道實現的關鍵動機是支援必須跨通道的事務,同時仍然受益於通道提供的抽象和松耦合。如果在事務範圍內呼叫 send()
,處理程式呼叫的結果(例如,更新資料庫記錄)將影響該事務的最終結果(提交或回滾)。
由於 DirectChannel 是最簡單的選項,並且不增加排程和管理輪詢器執行緒所需的任何額外開銷,因此它是 Spring Integration 中的預設通道型別。一般的思路是為應用程式定義通道,考慮其中哪些需要提供緩衝或限制輸入,並將這些通道修改為基於佇列的 PollableChannels 。同樣,如果通道需要廣播訊息,它不應該是 DirectChannel ,而應該是 PublishSubscribeChannel 。稍後,我們將展示如何配置這些通道中的每一個。 |
DirectChannel
內部將訊息分派委託給一個訊息排程器來呼叫其訂閱的訊息處理器,該排程器可以透過 load-balancer
或 load-balancer-ref
屬性(互斥)公開負載均衡策略。當多個訊息處理器訂閱同一通道時,訊息排程器使用負載均衡策略來幫助確定如何在訊息處理器之間分發訊息。為了方便起見,load-balancer
屬性公開了一個列舉值列表,指向預先存在的 LoadBalancingStrategy
實現。round-robin
(迴圈負載均衡)和 none
(用於明確停用負載均衡的情況)是僅有的可用值。將來版本可能會新增其他策略實現。然而,自版本 3.0 起,您可以透過使用 load-balancer-ref
屬性提供自己的 LoadBalancingStrategy
實現並注入它,該屬性應指向一個實現 LoadBalancingStrategy
的 bean,如下例所示
FixedSubscriberChannel
是一個 SubscribableChannel
,它僅支援一個不可取消訂閱的 MessageHandler
訂閱者。這對於高吞吐量效能用例非常有用,當沒有其他訂閱者參與且不需要通道攔截器時。
<int:channel id="lbRefChannel">
<int:dispatcher load-balancer-ref="lb"/>
</int:channel>
<bean id="lb" class="foo.bar.SampleLoadBalancingStrategy"/>
請注意,load-balancer
和 load-balancer-ref
屬性是互斥的。
負載均衡也與布林值屬性 failover
協同工作。如果 failover
的值為 true(預設值),當先前的處理器丟擲異常時,排程器會回退到任何後續處理器(如有必要)。順序由處理器自身定義的可選順序值決定,如果不存在此值,則由處理器訂閱的順序決定。
如果特定情況要求排程器在每次發生錯誤時始終嘗試呼叫第一個處理器,然後按照相同的固定順序序列回退,則不應提供負載均衡策略。換句話說,即使未啟用負載均衡,排程器仍然支援 failover
布林屬性。然而,在沒有負載均衡的情況下,處理器的呼叫總是從第一個開始,按照它們的順序。例如,當對主要、次要、第三等有明確定義時,這種方法效果很好。使用名稱空間支援時,任何端點上的 order
屬性決定順序。
請記住,負載均衡和 failover 僅適用於通道有多個訂閱的訊息處理器的情況。使用名稱空間支援時,這意味著多個端點共享在 input-channel 屬性中定義的同一個通道引用。 |
從版本 5.2 開始,當 failover
為 true 時,當前處理器的失敗以及失敗的訊息會分別記錄在 debug
或 info
日誌級別下(如果已配置)。
ExecutorChannel
ExecutorChannel
是一個點對點通道,它支援與 DirectChannel
相同的排程器配置(負載均衡策略和 failover
布林屬性)。這兩種排程通道型別之間的關鍵區別在於,ExecutorChannel
將分派委託給一個 TaskExecutor
例項來執行。這意味著 send 方法通常不會阻塞,但也意味著處理程式的呼叫可能不會發生在傳送方的執行緒中。因此,它不支援跨越發送方和接收處理器的事務。
傳送方有時可能會阻塞。例如,當使用具有節流客戶端的拒絕策略(例如 ThreadPoolExecutor.CallerRunsPolicy )的 TaskExecutor 時,線上程池達到最大容量且執行器的amp;apos;工作佇列已滿的情況下,傳送方的執行緒隨時可以執行該方法。由於這種情況只會在不可預測的情況下發生,因此不應依賴它進行事務處理。 |
PartitionedChannel
從版本 6.1 開始,提供了 PartitionedChannel
實現。這是 AbstractExecutorChannel
的擴充套件,表示點對點分派邏輯,其中實際的消費在特定執行緒上處理,該執行緒由傳送到此通道的訊息評估出的分割槽鍵確定。此通道與上面提到的 ExecutorChannel
類似,但不同之處在於具有相同分割槽鍵的訊息始終在同一個執行緒中處理,從而保留順序。它不需要外部的 TaskExecutor
,但可以使用自定義的 ThreadFactory
進行配置(例如 Thread.ofVirtual().name("partition-", 0).factory()
)。此工廠用於為每個分割槽向 MessageDispatcher
委託填充單執行緒執行器。預設情況下,IntegrationMessageHeaderAccessor.CORRELATION_ID
訊息頭用作分割槽鍵。此通道可以配置為一個簡單的 bean
@Bean
PartitionedChannel somePartitionedChannel() {
return new PartitionedChannel(3, (message) -> message.getHeaders().get("partitionKey"));
}
該通道將有 3
個分割槽 - 專用執行緒;將使用 partitionKey
頭部來確定訊息將在哪個分割槽中處理。有關詳細資訊,請參閱 PartitionedChannel
類的 Javadocs。
FluxMessageChannel
FluxMessageChannel
是一個 org.reactivestreams.Publisher
實現,用於將傳送的訊息“沉入”內部的 reactor.core.publisher.Flux
中,供下游的響應式訂閱者按需消費。此通道實現既不是 SubscribableChannel
,也不是 PollableChannel
,因此只能使用 org.reactivestreams.Subscriber
例項從此通道消費,同時遵守響應式流的背壓特性。另一方面,FluxMessageChannel
實現了 ReactiveStreamsSubscribableChannel
介面,其 subscribeTo(Publisher<Message<?>>)
契約允許接收來自響應式源釋出者的事件,將響應式流橋接到整合流中。為了實現整個整合流的完全響應式行為,必須在流中的所有端點之間放置此類通道。
有關與 Reactive Streams 互動的更多資訊,請參閱Reactive Streams 支援。
Scoped Channel
Spring Integration 1.0 提供了一個 ThreadLocalChannel
實現,但在 2.0 版本中已刪除。現在,處理相同需求更通用的方法是向通道新增 scope
屬性。該屬性的值可以是上下文中可用的作用域名稱。例如,在 Web 環境中,某些作用域是可用的,並且任何自定義作用域實現都可以向上下文註冊。以下示例展示了將執行緒本地作用域應用於通道,包括作用域本身的註冊
<int:channel id="threadScopedChannel" scope="thread">
<int:queue />
</int:channel>
<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
<property name="scopes">
<map>
<entry key="thread" value="org.springframework.context.support.SimpleThreadScope" />
</map>
</property>
</bean>
上例中定義的通道內部也委託給一個佇列,但該通道繫結到當前執行緒,因此佇列的內容也類似地繫結。這樣,傳送訊息到通道的執行緒稍後可以接收相同的訊息,而其他執行緒則無法訪問它們。雖然執行緒作用域的通道很少需要,但在使用 DirectChannel
例項來強制執行單個執行緒操作,但任何回覆訊息應傳送到“終端”通道的情況下,它們可能會很有用。如果該終端通道是執行緒作用域的,則原始傳送執行緒可以從終端通道收集其回覆。
現在,由於任何通道都可以設定作用域,除了執行緒本地作用域外,您還可以定義自己的作用域。