Reactive Streams 支援

Spring Integration 在框架的某些地方以及從不同的方面提供了對 Reactive Streams 互動的支援。我們將在這裡討論其中的大部分內容,並在必要時提供指向目標章節的適當連結以獲取詳細資訊。

前言

回顧一下,Spring Integration 擴充套件了 Spring 程式設計模型,以支援眾所周知的企業整合模式(Enterprise Integration Patterns)。Spring Integration 使得在基於 Spring 的應用程式中進行輕量級訊息傳遞成為可能,並透過宣告式介面卡支援與外部系統的整合。Spring Integration 的主要目標是提供一個簡單的模型來構建企業整合解決方案,同時保持實現可維護、可測試程式碼所需的關注點分離。這個目標透過在目標應用程式中使用第一類公民(例如 messagechannelendpoint)來實現,這些公民允許我們構建一個整合流(管道),在其中(大多數情況下)一個端點將訊息生成到通道中,供另一個端點消費。透過這種方式,我們區分了整合互動模型與目標業務邏輯。這裡的關鍵部分是中間的通道:流的行為取決於其實現,而端點保持不變。

另一方面,Reactive Streams 是一個帶有非阻塞背壓的非同步流處理標準。Reactive Streams 的主要目標是管理流資料跨非同步邊界的交換——例如將元素傳遞給另一個執行緒或執行緒池——同時確保接收端不會被迫快取任意數量的資料。換句話說,背壓是該模型不可或缺的一部分,以允許線上程之間進行調解的佇列是有界的。Reactive Streams 實現(例如 Project Reactor)的目的是在流應用程式的整個處理圖中保留這些優點和特性。Reactive Streams 庫的最終目標是為目標應用程式提供型別、一組運算子和支援 API,使其儘可能透明和順暢地利用現有程式語言結構,但最終解決方案並不像普通函式鏈呼叫那樣是命令式的。它分為兩個階段:定義和執行,執行稍後會在訂閱最終的響應式 publisher 時發生,並且資料需求從定義的底部向上推送,根據需要應用背壓——我們請求儘可能多當前可以處理的事件。響應式應用程式看起來像一個 "流",或者像我們在 Spring Integration 術語中習慣稱呼的 "流"。事實上,Java 9 以來的 Reactive Streams SPI 在 java.util.concurrent.Flow 類中體現。

從這裡看來,Spring Integration 流確實非常適合編寫 Reactive Streams 應用程式,因為我們在端點上應用了一些響應式框架運算子,但事實上問題要廣泛得多,我們需要記住,並非所有端點(例如 JdbcMessageHandler)都可以透明地在響應式流中處理。當然,Spring Integration 對 Reactive Streams 支援的主要目標是允許整個過程完全響應式、按需啟動且支援背壓。這要等到通道介面卡的目標協議和系統提供 Reactive Streams 互動模型才能實現。在下面的章節中,我們將描述 Spring Integration 中為開發響應式應用程式並保留整合流結構提供的元件和方法。

Spring Integration 中所有的 Reactive Streams 互動都使用 Project Reactor 型別實現,例如 MonoFlux

訊息閘道器

與 Reactive Streams 互動的最簡單點是 @MessagingGateway,在這裡我們只需要將閘道器方法的返回型別設定為 Mono<?>——整個整合流在閘道器方法呼叫之後將在對返回的 Mono 例項進行訂閱時執行。有關更多資訊,請參閱 Reactor Mono。框架內部對完全基於 Reactive Streams 相容協議的入站閘道器使用了類似的 Mono 回覆方法(有關更多資訊,請參閱下面的響應式通道介面卡)。傳送-接收操作被包裝在 Mono.defer() 中,並在 replyChannel 頭部可用時鏈式地評估回覆。透過這種方式,特定響應式協議(例如 Netty)的入站元件將充當訂閱者並啟動在 Spring Integration 上執行的響應式流。如果請求載荷是響應式型別,最好在響應式流定義中處理它,將流程推遲到發起者訂閱時。為此,處理程式方法也必須返回響應式型別。有關更多資訊,請參閱下一節。

響應式回覆載荷

當生成回覆的 MessageHandler 為回覆訊息返回響應式型別載荷時,它會以非同步方式處理,併為 outputChannel 提供常規的 MessageChannel 實現(必須設定 asynctrue),並在輸出通道是 ReactiveStreamsSubscribableChannel 實現(例如 FluxMessageChannel)時按需訂閱進行展平。在使用標準的命令式 MessageChannel 的情況下,如果回覆載荷是多值 publisher(有關更多資訊,請參閱 ReactiveAdapter.isMultiValue()),它會被包裝到 Mono.just() 中。結果是,該 Mono 必須在下游顯式訂閱,或者由下游的 FluxMessageChannel 展平。如果 outputChannelReactiveStreamsSubscribableChannel,則無需擔心返回型別和訂閱;所有內容都由框架內部平滑處理。

有關更多資訊,請參閱 非同步服務啟用器

另請參閱 Kotlin Coroutines 以獲取更多資訊。

FluxMessageChannelReactiveStreamsConsumer

FluxMessageChannelMessageChannelPublisher<Message<?>> 的組合實現。一個 Flux,作為一個熱源,在內部被建立,用於接收來自 send() 實現的入站訊息。Publisher.subscribe() 的實現被委託給該內部 Flux。此外,為了按需上游消費,FluxMessageChannel 提供了 ReactiveStreamsSubscribableChannel 契約的實現。為該通道提供的任何上游 Publisher(例如,參閱下面的源輪詢通道介面卡和分割器)在該通道訂閱就緒時會自動訂閱。來自此委託 publisher 的事件被沉入上述內部 Flux 中。

FluxMessageChannel 的消費者必須是 org.reactivestreams.Subscriber 例項,以遵守 Reactive Streams 契約。幸運的是,Spring Integration 中所有的 MessageHandler 實現也都實現了 Project Reactor 的 CoreSubscriber。並且由於中間的 ReactiveStreamsConsumer 實現,整個整合流配置對目標開發者來說是透明的。在這種情況下,流的行為從命令式推模型變為響應式拉模型。ReactiveStreamsConsumer 還可以用於使用 IntegrationReactiveUtils 將任何 MessageChannel 轉換為響應式源,從而使整合流部分響應式化。

有關更多資訊,請參閱 FluxMessageChannel

從版本 5.5 開始,ConsumerEndpointSpec 引入了一個 reactive() 選項,使得流中的端點成為 ReactiveStreamsConsumer,而與輸入通道無關。可選的 Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>> 可以提供,以便透過 Flux.transform() 操作(例如使用 publishOn()doOnNext()retry() 等)自定義輸入通道的源 Flux。此功能透過所有訊息傳遞註解(@ServiceActivator@Splitter 等)的 reactive() 屬性表示為 @Reactive 子註解。

源輪詢通道介面卡

通常,SourcePollingChannelAdapter 依賴於由 TaskScheduler 啟動的任務。根據提供的選項構建輪詢觸發器,用於定期排程任務以輪詢目標資料或事件源。當 outputChannelReactiveStreamsSubscribableChannel 時,使用相同的 Trigger 來確定下一次執行時間,但不是排程任務,而是 SourcePollingChannelAdapter 基於 Flux.generate() 建立一個 Flux<Message<?>>,用於 nextExecutionTime 值,並基於前一步驟的持續時間使用 Mono.delay()。然後使用 Flux.flatMapMany() 輪詢 maxMessagesPerPoll 並將它們沉入輸出 Flux 中。此生成器 Flux 由提供的 ReactiveStreamsSubscribableChannel 訂閱,遵循下游的背壓。從版本 5.5 開始,當 maxMessagesPerPoll == 0 時,源根本不會被呼叫,並且 flatMapMany() 透過 Mono.empty() 結果立即完成,直到 maxMessagesPerPoll 稍後更改為非零值,例如透過控制匯流排(Control Bus)。透過這種方式,任何 MessageSource 實現都可以轉換為響應式熱源。

有關更多資訊,請參閱 輪詢消費者

事件驅動通道介面卡

MessageProducerSupport 是事件驅動通道介面卡的基類,通常其 sendMessage(Message<?>) 用作生成驅動程式 API 中的監聽器回撥。當訊息生產者實現構建一個訊息 Flux 而不是基於監聽器的功能時,此回撥也可以輕鬆地插入到 doOnNext() Reactor 運算子中。事實上,當訊息生產者的 outputChannel 不是 ReactiveStreamsSubscribableChannel 時,框架就會這樣做。然而,為了更好的終端使用者體驗,並允許更多支援背壓的功能,MessageProducerSupport 提供了一個 subscribeToPublisher(Publisher<? extends Message<?>>) API,供目標實現用於當 Publisher<Message<?>>> 是目標系統的資料來源時。通常,它用於 doStart() 實現中,當呼叫目標驅動程式 API 以獲取源資料的 Publisher 時。建議將響應式 MessageProducerSupport 實現與 FluxMessageChannel 作為 outputChannel 結合使用,以實現按需訂閱和下游事件消費。當對 Publisher 的訂閱被取消時,通道介面卡進入停止狀態。對此通道介面卡呼叫 stop() 會停止源 Publisher 的生產。通道介面卡可以透過自動訂閱新建立的源 Publisher 來重新啟動。

從 Message Source 到 Reactive Streams

從版本 5.3 開始,提供了 ReactiveMessageSourceProducer。它是提供的 MessageSource 和到配置的 outputChannel 的事件驅動生產的組合。在內部,它將 MessageSource 包裝到反覆重新訂閱的 Mono 中,生成一個 Flux<Message<?>>,用於在上述 subscribeToPublisher(Publisher<? extends Message<?>>) 中訂閱。對此 Mono 的訂閱是使用 Schedulers.boundedElastic() 完成的,以避免目標 MessageSource 中可能的阻塞。當訊息源返回 null(沒有資料可拉取)時,Mono 會進入 repeatWhenEmpty() 狀態,並根據訂閱者上下文中的 IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY Duration 條目延遲進行後續的重新訂閱。預設情況下,延遲為 1 秒。如果 MessageSource 在頭部生成包含 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 資訊的 message,則在原始 MonodoOnSuccess() 中(如果需要)進行確認,並且如果下游流丟擲帶有失敗 message 的 MessagingException 需要拒絕,則在 doOnError() 中進行拒絕。此 ReactiveMessageSourceProducer 可用於任何需要將輪詢通道介面卡的功能轉換為任何現有 MessageSource<?> 實現的響應式、按需解決方案的用例。

分割器與聚合器

AbstractMessageSplitter 收到一個 Publisher 作為其邏輯時,流程會自然地遍歷 Publisher 中的項,將它們對映到訊息併發送到 outputChannel。如果此通道是 ReactiveStreamsSubscribableChannel,則該通道會按需訂閱 Publisher 的 Flux 包裝器,並且此分割器行為更像是一個 flatMap Reactor 運算子,當我們把入站事件對映到多值輸出 Publisher 時。這在整個整合流由分割器前後的 FluxMessageChannel 構建時最有意義,這樣可以將 Spring Integration 配置與 Reactive Streams 的要求及其事件處理運算子對齊。對於常規通道,Publisher 會轉換為 Iterable,用於標準的迭代和生產分割邏輯。

FluxAggregatorMessageHandler 是特定 Reactive Streams 邏輯實現的另一個示例,可以被視為 Project Reactor 中的 "響應式運算子"。它基於 Flux.groupBy()Flux.window()(或 buffer())運算子。當 FluxAggregatorMessageHandler 建立時,入站訊息會沉入一個啟動的 Flux.create() 中,使其成為一個熱源。該 Flux 由 ReactiveStreamsSubscribableChannel 按需訂閱,或者當 outputChannel 不是響應式時直接在 FluxAggregatorMessageHandler.start() 中訂閱。當整個整合流由該元件前後的 FluxMessageChannel 構建時,此 MessageHandler 具有其強大之處,使整個邏輯支援背壓。

有關更多資訊,請參閱 Stream 和 Flux 分割Flux 聚合器

Java DSL

Java DSL 中的 IntegrationFlow 可以從任何 Publisher 例項開始(請參閱 IntegrationFlow.from(Publisher<Message<T>>))。此外,使用 IntegrationFlowBuilder.toReactivePublisher() 運算子,IntegrationFlow 可以轉換為響應式熱源。在這兩種情況下,內部都使用 FluxMessageChannel;它可以根據其 ReactiveStreamsSubscribableChannel 契約訂閱入站 Publisher,並且它本身就是一個 Publisher<Message<?>>,供下游訂閱者使用。透過動態 IntegrationFlow 註冊,我們可以實現將 Reactive Streams 與此整合流連線到/來自 Publisher 的強大邏輯。

從版本 5.5.6 開始,引入了一個 toReactivePublisher(boolean autoStartOnSubscribe) 運算子變體,用於控制返回的 Publisher<Message<?>> 背後的整個 IntegrationFlow 的生命週期。通常,對響應式 publisher 的訂閱和消費發生在後期的執行時階段,而不是在響應式流組合期間,甚至不是在 ApplicationContext 啟動期間。為了避免在 Publisher<Message<?>> 訂閱點對 IntegrationFlow 進行生命週期管理的樣板程式碼,併為了更好的終端使用者體驗,引入了這個帶有 autoStartOnSubscribe 標誌的新運算子。如果設定為 true,它會將 IntegrationFlow 及其元件標記為 autoStartup = false,這樣 ApplicationContext 就不會自動啟動流中的訊息生產和消費。相反,IntegrationFlowstart() 是從內部的 Flux.doOnSubscribe() 啟動的。無論 autoStartOnSubscribe 的值如何,流都會從 Flux.doOnCancel()Flux.doOnTerminate() 停止——如果沒有任何東西可以消費訊息,生產訊息就沒有意義。

對於完全相反的用例,當 IntegrationFlow 需要呼叫響應式流並在完成後繼續時,IntegrationFlowDefinition 中提供了 fluxTransform() 運算子。此時的流被轉換為 FluxMessageChannel,該通道被傳播到提供的 fluxFunction 中,並在 Flux.transform() 運算子中執行。函式的返回值被包裝在 Mono<Message<?>> 中,用於平鋪對映到輸出 Flux,該 Flux 由另一個 FluxMessageChannel 訂閱,以進行下游流。

有關更多資訊,請參閱 Java DSL 章節

ReactiveMessageHandler

從版本 5.3 開始,框架原生支援 ReactiveMessageHandler。這種型別的訊息處理器專為響應式客戶端設計,它返回一個響應式型別,用於按需訂閱以執行低級別操作,並且不提供任何回覆資料來繼續響應式流組合。當在命令式整合流中使用 ReactiveMessageHandler 時,handleMessage() 的結果會在返回後立即訂閱,僅僅因為在此類流中沒有響應式流組合來遵守背壓。在這種情況下,框架將此 ReactiveMessageHandler 包裝到 ReactiveMessageHandlerAdapter 中——一個簡單的 MessageHandler 實現。然而,當流中包含 ReactiveStreamsConsumer 時(例如,當消費的通道是 FluxMessageChannel 時),此類 ReactiveMessageHandler 會與整個響應式流一起組合,使用 flatMap() Reactor 運算子,以在消費期間遵守背壓。

開箱即用的 ReactiveMessageHandler 實現之一是用於出站通道介面卡的 ReactiveMongoDbStoringMessageHandler。有關更多資訊,請參閱 MongoDB 響應式通道介面卡

從版本 6.1 開始,IntegrationFlowDefinition 暴露了一個方便的 handleReactive(ReactiveMessageHandler) 終端運算子。任何 ReactiveMessageHandler 實現(甚至僅僅是一個使用 Mono API 的簡單 lambda 表示式)都可以用於此運算子。框架會自動訂閱返回的 Mono<Void>。以下是此運算子可能配置的一個簡單示例:

@Bean
public IntegrationFlow wireTapFlow1() {
    return IntegrationFlow.from("tappedChannel1")
            .wireTap("tapChannel", wt -> wt.selector(m -> m.getPayload().equals("foo")))
            .handleReactive((message) -> Mono.just(message).log().then());
}

此運算子的過載版本接受 Consumer<GenericEndpointSpec<ReactiveMessageHandlerAdapter>>,以自定義圍繞提供的 ReactiveMessageHandler 的消費者端點。

此外,還提供了基於 ReactiveMessageHandlerSpec 的變體。在大多數情況下,它們用於特定協議的通道介面卡實現。請參閱下一節,其中包含指向目標技術的連結及其相應的響應式通道介面卡。

響應式通道介面卡

當整合目標協議提供 Reactive Streams 解決方案時,在 Spring Integration 中實現通道介面卡就變得簡單直接。

入站、事件驅動的通道介面卡實現是關於(如有必要)將請求包裝成一個延遲的 MonoFlux,並且僅當協議元件發起對監聽器方法返回的 Mono 的訂閱時,才執行傳送(並生成回覆,如果需要)。透過這種方式,我們將響應式流解決方案精確地封裝在這個元件中。當然,下游整合流在輸出通道上訂閱時應遵循 Reactive Streams 規範,並以按需、支援背壓的方式執行。

這並不總是透過整合流中使用的 MessageHandler 處理器本身的性質(或當前實現)可用。當沒有響應式實現時,可以使用執行緒池和佇列或 FluxMessageChannel(見上文),在整合端點之前和之後處理此限制。

一個響應式事件驅動入站通道介面卡的示例

public class CustomReactiveMessageProducer extends MessageProducerSupport {

    private final CustomReactiveSource customReactiveSource;

    public CustomReactiveMessageProducer(CustomReactiveSource customReactiveSource) {
        Assert.notNull(customReactiveSource, "'customReactiveSource' must not be null");
        this.customReactiveSource = customReactiveSource;
    }

    @Override
    protected void doStart() {
        Flux<Message<?>> messageFlux =
            this.customReactiveSource
                .map(event - >
                    MessageBuilder
                    .withPayload(event.getBody())
                    .setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
                    .build());

        subscribeToPublisher(messageFlux);
    }
}

用法如下所示

public class MainFlow {
  @Autowired
  private CustomReactiveMessageProducer customReactiveMessageProducer;

  @Bean
  public IntegrationFlow buildFlow() {
     return IntegrationFlow.from(customReactiveMessageProducer)
        .channel(outputChannel)
        .get();
  }
}

或者以宣告方式

public class MainFlow {
  @Bean
  public IntegrationFlow buildFlow() {
     return IntegrationFlow.from(new CustomReactiveMessageProducer(new CustomReactiveSource()))
        .handle(outputChannel)
        .get();
  }
}

或者即使沒有通道介面卡,我們也可以始終使用 Java DSL,如下所示

public class MainFlow {
  @Bean
  public IntegrationFlow buildFlow() {
    Flux<Message<?>> myFlux = this.customReactiveSource
                .map(event ->
                    MessageBuilder
                    .withPayload(event.getBody())
                    .setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
                    .build());
     return IntegrationFlow.from(myFlux)
        .handle(outputChannel)
        .get();
  }
}

響應式出站通道介面卡實現是關於根據目標協議提供的響應式 API,啟動(或繼續)與外部系統互動的響應式流。入站載荷本身可以是響應式型別,或者作為整個整合流的一個事件,該整合流是上層響應式流的一部分。返回的響應式型別如果是單向的、即發即棄的場景,可以立即訂閱;或者,它會向下遊傳播(請求-回覆場景),以進行進一步的整合流處理或目標業務邏輯中的顯式訂閱,但仍然保持響應式流的語義向下遊傳播。

一個響應式出站通道介面卡的示例

public class CustomReactiveMessageHandler extends AbstractReactiveMessageHandler {

    private final CustomEntityOperations customEntityOperations;

    public CustomReactiveMessageHandler(CustomEntityOperations customEntityOperations) {
        Assert.notNull(customEntityOperations, "'customEntityOperations' must not be null");
        this.customEntityOperations = customEntityOperations;
    }

    @Override
    protected Mono<Void> handleMessageInternal(Message<?> message) {
        return Mono.fromSupplier(() -> message.getHeaders().get("queryType", Type.class))
                .flatMap(mode -> {
                    switch (mode) {
                        case INSERT:
                            return handleInsert(message);
                        case UPDATE:
                            return handleUpdate(message);
                        default:
                            return Mono.error(new IllegalArgumentException());
                    }
                }).then();
    }

    private Mono<Void> handleInsert(Message<?> message) {
        return this.customEntityOperations.insert(message.getPayload())
                .then();
    }

    private Mono<Void> handleUpdate(Message<?> message) {
        return this.r2dbcEntityOperations.update(message.getPayload())
                .then();
    }

    public enum Type {
        INSERT,
        UPDATE,
    }
}

我們將能夠同時使用這兩種通道介面卡

public class MainFlow {

  @Autowired
  private CustomReactiveMessageProducer customReactiveMessageProducer;

  @Autowired
  private CustomReactiveMessageHandler customReactiveMessageHandler;

  @Bean
  public IntegrationFlow buildFlow() {
     return IntegrationFlow.from(customReactiveMessageProducer)
        .transform(someOperation)
        .handle(customReactiveMessageHandler)
        .get();
  }
}

目前,Spring Integration 為 WebFlux, RSocket, MongoDb, R2DBC, ZeroMQ, GraphQL, Apache Cassandra 提供了通道介面卡(或閘道器)實現。Redis Stream 通道介面卡 也是響應式的,並使用 Spring Data 的 ReactiveStreamOperations。更多響應式通道介面卡正在開發中,例如基於 Spring for Apache KafkaReactiveKafkaProducerTemplateReactiveKafkaConsumerTemplateApache Kafka 通道介面卡等。對於許多其他非響應式通道介面卡,建議使用執行緒池以避免在響應式流處理過程中阻塞。

響應式到命令式上下文傳播

Context Propagation 庫位於類路徑上時,Project Reactor 可以獲取 ThreadLocal 值(例如 Micrometer ObservationSecurityContextHolder),並將它們儲存到 Subscriber 上下文。反向操作也是可能的,當我們需要填充日誌 MDC 以進行跟蹤,或者讓我們在響應式流中呼叫的服務從作用域中恢復觀察時。有關上下文傳播的特殊運算子的更多資訊,請參閱 Project Reactor 文件。如果我們的整個解決方案是一個單一的響應式流組合,儲存和恢復上下文可以順利進行,因為 Subscriber 上下文從下游向上遊直到組合的開始(FluxMono)都是可見的。但是,如果應用程式在不同的 Flux 例項之間切換,或者切換到命令式處理再切換回來,那麼與 Subscriber 繫結的上下文可能不可用。對於這種情況,Spring Integration 提供了一項額外功能(從版本 6.0.5 開始),可以將 Reactor ContextView 儲存到響應式流生成的 IntegrationMessageHeaderAccessor.REACTOR_CONTEXT 訊息頭中,例如當我們執行直接的 send() 操作時。然後,此訊息頭會在 FluxMessageChannel.subscribeTo() 中使用,以恢復此通道將要發出的 Message 的 Reactor 上下文。目前,此訊息頭由 WebFluxInboundEndpointRSocketInboundGateway 元件填充,但可以在執行響應式到命令式整合的任何解決方案中使用。填充此訊息頭的邏輯如下:

return requestMono
        .flatMap((message) ->
                Mono.deferContextual((context) ->
                        Mono.just(message)
                                .handle((messageToSend, sink) ->
                                        send(messageWithReactorContextIfAny(messageToSend, context)))));
...

private Message<?> messageWithReactorContextIfAny(Message<?> message, ContextView context) {
    if (!context.isEmpty()) {
        return getMessageBuilderFactory()
                .fromMessage(message)
                .setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, context)
                .build();
    }
    return message;
}

請注意,我們仍然需要使用 handle() 運算子來使 Reactor 從上下文中恢復 ThreadLocal 值。即使它作為訊息頭髮送,框架也無法假設它是否會在下游恢復到 ThreadLocal 值。

要在另一個 FluxMono 組合中從 Message 恢復上下文,可以執行以下邏輯:

Mono.just(message)
        .handle((messageToHandle, sink) -> ...)
        .contextWrite(StaticMessageHeaderAccessor.getReactorContext(message)));