訊息通道實現
Spring Integration 提供了不同的訊息通道實現。以下各節簡要描述了每種實現。
PublishSubscribeChannel
PublishSubscribeChannel 實現將其收到的任何 Message 廣播給所有訂閱的處理程式。這最常用於傳送事件訊息,其主要作用是通知(與文件訊息相反,文件訊息通常旨在由單個處理程式處理)。請注意,PublishSubscribeChannel 僅用於傳送。由於當其 send(Message) 方法被呼叫時,它直接廣播給其訂閱者,因此消費者無法輪詢訊息(它不實現 PollableChannel,因此沒有 receive() 方法)。相反,任何訂閱者本身都必須是 MessageHandler,並且訂閱者的 handleMessage(Message) 方法將依次被呼叫。
在 3.0 版本之前,對沒有訂閱者的 PublishSubscribeChannel 呼叫 send 方法會返回 false。與 MessagingTemplate 結合使用時,會丟擲 MessageDeliveryException。從 3.0 版本開始,行為已更改為:如果至少存在最小數量的訂閱者(並且成功處理了訊息),則 send 始終被視為成功。此行為可以透過設定 minSubscribers 屬性來修改,該屬性預設為 0。
如果您使用 TaskExecutor,則僅使用正確數量訂閱者的存在來確定,因為訊息的實際處理是非同步執行的。 |
QueueChannel
QueueChannel 實現封裝了一個佇列。與 PublishSubscribeChannel 不同,QueueChannel 具有點對點語義。換句話說,即使通道有多個消費者,也只有一個消費者應該接收發送到該通道的任何 Message。它提供了一個預設的無引數建構函式(提供一個本質上無限容量的 Integer.MAX_VALUE),以及一個接受佇列容量的建構函式,如下所示:
public QueueChannel(int capacity)
尚未達到容量限制的通道將其訊息儲存在其內部佇列中,並且 send(Message<?>) 方法會立即返回,即使沒有接收器準備好處理訊息。如果佇列已達到容量,則傳送方會阻塞,直到佇列中有可用空間。或者,如果您使用帶有附加超時引數的 send 方法,則佇列會阻塞,直到有可用空間或超時時間結束(以先發生者為準)。同樣,如果佇列中有訊息可用,則 receive() 呼叫會立即返回;但是,如果佇列為空,則 receive 呼叫可能會阻塞,直到有訊息可用或超時(如果提供)結束。在任何一種情況下,都可以透過傳遞超時值 0 來強制立即返回,無論佇列的狀態如何。但請注意,不帶 timeout 引數的 send() 和 receive() 版本會無限期阻塞。
PriorityChannel
QueueChannel 強制先進先出(FIFO)排序,而 PriorityChannel 是另一種實現,它允許訊息根據優先順序在通道內排序。預設情況下,優先順序由每條訊息中的 priority 訊息頭確定。但是,對於自定義優先順序確定邏輯,可以將型別為 Comparator<Message<?>> 的比較器提供給 PriorityChannel 建構函式。
RendezvousChannel
RendezvousChannel 實現了“直接交接”場景,即傳送方會阻塞,直到另一方呼叫通道的 receive() 方法。另一方會阻塞,直到傳送方傳送訊息。在內部,此實現與 QueueChannel 非常相似,只是它使用 SynchronousQueue(一個容量為零的 BlockingQueue 實現)。這在傳送方和接收方在不同執行緒中操作但不適合非同步將訊息放入佇列的情況下非常有效。換句話說,使用 RendezvousChannel,傳送方知道某個接收方已接受了訊息,而使用 QueueChannel,訊息會儲存到內部佇列中,並且可能永遠不會被接收。
請記住,所有這些基於佇列的通道預設情況下都只在記憶體中儲存訊息。當需要持久化時,您可以在“queue”元素中提供一個“message-store”屬性來引用一個持久化的 MessageStore 實現,或者您可以用一個由持久化代理支援的通道替換本地通道,例如 JMS 支援的通道或通道介面卡。後一種選項允許您利用任何 JMS 提供程式的實現來持久化訊息,如 JMS 支援 中所述。然而,當不需要在佇列中進行緩衝時,最簡單的方法是依賴下一節中討論的 DirectChannel。 |
RendezvousChannel 也可用於實現請求-回覆操作。傳送方可以建立一個臨時的、匿名的 RendezvousChannel 例項,然後在構建 Message 時將其設定為“replyChannel”頭。傳送該 Message 後,傳送方可以立即呼叫 receive(可選地提供一個超時值),以便在等待回覆 Message 時阻塞。這與許多 Spring Integration 的請求-回覆元件內部使用的實現非常相似。
DirectChannel
DirectChannel 具有點對點語義,但它更類似於 PublishSubscribeChannel,而不是前面描述的任何基於佇列的通道實現。它實現 SubscribableChannel 介面而不是 PollableChannel 介面,因此它直接將訊息分派給訂閱者。然而,作為點對點通道,它與 PublishSubscribeChannel 的不同之處在於,它將每條 Message 傳送給單個訂閱的 MessageHandler。
除了是最簡單的點對點通道選項之外,其最重要的特性之一是它允許單個執行緒執行通道“兩端”的操作。例如,如果處理程式訂閱了 DirectChannel,那麼向該通道傳送 Message 會直接在傳送方的執行緒中觸發該處理程式的 handleMessage(Message) 方法的呼叫,然後 send() 方法呼叫才能返回。
提供這種行為的通道實現的關鍵動機是支援必須跨通道的事務,同時仍然受益於通道提供的抽象和松耦合。如果 send() 呼叫在事務範圍內被呼叫,則處理程式呼叫的結果(例如,更新資料庫記錄)在確定該事務的最終結果(提交或回滾)中起作用。
由於 DirectChannel 是最簡單的選項,並且不會增加排程和管理輪詢器執行緒所需的任何額外開銷,因此它是 Spring Integration 中的預設通道型別。一般的想法是為應用程式定義通道,考慮哪些通道需要提供緩衝或限制輸入,然後將這些通道修改為基於佇列的 PollableChannels。同樣,如果通道需要廣播訊息,它不應該是 DirectChannel,而應該是 PublishSubscribeChannel。稍後我們將展示如何配置這些通道。 |
DirectChannel 內部委託給一個訊息排程器來呼叫其訂閱的訊息處理程式,並且該排程器可以具有由 load-balancer 或 load-balancer-ref 屬性(互斥)公開的負載均衡策略。訊息排程器使用負載均衡策略來幫助確定當多個訊息處理程式訂閱同一通道時,訊息如何在訊息處理程式之間分發。為了方便,load-balancer 屬性公開了一個列舉值,指向 LoadBalancingStrategy 的預實現。round-robin(迴圈均衡處理程式)和 none(在需要顯式停用負載均衡的情況下)是僅有的可用值。其他策略實現可能會在未來版本中新增。然而,自 3.0 版本以來,您可以提供自己的 LoadBalancingStrategy 實現,並透過使用 load-balancer-ref 屬性注入它,該屬性應指向實現 LoadBalancingStrategy 的 bean,如以下示例所示:
FixedSubscriberChannel 是一種 SubscribableChannel,它只支援單個 MessageHandler 訂閱者,且該訂閱者無法取消訂閱。這在不涉及其他訂閱者且不需要通道攔截器的高吞吐量效能用例中非常有用。
<int:channel id="lbRefChannel">
<int:dispatcher load-balancer-ref="lb"/>
</int:channel>
<bean id="lb" class="foo.bar.SampleLoadBalancingStrategy"/>
請注意,load-balancer 和 load-balancer-ref 屬性是互斥的。
負載均衡還與布林型 failover 屬性結合使用。如果 failover 值為 true(預設值),當前面的處理程式丟擲異常時,排程程式會回退到任何後續處理程式(如有必要)。順序由處理程式本身定義的可選順序值決定,如果沒有此類值,則由處理程式的訂閱順序決定。
如果某些情況需要排程程式始終嘗試呼叫第一個處理程式,然後在每次發生錯誤時按相同的固定順序序列回退,則不應提供負載均衡策略。換句話說,即使沒有啟用負載均衡,排程程式仍然支援 failover 布林屬性。但是,如果沒有負載均衡,處理程式的呼叫始終從第一個開始,根據其順序。例如,當明確定義了主要、次要、第三等等時,這種方法效果很好。當使用名稱空間支援時,任何端點上的 order 屬性決定了順序。
請記住,負載均衡和 failover 僅在通道有多個訂閱訊息處理程式時才適用。當使用名稱空間支援時,這意味著多個端點共享在 input-channel 屬性中定義的相同通道引用。 |
從 5.2 版本開始,當 failover 為 true 時,當前處理程式的失敗以及失敗訊息將分別在 debug 或 info 日誌級別記錄(如果已配置)。
ExecutorChannel
ExecutorChannel 是一個點對點通道,支援與 DirectChannel 相同的排程器配置(負載均衡策略和 failover 布林屬性)。這兩種排程通道型別之間的主要區別在於 ExecutorChannel 將排程委託給 TaskExecutor 例項來執行。這意味著 send 方法通常不會阻塞,但也意味著處理程式呼叫可能不會發生在傳送方的執行緒中。因此,它不支援跨傳送方和接收處理程式的事務。
傳送方有時會阻塞。例如,當使用具有拒絕策略(例如 ThreadPoolExecutor.CallerRunsPolicy)來限制客戶端的 TaskExecutor 時,當執行緒池達到最大容量且執行器的工作佇列已滿時,傳送方的執行緒可以隨時執行該方法。由於這種情況只會在不可預測的情況下發生,因此您不應依賴它來處理事務。 |
PartitionedChannel
從 6.1 版本開始,提供了 PartitionedChannel 實現。這是 AbstractExecutorChannel 的擴充套件,表示點對點分派邏輯,其中實際消費在特定執行緒上處理,該執行緒由從傳送到此通道的訊息評估出的分割槽鍵確定。此通道與上面提到的 ExecutorChannel 類似,但不同之處在於,具有相同分割槽鍵的訊息始終在同一執行緒中處理,從而保留了順序。它不需要外部 TaskExecutor,但可以使用自定義 ThreadFactory(例如 Thread.ofVirtual().name("partition-", 0).factory())進行配置。此工廠用於為每個分割槽將單執行緒執行器填充到 MessageDispatcher 委託中。預設情況下,IntegrationMessageHeaderAccessor.CORRELATION_ID 訊息頭用作分割槽鍵。此通道可以配置為簡單的 bean:
@Bean
PartitionedChannel somePartitionedChannel() {
return new PartitionedChannel(3, (message) -> message.getHeaders().get("partitionKey"));
}
該通道將有 3 個分割槽 - 專用執行緒;將使用 partitionKey 訊息頭來確定訊息將在哪個分割槽中處理。有關更多資訊,請參閱 PartitionedChannel 類的 Javadoc。
FluxMessageChannel
FluxMessageChannel 是 org.reactivestreams.Publisher 的實現,用於將傳送的訊息“下沉”到內部 reactor.core.publisher.Flux 中,以便下游的響應式訂閱者按需消費。此通道實現既不是 SubscribableChannel,也不是 PollableChannel,因此只有 org.reactivestreams.Subscriber 例項才能用於從該通道消費,並遵循響應式流的背壓特性。另一方面,FluxMessageChannel 實現了 ReactiveStreamsSubscribableChannel,其 subscribeTo(Publisher<Message<?>>) 契約允許從響應式源釋出者接收事件,將響應式流橋接到整合流中。為了實現整個整合流的完全響應式行為,這樣的通道必須放置在流中的所有端點之間。
有關與響應式流互動的更多資訊,請參閱 響應式流支援。
作用域通道
Spring Integration 1.0 提供了 ThreadLocalChannel 實現,但在 2.0 中已將其移除。現在,處理相同要求的更通用方法是向通道新增 scope 屬性。該屬性的值可以是上下文中可用的作用域的名稱。例如,在 Web 環境中,某些作用域是可用的,並且任何自定義作用域實現都可以註冊到上下文中。以下示例展示了將執行緒區域性作用域應用於通道,包括作用域本身的註冊:
<int:channel id="threadScopedChannel" scope="thread">
<int:queue />
</int:channel>
<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
<property name="scopes">
<map>
<entry key="thread" value="org.springframework.context.support.SimpleThreadScope" />
</map>
</property>
</bean>
上例中定義的通道在內部也委託給一個佇列,但該通道繫結到當前執行緒,因此佇列的內容也同樣繫結。這樣,傳送到通道的執行緒以後可以接收相同的訊息,但其他執行緒無法訪問它們。雖然執行緒作用域通道很少需要,但它們在以下情況下可能很有用:當使用 DirectChannel 例項來強制單執行緒操作,但任何回覆訊息都應傳送到“終端”通道時。如果該終端通道是執行緒作用域的,則原始傳送執行緒可以從終端通道收集其回覆。
現在,由於任何通道都可以具有作用域,您可以除了執行緒區域性作用域之外定義自己的作用域。