響應式流支援

Spring Integration 在框架的某些地方以及從不同方面為 響應式流 互動提供支援。我們將在此處討論其中大部分內容,並在必要時提供指向目標章節的適當連結以獲取詳細資訊。

前言

回顧一下,Spring Integration 擴充套件了 Spring 程式設計模型以支援眾所周知的企業整合模式。Spring Integration 支援基於 Spring 的應用程式中的輕量級訊息傳遞,並透過宣告式介面卡支援與外部系統的整合。Spring Integration 的主要目標是為構建企業整合解決方案提供一個簡單的模型,同時保持對可維護、可測試程式碼至關重要的關注點分離。此目標在目標應用程式中透過 messagechannelendpoint 等一流公民實現,這些公民允許我們構建整合流(管道),其中(在大多數情況下)一個端點將訊息生成到通道中,供另一個端點消費。透過這種方式,我們將整合互動模型與目標業務邏輯區分開來。這裡的關鍵部分是中間的通道:流行為取決於其實現,而端點保持不變。

另一方面,響應式流是具有非阻塞背壓的非同步流處理標準。響應式流的主要目標是管理跨非同步邊界(例如將元素傳遞給另一個執行緒或執行緒池)的流資料交換,同時確保接收方不會被迫緩衝任意數量的資料。換句話說,背壓是此模型不可或缺的一部分,以便允許線上程之間進行調解的佇列是有界的。響應式流實現(例如 Project Reactor)的目的是在流應用程式的整個處理圖中保留這些優點和特性。響應式流庫的最終目標是以透明和流暢的方式為目標應用程式提供型別、運算子集和支援 API,就像使用可用的程式語言結構一樣,但最終的解決方案不像正常的函式鏈呼叫那樣是命令式的。它分為兩個階段:定義和執行,執行在稍後訂閱最終的響應式釋出者期間發生,並且資料需求從定義的底部推到頂部,根據需要應用背壓——我們請求當前可以處理的儘可能多的事件。響應式應用程式看起來像一個 "stream",或者正如我們在 Spring Integration 術語中習慣的那樣——"flow"。事實上,自 Java 9 以來的 Reactive Streams SPI 在 java.util.concurrent.Flow 類中呈現。

從這裡看,當我們在端點上應用一些響應式框架運算子時,Spring Integration 流似乎非常適合編寫響應式流應用程式,但實際上問題要廣泛得多,我們需要記住並非所有端點(例如 JdbcMessageHandler)都可以透明地在響應式流中處理。當然,Spring Integration 中響應式流支援的主要目標是允許整個過程完全響應式、按需啟動並準備好背壓。在通道介面卡的目標協議和系統提供響應式流互動模型之前,這將是不可能的。在下面的部分中,我們將描述 Spring Integration 中為開發響應式應用程式同時保留整合流結構而提供的元件和方法。

Spring Integration 中的所有響應式流互動都使用 Project Reactor 型別(例如 MonoFlux)實現。

訊息閘道器

與響應式流互動的最簡單點是 @MessagingGateway,我們只需將閘道器方法的返回型別設定為 Mono<?> ——當對返回的 Mono 例項進行訂閱時,閘道器方法呼叫背後的整個整合流將執行。有關更多資訊,請參閱 Reactor Mono。框架內部對完全基於響應式流相容協議的入站閘道器也使用了類似的 Mono 回覆方法(有關更多資訊,請參閱下面的 響應式通道介面卡)。傳送和接收操作被包裝在一個 Mono.defer() 中,並在 replyChannel 頭部可用時鏈式評估回覆。這樣,特定響應式協議(例如 Netty)的入站元件將作為 Spring Integration 上執行的響應式流的訂閱者和發起者。如果請求負載是響應式型別,最好在響應式流定義中處理它,將程序推遲到發起者訂閱。為此,處理程式方法也必須返回響應式型別。有關更多資訊,請參閱下一節。

響應式回覆負載

當產生回覆的 MessageHandler 返回一個響應式型別負載作為回覆訊息時,它會以非同步方式處理,其中為 outputChannel 提供了常規的 MessageChannel 實現(async 必須設定為 true),並在輸出通道是 ReactiveStreamsSubscribableChannel 實現(例如 FluxMessageChannel)時透過按需訂閱進行扁平化。對於標準命令式 MessageChannel 用例,如果回覆負載是**多值**釋出者(有關更多資訊,請參閱 ReactiveAdapter.isMultiValue()),它將被包裝到 Mono.just() 中。結果是,Mono 必須由下游顯式訂閱或由 FluxMessageChannel 下游扁平化。對於 outputChannelReactiveStreamsSubscribableChannel,無需擔心返回型別和訂閱;一切都在框架內部順利處理。

有關更多資訊,請參閱 非同步服務啟用器

另請參閱 Kotlin 協程 以獲取更多資訊。

FluxMessageChannelReactiveStreamsConsumer

FluxMessageChannelMessageChannelPublisher<Message<?>> 的組合實現。Flux 作為熱源,在內部建立,用於從 send() 實現中彙集傳入訊息。Publisher.subscribe() 實現委託給該內部 Flux。此外,為了按需上游消費,FluxMessageChannel 提供了 ReactiveStreamsSubscribableChannel 契約的實現。為該通道提供的任何上游 Publisher(例如,參見下面的源輪詢通道介面卡和拆分器)在該通道準備好訂閱時自動訂閱。來自此委託釋出者的事件彙集到上述內部 Flux 中。

FluxMessageChannel 的消費者必須是 org.reactivestreams.Subscriber 例項,以遵循響應式流契約。幸運的是,Spring Integration 中的所有 MessageHandler 實現也實現了 Project Reactor 的 CoreSubscriber。多虧了中間的 ReactiveStreamsConsumer 實現,整個整合流配置對目標開發人員保持透明。在這種情況下,流行為從命令式推送模型變為響應式拉取模型。ReactiveStreamsConsumer 還可以用於使用 IntegrationReactiveUtils 將任何 MessageChannel 轉換為響應式源,從而使整合流部分響應式。

有關更多資訊,請參閱 FluxMessageChannel

從版本 5.5 開始,ConsumerEndpointSpec 引入了 reactive() 選項,使流中的端點成為 ReactiveStreamsConsumer,而與輸入通道無關。可選的 Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>> 可以用於透過 Flux.transform() 操作自定義輸入通道的源 Flux,例如使用 publishOn()doOnNext()retry() 等。此功能透過其 reactive() 屬性在所有訊息傳遞註解(@ServiceActivator@Splitter 等)中表示為 @Reactive 子註解。

源輪詢通道介面卡

通常,SourcePollingChannelAdapter 依賴於由 TaskScheduler 啟動的任務。輪詢觸發器是根據提供的選項構建的,用於定期排程任務以輪詢目標資料來源或事件。當 outputChannelReactiveStreamsSubscribableChannel 時,相同的 Trigger 用於確定下一次執行時間,但 SourcePollingChannelAdapter 不排程任務,而是根據 nextExecutionTime 值的 Flux.generate() 和從上一步的持續時間的 Mono.delay() 建立一個 Flux<Message<?>>。然後使用 Flux.flatMapMany() 輪詢 maxMessagesPerPoll 並將其彙集到輸出 Flux 中。此生成器 Flux 由提供的 ReactiveStreamsSubscribableChannel 訂閱,遵循下游的背壓。從版本 5.5 開始,當 maxMessagesPerPoll == 0 時,源根本不被呼叫,並且 flatMapMany() 透過 Mono.empty() 結果立即完成,直到 maxMessagesPerPoll 在稍後時間(例如透過控制匯流排)更改為非零值。這樣,任何 MessageSource 實現都可以轉換為響應式熱源。

有關更多資訊,請參閱 輪詢消費者

事件驅動通道介面卡

MessageProducerSupport 是事件驅動通道介面卡的基類,通常,其 sendMessage(Message<?>) 用作生產驅動 API 中的監聽器回撥。當訊息生產者實現構建訊息的 Flux 而不是基於監聽器的功能時,此回撥也可以輕鬆插入 doOnNext() Reactor 運算子。事實上,當訊息生產者的 outputChannel 不是 ReactiveStreamsSubscribableChannel 時,框架中就是這樣做的。然而,為了改善終端使用者體驗並允許更多準備好背壓的功能,MessageProducerSupport 提供了一個 subscribeToPublisher(Publisher<? extends Message<?>>) API,供目標實現使用,當 Publisher<Message<?>>> 是目標系統的資料來源時。通常,它在 doStart() 實現中用於呼叫目標驅動 API 以獲取源資料的 Publisher。建議將響應式 MessageProducerSupport 實現與 FluxMessageChannel 作為 outputChannel 結合使用,以實現按需訂閱和下游事件消費。當取消對 Publisher 的訂閱時,通道介面卡進入停止狀態。對此通道介面卡呼叫 stop() 會完成從源 Publisher 的生產。通道介面卡可以重新啟動,並自動訂閱到新建立的源 Publisher

訊息源到響應式流

從版本 5.3 開始,提供了 ReactiveMessageSourceProducer。它結合了提供的 MessageSource 和事件驅動的生產到配置的 outputChannel 中。它內部將 MessageSource 包裝到重複重新訂閱的 Mono 中,生成一個 Flux<Message<?>>,用於訂閱到上述的 subscribeToPublisher(Publisher<? extends Message<?>>)。此 Mono 的訂閱使用 Schedulers.boundedElastic() 完成,以避免目標 MessageSource 中可能發生的阻塞。當訊息源返回 null(沒有資料可拉取)時,Mono 將進入 repeatWhenEmpty() 狀態,並根據訂閱者上下文中的 IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY Duration 條目進行後續重新訂閱的 delay。預設情況下,它是 1 秒。如果 MessageSource 在訊息頭中產生帶有 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 資訊的訊息,則在原始 MonodoOnSuccess() 中確認(如果需要),並在下游流丟擲帶有要拒絕的失敗訊息的 MessagingException 時在 doOnError() 中拒絕。此 ReactiveMessageSourceProducer 可用於任何需要將輪詢通道介面卡的功能轉換為任何現有 MessageSource<?> 實現的響應式、按需解決方案的用例。

拆分器和聚合器

AbstractMessageSplitter 獲得一個 Publisher 來執行其邏輯時,該過程會自然地遍歷 Publisher 中的專案,將它們對映到訊息,以便傳送到 outputChannel。如果此通道是 ReactiveStreamsSubscribableChannel,則 PublisherFlux 包裝器將按需從該通道訂閱,並且此拆分器行為更像一個 flatMap Reactor 運算子,我們將傳入事件對映到多值輸出 Publisher。當整個整合流在拆分器之前和之後都使用 FluxMessageChannel 構建時,這將最有意義,從而使 Spring Integration 配置與響應式流要求及其事件處理運算子保持一致。對於常規通道,Publisher 將轉換為 Iterable,用於標準的迭代和生產拆分邏輯。

FluxAggregatorMessageHandler 是特定響應式流邏輯實現的另一個示例,可以將其視為 Project Reactor 中的 "reactive operator"。它基於 Flux.groupBy()Flux.window()(或 buffer())運算子。傳入的訊息匯入在建立 FluxAggregatorMessageHandler 時啟動的 Flux.create() 中,使其成為熱源。此 FluxReactiveStreamsSubscribableChannel 按需訂閱,或者當 outputChannel 不是響應式時,直接在 FluxAggregatorMessageHandler.start() 中訂閱。當整個整合流在此元件之前和之後都使用 FluxMessageChannel 構建時,此 MessageHandler 發揮其作用,使整個邏輯準備好背壓。

有關更多資訊,請參閱 流和 Flux 拆分Flux 聚合器

Java DSL

Java DSL 中的 IntegrationFlow 可以從任何 Publisher 例項開始(請參閱 IntegrationFlow.from(Publisher<Message<T>>))。此外,透過 IntegrationFlowBuilder.toReactivePublisher() 運算子,IntegrationFlow 可以轉換為響應式熱源。在這兩種情況下,內部都使用 FluxMessageChannel;它可以根據其 ReactiveStreamsSubscribableChannel 契約訂閱入站 Publisher,並且它本身是下游訂閱者的 Publisher<Message<?>>。透過動態 IntegrationFlow 註冊,我們可以實現強大的邏輯,將響應式流與此整合流結合起來,與 Publisher 進行橋接。

從版本 5.5.6 開始,提供了 toReactivePublisher(boolean autoStartOnSubscribe) 運算子變體,用於控制返回的 Publisher<Message<?>> 背後的整個 IntegrationFlow 的生命週期。通常,響應式釋出者的訂閱和消費發生在稍後的執行時階段,而不是在響應式流組合期間,甚至不是在 ApplicationContext 啟動期間。為了避免在 Publisher<Message<?>> 訂閱點上進行 IntegrationFlow 生命週期管理的樣板程式碼,併為了更好的終端使用者體驗,引入了帶有 autoStartOnSubscribe 標誌的新運算子。它將(如果為 trueIntegrationFlow 及其元件標記為 autoStartup = false,因此 ApplicationContext 不會自動啟動流中訊息的生產和消費。相反,IntegrationFlowstart() 是從內部 Flux.doOnSubscribe() 啟動的。無論 autoStartOnSubscribe 值如何,流都從 Flux.doOnCancel()Flux.doOnTerminate() 停止——如果沒有任何東西可以消費訊息,那麼生產訊息就沒有意義。

對於完全相反的用例,當 IntegrationFlow 應該呼叫響應式流並在完成後繼續時,在 IntegrationFlowDefinition 中提供了 fluxTransform() 運算子。此時的流將轉換為 FluxMessageChannel,該通道傳播到提供的 fluxFunction,在 Flux.transform() 運算子中執行。函式的結果被包裝到 Mono<Message<?>> 中,用於扁平化到輸出 Flux 中,該 Flux 由另一個 FluxMessageChannel 訂閱以進行下游流。

有關更多資訊,請參閱 Java DSL 章​​節

ReactiveMessageHandler

從版本 5.3 開始,框架原生支援 ReactiveMessageHandler。此型別的訊息處理程式專為響應式客戶端設計,這些客戶端返回響應式型別以按需訂閱低階操作執行,並且不提供任何回覆資料以繼續響應式流組合。當 ReactiveMessageHandler 在命令式整合流中使用時,handleMessage() 結果在返回後立即訂閱,因為此類流中沒有響應式流組合來遵守背壓。在這種情況下,框架將此 ReactiveMessageHandler 包裝到 ReactiveMessageHandlerAdapter 中——一個 MessageHandler 的普通實現。然而,當 ReactiveStreamsConsumer 參與流時(例如,當要消費的通道是 FluxMessageChannel 時),此 ReactiveMessageHandler 將與整個響應式流結合,使用 flatMap() Reactor 運算子,以在消費期間遵守背壓。

一個開箱即用的 ReactiveMessageHandler 實現是用於出站通道介面卡的 ReactiveMongoDbStoringMessageHandler。有關更多資訊,請參閱 MongoDB 響應式通道介面卡

從版本 6.1 開始,IntegrationFlowDefinition 公開了一個方便的 handleReactive(ReactiveMessageHandler) 終端運算子。任何 ReactiveMessageHandler 實現(即使是使用 Mono API 的簡單 lambda)都可以用於此運算子。框架會自動訂閱返回的 Mono<Void>。以下是此運算子可能配置的簡單示例

@Bean
public IntegrationFlow wireTapFlow1() {
    return IntegrationFlow.from("tappedChannel1")
            .wireTap("tapChannel", wt -> wt.selector(m -> m.getPayload().equals("foo")))
            .handleReactive((message) -> Mono.just(message).log().then());
}

此運算子的過載版本接受 Consumer<GenericEndpointSpec<ReactiveMessageHandlerAdapter>> 來定製圍繞提供的 ReactiveMessageHandler 的消費者端點。

此外,還提供了基於 ReactiveMessageHandlerSpec 的變體。在大多數情況下,它們用於特定協議的通道介面卡實現。請參閱下一節,其中包含指向具有相應響應式通道介面卡的目標技術的連結。

響應式通道介面卡

當整合目標協議提供響應式流解決方案時,在 Spring Integration 中實現通道介面卡將變得非常簡單。

入站、事件驅動的通道介面卡實現是將請求(如有必要)包裝到延遲的 MonoFlux 中,並且僅當協議元件啟動對監聽器方法返回的 Mono 的訂閱時才執行 send(並生成回覆,如有)。透過這種方式,我們將響應式流解決方案精確地封裝在此元件中。當然,訂閱到輸出通道的下游整合流應遵循響應式流規範,並以按需、背壓就緒的方式執行。

這並非總是可用的,因為整合流中使用的 MessageHandler 處理器本身的性質(或當前實現)所限。當沒有響應式實現時,可以使用執行緒池和佇列或 FluxMessageChannel(參見上文)在整合端點之前和之後處理此限制。

響應式**事件驅動**入站通道介面卡示例

public class CustomReactiveMessageProducer extends MessageProducerSupport {

    private final CustomReactiveSource customReactiveSource;

    public CustomReactiveMessageProducer(CustomReactiveSource customReactiveSource) {
        Assert.notNull(customReactiveSource, "'customReactiveSource' must not be null");
        this.customReactiveSource = customReactiveSource;
    }

    @Override
    protected void doStart() {
        Flux<Message<?>> messageFlux =
            this.customReactiveSource
                .map(event - >
                    MessageBuilder
                    .withPayload(event.getBody())
                    .setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
                    .build());

        subscribeToPublisher(messageFlux);
    }
}

用法如下

public class MainFlow {
  @Autowired
  private CustomReactiveMessageProducer customReactiveMessageProducer;

  @Bean
  public IntegrationFlow buildFlow() {
     return IntegrationFlow.from(customReactiveMessageProducer)
        .channel(outputChannel)
        .get();
  }
}

或者以宣告的方式

public class MainFlow {
  @Bean
  public IntegrationFlow buildFlow() {
     return IntegrationFlow.from(new CustomReactiveMessageProducer(new CustomReactiveSource()))
        .handle(outputChannel)
        .get();
  }
}

或者甚至不使用通道介面卡,我們始終可以透過以下方式使用 Java DSL

public class MainFlow {
  @Bean
  public IntegrationFlow buildFlow() {
    Flux<Message<?>> myFlux = this.customReactiveSource
                .map(event ->
                    MessageBuilder
                    .withPayload(event.getBody())
                    .setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
                    .build());
     return IntegrationFlow.from(myFlux)
        .handle(outputChannel)
        .get();
  }
}

響應式出站通道介面卡實現是關於啟動(或繼續)響應式流,以根據目標協議提供的響應式 API 與外部系統互動。入站有效負載本身可以是響應式型別,也可以是整個整合流的事件,該事件是頂部響應式流的一部分。如果我們在單向、即發即棄場景中,則返回的響應式型別可以立即訂閱,或者它向下遊傳播(請求-回覆場景),以用於進一步的整合流或目標業務邏輯中的顯式訂閱,但仍保留響應式流語義。

響應式出站通道介面卡示例

public class CustomReactiveMessageHandler extends AbstractReactiveMessageHandler {

    private final CustomEntityOperations customEntityOperations;

    public CustomReactiveMessageHandler(CustomEntityOperations customEntityOperations) {
        Assert.notNull(customEntityOperations, "'customEntityOperations' must not be null");
        this.customEntityOperations = customEntityOperations;
    }

    @Override
    protected Mono<Void> handleMessageInternal(Message<?> message) {
        return Mono.fromSupplier(() -> message.getHeaders().get("queryType", Type.class))
                .flatMap(mode -> {
                    switch (mode) {
                        case INSERT:
                            return handleInsert(message);
                        case UPDATE:
                            return handleUpdate(message);
                        default:
                            return Mono.error(new IllegalArgumentException());
                    }
                }).then();
    }

    private Mono<Void> handleInsert(Message<?> message) {
        return this.customEntityOperations.insert(message.getPayload())
                .then();
    }

    private Mono<Void> handleUpdate(Message<?> message) {
        return this.r2dbcEntityOperations.update(message.getPayload())
                .then();
    }

    public enum Type {
        INSERT,
        UPDATE,
    }
}

我們將能夠使用這兩個通道介面卡

public class MainFlow {

  @Autowired
  private CustomReactiveMessageProducer customReactiveMessageProducer;

  @Autowired
  private CustomReactiveMessageHandler customReactiveMessageHandler;

  @Bean
  public IntegrationFlow buildFlow() {
     return IntegrationFlow.from(customReactiveMessageProducer)
        .transform(someOperation)
        .handle(customReactiveMessageHandler)
        .get();
  }
}

目前,Spring Integration 為 WebFluxRSocketMongoDbR2DBCZeroMQGraphQLApache Cassandra 提供了通道介面卡(或閘道器)實現。Redis Stream 通道介面卡 也是響應式的,並使用 Spring Data 的 ReactiveStreamOperations。更多響應式通道介面卡正在開發中,例如用於 Kafka 中的 Apache Kafka,基於 Spring for Apache KafkaReactiveKafkaProducerTemplateReactiveKafkaConsumerTemplate 等。對於許多其他非響應式通道介面卡,建議使用執行緒池,以避免在響應式流處理期間阻塞。

響應式到命令式上下文傳播

Context Propagation 庫位於類路徑上時,Project Reactor 可以獲取 ThreadLocal 值(例如 Micrometer ObservationSecurityContextHolder)並將它們儲存到 Subscriber 上下文中。反向操作也是可能的,當我們為了跟蹤需要填充日誌 MDC 或讓從響應式流呼叫的服務從作用域恢復觀察時。有關其用於上下文傳播的特殊運算子的更多資訊,請參閱 Project Reactor 文件。儲存和恢復上下文工作順利,如果我們的整個解決方案是單個響應式流組合,因為 Subscriber 上下文從下游一直到組合的開始(FluxMono)都是可見的。但是,如果應用程式在不同的 Flux 例項之間切換或切換到命令式處理再切換回來,則繫結到 Subscriber 的上下文可能不可用。對於這種情況,Spring Integration 提供了額外的功能(從版本 6.0.5 開始),將 Reactor ContextView 儲存到從響應式流生成的 IntegrationMessageHeaderAccessor.REACTOR_CONTEXT 訊息頭中,例如當我們執行直接 send() 操作時。然後,此頭在 FluxMessageChannel.subscribeTo() 中用於恢復此通道將發出的 Message 的 Reactor 上下文。目前,此頭由 WebFluxInboundEndpointRSocketInboundGateway 元件填充,但可用於執行響應式到命令式整合的任何解決方案中。填充此頭的邏輯如下

return requestMono
        .flatMap((message) ->
                Mono.deferContextual((context) ->
                        Mono.just(message)
                                .handle((messageToSend, sink) ->
                                        send(messageWithReactorContextIfAny(messageToSend, context)))));
...

private Message<?> messageWithReactorContextIfAny(Message<?> message, ContextView context) {
    if (!context.isEmpty()) {
        return getMessageBuilderFactory()
                .fromMessage(message)
                .setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, context)
                .build();
    }
    return message;
}

請注意,我們仍然需要使用 handle() 運算子來使 Reactor 從上下文中恢復 ThreadLocal 值。即使它作為頭髮送,框架也無法假設它是否會恢復到下游的 ThreadLocal 值。

為了從另一個 FluxMono 組合中的 Message 恢復上下文,可以執行此邏輯

Mono.just(message)
        .handle((messageToHandle, sink) -> ...)
        .contextWrite(StaticMessageHeaderAccessor.getReactorContext(message)));
© . This site is unofficial and not affiliated with VMware.