RSocket 支援
RSocket Spring Integration 模組 (spring-integration-rsocket) 允許執行 RSocket 應用程式協議。
專案需要此依賴項
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
<version>7.0.0</version>
</dependency>
compile "org.springframework.integration:spring-integration-rsocket:7.0.0"
此模組從 5.2 版本開始提供,基於 Spring Messaging 基礎,並實現了其 RSocket 元件,例如 RSocketRequester、RSocketMessageHandler 和 RSocketStrategies。有關 RSocket 協議、術語和元件的更多資訊,請參閱 Spring Framework RSocket 支援。
在透過通道介面卡啟動整合流處理之前,我們需要在伺服器和客戶端之間建立 RSocket 連線。為此,Spring Integration RSocket 支援提供了 ServerRSocketConnector 和 ClientRSocketConnector 作為 AbstractRSocketConnector 的實現。
ServerRSocketConnector 根據提供的 io.rsocket.transport.ServerTransport 在主機和埠上暴露一個監聽器,用於接受來自客戶端的連線。內部的 RSocketServer 例項可以透過 setServerConfigurer() 進行自定義,也可以配置其他選項,例如用於有效負載資料和頭部元資料的 RSocketStrategies 和 MimeType。當客戶端請求者提供 setupRoute(參見下面的 ClientRSocketConnector)時,連線的客戶端將作為 RSocketRequester 儲存,其鍵由 clientRSocketKeyStrategy BiFunction<Map<String, Object>, DataBuffer, Object> 確定。預設情況下,連線資料用作鍵,並轉換為 UTF-8 字元集的字串值。此 RSocketRequester 登錄檔可用於應用程式邏輯中,以確定特定的客戶端連線以與其互動,或向所有連線的客戶端釋出相同的訊息。當客戶端建立連線時,ServerRSocketConnector 會發出 RSocketConnectedEvent。這與 Spring Messaging 模組中 @ConnectMapping 註解提供的功能類似。對映模式 * 表示接受所有客戶端路由。RSocketConnectedEvent 可用於透過 DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER 頭部來區分不同的路由。
典型的伺服器配置可能如下所示:
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}
@Bean
public ServerRSocketConnector serverRSocketConnector() {
ServerRSocketConnector serverRSocketConnector = new ServerRSocketConnector("localhost", 0);
serverRSocketConnector.setRSocketStrategies(rsocketStrategies());
serverRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0"));
serverRSocketConnector.setServerConfigurer((server) -> server.payloadDecoder(PayloadDecoder.ZERO_COPY));
serverRSocketConnector.setClientRSocketKeyStrategy((headers, data) -> ""
+ headers.get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER));
return serverRSocketConnector;
}
@EventListener
public void onApplicationEvent(RSocketConnectedEvent event) {
...
}
所有選項,包括 RSocketStrategies bean 和 RSocketConnectedEvent 的 @EventListener,都是可選的。有關更多資訊,請參閱 ServerRSocketConnector JavaDocs。
從 5.2.1 版本開始,ServerRSocketMessageHandler 被提取到一個公共的頂級類中,以便與現有 RSocket 伺服器進行可能的連線。當 ServerRSocketConnector 提供一個外部 ServerRSocketMessageHandler 例項時,它不會在內部建立 RSocket 伺服器,而只是將所有處理邏輯委託給提供的例項。此外,ServerRSocketMessageHandler 可以配置一個 messageMappingCompatible 標誌,以便同時處理 RSocket 控制器的 @MessageMapping,完全取代標準 RSocketMessageHandler 提供的功能。這在混合配置中很有用,當經典的 @MessageMapping 方法與 RSocket 通道介面卡一起存在於同一個應用程式中,並且應用程式中存在外部配置的 RSocket 伺服器時。
ClientRSocketConnector 用作基於透過提供的 ClientTransport 連線的 RSocket 的 RSocketRequester 的持有者。RSocketConnector 可以透過提供的 RSocketConnectorConfigurer 進行自定義。setupRoute(帶可選模板變數)和帶元資料的 setupData 也可以在此元件上配置。
典型的客戶端配置可能如下所示:
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}
@Bean
public ClientRSocketConnector clientRSocketConnector() {
ClientRSocketConnector clientRSocketConnector =
new ClientRSocketConnector("localhost", serverRSocketConnector().getBoundPort().block());
clientRSocketConnector.setRSocketStrategies(rsocketStrategies());
clientRSocketConnector.setSetupRoute("clientConnect/{user}");
clientRSocketConnector.setSetupRouteVariables("myUser");
return clientRSocketConnector;
}
這些選項(包括 RSocketStrategies bean)中的大多數是可選的。請注意我們如何連線到在任意埠上本地啟動的 RSocket 伺服器。有關 setupData 用例,請參閱 ServerRSocketConnector.clientRSocketKeyStrategy。另請參閱 ClientRSocketConnector 及其 AbstractRSocketConnector 超類 Javadocs 以獲取更多資訊。
ClientRSocketConnector 和 ServerRSocketConnector 都負責將入站通道介面卡對映到它們的 path 配置,以路由傳入的 RSocket 請求。有關更多資訊,請參閱下一節。
RSocket 入站閘道器
RSocketInboundGateway 負責接收 RSocket 請求並生成響應(如果有)。它需要一個 path 對映陣列,該對映可以類似於 MVC 請求對映或 @MessageMapping 語義的模式。此外,(從 5.2.2 版本開始),可以在 RSocketInboundGateway 上配置一組互動模型(參見 RSocketInteractionModel),以透過特定的幀型別限制此端點的 RSocket 請求。預設情況下,支援所有互動模型。此 bean,根據其 IntegrationRSocketEndpoint 實現(ReactiveMessageHandler 的擴充套件),由 ServerRSocketConnector 或 ClientRSocketConnector 自動檢測,用於內部 IntegrationRSocketMessageHandler 中傳入請求的路由邏輯。可以將 AbstractRSocketConnector 提供給 RSocketInboundGateway 以進行顯式端點註冊。這樣,該 AbstractRSocketConnector 上的自動檢測選項將停用。RSocketStrategies 也可以注入到 RSocketInboundGateway 中,或者從提供的 AbstractRSocketConnector 中獲取,從而覆蓋任何顯式注入。解碼器從這些 RSocketStrategies 中使用,根據提供的 requestElementType 解碼請求有效負載。如果傳入 Message 中未提供 RSocketPayloadReturnValueHandler.RESPONSE_HEADER 頭部,RSocketInboundGateway 將請求視為 fireAndForget RSocket 互動模型。在這種情況下,RSocketInboundGateway 執行一個簡單的 send 操作到 outputChannel。否則,使用 RSocketPayloadReturnValueHandler.RESPONSE_HEADER 頭部中的 MonoProcessor 值向 RSocket 傳送回覆。為此,RSocketInboundGateway 在 outputChannel 上執行 sendAndReceiveMessageReactive 操作。根據 MessagingRSocket 邏輯,要傳送到下游的訊息的 payload 始終是 Flux。當處於 fireAndForget RSocket 互動模型時,訊息具有純轉換的 payload。回覆 payload 可以是純物件或 Publisher - RSocketInboundGateway 會根據 RSocketStrategies 中提供的編碼器將它們正確轉換為 RSocket 響應。
從 5.3 版本開始,decodeFluxAsUnit 選項(預設為 false)已新增到 RSocketInboundGateway 中。預設情況下,傳入的 Flux 會被轉換,使其每個事件都單獨解碼。這與當前 @MessageMapping 語義的行為完全相同。要恢復以前的行為或根據應用程式要求將整個 Flux 解碼為單個單元,必須將 decodeFluxAsUnit 設定為 true。但是,目標解碼邏輯取決於所選的 Decoder,例如 StringDecoder 需要在流中存在換行符(預設情況下)以指示位元組緩衝區結束。
有關如何配置 RSocketInboundGateway 端點並處理下游有效負載的示例,請參閱 使用 Java 配置 RSocket 端點。
RSocket 出站閘道器
RSocketOutboundGateway 是一個 AbstractReplyProducingMessageHandler,用於向 RSocket 傳送請求並根據 RSocket 回覆(如果有)生成回覆。低階 RSocket 協議互動被委託給從提供的 ClientRSocketConnector 或伺服器端請求訊息中的 RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER 頭部解析出的 RSocketRequester。伺服器端的目標 RSocketRequester 可以從 RSocketConnectedEvent 中解析,或者使用 ServerRSocketConnector.getClientRSocketRequester() API 根據透過 ServerRSocketConnector.setClientRSocketKeyStrategy() 為連線請求對映選擇的某些業務鍵進行解析。有關更多資訊,請參閱 ServerRSocketConnector JavaDocs。
傳送請求的 route 必須顯式配置(與路徑變數一起)或透過針對請求訊息求值的 SpEL 表示式進行配置。
RSocket 互動模型可以透過 RSocketInteractionModel 選項或相應的表示式設定提供。預設情況下,對於常見的閘道器用例,使用 requestResponse。
當請求訊息有效負載是 Publisher 時,可以提供 publisherElementType 選項,以便根據目標 RSocketRequester 中提供的 RSocketStrategies 對其元素進行編碼。此選項的表示式可以評估為 ParameterizedTypeReference。有關資料及其型別的更多資訊,請參閱 RSocketRequester.RequestSpec.data() JavaDocs。
RSocket 請求還可以透過 metadata 進行增強。為此,可以在 RSocketOutboundGateway 上配置針對請求訊息的 metadataExpression。此表示式必須評估為 Map<Object, MimeType>。
當 interactionModel 不是 fireAndForget 時,必須提供 expectedResponseType。它預設為 String.class。此選項的表示式可以評估為 ParameterizedTypeReference。有關回複數據及其型別的更多資訊,請參閱 RSocketRequester.RetrieveSpec.retrieveMono() 和 RSocketRequester.RetrieveSpec.retrieveFlux() JavaDocs。
來自 RSocketOutboundGateway 的回覆 payload 始終是一個 Mono(即使對於 fireAndForget 互動模型也是 Mono<Void>),這使得此元件始終是 async。此 Mono 在生成到常規通道的 outputChannel 之前訂閱,或者由 FluxMessageChannel 按需處理。requestStream 或 requestChannel 互動模型的 Flux 響應也包裝在回覆 Mono 中。它可以透過 FluxMessageChannel 和直通服務啟用器在下游展平。
@ServiceActivator(inputChannel = "rsocketReplyChannel", outputChannel ="fluxMessageChannel")
public Flux<?> flattenRSocketResponse(Flux<?> payload) {
return payload;
}
或者在目標應用程式邏輯中顯式訂閱。
預期的響應型別也可以配置(或透過表示式求值)為 void,將此閘道器視為出站通道介面卡。但是,仍然需要配置 outputChannel(即使它只是一個 NullChannel)以啟動對返回 Mono 的訂閱。
有關如何配置 RSocketOutboundGateway 端點並處理下游有效負載的示例,請參閱 使用 Java 配置 RSocket 端點。
RSocket 名稱空間支援
Spring Integration 提供了 rsocket 名稱空間和相應的模式定義。要將其包含在配置中,請在應用程式上下文配置檔案中新增以下名稱空間宣告:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-rsocket="http://www.springframework.org/schema/integration/rsocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/rsocket
https://www.springframework.org/schema/integration/rsocket/spring-integration-rsocket.xsd">
...
</beans>
入站
要使用 XML 配置 Spring Integration RSocket 入站通道介面卡,您需要使用 int-rsocket 名稱空間中適當的 inbound-gateway 元件。以下示例顯示瞭如何配置它:
<int-rsocket:inbound-gateway id="inboundGateway"
path="testPath"
interaction-models="requestStream,requestChannel"
rsocket-connector="clientRSocketConnector"
request-channel="requestChannel"
rsocket-strategies="rsocketStrategies"
request-element-type="byte[]"/>
ClientRSocketConnector 和 ServerRSocketConnector 應配置為通用的 <bean> 定義。
出站
<int-rsocket:outbound-gateway id="outboundGateway"
client-rsocket-connector="clientRSocketConnector"
auto-startup="false"
interaction-model="fireAndForget"
route-expression="'testRoute'"
request-channel="requestChannel"
publisher-element-type="byte[]"
expected-response-type="java.util.Date"
metadata-expression="{'metadata': new org.springframework.util.MimeType('*')}"/>
有關所有這些 XML 屬性的說明,請參閱 spring-integration-rsocket.xsd。
使用 Java 配置 RSocket 端點
以下示例顯示瞭如何使用 Java 配置 RSocket 入站端點:
@Bean
public RSocketInboundGateway rsocketInboundGatewayRequestReply() {
RSocketInboundGateway rsocketInboundGateway = new RSocketInboundGateway("echo");
rsocketInboundGateway.setRequestChannelName("requestReplyChannel");
return rsocketInboundGateway;
}
@Transformer(inputChannel = "requestReplyChannel")
public Mono<String> echoTransformation(Flux<String> payload) {
return payload.next().map(String::toUpperCase);
}
在此配置中假定存在 ClientRSocketConnector 或 ServerRSocketConnector,這意味著在該“echo”路徑上自動檢測此端點。請注意 @Transformer 簽名,它完全響應式地處理 RSocket 請求並生成響應式回覆。
以下示例顯示瞭如何使用 Java DSL 配置 RSocket 入站閘道器:
@Bean
public IntegrationFlow rsocketUpperCaseFlow() {
return IntegrationFlow
.from(RSockets.inboundGateway("/uppercase")
.interactionModels(RSocketInteractionModel.requestChannel))
.<Flux<String>, Mono<String>>transform((flux) -> flux.next().map(String::toUpperCase))
.get();
}
在此配置中假定存在 ClientRSocketConnector 或 ServerRSocketConnector,這意味著在該“/uppercase”路徑上自動檢測此端點,並且預期的互動模型為“請求通道”。
以下示例顯示瞭如何使用 Java 配置 RSocket 出站閘道器:
@Bean
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
public RSocketOutboundGateway rsocketOutboundGateway() {
RSocketOutboundGateway rsocketOutboundGateway =
new RSocketOutboundGateway(
new FunctionExpression<Message<?>>((m) ->
m.getHeaders().get("route_header")));
rsocketOutboundGateway.setInteractionModelExpression(
new FunctionExpression<Message<?>>((m) -> m.getHeaders().get("rsocket_interaction_model")));
rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector());
return rsocketOutboundGateway;
}
setClientRSocketConnector() 僅在客戶端需要。在伺服器端,請求訊息中必須提供帶有 RSocketRequester 值的 RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER 頭部。
以下示例顯示瞭如何使用 Java DSL 配置 RSocket 出站閘道器:
@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
return IntegrationFlow
.from(Function.class)
.handle(RSockets.outboundGateway("/uppercase")
.interactionModel(RSocketInteractionModel.requestResponse)
.expectedResponseType(String.class)
.clientRSocketConnector(clientRSocketConnector))
.get();
}
有關如何在上述流的開頭使用上述提到的 Function 介面的更多資訊,請參閱 作為閘道器的 IntegrationFlow。