RSocket
本節描述了 Spring Framework 對 RSocket 協議的支援。
概述
RSocket 是一種應用協議,用於透過 TCP、WebSocket 和其他位元組流傳輸進行多路複用、雙工通訊,使用以下互動模型之一:
-
Request-Response(請求-響應)——傳送一條訊息並接收一條響應。 -
Request-Stream(請求-流)——傳送一條訊息並接收一個訊息流。 -
Channel(通道)——雙向傳送訊息流。 -
Fire-and-Forget(即發即棄)——傳送一個單向訊息。
一旦建立初始連線,“客戶端”與“伺服器”的區別就消失了,雙方變得對稱,每一方都可以發起上述互動之一。這就是為什麼協議中將參與方稱為“請求者”和“響應者”,而上述互動被稱為“請求流”或簡稱“請求”。
以下是 RSocket 協議的主要特性和優勢:
-
Reactive Streams(響應式流)語義跨網路邊界——對於像
Request-Stream和Channel這樣的流式請求,背壓訊號在請求者和響應者之間傳播,允許請求者在源頭減慢響應者,從而減少對網路層擁塞控制的依賴,以及在網路層或任何層進行緩衝的需求。 -
請求節流——此功能被命名為“租約”(Leasing),源於
LEASE幀,該幀可以從每一端傳送,以限制另一端在給定時間內允許的總請求數。租約會定期續訂。 -
會話恢復——這旨在應對連線丟失,並需要維護一些狀態。狀態管理對應用程式是透明的,並且與背壓結合良好,背壓可以在可能的情況下停止生產者並減少所需的狀態量。
-
大型訊息的碎片化和重組。
-
保持活動(心跳)。
RSocket 在多種語言中都有實現。Java 庫構建在 Project Reactor 之上,並使用 Reactor Netty 作為傳輸層。這意味著應用程式中 Reactive Streams Publisher 的訊號會透過 RSocket 跨網路透明傳播。
協議
RSocket 的優勢之一是它在網路上有明確定義的行為,並且有易於閱讀的規範以及一些協議擴充套件。因此,閱讀規範是一個好主意,它獨立於語言實現和更高級別的框架 API。本節提供了一個簡潔的概述,以建立一些上下文。
連線
最初,客戶端透過一些低階流式傳輸(如 TCP 或 WebSocket)連線到伺服器,並向伺服器傳送一個 SETUP 幀以設定連線引數。
伺服器可以拒絕 SETUP 幀,但通常在傳送(對於客戶端)和接收(對於伺服器)之後,雙方都可以開始發出請求,除非 SETUP 指示使用租約語義來限制請求數量,在這種情況下,雙方必須等待來自另一端的 LEASE 幀以允許發出請求。
發起請求
一旦建立連線,雙方都可以透過 REQUEST_RESPONSE、REQUEST_STREAM、REQUEST_CHANNEL 或 REQUEST_FNF 幀中的一個發起請求。每個幀都攜帶一個從請求者到響應者的訊息。
然後,響應者可以返回帶有響應訊息的 PAYLOAD 幀,在 REQUEST_CHANNEL 的情況下,請求者也可以傳送帶有更多請求訊息的 PAYLOAD 幀。
當請求涉及訊息流(例如 Request-Stream 和 Channel)時,響應者必須遵守來自請求者的需求訊號。需求表示為訊息的數量。初始需求在 REQUEST_STREAM 和 REQUEST_CHANNEL 幀中指定。後續需求透過 REQUEST_N 幀發出訊號。
雙方還可以透過 METADATA_PUSH 幀傳送元資料通知,這些通知不涉及任何單個請求,而是涉及整個連線。
訊息格式
RSocket 訊息包含資料和元資料。元資料可用於傳送路由、安全令牌等。資料和元資料可以採用不同的格式。它們的 MIME 型別在 SETUP 幀中宣告,並適用於給定連線上的所有請求。
雖然所有訊息都可以有元資料,但通常路由等元資料是按請求的,因此僅包含在請求的第一條訊息中,即與 REQUEST_RESPONSE、REQUEST_STREAM、REQUEST_CHANNEL 或 REQUEST_FNF 幀中的一個。
協議擴充套件定義了用於應用程式的通用元資料格式
Java 實現
RSocket 的Java 實現構建在 Project Reactor 之上。TCP 和 WebSocket 的傳輸層構建在 Reactor Netty 之上。作為一個 Reactive Streams 庫,Reactor 簡化了協議的實現工作。對於應用程式來說,使用帶有宣告式運算子和透明背壓支援的 Flux 和 Mono 是一個自然的契合。
RSocket Java 中的 API 有意地保持最小化和基礎。它專注於協議功能,並將應用程式程式設計模型(例如,RPC 程式碼生成與其他)作為一個更高級別的獨立關注點。
主要契約 io.rsocket.RSocket 建模了四種請求互動型別,其中 Mono 表示單個訊息的承諾,Flux 表示訊息流,而 io.rsocket.Payload 則是實際訊息,可訪問資料和元資料作為位元組緩衝區。RSocket 契約是對稱使用的。對於請求,應用程式獲得一個 RSocket 來執行請求。對於響應,應用程式實現 RSocket 來處理請求。
這並非旨在提供全面介紹。在大多數情況下,Spring 應用程式無需直接使用其 API。但是,獨立於 Spring 檢視或試驗 RSocket 可能很重要。RSocket Java 倉庫包含許多示例應用程式,展示了其 API 和協議功能。
Spring 支援
spring-messaging 模組包含以下內容:
-
RSocketRequester——流式 API,透過
io.rsocket.RSocket發出請求,支援資料和元資料的編碼/解碼。 -
帶註解的響應器——帶有
@MessageMapping和@RSocketExchange註解的處理程式方法,用於響應。 -
RSocket 介面——將 RSocket 服務宣告為帶有
@RSocketExchange方法的 Java 介面,用作請求者或響應器。
spring-web 模組包含 Encoder 和 Decoder 實現,如 Jackson CBOR/JSON 和 Protobuf,RSocket 應用程式可能會用到。它還包含 PathPatternParser,可以插入以實現高效的路由匹配。
Spring Boot 2.2 支援透過 TCP 或 WebSocket 啟動 RSocket 伺服器,包括在 WebFlux 伺服器中透過 WebSocket 暴露 RSocket 的選項。它還提供客戶端支援以及 RSocketRequester.Builder 和 RSocketStrategies 的自動配置。有關更多詳細資訊,請參閱 Spring Boot 參考中的RSocket 部分。
Spring Security 5.2 提供 RSocket 支援。
Spring Integration 5.2 提供入站和出站閘道器,用於與 RSocket 客戶端和伺服器進行互動。有關更多詳細資訊,請參閱 Spring Integration 參考手冊。
Spring Cloud Gateway 支援 RSocket 連線。
RSocketRequester
RSocketRequester 提供了一個流式 API 來執行 RSocket 請求,接受並返回用於資料和元資料的物件,而不是低階資料緩衝區。它可以對稱使用,用於從客戶端發出請求以及從伺服器發出請求。
客戶端請求者
在客戶端獲取 RSocketRequester 是為了連線到伺服器,這涉及傳送一個帶有連線設定的 RSocket SETUP 幀。RSocketRequester 提供了一個構建器,有助於準備 io.rsocket.core.RSocketConnector,包括 SETUP 幀的連線設定。
這是使用預設設定連線的最基本方式
-
Java
-
Kotlin
RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);
URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);
val requester = RSocketRequester.builder().tcp("localhost", 7000)
URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)
以上操作不會立即連線。當發出請求時,將透明地建立並使用共享連線。
連線設定
RSocketRequester.Builder 提供以下內容來定製初始的 SETUP 幀:
-
dataMimeType(MimeType)——設定連線上資料的 MIME 型別。 -
metadataMimeType(MimeType)——設定連線上元資料的 MIME 型別。 -
setupData(Object)——包含在SETUP中的資料。 -
setupRoute(String, Object…)——包含在SETUP元資料中的路由。 -
setupMetadata(Object, MimeType)——包含在SETUP中的其他元資料。
對於資料,預設的 MIME 型別是從第一個配置的 Decoder 派生出來的。對於元資料,預設的 MIME 型別是複合元資料,它允許每個請求有多個元資料值和 MIME 型別對。通常,兩者都不需要更改。
SETUP 幀中的資料和元資料是可選的。在伺服器端,@ConnectMapping 方法可用於處理連線的開始和 SETUP 幀的內容。元資料可用於連線級別的安全性。
策略
RSocketRequester.Builder 接受 RSocketStrategies 來配置請求者。您需要使用它來為資料和元資料值的(反)序列化提供編碼器和解碼器。預設情況下,只註冊了 spring-core 中用於 String、byte[] 和 ByteBuffer 的基本編解碼器。新增 spring-web 可以訪問更多可以按如下方式註冊的編解碼器:
-
Java
-
Kotlin
RSocketStrategies strategies = RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.build();
RSocketRequester requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000);
val strategies = RSocketStrategies.builder()
.encoders { it.add(Jackson2CborEncoder()) }
.decoders { it.add(Jackson2CborDecoder()) }
.build()
val requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000)
RSocketStrategies 旨在重用。在某些場景下,例如,同一個應用程式中的客戶端和伺服器,最好在 Spring 配置中宣告它。
客戶端響應器
RSocketRequester.Builder 可用於配置對來自伺服器的請求的響應器。
您可以使用帶註解的處理程式進行客戶端響應,它基於與伺服器上使用的相同基礎設施,但透過程式設計方式註冊,如下所示:
-
Java
-
Kotlin
RSocketStrategies strategies = RSocketStrategies.builder()
.routeMatcher(new PathPatternRouteMatcher()) (1)
.build();
SocketAcceptor responder =
RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(responder)) (3)
.tcp("localhost", 7000);
| 1 | 如果存在 spring-web,請使用 PathPatternRouteMatcher 以實現高效的路由匹配。 |
| 2 | 從帶有 @MessageMapping 和/或 @ConnectMapping 方法的類建立響應器。 |
| 3 | 註冊響應器。 |
val strategies = RSocketStrategies.builder()
.routeMatcher(PathPatternRouteMatcher()) (1)
.build()
val responder =
RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(responder) } (3)
.tcp("localhost", 7000)
| 1 | 如果存在 spring-web,請使用 PathPatternRouteMatcher 以實現高效的路由匹配。 |
| 2 | 從帶有 @MessageMapping 和/或 @ConnectMapping 方法的類建立響應器。 |
| 3 | 註冊響應器。 |
請注意,上述只是為客戶端響應器的程式設計註冊而設計的快捷方式。對於替代場景,即客戶端響應器在 Spring 配置中,您仍然可以將 RSocketMessageHandler 宣告為 Spring bean,然後按如下方式應用:
-
Java
-
Kotlin
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(handler.responder()))
.tcp("localhost", 7000);
import org.springframework.beans.factory.getBean
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(handler.responder()) }
.tcp("localhost", 7000)
對於上述情況,您可能還需要在 RSocketMessageHandler 中使用 setHandlerPredicate 來切換到不同的策略來檢測客戶端響應器,例如,基於自定義註解(如 @RSocketClientResponder)而不是預設的 @Controller。這在同一個應用程式中包含客戶端和伺服器或多個客戶端的場景中是必需的。
另請參閱帶註解的響應器,瞭解更多關於程式設計模型的資訊。
高階
RSocketRequesterBuilder 提供了一個回撥,以公開底層的 io.rsocket.core.RSocketConnector,用於進一步配置 keepalive 間隔、會話恢復、攔截器等選項。您可以在該級別配置選項,如下所示:
-
Java
-
Kotlin
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> {
// ...
})
.tcp("localhost", 7000);
val requester = RSocketRequester.builder()
.rsocketConnector {
//...
}
.tcp("localhost", 7000)
伺服器請求者
從伺服器向連線的客戶端發出請求,只需從伺服器獲取連線客戶端的請求者即可。
在帶註解的響應器中,@ConnectMapping 和 @MessageMapping 方法支援 RSocketRequester 引數。使用它來訪問連線的請求器。請記住,@ConnectMapping 方法本質上是 SETUP 幀的處理程式,必須在請求開始之前進行處理。因此,開始時的請求必須與處理解耦。例如:
-
Java
-
Kotlin
@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
requester.route("status").data("5")
.retrieveFlux(StatusReport.class)
.subscribe(bar -> { (1)
// ...
});
return ... (2)
}
| 1 | 非同步啟動請求,獨立於處理。 |
| 2 | 執行處理並返回完成的 Mono<Void>。 |
@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
GlobalScope.launch {
requester.route("status").data("5").retrieveFlow<StatusReport>().collect { (1)
// ...
}
}
/// ... (2)
}
| 1 | 非同步啟動請求,獨立於處理。 |
| 2 | 在掛起函式中執行處理。 |
請求
-
Java
-
Kotlin
ViewBox viewBox = ... ;
Flux<AirportLocation> locations = requester.route("locate.radars.within") (1)
.data(viewBox) (2)
.retrieveFlux(AirportLocation.class); (3)
| 1 | 指定一個路由以包含在請求訊息的元資料中。 |
| 2 | 為請求訊息提供資料。 |
| 3 | 宣告預期的響應。 |
val viewBox: ViewBox = ...
val locations = requester.route("locate.radars.within") (1)
.data(viewBox) (2)
.retrieveFlow<AirportLocation>() (3)
| 1 | 指定一個路由以包含在請求訊息的元資料中。 |
| 2 | 為請求訊息提供資料。 |
| 3 | 宣告預期的響應。 |
互動型別是根據輸入和輸出的基數隱式確定的。上面的例子是一個 Request-Stream,因為傳送一個值並接收一個值流。在大多數情況下,只要輸入和輸出的選擇與 RSocket 互動型別以及響應者預期的輸入和輸出型別匹配,您就不需要考慮這個問題。唯一無效的組合示例是多對一。
data(Object) 方法也接受任何 Reactive Streams Publisher,包括 Flux 和 Mono,以及任何在 ReactiveAdapterRegistry 中註冊的其他值生產者。對於產生相同型別值的多值 Publisher(如 Flux),請考慮使用過載的 data 方法之一,以避免在每個元素上進行型別檢查和 Encoder 查詢。
data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
data(Object) 步驟是可選的。對於不傳送資料的請求,跳過它。
-
Java
-
Kotlin
Mono<AirportLocation> location = requester.route("find.radar.EWR"))
.retrieveMono(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveAndAwait
val location = requester.route("find.radar.EWR")
.retrieveAndAwait<AirportLocation>()
如果使用複合元資料(預設)並且值受註冊的 Encoder 支援,則可以新增額外的元資料值。例如:
-
Java
-
Kotlin
String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");
Flux<AirportLocation> locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlux(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveFlow
val requester: RSocketRequester = ...
val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")
val locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlow<AirportLocation>()
對於 Fire-and-Forget,使用返回 Mono<Void> 的 send() 方法。請注意,Mono 僅表示訊息已成功傳送,而不表示已處理。
對於 Metadata-Push,使用 sendMetadata() 方法,返回值為 Mono<Void>。
帶註解的響應器
RSocket 響應器可以作為 @MessageMapping 和 @ConnectMapping 方法實現。@MessageMapping 方法處理單個請求,而 @ConnectMapping 方法處理連線級別的事件(設定和元資料推送)。帶註解的響應器是對稱支援的,用於從伺服器端響應和從客戶端響應。
伺服器響應器
要在伺服器端使用帶註解的響應器,請將 RSocketMessageHandler 新增到您的 Spring 配置中,以檢測帶有 @MessageMapping 和 @ConnectMapping 方法的 @Controller bean:
-
Java
-
Kotlin
@Configuration
static class ServerConfig {
@Bean
public RSocketMessageHandler rsocketMessageHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.routeMatcher(new PathPatternRouteMatcher());
return handler;
}
}
@Configuration
class ServerConfig {
@Bean
fun rsocketMessageHandler() = RSocketMessageHandler().apply {
routeMatcher = PathPatternRouteMatcher()
}
}
然後透過 Java RSocket API 啟動 RSocket 伺服器,並按如下方式為響應器插入 RSocketMessageHandler:
-
Java
-
Kotlin
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
CloseableChannel server =
RSocketServer.create(handler.responder())
.bind(TcpServerTransport.create("localhost", 7000))
.block();
import org.springframework.beans.factory.getBean
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val server = RSocketServer.create(handler.responder())
.bind(TcpServerTransport.create("localhost", 7000))
.awaitSingle()
RSocketMessageHandler 預設支援複合和路由元資料。如果您需要切換到不同的 MIME 型別或註冊額外的元資料 MIME 型別,可以設定其MetadataExtractor。
您需要設定 Encoder 和 Decoder 例項,以便支援元資料和資料格式。您可能需要 spring-web 模組來獲取編解碼器實現。
預設情況下,使用 SimpleRouteMatcher 透過 AntPathMatcher 進行路由匹配。我們建議插入 spring-web 中的 PathPatternRouteMatcher 以實現高效的路由匹配。RSocket 路由可以是分層的,但不是 URL 路徑。預設情況下,這兩個路由匹配器都配置為使用“.”作為分隔符,並且沒有像 HTTP URL 那樣的 URL 解碼。
RSocketMessageHandler 可以透過 RSocketStrategies 進行配置,如果您需要在同一程序中的客戶端和伺服器之間共享配置,這可能會很有用:
-
Java
-
Kotlin
@Configuration
static class ServerConfig {
@Bean
public RSocketMessageHandler rsocketMessageHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.setRSocketStrategies(rsocketStrategies());
return handler;
}
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.routeMatcher(new PathPatternRouteMatcher())
.build();
}
}
@Configuration
class ServerConfig {
@Bean
fun rsocketMessageHandler() = RSocketMessageHandler().apply {
rSocketStrategies = rsocketStrategies()
}
@Bean
fun rsocketStrategies() = RSocketStrategies.builder()
.encoders { it.add(Jackson2CborEncoder()) }
.decoders { it.add(Jackson2CborDecoder()) }
.routeMatcher(PathPatternRouteMatcher())
.build()
}
客戶端響應器
客戶端的帶註解響應器需要在 RSocketRequester.Builder 中進行配置。有關詳細資訊,請參閱客戶端響應器。
@MessageMapping
-
Java
-
Kotlin
@Controller
public class RadarsController {
@MessageMapping("locate.radars.within")
public Flux<AirportLocation> radars(MapRequest request) {
// ...
}
}
@Controller
class RadarsController {
@MessageMapping("locate.radars.within")
fun radars(request: MapRequest): Flow<AirportLocation> {
// ...
}
}
上述 @MessageMapping 方法響應路由為 "locate.radars.within" 的 Request-Stream 互動。它支援靈活的方法簽名,並可選擇使用以下方法引數:
| 方法引數 | 描述 |
|---|---|
|
請求的有效載荷。這可以是具體的值,也可以是像 注意:註解的使用是可選的。一個不是簡單型別且不是其他任何受支援引數的方法引數,被假定為預期的有效載荷。 |
|
用於向遠端端發出請求的請求者。 |
|
根據對映模式中的變數從路由中提取的值,例如, |
|
已註冊用於提取的元資料值,如MetadataExtractor中所述。 |
|
所有已註冊用於提取的元資料值,如MetadataExtractor中所述。 |
返回值預計為一個或多個將序列化為響應有效負載的物件。這可以是非同步型別(如 Mono 或 Flux)、具體值,或者 void 或無值非同步型別(如 Mono<Void>)。
@MessageMapping 方法支援的 RSocket 互動型別由輸入(即 @Payload 引數)和輸出的基數決定,其中基數表示以下內容:
| 基數 | 描述 |
|---|---|
1 |
可以是顯式值,也可以是單值非同步型別,例如 |
多 |
一個多值非同步型別,例如 |
0 |
對於輸入,這意味著方法沒有 對於輸出,這是 |
下表顯示了所有輸入和輸出基數組合以及相應的互動型別
| 輸入基數 | 輸出基數 | 互動型別 |
|---|---|---|
0, 1 |
0 |
即發即棄,請求-響應 |
0, 1 |
1 |
請求-響應 |
0, 1 |
多 |
請求-流 |
多 |
0,1,多 |
請求-通道 |
@RSocketExchange
作為 @MessageMapping 的替代方案,您還可以使用 @RSocketExchange 方法處理請求。此類方法在RSocket 介面上宣告,可以透過 RSocketServiceProxyFactory 用作請求者,或由響應器實現。
例如,作為響應者處理請求:
-
Java
-
Kotlin
public interface RadarsService {
@RSocketExchange("locate.radars.within")
Flux<AirportLocation> radars(MapRequest request);
}
@Controller
public class RadarsController implements RadarsService {
public Flux<AirportLocation> radars(MapRequest request) {
// ...
}
}
interface RadarsService {
@RSocketExchange("locate.radars.within")
fun radars(request: MapRequest): Flow<AirportLocation>
}
@Controller
class RadarsController : RadarsService {
override fun radars(request: MapRequest): Flow<AirportLocation> {
// ...
}
}
@RSocketExchange 和 @MessageMapping 之間存在一些差異,因為前者需要保持適用於請求者和響應者。例如,雖然 @MessageMapping 可以宣告為處理任意數量的路由,並且每個路由都可以是模式,但 @RSocketExchange 必須宣告一個單一的、具體的路由。在支援的元資料相關方法引數上也有細微差別,請參閱@MessageMapping 和RSocket 介面以獲取支援引數列表。
@RSocketExchange 可以在型別級別使用,為給定 RSocket 服務介面的所有路由指定一個公共字首。
@ConnectMapping
@ConnectMapping 處理 RSocket 連線開始時的 SETUP 幀,以及透過 METADATA_PUSH 幀(即 io.rsocket.RSocket 中的 metadataPush(Payload))進行的任何後續元資料推送通知。
@ConnectMapping 方法支援與@MessageMapping 相同的引數,但基於 SETUP 和 METADATA_PUSH 幀中的元資料和資料。@ConnectMapping 可以有一個模式來將處理範圍縮小到元資料中包含路由的特定連線,或者如果沒有宣告模式,則所有連線都匹配。
@ConnectMapping 方法不能返回資料,並且必須宣告 void 或 Mono<Void> 作為返回值。如果處理程式為新連線返回錯誤,則連線將被拒絕。處理程式不得被阻止以向連線的 RSocketRequester 發出請求。有關詳細資訊,請參閱伺服器請求器。
MetadataExtractor
響應器必須解釋元資料。複合元資料允許獨立格式化的元資料值(例如,用於路由、安全、跟蹤),每個值都有自己的 MIME 型別。應用程式需要一種方法來配置要支援的元資料 MIME 型別,以及一種訪問提取值的方法。
MetadataExtractor 是一個契約,用於接收序列化的元資料並返回解碼後的名稱-值對,這些名稱-值對可以透過名稱像頭一樣訪問,例如透過帶註解的處理程式方法中的 @Header。
DefaultMetadataExtractor 可以提供 Decoder 例項來解碼元資料。它開箱即用地支援"message/x.rsocket.routing.v0",它將其解碼為 String 並以 "route" 鍵儲存。對於任何其他 MIME 型別,您需要提供 Decoder 並按如下方式註冊 MIME 型別:
-
Java
-
Kotlin
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(fooMimeType, Foo.class, "foo");
import org.springframework.messaging.rsocket.metadataToExtract
val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Foo>(fooMimeType, "foo")
複合元資料非常適合組合獨立的元資料值。但是,請求者可能不支援複合元資料,或者可能選擇不使用它。為此,DefaultMetadataExtractor 可能需要自定義邏輯來將解碼後的值對映到輸出對映。下面是一個使用 JSON 作為元資料的示例:
-
Java
-
Kotlin
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(
MimeType.valueOf("application/vnd.myapp.metadata+json"),
new ParameterizedTypeReference<Map<String,String>>() {},
(jsonMap, outputMap) -> {
outputMap.putAll(jsonMap);
});
import org.springframework.messaging.rsocket.metadataToExtract
val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Map<String, String>>(MimeType.valueOf("application/vnd.myapp.metadata+json")) { jsonMap, outputMap ->
outputMap.putAll(jsonMap)
}
透過 RSocketStrategies 配置 MetadataExtractor 時,您可以讓 RSocketStrategies.Builder 使用配置的解碼器建立提取器,然後簡單地使用回撥來自定義註冊,如下所示:
-
Java
-
Kotlin
RSocketStrategies strategies = RSocketStrategies.builder()
.metadataExtractorRegistry(registry -> {
registry.metadataToExtract(fooMimeType, Foo.class, "foo");
// ...
})
.build();
import org.springframework.messaging.rsocket.metadataToExtract
val strategies = RSocketStrategies.builder()
.metadataExtractorRegistry { registry: MetadataExtractorRegistry ->
registry.metadataToExtract<Foo>(fooMimeType, "foo")
// ...
}
.build()
RSocket 介面
Spring Framework 允許您將 RSocket 服務定義為帶有 @RSocketExchange 方法的 Java 介面。您可以將此類介面傳遞給 RSocketServiceProxyFactory 以建立代理,該代理透過 RSocketRequester 執行請求。您還可以將介面實現為處理請求的響應器。
首先建立帶有 @RSocketExchange 方法的介面
interface RadarService {
@RSocketExchange("radars")
Flux<AirportLocation> getRadars(@Payload MapRequest request);
// more RSocket exchange methods...
}
現在你可以建立一個代理,在方法呼叫時執行請求
RSocketRequester requester = ... ;
RSocketServiceProxyFactory factory = RSocketServiceProxyFactory.builder(requester).build();
RadarService service = factory.createClient(RadarService.class);
您還可以實現該介面以作為響應器處理請求。請參閱帶註解的響應器。
方法引數
帶有註解的 RSocket 交換方法支援靈活的方法簽名和以下方法引數:
| 方法引數 | 描述 |
|---|---|
|
新增一個路由變數,與 |
|
設定請求的輸入有效載荷。這可以是一個具體值,也可以是任何能夠透過 |
|
輸入有效載荷中元資料條目的值。這可以是任何 |
|
元資料條目的 |
返回值
帶註解的 RSocket 交換方法支援返回值,這些返回值可以是具體值,也可以是任何可以透過 ReactiveAdapterRegistry 適配為 Reactive Streams Publisher 的值生產者。
預設情況下,具有同步(阻塞)方法簽名的 RSocket 服務方法的行為取決於底層 RSocket ClientTransport 的響應超時設定以及 RSocket keep-alive 設定。RSocketServiceProxyFactory.Builder 確實公開了一個 blockTimeout 選項,它也允許您配置阻塞響應的最長時間,但我們建議在 RSocket 級別配置超時值以獲得更多控制。