RSocket 支援

RSocket Spring Integration 模組(spring-integration-rsocket)支援執行 RSocket 應用協議

您需要在專案中包含此依賴

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-rsocket</artifactId>
    <version>6.4.4</version>
</dependency>
compile "org.springframework.integration:spring-integration-rsocket:6.4.4"

此模組從版本 5.2 開始提供,並基於 Spring Messaging 基礎,其 RSocket 元件實現包括 RSocketRequesterRSocketMessageHandlerRSocketStrategies。有關 RSocket 協議、術語和元件的更多資訊,請參閱 Spring Framework RSocket 支援

在透過通道介面卡啟動整合流處理之前,我們需要在伺服器和客戶端之間建立 RSocket 連線。為此,Spring Integration RSocket 支援提供了 ServerRSocketConnectorClientRSocketConnector 作為 AbstractRSocketConnector 的實現。

ServerRSocketConnector 根據提供的 io.rsocket.transport.ServerTransport 在主機和埠上公開一個監聽器,用於接受來自客戶端的連線。內部 RSocketServer 例項可以透過 setServerConfigurer() 進行自定義,還可以配置其他選項,例如用於有效載荷資料和頭部元資料的 RSocketStrategiesMimeType。當客戶端請求者提供 setupRoute(見下面的 ClientRSocketConnector)時,連線的客戶端會以 RSocketRequester 的形式儲存,其鍵由 clientRSocketKeyStrategy BiFunction<Map<String, Object>, DataBuffer, Object> 確定。預設情況下,連線資料被轉換為 UTF-8 字元集的字串作為鍵。此 RSocketRequester 登錄檔可在應用程式邏輯中用於確定特定客戶端連線以與之互動,或將相同訊息釋出到所有連線的客戶端。當客戶端建立連線時,ServerRSocketConnector 會發出一個 RSocketConnectedEvent。這類似於 Spring Messaging 模組中 @ConnectMapping 註解提供的功能。對映模式 * 意味著接受所有客戶端路由。RSocketConnectedEvent 可用於透過 DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER 頭部區分不同的路由。

典型的伺服器配置可能如下所示

@Bean
public RSocketStrategies rsocketStrategies() {
    return RSocketStrategies.builder()
        .decoder(StringDecoder.textPlainOnly())
        .encoder(CharSequenceEncoder.allMimeTypes())
        .dataBufferFactory(new DefaultDataBufferFactory(true))
        .build();
}

@Bean
public ServerRSocketConnector serverRSocketConnector() {
    ServerRSocketConnector serverRSocketConnector = new ServerRSocketConnector("localhost", 0);
    serverRSocketConnector.setRSocketStrategies(rsocketStrategies());
    serverRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0"));
    serverRSocketConnector.setServerConfigurer((server) -> server.payloadDecoder(PayloadDecoder.ZERO_COPY));
    serverRSocketConnector.setClientRSocketKeyStrategy((headers, data) -> ""
                                    + headers.get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER));
    return serverRSocketConnector;
}

@EventListener
public void onApplicationEvent(RSocketConnectedEvent event) {
	...
}

所有選項,包括 RSocketStrategies bean 和用於 RSocketConnectedEvent@EventListener 都是可選的。更多資訊請參閱 ServerRSocketConnector 的 JavaDocs。

從版本 5.2.1 開始,ServerRSocketMessageHandler 被提取為公共的頂級類,以便可能連線到現有的 RSocket 伺服器。當 ServerRSocketConnector 提供了 ServerRSocketMessageHandler 的外部例項時,它不會在內部建立 RSocket 伺服器,而只是將所有處理邏輯委託給提供的例項。此外,ServerRSocketMessageHandler 可以配置一個 messageMappingCompatible 標誌,以便處理 RSocket 控制器的 @MessageMapping,完全替代標準 RSocketMessageHandler 提供的功能。這在混合配置中可能非常有用,即經典 @MessageMapping 方法與 RSocket 通道介面卡同時存在於同一個應用程式中,並且應用程式中存在外部配置的 RSocket 伺服器。

ClientRSocketConnector 用作基於透過提供的 ClientTransport 連線的 RSocketRSocketRequester 的持有者。RSocketConnector 可以透過提供的 RSocketConnectorConfigurer 進行自定義。在此元件上還可以配置 setupRoute(帶有可選的模板變數)以及帶有元資料的 setupData

典型的客戶端配置可能如下所示

@Bean
public RSocketStrategies rsocketStrategies() {
    return RSocketStrategies.builder()
        .decoder(StringDecoder.textPlainOnly())
        .encoder(CharSequenceEncoder.allMimeTypes())
        .dataBufferFactory(new DefaultDataBufferFactory(true))
        .build();
}

@Bean
public ClientRSocketConnector clientRSocketConnector() {
    ClientRSocketConnector clientRSocketConnector =
            new ClientRSocketConnector("localhost", serverRSocketConnector().getBoundPort().block());
    clientRSocketConnector.setRSocketStrategies(rsocketStrategies());
    clientRSocketConnector.setSetupRoute("clientConnect/{user}");
    clientRSocketConnector.setSetupRouteVariables("myUser");
    return clientRSocketConnector;
}

這些選項(包括 RSocketStrategies bean)大多數是可選的。注意我們是如何連線到在任意埠上本地啟動的 RSocket 伺服器的。有關 setupData 用例,請參閱 ServerRSocketConnector.clientRSocketKeyStrategy。有關更多資訊,另請參閱 ClientRSocketConnector 及其超類 AbstractRSocketConnector 的 JavaDocs。

ClientRSocketConnectorServerRSocketConnector 都負責將入站通道介面卡對映到其 path 配置,以便路由傳入的 RSocket 請求。更多資訊請參閱下一節。

RSocket 入站閘道器

RSocketInboundGateway 負責接收 RSocket 請求並生成響應(如果有)。它需要一個 path 對映陣列,這些對映可以是類似於 MVC 請求對映或 @MessageMapping 語義的模式。此外(從版本 5.2.2 開始),可以在 RSocketInboundGateway 上配置一組互動模型(參見 RSocketInteractionModel),以按特定的幀型別限制 RSocket 請求到達此端點。預設情況下,支援所有互動模型。這樣的 bean,根據其 IntegrationRSocketEndpoint 實現(ReactiveMessageHandler 的擴充套件),會被 ServerRSocketConnectorClientRSocketConnector 自動檢測到,用於內部 IntegrationRSocketMessageHandler 中處理傳入請求的路由邏輯。可以向 RSocketInboundGateway 提供 AbstractRSocketConnector 以進行顯式端點註冊。這樣,在該 AbstractRSocketConnector 上會停用自動檢測選項。RSocketStrategies 也可以注入到 RSocketInboundGateway 中,或者它們可以從提供的 AbstractRSocketConnector 獲取,從而覆蓋任何顯式注入。解碼器從這些 RSocketStrategies 中使用,根據提供的 requestElementType 解碼請求負載。如果傳入的 Message 中未提供 RSocketPayloadReturnValueHandler.RESPONSE_HEADER 頭部,則 RSocketInboundGateway 將請求視為 fireAndForget RSocket 互動模型。在這種情況下,RSocketInboundGateway 執行簡單的 send 操作到 outputChannel。否則,使用 RSocketPayloadReturnValueHandler.RESPONSE_HEADER 頭部中的 MonoProcessor 值向 RSocket 傳送回覆。為此,RSocketInboundGatewayoutputChannel 執行 sendAndReceiveMessageReactive 操作。要傳送到下游的訊息的 payload 始終是 Flux,根據 MessagingRSocket 邏輯。當處於 fireAndForget RSocket 互動模型時,訊息具有簡單的轉換後的 payload。回覆的 payload 可以是一個普通物件或一個 Publisher - RSocketInboundGateway 會根據 RSocketStrategies 中提供的編碼器將它們適當地轉換為 RSocket 響應。

從版本 5.3 開始,RSocketInboundGateway 添加了一個 decodeFluxAsUnit 選項(預設為 false)。預設情況下,傳入的 Flux 被轉換為其每個事件單獨解碼。這與當前 @MessageMapping 語義下的行為完全一致。要恢復之前的行為或根據應用程式要求將整個 Flux 解碼為一個單元,必須將 decodeFluxAsUnit 設定為 true。然而,目標解碼邏輯取決於選擇的 Decoder,例如 StringDecoder 需要流中存在換行符(預設)來指示位元組緩衝區結束。

有關如何配置 RSocketInboundGateway 端點以及如何處理下游負載的示例,請參閱使用 Java 配置 RSocket 端點

RSocket 出站閘道器

RSocketOutboundGateway 是一個 AbstractReplyProducingMessageHandler,用於向 RSocket 傳送請求並根據 RSocket 響應(如果有)生成回覆。低級別 RSocket 協議互動委託給從提供的 ClientRSocketConnector 或從伺服器端請求訊息中的 RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER 頭部解析的 RSocketRequester。伺服器端的目標 RSocketRequester 可以從 RSocketConnectedEvent 或透過 ServerRSocketConnector.setClientRSocketKeyStrategy() 為連線請求對映選擇的某些業務鍵使用 ServerRSocketConnector.getClientRSocketRequester() API 解析。更多資訊請參閱 ServerRSocketConnector 的 JavaDocs。

傳送請求的 route 必須顯式配置( junto 與路徑變數)或透過針對請求訊息進行評估的 SpEL 表示式配置。

RSocket 互動模型可以透過 RSocketInteractionModel 選項或相應的表示式設定提供。預設情況下,對於常見閘道器用例,使用 requestResponse

當請求訊息負載是 Publisher 時,可以提供一個 publisherElementType 選項,用於根據目標 RSocketRequester 中提供的 RSocketStrategies 編碼其元素。此選項的表示式可以評估為 ParameterizedTypeReference。有關資料及其型別的更多資訊,請參閱 RSocketRequester.RequestSpec.data() 的 JavaDocs。

RSocket 請求還可以透過 metadata 進行增強。為此,可以在 RSocketOutboundGateway 上配置一個針對請求訊息的 metadataExpression。這樣的表示式必須評估為 Map<Object, MimeType>

interactionModel 不是 fireAndForget 時,必須提供一個 expectedResponseType。預設為 String.class。此選項的表示式可以評估為 ParameterizedTypeReference。有關回複數據及其型別的更多資訊,請參閱 RSocketRequester.RetrieveSpec.retrieveMono()RSocketRequester.RetrieveSpec.retrieveFlux() 的 JavaDocs。

RSocketOutboundGateway 的回覆 payload 是一個 Mono(即使對於 fireAndForget 互動模型也是 Mono<Void>),這使得此元件始終是 async 的。對於常規通道,此 Mono 在生成到 outputChannel 之前被訂閱,或者由 FluxMessageChannel 按需處理。對於 requestStreamrequestChannel 互動模型的 Flux 響應,也會被包裝到回覆 Mono 中。它可以透過 FluxMessageChannel 和直通服務啟用器在下游進行展平

@ServiceActivator(inputChannel = "rsocketReplyChannel", outputChannel ="fluxMessageChannel")
public Flux<?> flattenRSocketResponse(Flux<?> payload) {
    return payload;
}

或在目標應用邏輯中顯式訂閱。

預期的響應型別也可以配置為(或透過表示式評估為)void,將此閘道器視為出站通道介面卡。但是,仍然必須配置 outputChannel(即使只是 NullChannel)以啟動對返回的 Mono 的訂閱。

有關如何配置 RSocketOutboundGateway 端點以及如何處理下游負載的示例,請參閱使用 Java 配置 RSocket 端點

RSocket 名稱空間支援

Spring Integration 提供了 rsocket 名稱空間和相應的模式定義。要在配置中包含它,請在應用程式上下文配置檔案中新增以下名稱空間宣告

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-rsocket="http://www.springframework.org/schema/integration/rsocket"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/rsocket
    https://www.springframework.org/schema/integration/rsocket/spring-integration-rsocket.xsd">
    ...
</beans>

入站

要使用 XML 配置 Spring Integration RSocket 入站通道介面卡,您需要使用 int-rsocket 名稱空間中的適當 inbound-gateway 元件。以下示例顯示瞭如何配置它

<int-rsocket:inbound-gateway id="inboundGateway"
                             path="testPath"
                             interaction-models="requestStream,requestChannel"
                             rsocket-connector="clientRSocketConnector"
                             request-channel="requestChannel"
                             rsocket-strategies="rsocketStrategies"
                             request-element-type="byte[]"/>

ClientRSocketConnectorServerRSocketConnector 應配置為通用的 <bean> 定義。

出站

<int-rsocket:outbound-gateway id="outboundGateway"
                              client-rsocket-connector="clientRSocketConnector"
                              auto-startup="false"
                              interaction-model="fireAndForget"
                              route-expression="'testRoute'"
                              request-channel="requestChannel"
                              publisher-element-type="byte[]"
                              expected-response-type="java.util.Date"
                              metadata-expression="{'metadata': new org.springframework.util.MimeType('*')}"/>

有關所有這些 XML 屬性的說明,請參閱 spring-integration-rsocket.xsd

使用 Java 配置 RSocket 端點

以下示例顯示瞭如何使用 Java 配置 RSocket 入站端點

@Bean
public RSocketInboundGateway rsocketInboundGatewayRequestReply() {
    RSocketInboundGateway rsocketInboundGateway = new RSocketInboundGateway("echo");
    rsocketInboundGateway.setRequestChannelName("requestReplyChannel");
    return rsocketInboundGateway;
}

@Transformer(inputChannel = "requestReplyChannel")
public Mono<String> echoTransformation(Flux<String> payload) {
    return payload.next().map(String::toUpperCase);
}

此配置中假定存在 ClientRSocketConnectorServerRSocketConnector,這意味著對此類端點在“echo”路徑上進行自動檢測。請注意 @Transformer 的簽名,它對 RSocket 請求進行完全響應式處理並生成響應式回覆。

以下示例顯示瞭如何使用 Java DSL 配置 RSocket 入站閘道器

@Bean
public IntegrationFlow rsocketUpperCaseFlow() {
    return IntegrationFlow
        .from(RSockets.inboundGateway("/uppercase")
                   .interactionModels(RSocketInteractionModel.requestChannel))
        .<Flux<String>, Mono<String>>transform((flux) -> flux.next().map(String::toUpperCase))
        .get();
}

此配置中假定存在 ClientRSocketConnectorServerRSocketConnector,這意味著對此類端點在“/uppercase”路徑上進行自動檢測,並且預期的互動模型為“request channel”。

以下示例顯示瞭如何使用 Java 配置 RSocket 出站閘道器

@Bean
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
public RSocketOutboundGateway rsocketOutboundGateway() {
    RSocketOutboundGateway rsocketOutboundGateway =
            new RSocketOutboundGateway(
                    new FunctionExpression<Message<?>>((m) ->
                        m.getHeaders().get("route_header")));
    rsocketOutboundGateway.setInteractionModelExpression(
            new FunctionExpression<Message<?>>((m) -> m.getHeaders().get("rsocket_interaction_model")));
    rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector());
    return rsocketOutboundGateway;
}

僅客戶端需要 setClientRSocketConnector()。在伺服器端,請求訊息中必須提供帶有 RSocketRequester 值的 RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER 頭部。

以下示例顯示瞭如何使用 Java DSL 配置 RSocket 出站閘道器

@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
    return IntegrationFlow
        .from(Function.class)
        .handle(RSockets.outboundGateway("/uppercase")
            .interactionModel(RSocketInteractionModel.requestResponse)
            .expectedResponseType(String.class)
            .clientRSocketConnector(clientRSocketConnector))
        .get();
}

有關如何在上述流程開始時使用提到的 Function 介面的更多資訊,請參閱IntegrationFlow 用作閘道器